This page documents the Python SDK interface for running a Summoner client via SummonerClient. It focuses on how to use the class and its methods, and what behavior to expect when you call them.
A Summoner client connects to a Summoner server over TCP, continuously receives newline-delimited messages, runs user-defined handlers registered via decorators (for receive/send/hooks/state sync), and optionally emits messages back to the server. The client also supports reconnection, fallback, agent travel (switching host/port at runtime), and sender behavior for reactive event data, payload admission filters, timed senders, and DNA replay of sender scheduling rules.
SummonerClient is the primary SDK entry point for running a client process. It handles configuration loading (from a file path or in-memory dict), logger initialization, termination signal handling (where supported), handler registration, and the overall client lifecycle (connect, run loops, shutdown).
def __init__(self, name: Optional[str] = None) -> NoneCreates a client instance and prepares internal state for running client sessions.
-
Sets a logical
nameused for logging. -
Creates a dedicated asyncio event loop for this client instance and sets it as the current loop.
-
Initializes internal registries and locks for:
- route registration (receivers and senders),
- hook registration (send/receive validation hooks),
- active task tracking,
- connection intent (travel/quit).
-
Initializes flow support (
Flow) used for route parsing and state-driven activation.
- Type:
Optional[str] - Meaning: A human-readable identifier for logs and diagnostics.
- Default behavior: If
nameis not a string, a placeholder is used ("<client:no-name>").
This constructor returns a SummonerClient instance.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")def run(
self,
host: str = "127.0.0.1",
port: int = 8888,
config_path: Optional[str] = None,
config_dict: Optional[dict[str, Any]] = None,
) -> NoneStarts the client and blocks the calling thread until the client stops.
At a high level, run(...) does six things:
- Loads configuration (from
config_dictorconfig_path). - Applies configuration to internal runtime settings (logger, reconnection, receiver/sender parameters).
- Initializes flow parsing metadata.
- Installs termination signal handlers (where supported).
- Awaits completion of all decorator registrations scheduled before runtime.
- Runs the client session loop with reconnection and fallback behavior.
This method is the normal entry point for SDK usage.
- Type:
str - Meaning: Initial target host to connect to.
- Default:
"127.0.0.1"
- Type:
int - Meaning: Initial target port to connect to.
- Default:
8888
- Type:
Optional[str] - Meaning: Path to a JSON configuration file.
- When used: Used when
config_dictis not provided.
- Type:
Optional[dict[str, Any]] - Meaning: In-memory configuration dictionary.
- Precedence: If provided, it is used instead of
config_path. - Validation: Must be a
dictorNone. Any other type raisesTypeError.
This method expects configuration keys such as:
-
host,port(optional): default connection target (may be overridden byrun(host, port)at the SDK call site). -
logger(optional): logging configuration. -
hyper_parameters(optional): client runtime tuning, including:hyper_parameters.reconnectionhyper_parameters.receiverhyper_parameters.sender
The full set of configuration keys is documented in the configuration guide for the client.
- Returns
None. - This call blocks until the client exits.
- The client attempts a clean shutdown on
KeyboardInterruptand cancellation.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
client.run(host="127.0.0.1", port=8888)from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
client.run(
host="127.0.0.1",
port=8888,
config_path="client.json",
)from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
client.run(
host="127.0.0.1",
port=8888,
config_dict={
"logger": {"log_level": "INFO", "enable_console_log": True},
"hyper_parameters": {
"reconnection": {"primary_retry_limit": 3, "retry_delay_seconds": 2},
"receiver": {"max_bytes_per_line": 65536, "read_timeout_seconds": None},
"sender": {"concurrency_limit": 20},
},
},
)def flow(self) -> FlowReturns the Flow object owned by this client. This is the object used to define and parse routes and to enable flow-driven activation.
None.
Returns a Flow instance.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
flow = client.flow()
# flow.enable(...) or flow.add(...) depending on your Flow APIdef initialize(self) -> NoneInitializes flow metadata by compiling route patterns used by the flow parser.
This is called by run(...) as part of startup. You usually do not need to call it directly unless you embed the client lifecycle manually.
None.
Returns None.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
client.initialize()def upload_states(self) -> Callable[[Callable[[], Awaitable[Any]]], Callable[[], Awaitable[Any]]]Decorator used to register an async function that returns the current state snapshot for the flow system.
If flow is enabled, the receiver loop can call this function to obtain state, build a StateTape, compute activations, and schedule receivers accordingly.
This decorator must be used before client.run().
Returns a decorator. The decorated function is stored on the client as the upload-state provider.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
@client.upload_states()
async def upload():
return {"stage": "idle", "room": "alpha"}def download_states(self) -> Callable[[Callable[[Any], Awaitable[Any]]], Callable[[Any], Awaitable[Any]]]Decorator used to register an async function that receives a StateTape-compatible payload after receiver batches execute.
If flow is enabled, the client updates the tape during receiver execution and then calls this function with the updated snapshot.
This decorator must be used before client.run().
Returns a decorator. The decorated function is stored on the client as the download-state consumer.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
@client.download_states()
async def download(tape):
# Persist tape to memory/DB if desired
return Nonedef hook(
self,
direction: Direction,
priority: Union[int, tuple[int, ...]] = (),
) -> Callable[[Callable[[Optional[Union[str, dict]]], Awaitable[Optional[Union[str, dict]]]]], Callable[..., Any]]Decorator used to register an async hook that runs on payloads:
- Direction.RECEIVE: after a message is received and decoded, before receiver handlers run.
- Direction.SEND: before a payload is encoded and sent to the server.
Hooks are ordered by priority. A hook may:
- return a transformed payload (string/dict/other supported payload types), or
- return
Noneto drop the payload (skip further processing for that payload).
This decorator must be used before client.run() (recommended) although registration is scheduled safely.
- Type:
Direction - Meaning: Whether the hook applies on receive or send.
- Type:
Union[int, tuple[int, ...]] - Meaning: Ordering key for hook execution. Lower priorities run earlier (based on the SDK's ordering rule).
- Default:
()
Returns a decorator.
from summoner.client import SummonerClient
from summoner.protocol.process import Direction
client = SummonerClient(name="summoner:client")
@client.hook(Direction.RECEIVE, priority=0)
async def drop_empty(payload):
if payload is None:
return None
return payloadfrom summoner.client import SummonerClient
from summoner.protocol.process import Direction
client = SummonerClient(name="summoner:client")
@client.hook(Direction.SEND, priority=0)
async def normalize(payload):
if isinstance(payload, dict) and "type" not in payload:
payload["type"] = "message"
return payloaddef receive(
self,
route: str,
priority: Union[int, tuple[int, ...]] = (),
) -> Callable[[Callable[[Union[str, dict]], Awaitable[Optional[Event]]]], Callable[..., Any]]Decorator used to register an async receiver handler that is called when messages are received.
Receivers are grouped and executed in batches by priority. When flow is enabled, the route may be parsed and used for activation logic; otherwise the raw route is used as the index key.
If the handler returns an Event, that event may also carry data. Flow-aware sender logic can forward that payload to reactive senders registered with use_data=True, and those senders may further qualify it with when_data before the sender runs.
The decorated function must:
- be
async - accept exactly one argument (the payload)
- return
Optional[Event](orNone)
- Type:
str - Meaning: Logical route string used for indexing and (optionally) flow parsing.
- Type:
Union[int, tuple[int, ...]] - Meaning: Batch ordering key.
- Default:
()
Returns a decorator.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
@client.receive(route="chat.message", priority=0)
async def on_message(payload):
# payload is typically a dict or string depending on upstream encoding
return Nonedef send(
self,
route: str,
multi: bool = False,
on_triggers: Optional[set[Signal]] = None,
on_actions: Optional[set[Type]] = None,
use_data: bool = False,
data_mode: Optional[str] = None,
every: Optional[float] = None,
run_while: Any = None,
when_data: Any = None,
) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Any]]Decorator used to register an async sender handler that produces outbound payloads.
Senders are executed by the sender loop and enqueue output payloads to be encoded and written to the server connection.
If multi=False, the sender returns a single payload or None. If multi=True, it returns a list of payloads (or None to stay quiet for that run).
Returning None means "send nothing this cycle." Senders should return payloads, not yield them. Generator-based senders are not supported.
When flow is enabled, on_triggers and/or on_actions make a sender reactive to activation events. From an SDK point of view, those filters decide when the sender is eligible to run.
If you set use_data=True, the sender receives the matched event payload (Event.data) as its single argument.
data_mode controls how that payload is delivered:
"live"keeps shared-reference semantics, so later mutations may still be visible when the sender runs."snapshot"gives the sender a stable delivered payload, which is usually safer when the data may be mutated later or buffered for timed delivery.
Compared with earlier releases, the snapshot path keeps the same public behavior while doing less unnecessary copy work inside the SDK.
when_data=... is an optional synchronous payload filter for reactive use_data=True senders. It is evaluated on the same payload the sender would receive, before the sender coroutine is queued.
Two runtime details matter for advanced reactive senders:
- With untimed reactive senders,
use_data=Truekeeps one invocation per matching event. Matching events are not collapsed into one run because the event payload itself is part of the sender input. - For reactive timed senders, cadence does not arm the sender by itself. A matching event must arm the runtime first; after that,
run_whilecan keep the sender active or stop it.
every=<seconds> turns the sender into a timed sender. Once admitted, it is scheduled independently from the untimed batch loop.
run_while=... lets a timed sender keep running only while a bool or callable condition allows it.
- Untimed sender: no
every; runs in the legacy round-based sender loop. - Reactive sender: uses
on_triggersand/oron_actions; runs only for matching events. - Timed sender: uses
every; runs on a cadence once scheduled.
Registration enforces a few groups of rules.
Basic type checks
routemust be a string.multianduse_datamust be booleans.everymust beNoneor a positive number.on_triggersmust beNoneor aset[Signal].on_actionsmust beNoneor a set ofAction.MOVE,Action.STAY, and/orAction.TEST.
Flow-related rules
use_data=Truerequires a non-empty reactive filter andclient.flow().activate()before registration.- Timed reactive senders also require
client.flow().activate()before registration.
Option coupling
data_moderequiresuse_data=True.run_whilerequiresevery.when_datarequiresuse_data=True.when_datamust beNoneor a synchronous callable that accepts exactly one payload argument.
- If
use_data=False, the sender must accept zero parameters. - If
use_data=True, the sender must accept exactly one parameter. - When
use_data=True, the SDK enforces the parameter count but intentionally does not require a specific annotation type.
- Type:
str - Meaning: Logical route string used for indexing and (optionally) flow parsing.
- Type:
bool - Meaning: Whether the sender returns multiple payloads per invocation.
- Default:
False
- Type:
Optional[set[Signal]] - Meaning: Optional trigger set that gates sender execution in flow-enabled mode.
- Type:
Optional[set[Type]] - Meaning: Optional action set that gates sender execution in flow-enabled mode. This is validated against allowed action event classes.
- Type:
bool - Meaning: Whether the sender receives the matched event payload as a single argument.
- Default:
False - Important rule: Requires a non-empty reactive filter and flow activation before registration.
- Type:
Optional[str] - Meaning: Transfer policy for event payloads when
use_data=True. - Accepted values:
None,"live","snapshot" - Default behavior: Normalizes to
"live"whenuse_data=True.
- Type:
Any - Meaning: Optional synchronous payload admission predicate for reactive
use_data=Truesenders. - Accepted forms:
Noneor a callable that accepts one payload argument and returns a truthy/falsey value. - Important rule: Evaluated before sender invocation and only valid with
use_data=True.
- Type:
Optional[float] - Meaning: Repeat cadence in seconds for timed senders.
- Default:
None - Validation: Must be positive when provided.
- Type:
Any - Meaning: Optional timed-sender guard. May be
None, a bool, or a callable returningbool/ awaitablebool. - Important rule: Only valid when
everyis set.
Returns a decorator.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
pending_message = {"kind": "chat", "data": "hello"}
@client.send(route="chat.send")
async def send_one():
if pending_message is None:
return None
return pending_messagefrom summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
@client.send(route="chat.send", multi=True)
async def send_many():
return [{"data": "a"}, {"data": "b"}, {"data": "c"}]from summoner.client import SummonerClient
from summoner.protocol.triggers import load_triggers, Action
client = SummonerClient(name="summoner:client")
Trigger = load_triggers(text="OK\n minor")
client.flow().activate()
@client.send(
route="chat_send",
on_actions={Action.STAY},
on_triggers={Trigger.minor},
use_data=True,
data_mode="snapshot",
)
async def reply(event_data):
return {"seen": event_data}from summoner.client import SummonerClient
from summoner.protocol.triggers import load_triggers, Action
client = SummonerClient(name="summoner:client")
Trigger = load_triggers(text="OK\n minor")
client.flow().activate()
def ready_only(data):
return data.get("status") == "ready"
@client.send(
route="chat_send",
on_actions={Action.STAY},
on_triggers={Trigger.minor},
use_data=True,
when_data=ready_only,
)
async def reply_ready(event_data):
return {"seen": event_data["id"]}from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
keep_running = True
@client.send(route="heartbeat", every=5.0, run_while=lambda: keep_running)
async def heartbeat():
return {"kind": "heartbeat"}async def travel_to(self, host: str, port: int) -> NoneRequests that the client travel to a new server address (host/port). This sets an internal intent flag checked by the session loops.
In typical use, you call this from within a receiver or sender handler to migrate the client to another server endpoint.
- Type:
str - Meaning: Destination host.
- Type:
int - Meaning: Destination port.
An awaitable coroutine. Returns None.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
@client.receive("control.travel")
async def on_travel(payload):
await client.travel_to(host="127.0.0.1", port=9999)
return Noneasync def quit(self) -> NoneRequests that the client exit cleanly. This sets an internal intent flag that is checked by the session loops. In typical use, you call this from within a receiver or sender handler.
None.
An awaitable coroutine. Returns None.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
@client.receive("control.quit")
async def on_quit(payload):
await client.quit()
return Nonedef dna(self, include_context: bool = False) -> strSerializes this client's registered behavior (decorated handlers and related metadata) into a JSON string called "DNA".
At the SDK level, DNA is used to support cloning and merging workflows by capturing:
- handler type (receive/send/hook/upload_states/download_states),
- route keys and priorities (where applicable),
- handler source code,
- sender-side behavior metadata such as
use_data,data_mode,when_data,every, and serialized callable specs for timed guards and payload filters, - optional context metadata when
include_context=True.
For @send(...) handlers, DNA now writes the full sender contract, including:
multion_triggersandon_actionsby nameuse_datadata_modeeveryrun_while_kindrun_while_valuerun_while_namerun_while_sourcewhen the SDK can serialize fallback source textwhen_data_kindwhen_data_valuewhen_data_namewhen_data_sourcewhen the SDK can serialize fallback source text
When include_context=True, DNA also scans registered handlers and callable run_while / when_data helpers so translation and merger workflows can recover referenced imports, globals, recipes, and explicit "missing" bindings.
- Type:
bool - Meaning: If
True, includes a__context__header with best-effort imports/globals/recipes/missing bindings used by handlers. - Default:
False
Returns a JSON string.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
dna_json = client.dna()from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
dna_json = client.dna(include_context=True)def set_termination_signals(self) -> NoneInstalls process termination signal handlers on non-Windows platforms.
-
Registers handlers for:
- SIGINT (Ctrl+C)
- SIGTERM (process termination)
-
Each handler triggers
shutdown().
On Windows, signal handler installation is skipped.
None.
Returns None.
You normally do not call this directly because run(...) calls it as part of normal startup.
def shutdown(self) -> NoneTriggers client shutdown by cancelling all tasks in the client's event loop.
This is typically invoked through signal handlers or process interruption.
None.
Returns None.
from summoner.client import SummonerClient
client = SummonerClient(name="summoner:client")
client.shutdown()async def run_client(self, host: str = "127.0.0.1", port: int = 8888) -> NoneRuns the client's reconnection state machine.
This coroutine:
- attempts a "primary stage" connection loop using the provided
host/port, - on repeated failures, falls back to configured default host/port (if configured),
- exits cleanly on
/quit, - restarts primary behavior on
/travel.
In typical SDK usage, you do not call this directly; it is run by run(...).
- Type:
str - Meaning: Primary connection host.
- Default:
"127.0.0.1"
- Type:
int - Meaning: Primary connection port.
- Default:
8888
An awaitable coroutine. Returns None when the client finishes.
This method is usually called internally by run(...).
async def handle_session(self, host: str = "127.0.0.1", port: int = 8888) -> NoneRuns a single connected session: one receiver loop and one sender loop concurrently.
- Opens a TCP connection to the current host/port (including dynamic overrides from travel).
- Starts background send workers to execute sender handlers.
- Runs
message_receiver_loop(...)andmessage_sender_loop(...)concurrently. - Ends the session when one side completes (disconnect, travel, quit), then cancels the other side.
- Closes the connection and cleans up workers and tracked tasks.
- Clears timed-sender runtime state both at session start and during cleanup so timed senders do not leak scheduling state across reconnects.
In typical SDK usage, you do not call this directly; it is used as part of the reconnection logic.
- Type:
str - Meaning: Session host (used as fallback if the client has no dynamic host override).
- Default:
"127.0.0.1"
- Type:
int - Meaning: Session port (used as fallback if the client has no dynamic port override).
- Default:
8888
An awaitable coroutine. Returns when the session ends.
This method is usually called internally by run_client(...).
async def message_receiver_loop(
self,
reader: asyncio.StreamReader,
stop_event: asyncio.Event,
) -> NoneContinuously reads messages from the server, applies receive hooks, and dispatches receiver handlers.
At a high level:
- Reads one newline-delimited message (with size and timeout controls).
- Decodes the message into a relayed payload type.
- Applies receiving hooks in priority order.
- Runs receiver handlers in batches (by priority). If flow is enabled, batches may be activation-driven.
- If flow is enabled, forwards activation events across the event bridge to the sender side, including any
Event.datapayload carried by the receiver result. - Exits when
stop_eventis set, the server disconnects, or the task is cancelled.
- Type:
asyncio.StreamReader - Meaning: Read side of the TCP connection.
- Type:
asyncio.Event - Meaning: Cooperative termination signal shared with the sender loop.
An awaitable coroutine. Returns when the session ends or raises ServerDisconnected on EOF, depending on shutdown path.
This method is usually called internally by handle_session(...).
async def message_sender_loop(
self,
writer: asyncio.StreamWriter,
stop_event: asyncio.Event,
) -> NoneContinuously schedules and enqueues sender handlers for execution, then ensures outbound payloads are written to the server.
At a high level:
-
Starts two internal loops together:
- an untimed batch loop that preserves the legacy round-based sender contract,
- a timed scheduler loop that services
every=...senders independently.
-
The untimed batch loop:
- snapshots the sender registry,
- consumes pending receiver events,
- matches reactive senders by route and event filters,
- keeps one invocation per matching event when
use_data=True, - evaluates
when_databefore enqueueing ause_data=Truesender invocation, - enqueues a bounded batch of send invocations,
- waits for that untimed batch to finish before starting the next untimed round.
-
The timed scheduler loop:
- lets non-reactive timed senders become due immediately on the first scheduler pass,
- arms timed reactive senders from matching events,
- does not treat
run_whileas an arming mechanism on its own, - tracks one runtime record per sender registration,
- polls
run_while, - emits due invocations on cadence,
- applies
when_datato buffered payloads one by one before invoking ause_data=Truesender, - optionally replays buffered payloads when
use_data=True, which means one due tick can emit more than one invocation if several payloads were buffered.
-
Both loops share the same bounded send queue and writer workers.
-
batch_drainstill controls when the socket is flushed, but timed senders are no longer forced to wait for unrelated untimed batches before becoming due. -
On shutdown, the loop cancels any in-flight
run_whiletasks and shuts down both internal loops cleanly.
- Type:
asyncio.StreamWriter - Meaning: Write side of the TCP connection.
- Type:
asyncio.Event - Meaning: Cooperative termination signal shared with the receiver loop.
An awaitable coroutine. Returns when the session ends.
This method is usually called internally by handle_session(...).
from summoner.client import SummonerClient
from summoner.protocol.process import Direction
client = SummonerClient(name="summoner:client")
@client.receive("chat.message", priority=0)
async def on_message(payload):
client.logger.info(f"received: {payload!r}")
return None
@client.send("chat.send")
async def send_message():
# Return a payload to send; returning None sends nothing this cycle
return {"kind": "chat", "data": "hello"}
client.run(
host="127.0.0.1",
port=8888,
config_dict={
"logger": {"log_level": "INFO", "enable_console_log": True},
"hyper_parameters": {
"sender": {"concurrency_limit": 10, "queue_maxsize": 10},
"receiver": {"read_timeout_seconds": None},
},
},
)- The client connects to the server at
127.0.0.1:8888. - Incoming messages are passed through receive hooks (if any) and then dispatched to
@receivehandlers. - Outbound senders are executed by background workers and their payloads are written to the server connection.
« Previous: Summoner.client | Next: Summoner.client configuration guide »