Skip to content

Latest commit

 

History

History
138 lines (95 loc) · 9.29 KB

File metadata and controls

138 lines (95 loc) · 9.29 KB

Streaming agents

How to consume agent execution as a typed event stream. Pairs with the v0.5.0 streaming premortem for the design rationale.

Quick start

import agents_engine.runtime.events.AgentEvent
import agents_engine.runtime.events.session

val session = myAgent.session(input)

session.events.collect { event ->
    when (event) {
        is AgentEvent.SkillStarted             -> log("${event.skillName}")
        is AgentEvent.Token                    -> render(event.text)            // mid-loop
        is AgentEvent.ToolCallStarted          -> log("tool: ${event.toolName} (${event.callId})")
        is AgentEvent.ToolCallArgumentsDelta   -> previewArgs(event.callId, event.deltaJson)
        is AgentEvent.ToolCallFinished         -> if (event.isError) err(event) else showResult(event)
        is AgentEvent.SkillCompleted           -> log("${event.skillName} (${event.tokensUsed?.total ?: '?'} tokens)")
        is AgentEvent.Completed                -> done(event.output, event.tokensUsed)
        is AgentEvent.Failed                   -> err(event.cause)
    }
}

// OR: skip the events, just wait for the typed output.
val output: OUT = myAgent.session(input).await()

Each agent.session(input) call starts a fresh invocation. events is a cold Flow<AgentEvent<OUT>> — collecting it twice would run the agent twice. Use events.shareIn(...) if you need multiple collectors.

The AgentEvent hierarchy

All subtypes carry an agentId: String field — the name of the agent that produced the event. (Composition operators don't yet flow events through; see the composition note below.) Only Completed is parameterized on the agent's OUT; everything else is AgentEvent<Nothing> so events flow through any AgentSession<OUT>.

Event Fires when Carries
SkillStarted Before the resolved skill executes skillName
Token LLM streams a content chunk skillName, text
ToolCallStarted Streaming adapter sees a new tool call skillName, callId, toolName
ToolCallArgumentsDelta Each fragment of streamed tool-call args callId, deltaJson
ToolCallFinished After the agentic loop runs the executor callId, toolName, arguments, result, isError
SkillCompleted Skill body has returned skillName, tokensUsed (cumulative across all LLM turns of this skill; null for implementedBy)
Completed<OUT> Terminal success — emitted exactly once output, tokensUsed
Failed Terminal failure — emitted exactly once before the exception propagates cause

implementedBy skills: only SkillStartedSkillCompletedCompleted. No Token or ToolCall* (no LLM round-trip). tokensUsed is always null.

Agentic skills (LLM-driven): the full set fires. Token events arrive incrementally as the model streams (proof in AgentSessionIncrementalArrivalTest).

Completed and Failed are mutually exclusive. A session emits exactly one of them as its terminal event.

Provider streaming status

All three first-party adapters override ModelClient.chatStream with native wire-level streaming. Numbers below are from the live integration tests under ./gradlew integrationTest against real APIs.

Provider Protocol File Live measurement (count 1–10 prompt)
Ollama NDJSON OllamaClient.chatStream 19 chunks / 84ms gap (gpt-oss:120b-cloud)
Anthropic SSE with named events + indexed content blocks ClaudeClient.chatStream 2 chunks / 27ms gap (claude-haiku-4-5)
OpenAI SSE with [DONE] terminator OpenAiClient.chatStream 19 chunks / 202ms gap (gpt-4o-mini)

Custom ModelClient implementations don't need to override chatStream — the default impl wraps chat() and emits one bundled chunk sequence. That's fine for non-streaming providers; it just won't show incremental arrival.

Anthropic-specific: interleaved content blocks

Anthropic's SSE can interleave chunks across content blocks (text vs tool_use) — both have an index and chunks for different indices arrive mixed. ClaudeClient.chatStream tracks blocks in a Map<Int, BlockState> and routes each delta to the right block's id/builder. This is what ToolCall.callId was designed for; the test ClaudeClientChatStreamTest > interleaved text and tool_use blocks emit correctly keyed by callId pins it.

OpenAI-specific: usage opt-in

Token usage on streamed responses requires stream_options.include_usage: true in the request. OpenAiClient.buildRequestJson(stream = true) sets it automatically; OpenAI then sends a final usage-only delta before [DONE].

TokenUsage in events

SkillCompleted.tokensUsed and Completed.tokensUsed carry a cumulative TokenUsage summed across every LLM turn of the skill — promptTokens and completionTokens summed independently. For a single-turn run, this equals that turn's usage; for a multi-turn loop, it's the total billed for the skill.

val session = agent.session(input)
val output = session.await()
session.events.toList().filterIsInstance<AgentEvent.Completed<*>>().single().tokensUsed
// → TokenUsage(promptTokens=147, completionTokens=63), or null if the provider didn't report

implementedBy skills: always null (no LLM).

Cancellation

Today's contract: cancelling the coroutine collecting events terminates the Flow promptly. The session's outer scope is a SupervisorJob with Dispatchers.Unconfined; cancellation propagates through the channel-backed Flow.

Step-3 gap (deferred to step 4): the agent invocation itself may run to completion in the background when cancelled, because:

  • implementedBy lambdas are (IN) -> OUT — pure synchronous code with no suspension points. Coroutine cancellation can only fire at suspension points, so Thread.sleep or a tight loop won't be interrupted.
  • Native streaming adapters use HttpClient.send(BodyHandlers.ofInputStream()) which blocks inside BufferedReader.readLine(). Same issue — coroutine cancel doesn't interrupt the blocking read.

AgentSessionCancellationTest documents the current contract: collector cancellation returns under 500ms even while a 2-second Thread.sleep is still running in the skill body. Step 4 migrates the adapters to sendAsync so HTTP cancellation propagates properly; that's also when synchronous-skill cancellation will be addressed (likely via explicit session.cancel()).

Test coverage map

For contributors navigating the streaming test surface:

Session API

File Pins
AgentSessionBasicEventsTest implementedBy happy path — three ordered bracket events
AgentSessionIntegrationTest failure path (identity-preserved cause), concurrent sessions, agentic-stub bracketing with Token, tool-call event sequence with shared callId, tokensUsed single-turn, tokensUsed cumulative across two turns
AgentSessionLiveTest live π to 20 decimals against Ollama — full20=true end-to-end
AgentSessionCancellationTest collector cancel returns under 500ms even with a 2-second sleeping skill
AgentSessionIncrementalArrivalTest timing proof — first Token ≥100ms before Completed under a delayed-chunk stub
ModelClientChatStreamDefaultTest default chatStream wrap of non-streaming chat() — Text and ToolCalls cases

Adapter streaming (provider-level chunk parsing)

File Pins
OllamaClientChatStreamTest NDJSON: TextDelta sequence + End with usage; tool-call triple; empty-content skip
OllamaClientChatStreamLiveTest live Ollama — multiple chunks with measurable timing gap
ClaudeClientChatStreamTest SSE text-only; tool_use with input_json_delta accumulation; interleaved text + tool_use blocks correctly keyed by callId
ClaudeClientChatStreamLiveTest live Anthropic — multiple chunks with usage
OpenAiClientChatStreamTest SSE text-only with usage-only final delta; tool-call with call_* id reused across deltas
OpenAiClientChatStreamLiveTest live OpenAI — multiple chunks with usage

22 test methods across 12 files. The non-live tests run under ./gradlew test; the live ones run under ./gradlew integrationTest (tagged live-llm).

Composition

Pipeline / Branch / wrap / Swarm do not yet flow events through to a parent session. Only leaf agents (the agent you directly call .session(input) on) expose streaming. A composed pipeline still works end-to-end with pipeline(input) returning the typed output, but pipeline.session(input) is not yet defined.

This is the next v0.5.0 milestone after step 3. The shape per the premortem: agentId on every event already namespaces by source agent — a Pipeline session would flow events from each inner agent with their respective agentIds, plus its own bracket events.

Known gaps (post-step-3, pre-v0.5.0-release)

  • Composition flow-through (above).
  • HTTP cancellation mid-read — blocking InputStream isn't coroutine-cancellable.
  • Synchronous skill body cancellationimplementedBy lambdas can't be interrupted.
  • Provider-specific limits — Ollama bundles tool-call args in one final chunk (no progressive input_json_delta); only Anthropic streams tool args progressively today.

See docs/roadmap.md Phase 2 Secondary for the planned closure of each.