How to consume agent execution as a typed event stream. Pairs with the v0.5.0 streaming premortem for the design rationale.
import agents_engine.runtime.events.AgentEvent
import agents_engine.runtime.events.session
val session = myAgent.session(input)
session.events.collect { event ->
when (event) {
is AgentEvent.SkillStarted -> log("→ ${event.skillName}")
is AgentEvent.Token -> render(event.text) // mid-loop
is AgentEvent.ToolCallStarted -> log("tool: ${event.toolName} (${event.callId})")
is AgentEvent.ToolCallArgumentsDelta -> previewArgs(event.callId, event.deltaJson)
is AgentEvent.ToolCallFinished -> if (event.isError) err(event) else showResult(event)
is AgentEvent.SkillCompleted -> log("✓ ${event.skillName} (${event.tokensUsed?.total ?: '?'} tokens)")
is AgentEvent.Completed -> done(event.output, event.tokensUsed)
is AgentEvent.Failed -> err(event.cause)
}
}
// OR: skip the events, just wait for the typed output.
val output: OUT = myAgent.session(input).await()Each agent.session(input) call starts a fresh invocation. events is a cold Flow<AgentEvent<OUT>> — collecting it twice would run the agent twice. Use events.shareIn(...) if you need multiple collectors.
All subtypes carry an agentId: String field — the name of the agent that produced the event. (Composition operators don't yet flow events through; see the composition note below.) Only Completed is parameterized on the agent's OUT; everything else is AgentEvent<Nothing> so events flow through any AgentSession<OUT>.
| Event | Fires when | Carries |
|---|---|---|
SkillStarted |
Before the resolved skill executes | skillName |
Token |
LLM streams a content chunk | skillName, text |
ToolCallStarted |
Streaming adapter sees a new tool call | skillName, callId, toolName |
ToolCallArgumentsDelta |
Each fragment of streamed tool-call args | callId, deltaJson |
ToolCallFinished |
After the agentic loop runs the executor | callId, toolName, arguments, result, isError |
SkillCompleted |
Skill body has returned | skillName, tokensUsed (cumulative across all LLM turns of this skill; null for implementedBy) |
Completed<OUT> |
Terminal success — emitted exactly once | output, tokensUsed |
Failed |
Terminal failure — emitted exactly once before the exception propagates | cause |
implementedBy skills: only SkillStarted → SkillCompleted → Completed. No Token or ToolCall* (no LLM round-trip). tokensUsed is always null.
Agentic skills (LLM-driven): the full set fires. Token events arrive incrementally as the model streams (proof in AgentSessionIncrementalArrivalTest).
Completed and Failed are mutually exclusive. A session emits exactly one of them as its terminal event.
All three first-party adapters override ModelClient.chatStream with native wire-level streaming. Numbers below are from the live integration tests under ./gradlew integrationTest against real APIs.
| Provider | Protocol | File | Live measurement (count 1–10 prompt) |
|---|---|---|---|
| Ollama | NDJSON | OllamaClient.chatStream |
19 chunks / 84ms gap (gpt-oss:120b-cloud) |
| Anthropic | SSE with named events + indexed content blocks | ClaudeClient.chatStream |
2 chunks / 27ms gap (claude-haiku-4-5) |
| OpenAI | SSE with [DONE] terminator |
OpenAiClient.chatStream |
19 chunks / 202ms gap (gpt-4o-mini) |
Custom ModelClient implementations don't need to override chatStream — the default impl wraps chat() and emits one bundled chunk sequence. That's fine for non-streaming providers; it just won't show incremental arrival.
Anthropic's SSE can interleave chunks across content blocks (text vs tool_use) — both have an index and chunks for different indices arrive mixed. ClaudeClient.chatStream tracks blocks in a Map<Int, BlockState> and routes each delta to the right block's id/builder. This is what ToolCall.callId was designed for; the test ClaudeClientChatStreamTest > interleaved text and tool_use blocks emit correctly keyed by callId pins it.
Token usage on streamed responses requires stream_options.include_usage: true in the request. OpenAiClient.buildRequestJson(stream = true) sets it automatically; OpenAI then sends a final usage-only delta before [DONE].
SkillCompleted.tokensUsed and Completed.tokensUsed carry a cumulative TokenUsage summed across every LLM turn of the skill — promptTokens and completionTokens summed independently. For a single-turn run, this equals that turn's usage; for a multi-turn loop, it's the total billed for the skill.
val session = agent.session(input)
val output = session.await()
session.events.toList().filterIsInstance<AgentEvent.Completed<*>>().single().tokensUsed
// → TokenUsage(promptTokens=147, completionTokens=63), or null if the provider didn't reportimplementedBy skills: always null (no LLM).
Today's contract: cancelling the coroutine collecting events terminates the Flow promptly. The session's outer scope is a SupervisorJob with Dispatchers.Unconfined; cancellation propagates through the channel-backed Flow.
Step-3 gap (deferred to step 4): the agent invocation itself may run to completion in the background when cancelled, because:
implementedBylambdas are(IN) -> OUT— pure synchronous code with no suspension points. Coroutine cancellation can only fire at suspension points, soThread.sleepor a tight loop won't be interrupted.- Native streaming adapters use
HttpClient.send(BodyHandlers.ofInputStream())which blocks insideBufferedReader.readLine(). Same issue — coroutine cancel doesn't interrupt the blocking read.
AgentSessionCancellationTest documents the current contract: collector cancellation returns under 500ms even while a 2-second Thread.sleep is still running in the skill body. Step 4 migrates the adapters to sendAsync so HTTP cancellation propagates properly; that's also when synchronous-skill cancellation will be addressed (likely via explicit session.cancel()).
For contributors navigating the streaming test surface:
| File | Pins |
|---|---|
AgentSessionBasicEventsTest |
implementedBy happy path — three ordered bracket events |
AgentSessionIntegrationTest |
failure path (identity-preserved cause), concurrent sessions, agentic-stub bracketing with Token, tool-call event sequence with shared callId, tokensUsed single-turn, tokensUsed cumulative across two turns |
AgentSessionLiveTest |
live π to 20 decimals against Ollama — full20=true end-to-end |
AgentSessionCancellationTest |
collector cancel returns under 500ms even with a 2-second sleeping skill |
AgentSessionIncrementalArrivalTest |
timing proof — first Token ≥100ms before Completed under a delayed-chunk stub |
ModelClientChatStreamDefaultTest |
default chatStream wrap of non-streaming chat() — Text and ToolCalls cases |
| File | Pins |
|---|---|
OllamaClientChatStreamTest |
NDJSON: TextDelta sequence + End with usage; tool-call triple; empty-content skip |
OllamaClientChatStreamLiveTest |
live Ollama — multiple chunks with measurable timing gap |
ClaudeClientChatStreamTest |
SSE text-only; tool_use with input_json_delta accumulation; interleaved text + tool_use blocks correctly keyed by callId |
ClaudeClientChatStreamLiveTest |
live Anthropic — multiple chunks with usage |
OpenAiClientChatStreamTest |
SSE text-only with usage-only final delta; tool-call with call_* id reused across deltas |
OpenAiClientChatStreamLiveTest |
live OpenAI — multiple chunks with usage |
22 test methods across 12 files. The non-live tests run under ./gradlew test; the live ones run under ./gradlew integrationTest (tagged live-llm).
Pipeline / Branch / wrap / Swarm do not yet flow events through to a parent session. Only leaf agents (the agent you directly call .session(input) on) expose streaming. A composed pipeline still works end-to-end with pipeline(input) returning the typed output, but pipeline.session(input) is not yet defined.
This is the next v0.5.0 milestone after step 3. The shape per the premortem: agentId on every event already namespaces by source agent — a Pipeline session would flow events from each inner agent with their respective agentIds, plus its own bracket events.
- Composition flow-through (above).
- HTTP cancellation mid-read — blocking InputStream isn't coroutine-cancellable.
- Synchronous skill body cancellation —
implementedBylambdas can't be interrupted. - Provider-specific limits — Ollama bundles tool-call args in one final chunk (no progressive
input_json_delta); only Anthropic streams tool args progressively today.
See docs/roadmap.md Phase 2 Secondary for the planned closure of each.