|
| 1 | +# Streaming agents |
| 2 | + |
| 3 | +How to consume agent execution as a typed event stream. Pairs with the [v0.5.0 streaming premortem](premortem-0.5.0-streaming.md) for the design rationale. |
| 4 | + |
| 5 | +## Quick start |
| 6 | + |
| 7 | +```kotlin |
| 8 | +import agents_engine.runtime.events.AgentEvent |
| 9 | +import agents_engine.runtime.events.session |
| 10 | + |
| 11 | +val session = myAgent.session(input) |
| 12 | + |
| 13 | +session.events.collect { event -> |
| 14 | + when (event) { |
| 15 | + is AgentEvent.SkillStarted -> log("→ ${event.skillName}") |
| 16 | + is AgentEvent.Token -> render(event.text) // mid-loop |
| 17 | + is AgentEvent.ToolCallStarted -> log("tool: ${event.toolName} (${event.callId})") |
| 18 | + is AgentEvent.ToolCallArgumentsDelta -> previewArgs(event.callId, event.deltaJson) |
| 19 | + is AgentEvent.ToolCallFinished -> if (event.isError) err(event) else showResult(event) |
| 20 | + is AgentEvent.SkillCompleted -> log("✓ ${event.skillName} (${event.tokensUsed?.total ?: '?'} tokens)") |
| 21 | + is AgentEvent.Completed -> done(event.output, event.tokensUsed) |
| 22 | + is AgentEvent.Failed -> err(event.cause) |
| 23 | + } |
| 24 | +} |
| 25 | + |
| 26 | +// OR: skip the events, just wait for the typed output. |
| 27 | +val output: OUT = myAgent.session(input).await() |
| 28 | +``` |
| 29 | + |
| 30 | +Each `agent.session(input)` call starts a fresh invocation. `events` is a cold `Flow<AgentEvent<OUT>>` — collecting it twice would run the agent twice. Use `events.shareIn(...)` if you need multiple collectors. |
| 31 | + |
| 32 | +## The AgentEvent hierarchy |
| 33 | + |
| 34 | +All subtypes carry an `agentId: String` field — the name of the agent that produced the event. (Composition operators don't yet flow events through; see the [composition note](#composition) below.) Only `Completed` is parameterized on the agent's `OUT`; everything else is `AgentEvent<Nothing>` so events flow through any `AgentSession<OUT>`. |
| 35 | + |
| 36 | +| Event | Fires when | Carries | |
| 37 | +|---|---|---| |
| 38 | +| `SkillStarted` | Before the resolved skill executes | `skillName` | |
| 39 | +| `Token` | LLM streams a content chunk | `skillName`, `text` | |
| 40 | +| `ToolCallStarted` | Streaming adapter sees a new tool call | `skillName`, `callId`, `toolName` | |
| 41 | +| `ToolCallArgumentsDelta` | Each fragment of streamed tool-call args | `callId`, `deltaJson` | |
| 42 | +| `ToolCallFinished` | After the agentic loop runs the executor | `callId`, `toolName`, `arguments`, `result`, `isError` | |
| 43 | +| `SkillCompleted` | Skill body has returned | `skillName`, `tokensUsed` (cumulative across all LLM turns of this skill; null for `implementedBy`) | |
| 44 | +| `Completed<OUT>` | Terminal success — emitted exactly once | `output`, `tokensUsed` | |
| 45 | +| `Failed` | Terminal failure — emitted exactly once before the exception propagates | `cause` | |
| 46 | + |
| 47 | +**`implementedBy` skills:** only `SkillStarted` → `SkillCompleted` → `Completed`. No `Token` or `ToolCall*` (no LLM round-trip). `tokensUsed` is always null. |
| 48 | + |
| 49 | +**Agentic skills (LLM-driven):** the full set fires. `Token` events arrive incrementally as the model streams (proof in `AgentSessionIncrementalArrivalTest`). |
| 50 | + |
| 51 | +**`Completed` and `Failed` are mutually exclusive.** A session emits exactly one of them as its terminal event. |
| 52 | + |
| 53 | +## Provider streaming status |
| 54 | + |
| 55 | +All three first-party adapters override `ModelClient.chatStream` with native wire-level streaming. Numbers below are from the live integration tests under `./gradlew integrationTest` against real APIs. |
| 56 | + |
| 57 | +| Provider | Protocol | File | Live measurement (count 1–10 prompt) | |
| 58 | +|---|---|---|---| |
| 59 | +| Ollama | NDJSON | `OllamaClient.chatStream` | 19 chunks / 84ms gap (gpt-oss:120b-cloud) | |
| 60 | +| Anthropic | SSE with named events + indexed content blocks | `ClaudeClient.chatStream` | 2 chunks / 27ms gap (claude-haiku-4-5) | |
| 61 | +| OpenAI | SSE with `[DONE]` terminator | `OpenAiClient.chatStream` | 19 chunks / 202ms gap (gpt-4o-mini) | |
| 62 | + |
| 63 | +Custom `ModelClient` implementations don't need to override `chatStream` — the default impl wraps `chat()` and emits one bundled chunk sequence. That's fine for non-streaming providers; it just won't show incremental arrival. |
| 64 | + |
| 65 | +### Anthropic-specific: interleaved content blocks |
| 66 | + |
| 67 | +Anthropic's SSE can interleave chunks across content blocks (text vs tool_use) — both have an `index` and chunks for different indices arrive mixed. `ClaudeClient.chatStream` tracks blocks in a `Map<Int, BlockState>` and routes each delta to the right block's id/builder. This is what `ToolCall.callId` was designed for; the test `ClaudeClientChatStreamTest > interleaved text and tool_use blocks emit correctly keyed by callId` pins it. |
| 68 | + |
| 69 | +### OpenAI-specific: usage opt-in |
| 70 | + |
| 71 | +Token usage on streamed responses requires `stream_options.include_usage: true` in the request. `OpenAiClient.buildRequestJson(stream = true)` sets it automatically; OpenAI then sends a final usage-only delta before `[DONE]`. |
| 72 | + |
| 73 | +## TokenUsage in events |
| 74 | + |
| 75 | +`SkillCompleted.tokensUsed` and `Completed.tokensUsed` carry a cumulative `TokenUsage` summed across every LLM turn of the skill — `promptTokens` and `completionTokens` summed independently. For a single-turn run, this equals that turn's usage; for a multi-turn loop, it's the total billed for the skill. |
| 76 | + |
| 77 | +```kotlin |
| 78 | +val session = agent.session(input) |
| 79 | +val output = session.await() |
| 80 | +session.events.toList().filterIsInstance<AgentEvent.Completed<*>>().single().tokensUsed |
| 81 | +// → TokenUsage(promptTokens=147, completionTokens=63), or null if the provider didn't report |
| 82 | +``` |
| 83 | + |
| 84 | +`implementedBy` skills: always null (no LLM). |
| 85 | + |
| 86 | +## Cancellation |
| 87 | + |
| 88 | +Today's contract: **cancelling the coroutine collecting `events` terminates the Flow promptly.** The session's outer scope is a `SupervisorJob` with `Dispatchers.Unconfined`; cancellation propagates through the channel-backed Flow. |
| 89 | + |
| 90 | +**Step-3 gap (deferred to step 4):** the agent invocation itself may run to completion in the background when cancelled, because: |
| 91 | + |
| 92 | +- `implementedBy` lambdas are `(IN) -> OUT` — pure synchronous code with no suspension points. Coroutine cancellation can only fire at suspension points, so `Thread.sleep` or a tight loop won't be interrupted. |
| 93 | +- Native streaming adapters use `HttpClient.send(BodyHandlers.ofInputStream())` which blocks inside `BufferedReader.readLine()`. Same issue — coroutine cancel doesn't interrupt the blocking read. |
| 94 | + |
| 95 | +`AgentSessionCancellationTest` documents the current contract: collector cancellation returns under 500ms even while a 2-second `Thread.sleep` is still running in the skill body. Step 4 migrates the adapters to `sendAsync` so HTTP cancellation propagates properly; that's also when synchronous-skill cancellation will be addressed (likely via explicit `session.cancel()`). |
| 96 | + |
| 97 | +## Test coverage map |
| 98 | + |
| 99 | +For contributors navigating the streaming test surface: |
| 100 | + |
| 101 | +### Session API |
| 102 | + |
| 103 | +| File | Pins | |
| 104 | +|---|---| |
| 105 | +| `AgentSessionBasicEventsTest` | implementedBy happy path — three ordered bracket events | |
| 106 | +| `AgentSessionIntegrationTest` | failure path (identity-preserved cause), concurrent sessions, agentic-stub bracketing with Token, tool-call event sequence with shared callId, tokensUsed single-turn, tokensUsed cumulative across two turns | |
| 107 | +| `AgentSessionLiveTest` | live π to 20 decimals against Ollama — `full20=true` end-to-end | |
| 108 | +| `AgentSessionCancellationTest` | collector cancel returns under 500ms even with a 2-second sleeping skill | |
| 109 | +| `AgentSessionIncrementalArrivalTest` | timing proof — first Token ≥100ms before Completed under a delayed-chunk stub | |
| 110 | +| `ModelClientChatStreamDefaultTest` | default `chatStream` wrap of non-streaming `chat()` — Text and ToolCalls cases | |
| 111 | + |
| 112 | +### Adapter streaming (provider-level chunk parsing) |
| 113 | + |
| 114 | +| File | Pins | |
| 115 | +|---|---| |
| 116 | +| `OllamaClientChatStreamTest` | NDJSON: TextDelta sequence + End with usage; tool-call triple; empty-content skip | |
| 117 | +| `OllamaClientChatStreamLiveTest` | live Ollama — multiple chunks with measurable timing gap | |
| 118 | +| `ClaudeClientChatStreamTest` | SSE text-only; tool_use with `input_json_delta` accumulation; interleaved text + tool_use blocks correctly keyed by callId | |
| 119 | +| `ClaudeClientChatStreamLiveTest` | live Anthropic — multiple chunks with usage | |
| 120 | +| `OpenAiClientChatStreamTest` | SSE text-only with usage-only final delta; tool-call with `call_*` id reused across deltas | |
| 121 | +| `OpenAiClientChatStreamLiveTest` | live OpenAI — multiple chunks with usage | |
| 122 | + |
| 123 | +22 test methods across 12 files. The non-live tests run under `./gradlew test`; the live ones run under `./gradlew integrationTest` (tagged `live-llm`). |
| 124 | + |
| 125 | +## Composition |
| 126 | + |
| 127 | +`Pipeline` / `Branch` / `wrap` / `Swarm` do **not** yet flow events through to a parent session. Only leaf agents (the agent you directly call `.session(input)` on) expose streaming. A composed pipeline still works end-to-end with `pipeline(input)` returning the typed output, but `pipeline.session(input)` is not yet defined. |
| 128 | + |
| 129 | +This is the next v0.5.0 milestone after step 3. The shape per the premortem: `agentId` on every event already namespaces by source agent — a Pipeline session would flow events from each inner agent with their respective `agentId`s, plus its own bracket events. |
| 130 | + |
| 131 | +## Known gaps (post-step-3, pre-v0.5.0-release) |
| 132 | + |
| 133 | +- **Composition flow-through** (above). |
| 134 | +- **HTTP cancellation** mid-read — blocking InputStream isn't coroutine-cancellable. |
| 135 | +- **Synchronous skill body cancellation** — `implementedBy` lambdas can't be interrupted. |
| 136 | +- **Provider-specific limits** — Ollama bundles tool-call args in one final chunk (no progressive `input_json_delta`); only Anthropic streams tool args progressively today. |
| 137 | + |
| 138 | +See [`docs/roadmap.md`](roadmap.md) Phase 2 *Secondary* for the planned closure of each. |
0 commit comments