Skip to content

Delivery Bus

The delivery bus lets external subscribers receive events from a run without being part of the responder chain.

DeliveryBus

kando.responders.delivery.DeliveryBus()

In-process subscriber registry. Thread-safety is the caller's responsibility.

Source code in kando/responders/delivery.py
def __init__(self) -> None:
    self._subscriptions: list[Subscription] = []

deliver(event)

Dispatch event to all matching subscribers. Returns list of reached subscriber names.

Source code in kando/responders/delivery.py
def deliver(self, event: KandoEvent) -> list[str]:
    """Dispatch event to all matching subscribers. Returns list of reached subscriber names."""
    reached: list[str] = []
    for sub in self._subscriptions:
        if not sub.pattern or event.type in sub.pattern:
            try:
                sub.callback(event)
                reached.append(sub.name)
            except Exception:
                pass  # isolate subscriber errors; do not crash the runtime loop
    return reached

subscribe(callback, name, pattern=None)

Register a callback.

Parameters:

Name Type Description Default
callback Callback

callable(event) called synchronously on each matching event.

required
name str

unique name for this subscription (used in delivery receipts).

required
pattern frozenset[str] | set[str] | None

set of event type strings to match. None / empty matches all types.

None
Source code in kando/responders/delivery.py
def subscribe(
    self,
    callback: Callback,
    name: str,
    pattern: frozenset[str] | set[str] | None = None,
) -> None:
    """Register a callback.

    Args:
        callback: callable(event) called synchronously on each matching event.
        name: unique name for this subscription (used in delivery receipts).
        pattern: set of event type strings to match. None / empty matches all types.
    """
    self._subscriptions.append(Subscription(
        name=name,
        pattern=frozenset(pattern) if pattern else frozenset(),
        callback=callback,
    ))

unsubscribe(name)

Remove a subscription by name. Returns True if it existed.

Source code in kando/responders/delivery.py
def unsubscribe(self, name: str) -> bool:
    """Remove a subscription by name. Returns True if it existed."""
    before = len(self._subscriptions)
    self._subscriptions = [s for s in self._subscriptions if s.name != name]
    return len(self._subscriptions) < before

create_delivery_responder

kando.responders.delivery.create_delivery_responder(bus)

Return a Responder that delivers every event to the given bus.

Usage::

bus = DeliveryBus()
bus.subscribe(my_webhook_fn, name="webhook", pattern={"run.completed"})
runtime = Runtime(ledger=..., responders=[..., create_delivery_responder(bus)])
Source code in kando/responders/delivery.py
def create_delivery_responder(bus: DeliveryBus) -> Responder:
    """Return a Responder that delivers every event to the given bus.

    Usage::

        bus = DeliveryBus()
        bus.subscribe(my_webhook_fn, name="webhook", pattern={"run.completed"})
        runtime = Runtime(ledger=..., responders=[..., create_delivery_responder(bus)])
    """
    def _deliver(event: KandoEvent, world: World) -> Iterator[KandoEvent]:
        bus.deliver(event)
        return iter([])

    return Responder(
        name="delivery.bus",
        pattern=frozenset(),   # empty = match all event types
        fn=_deliver,
    )

Usage

from kando.responders.delivery import DeliveryBus, create_delivery_responder
from kando.runtime import Runtime

bus = DeliveryBus()

# Log all events
bus.subscribe(lambda e: print(f"[event] {e.type} {e.id}"), name="logger")

# Webhook on budget.exhausted only
def notify_slack(event):
    requests.post(SLACK_URL, json={"text": f"Budget exhausted: {event.data['reasons']}"})

bus.subscribe(notify_slack, name="slack-alert", pattern={"budget.exhausted"})

# Plug into the runtime
runtime = Runtime(
    ledger=store,
    responders=[*create_kit(), create_delivery_responder(bus)],
)
world = runtime.run(seed)

Pattern matching

pattern value Matches
None or empty frozenset() All event types (wildcard)
frozenset({"object.created"}) Only object.created events
frozenset({"budget.exhausted", "branch.created"}) Either type

Notes

  • Callbacks fire synchronously during the event loop — keep them fast.
  • Callbacks that raise exceptions will propagate and halt the run. Wrap in try/except if needed.
  • The delivery responder itself emits no new events — it cannot create feedback loops.
  • For async delivery (webhooks, queues), use threading.Thread inside the callback.