Skip to content

Writing a Kit

A kit is a Python module with two exported functions:

def create_kit() -> list[Responder]: ...
def seed_from_goal(goal: str, run_id: str) -> list[KandoEvent]: ...  # optional

Minimal kit

# kits/planning/kit.py
from __future__ import annotations
from typing import Iterator
from kando.responders.base import Responder
from kando.schema.events import KandoEvent, OBJECT_CREATED, make_event
from kando.world.graph import World

TASK = "Task"
SUBTASK = "Subtask"

_COUNTER = 0

def _on_task_created(event: KandoEvent, world: World) -> Iterator[KandoEvent]:
    if event.data.get("type") != TASK:
        return
    global _COUNTER
    _COUNTER += 1
    task_id = event.data["id"]
    yield make_event(
        type=OBJECT_CREATED,
        source=event.source,
        actor="planning.decomposer",
        cause=[event.id],
        data={"id": f"subtask-{_COUNTER}", "type": SUBTASK,
              "data": {"text": "Step 1", "parent": task_id}},
    )

def create_kit() -> list[Responder]:
    return [
        Responder(
            name="planning.on_task_created",
            pattern=frozenset({OBJECT_CREATED}),
            fn=_on_task_created,
        ),
    ]

Best practices

Use domain-typed object data

# Good: typed data dict
{"id": "task-001", "type": "Task", "data": {"title": "Research", "priority": 1}}

# Avoid: loose untyped fields at top level
{"task_title": "Research", "priority": 1}

Guard on object type in every responder

Responders receive ALL object.created events. Always check event.data.get("type") first:

def _on_company_created(event: KandoEvent, world: World):
    if event.data.get("type") != "Company":
        return   # ← this is critical
    ...

Use world.objects for lookups, not event data

After the event fires, the object is already applied to the world. Reference it from world.objects when you need other objects' data:

claim_id = event.data["data"]["claim_id"]
claim_obj = world.objects.get(claim_id)
if not claim_obj:
    return  # guard against events that arrive before their parents

Access the LLM cache from world.context

def _llm_responder(event: KandoEvent, world: World):
    cache = world.context.get("cache")  # LLMCache or None
    if cache:
        cached = cache.get({"model": "x", "prompt": "..."})
        if cached:
            # serve from cache

_COUNTER for ID generation

Use a module-level counter for stable, monotonic IDs. Reset it in tests if needed.

Seed events

from datetime import datetime, timezone
from kando.schema.events import KandoEvent, OBJECT_CREATED

def seed_from_goal(goal: str, run_id: str) -> list[KandoEvent]:
    return [KandoEvent(
        id=f"task.created-{run_id[:8]}",
        type=OBJECT_CREATED,
        source=f"run:{run_id}",
        actor="cli",
        cause=[],                          # root event — no cause
        timestamp=datetime.now(timezone.utc),
        data={"id": f"task-{run_id[:8]}", "type": "Task",
              "data": {"title": goal}},
    )]

Edge logic

Register logic that fires automatically when a relation of a given type is created:

from kando.responders.edge import edge_logic
from kando.schema.events import KandoEvent, OBJECT_CREATED, make_event
from kando.world.graph import World

@edge_logic("blocks")
def on_blocks(event: KandoEvent, world: World):
    """When a 'blocks' relation is created, pause the blocked task."""
    blocked_id = event.data.get("target_id")
    if blocked_id and blocked_id in world.objects:
        yield make_event(
            type=OBJECT_CREATED, source=event.source,
            actor="edge.blocks", cause=[event.id],
            data={"id": f"pause-{blocked_id}", "type": "PauseSignal",
                  "data": {"task_id": blocked_id}},
            run_id_counter=0,
        )

Testing a kit

from kando.ledger.memory import MemoryLedgerStore
from kando.runtime import Runtime
from kits.planning.kit import create_kit, seed_from_goal, TASK, SUBTASK

def test_task_creates_subtask():
    store = MemoryLedgerStore("test-run")
    seed = seed_from_goal("Build feature X", "testrun001")
    world = Runtime(ledger=store, responders=create_kit()).run(seed)
    subtasks = [o for o in world.objects.values() if o.type == SUBTASK]
    assert len(subtasks) == 1
    assert subtasks[0].data["parent"] == f"task-testrun0"

Delivery integration

from kando.responders.delivery import DeliveryBus, create_delivery_responder

bus = DeliveryBus()
bus.subscribe(print, name="logger")                         # log all events
bus.subscribe(webhook_fn, name="webhook",
              pattern={"budget.exhausted", "object.created"})

runtime = Runtime(
    ledger=store,
    responders=[*create_kit(), create_delivery_responder(bus)],
)