Delivery Bus¶
The delivery bus lets external subscribers receive events from a run without being part of the responder chain.
DeliveryBus¶
kando.responders.delivery.DeliveryBus()
¶
In-process subscriber registry. Thread-safety is the caller's responsibility.
Source code in kando/responders/delivery.py
deliver(event)
¶
Dispatch event to all matching subscribers. Returns list of reached subscriber names.
Source code in kando/responders/delivery.py
subscribe(callback, name, pattern=None)
¶
Register a callback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callback
|
callable(event) called synchronously on each matching event. |
required |
name
|
str
|
unique name for this subscription (used in delivery receipts). |
required |
pattern
|
frozenset[str] | set[str] | None
|
set of event type strings to match. None / empty matches all types. |
None
|
Source code in kando/responders/delivery.py
unsubscribe(name)
¶
Remove a subscription by name. Returns True if it existed.
Source code in kando/responders/delivery.py
create_delivery_responder¶
kando.responders.delivery.create_delivery_responder(bus)
¶
Return a Responder that delivers every event to the given bus.
Usage::
bus = DeliveryBus()
bus.subscribe(my_webhook_fn, name="webhook", pattern={"run.completed"})
runtime = Runtime(ledger=..., responders=[..., create_delivery_responder(bus)])
Source code in kando/responders/delivery.py
Usage¶
from kando.responders.delivery import DeliveryBus, create_delivery_responder
from kando.runtime import Runtime
bus = DeliveryBus()
# Log all events
bus.subscribe(lambda e: print(f"[event] {e.type} {e.id}"), name="logger")
# Webhook on budget.exhausted only
def notify_slack(event):
requests.post(SLACK_URL, json={"text": f"Budget exhausted: {event.data['reasons']}"})
bus.subscribe(notify_slack, name="slack-alert", pattern={"budget.exhausted"})
# Plug into the runtime
runtime = Runtime(
ledger=store,
responders=[*create_kit(), create_delivery_responder(bus)],
)
world = runtime.run(seed)
Pattern matching¶
pattern value |
Matches |
|---|---|
None or empty frozenset() |
All event types (wildcard) |
frozenset({"object.created"}) |
Only object.created events |
frozenset({"budget.exhausted", "branch.created"}) |
Either type |
Notes¶
- Callbacks fire synchronously during the event loop — keep them fast.
- Callbacks that raise exceptions will propagate and halt the run. Wrap in
try/exceptif needed. - The delivery responder itself emits no new events — it cannot create feedback loops.
- For async delivery (webhooks, queues), use
threading.Threadinside the callback.