Skip to content

Ledger

The ledger is the append-only event log for a single agent run.

LedgerStore (interface)

kando.ledger.interface.LedgerStore

Bases: ABC

Abstract base for all ledger backends. High-level modules depend on this, not concretions.

append(events) abstractmethod

Append events and return the new total position (length of ledger after append).

Source code in kando/ledger/interface.py
@abstractmethod
def append(self, events: list[KandoEvent]) -> int:
    """Append events and return the new total position (length of ledger after append)."""
    ...

read(from_position=0) abstractmethod

Yield events starting at from_position.

Source code in kando/ledger/interface.py
@abstractmethod
def read(self, from_position: int = 0) -> Iterator[KandoEvent]:
    """Yield events starting at from_position."""
    ...

read_all()

Yield all events from the beginning.

Source code in kando/ledger/interface.py
def read_all(self) -> Iterator[KandoEvent]:
    """Yield all events from the beginning."""
    return self.read(from_position=0)

stream_name() abstractmethod

Return the stream identity for this ledger (e.g. 'run:abc-123').

Source code in kando/ledger/interface.py
@abstractmethod
def stream_name(self) -> str:
    """Return the stream identity for this ledger (e.g. 'run:abc-123')."""
    ...

MemoryLedgerStore

In-process ledger for testing and exploration. No persistence.

kando.ledger.memory.MemoryLedgerStore(run_id)

Bases: LedgerStore

In-process ledger for testing — no persistence.

Source code in kando/ledger/memory.py
def __init__(self, run_id: str) -> None:
    self._run_id = run_id
    self._events: list[KandoEvent] = []
    self._lock = threading.Lock()
from kando.ledger.memory import MemoryLedgerStore

store = MemoryLedgerStore("my-run")
store.append([event1, event2])
all_events = list(store.read_all())

EventStreamLedgerStore

Durable ledger backed by EventStoreDB.

kando.ledger.stream.EventStreamLedgerStore(run_id, uri=None)

Bases: LedgerStore

Durable ledger backed by an EventStoreDB stream.

Source code in kando/ledger/stream.py
def __init__(self, run_id: str, uri: str | None = None) -> None:
    self._run_id = run_id
    self._client = EventStoreDBClient(uri=_make_uri(uri))
    self._length: int | None = None
import os
os.environ["EVENTSTORE_URL"] = "http://localhost:2113"

from kando.ledger.stream import EventStreamLedgerStore

store = EventStreamLedgerStore("my-run-001")
store.append([event])
for e in store.read(from_position=0):
    print(e.id, e.type)

Requires EventStoreDB

Install with pip install -e ".[stream]" and start EventStoreDB with:

docker compose up -d eventstore
export EVENTSTORE_URL=http://localhost:2113


Snapshot

World checkpoints: fast startup without replaying the full ledger.

from kando.world.snapshot import save_snapshot, load_snapshot

# Save a checkpoint after processing N events
save_snapshot(run_id="my-run", world=world, position=100)

# Load on next startup — fall back to full replay if absent
result = load_snapshot("my-run")
if result:
    world, position = result
    # replay only events after `position`
else:
    world = reproject(store)

Set KANDO_SNAPSHOT_DIR to control where snapshots are stored (default: .kando_snapshots).