Skip to content

Commit 449e465

Browse files
Skobeltsynclaude
andcommitted
feat(#1739): v0.5.0 step 3 — agentic-loop rewire emits Token + ToolCall* events
Third v0.5.0 streaming step. TDD red-first. ToolCall gains callId: String? = null. Nullable + default keeps the existing constructor signature working; non-streaming providers leave it empty. Streaming aggregator stamps it from LlmChunk.ToolCallStarted so the loop carries the same id through to AgentEvent.ToolCallFinished. New chatOrStream(client, messages, agentId, skillName, emitter): - emitter == null → withContext(IO) { client.chat(messages) } as before - emitter != null → collects client.chatStream(messages), emits Token for each TextDelta and ToolCallStarted / ToolCallArgumentsDelta for the matching LlmChunks. Provider-side LlmChunk.ToolCallFinished is bookkeeping; the consumer-facing AgentEvent.ToolCallFinished fires later in executeAgentic after the tool executor returns. executeAgentic gains emitter: AgentEventEmitter? = null. After executeToolWithBudget returns, emits ToolCallFinished with the executor's result and isError=false. Try/catch around the executor emits isError=true and rethrows so the loop's outer error path is preserved. Agent.invokeSuspendForSession plumbs the emitter through. Existing Agent.invokeSuspend keeps the byte-for-byte non-streaming path (emitter null). Agent.session(input) wires a trySend-based emitter into the channel that fronts the events Flow. Default ModelClient.chatStream honors call.callId when provided (synthesizes UUID only when the non-streaming chat() path returned a ToolCall without one). Preserves explicit ids end-to-end. Tests (red-first): - AgentSessionIntegrationTest's agentic-stub bracketing flipped from 3 events to 4 — now expects a Token("done") between SkillStarted and SkillCompleted. ToolCall* still absent under a no-tool stub. - New tool-call test exercises a two-turn stub (ToolCalls → Text), asserts shared callId between Started and Finished, single Token from the final text turn, strict ordering. - New AgentSessionIncrementalArrivalTest: ModelClient overrides chatStream to insert delay(50) between TextDelta chunks. Collects events with arrival timestamps. Asserts first Token arrives at least 100ms before Completed — proves incremental flow vs. batch-at-end. Verified: full root + KSP + no-reflect green; live π test still hits full20=true through the new chatOrStream path. tokensUsed threading on SkillCompleted/Completed deferred (separate follow-up). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e8b7437 commit 449e465

7 files changed

Lines changed: 367 additions & 20 deletions

File tree

src/main/kotlin/agents_engine/core/Agent.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,22 +252,30 @@ class Agent<IN, OUT>(
252252
* which lets parent-scope cancellation and `withTimeout` propagate cleanly into
253253
* the agentic loop. The blocking [invoke] is a thin shim over this.
254254
*/
255-
suspend fun invokeSuspend(input: IN): OUT = invokeSuspendForSession(input) { /* no-op */ }
255+
suspend fun invokeSuspend(input: IN): OUT = invokeSuspendForSession(input, emitter = null) { /* no-op */ }
256256

257257
/**
258258
* #1736 — session-aware sibling of [invokeSuspend]. Same logic, plus an
259259
* extra [onSkillStarted] callback fired after skill resolution and before
260260
* execution. Existing `invokeSuspend` delegates with a no-op callback, so
261261
* backward-compat is byte-for-byte; this entry point is only called by
262262
* `Agent.session(input)` to surface the skill name into the event flow.
263+
*
264+
* #1739 — when [emitter] is non-null, the agentic loop streams via
265+
* `chatStream` and surfaces `Token` / `ToolCall*` events through it.
266+
* Non-agentic skills ignore the emitter (they have no LLM round-trip).
263267
*/
264-
internal suspend fun invokeSuspendForSession(input: IN, onSkillStarted: (String) -> Unit): OUT {
268+
internal suspend fun invokeSuspendForSession(
269+
input: IN,
270+
emitter: agents_engine.model.AgentEventEmitter? = null,
271+
onSkillStarted: (String) -> Unit,
272+
): OUT {
265273
try {
266274
val skill = resolveSkill(input)
267275
skillChosenListener?.invoke(skill.name)
268276
onSkillStarted(skill.name)
269277
return if (skill.isAgentic) {
270-
castOut(executeAgentic(this, skill, input))
278+
castOut(executeAgentic(this, skill, input, emitter = emitter))
271279
} else {
272280
castOut(executors[skill.name]!!(input))
273281
}

src/main/kotlin/agents_engine/model/AgenticLoop.kt

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ suspend fun <IN> executeAgentic(
2828
* races on concurrent invocation of the same pipeline).
2929
*/
3030
effectivePrompt: String = agent.prompt,
31+
/**
32+
* #1739: optional AgentEvent emitter. When non-null, the loop streams
33+
* via `client.chatStream(...)`, surfaces `Token` / `ToolCallStarted` /
34+
* `ToolCallArgumentsDelta` events from chunks, and emits
35+
* `ToolCallFinished` after each tool executor runs. When null, the
36+
* loop uses `client.chat(...)` byte-for-byte as before — non-streaming
37+
* callers (`Agent.invoke`, `Agent.invokeSuspend`) pay no overhead.
38+
*/
39+
emitter: AgentEventEmitter? = null,
3140
): Any {
3241
val config = requireNotNull(agent.modelConfig) {
3342
"Agent '${agent.name}' has no model configured. Add a model { } block."
@@ -152,7 +161,7 @@ suspend fun <IN> executeAgentic(
152161
elapsedNanos.toDouble() / budget.maxDuration.inWholeNanoseconds,
153162
)
154163

155-
val response = withContext(Dispatchers.IO) { client.chat(messages) }
164+
val response = chatOrStream(client, messages, agent.name, skill.name, emitter)
156165
turns++
157166
maybeFireThreshold(BudgetReason.TURNS, turns.toDouble() / budget.maxTurns)
158167

@@ -213,9 +222,46 @@ suspend fun <IN> executeAgentic(
213222
"Tool '${call.name}' is not allowed for skill '${skill.name}'. " +
214223
"Allowed: ${allowedToolMap.keys}"
215224
)
216-
val result = executeToolWithBudget(agent, tool, call, budget)
225+
val result = try {
226+
executeToolWithBudget(agent, tool, call, budget)
227+
} catch (t: Throwable) {
228+
// #1739: tool executor threw and onError didn't recover.
229+
// Surface a ToolCallFinished event with isError=true so
230+
// consumers see the failure, then rethrow — the loop's
231+
// outer error path takes over (session emits Failed).
232+
if (emitter != null && call.callId != null) {
233+
emitter(
234+
agents_engine.runtime.events.AgentEvent.ToolCallFinished(
235+
agentId = agent.name,
236+
callId = call.callId,
237+
toolName = call.name,
238+
arguments = call.arguments,
239+
result = t.message,
240+
isError = true,
241+
)
242+
)
243+
}
244+
throw t
245+
}
217246
if (isKnowledge) agent.knowledgeUsedListener?.invoke(call.name, result?.toString() ?: "")
218247
else agent.toolUseListener?.invoke(call.name, call.arguments, result)
248+
// #1739: emit ToolCallFinished on the success path with the
249+
// executor's return value. callId is the one the streaming
250+
// aggregator stamped on this ToolCall — null only when the
251+
// emitter is null (no event work needed) or the non-streaming
252+
// path produced a ToolCall without one.
253+
if (emitter != null && call.callId != null) {
254+
emitter(
255+
agents_engine.runtime.events.AgentEvent.ToolCallFinished(
256+
agentId = agent.name,
257+
callId = call.callId,
258+
toolName = call.name,
259+
arguments = call.arguments,
260+
result = result,
261+
isError = false,
262+
)
263+
)
264+
}
219265
val toolMessage = if (tool.untrustedOutput) {
220266
wrapUntrustedToolResult(tool.name, result)
221267
} else {

src/main/kotlin/agents_engine/model/ModelClient.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ data class ToolCall(
1111
val arguments: Map<String, Any?> = emptyMap(),
1212
val rawArguments: String? = null,
1313
val invalidArgumentsError: String? = null,
14+
/**
15+
* #1739 — provider-side call identifier. Set by streaming adapters
16+
* (Anthropic SSE `tool_use_id`, OpenAI `tool_call_id`, MCP) so the
17+
* agentic loop can correlate the chunks of one tool call back to a
18+
* single `AgentEvent.ToolCallStarted` / `ToolCallFinished` pair, even
19+
* under interleaved streaming. Nullable with default null — non-
20+
* streaming providers that don't surface an explicit id can leave
21+
* this empty.
22+
*/
23+
val callId: String? = null,
1424
)
1525

1626
/**
@@ -64,7 +74,12 @@ fun interface ModelClient {
6474
}
6575
is LlmResponse.ToolCalls -> {
6676
response.calls.forEach { call ->
67-
val callId = java.util.UUID.randomUUID().toString()
77+
// #1739: honor the provider's callId when supplied; synthesize
78+
// only when the non-streaming `chat()` path returned a ToolCall
79+
// without one. This keeps explicit ids stable end-to-end so
80+
// AgentEvent.ToolCallStarted and ToolCallFinished can be
81+
// matched by consumers.
82+
val callId = call.callId ?: java.util.UUID.randomUUID().toString()
6883
emit(LlmChunk.ToolCallStarted(callId, call.name))
6984
emit(LlmChunk.ToolCallArgumentsDelta(callId, call.rawArguments ?: ""))
7085
emit(LlmChunk.ToolCallFinished(callId, call.arguments))
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package agents_engine.model
2+
3+
import agents_engine.runtime.events.AgentEvent
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.flow.collect
6+
import kotlinx.coroutines.withContext
7+
8+
// #1739 — emitter shape used to plumb AgentEvents out of the agentic
9+
// loop. `AgentEvent<*>` because the loop only ever produces non-`OUT`
10+
// subtypes (Token, ToolCall*, SkillStarted, SkillCompleted, Failed);
11+
// only `AgentEvent.Completed<OUT>` carries the typed payload and that's
12+
// emitted in `Agent.session(input)` after the loop returns.
13+
internal typealias AgentEventEmitter = suspend (AgentEvent<*>) -> Unit
14+
15+
/**
16+
* #1739 — round-trip the model: either via the existing non-streaming
17+
* `chat()` path (when [emitter] is null — byte-for-byte the old
18+
* behavior) or via `chatStream()` aggregated into the same `LlmResponse`
19+
* the agentic loop expects, emitting `AgentEvent` chunks as they arrive.
20+
*
21+
* Aggregation strategy:
22+
* - `TextDelta` chunks are concatenated into a final `LlmResponse.Text`.
23+
* Each delta also fires an `AgentEvent.Token`.
24+
* - `ToolCallStarted` records `callId` -> `toolName` in arrival order.
25+
* Fires `AgentEvent.ToolCallStarted`.
26+
* - `ToolCallArgumentsDelta` fires the matching `AgentEvent` with the
27+
* same `callId` (consumers can stream JSON-arg deltas to a UI today
28+
* even though the default `chatStream` impl coalesces them into one).
29+
* - `ToolCallFinished` (provider-side) records final arguments per
30+
* `callId`. **No `AgentEvent.ToolCallFinished` fires here** — that
31+
* one needs the executor's `result`, which the agentic loop produces
32+
* after this function returns. The loop emits it then.
33+
* - `End` carries optional `tokenUsage` into the returned `LlmResponse`.
34+
*
35+
* Interleaving safety: even if a provider's native streaming adapter
36+
* later interleaves chunks across multiple tool calls (Anthropic SSE
37+
* does this), the `callId` field on each chunk routes the delta to the
38+
* right pending entry. `ToolCall.callId` propagates into the final
39+
* `LlmResponse.ToolCalls` so the loop's `ToolCallFinished` event uses
40+
* the same id.
41+
*/
42+
internal suspend fun chatOrStream(
43+
client: ModelClient,
44+
messages: List<LlmMessage>,
45+
agentId: String,
46+
skillName: String,
47+
emitter: AgentEventEmitter?,
48+
): LlmResponse {
49+
if (emitter == null) {
50+
return withContext(Dispatchers.IO) { client.chat(messages) }
51+
}
52+
val textBuilder = StringBuilder()
53+
val callOrder = mutableListOf<String>()
54+
val pendingNames = mutableMapOf<String, String>()
55+
val pendingArgs = mutableMapOf<String, Map<String, Any?>>()
56+
var tokenUsage: TokenUsage? = null
57+
58+
client.chatStream(messages).collect { chunk ->
59+
when (chunk) {
60+
is LlmChunk.TextDelta -> {
61+
textBuilder.append(chunk.text)
62+
emitter(AgentEvent.Token(agentId, skillName, chunk.text))
63+
}
64+
is LlmChunk.ToolCallStarted -> {
65+
callOrder += chunk.callId
66+
pendingNames[chunk.callId] = chunk.toolName
67+
emitter(AgentEvent.ToolCallStarted(agentId, skillName, chunk.callId, chunk.toolName))
68+
}
69+
is LlmChunk.ToolCallArgumentsDelta -> {
70+
emitter(AgentEvent.ToolCallArgumentsDelta(agentId, chunk.callId, chunk.deltaJson))
71+
}
72+
is LlmChunk.ToolCallFinished -> {
73+
// Bookkeeping only — the consumer-facing AgentEvent.ToolCallFinished
74+
// fires AFTER the agentic loop runs the tool executor and has a result.
75+
pendingArgs[chunk.callId] = chunk.arguments
76+
}
77+
is LlmChunk.End -> {
78+
tokenUsage = chunk.tokenUsage
79+
}
80+
}
81+
}
82+
83+
return if (callOrder.isNotEmpty()) {
84+
val calls = callOrder.map { callId ->
85+
ToolCall(
86+
name = pendingNames[callId] ?: error("LlmChunk.ToolCallStarted missing for callId=$callId"),
87+
arguments = pendingArgs[callId] ?: emptyMap(),
88+
callId = callId,
89+
)
90+
}
91+
LlmResponse.ToolCalls(calls, tokenUsage)
92+
} else {
93+
LlmResponse.Text(textBuilder.toString(), tokenUsage)
94+
}
95+
}

src/main/kotlin/agents_engine/runtime/events/AgentSessionExtension.kt

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,20 @@ fun <IN, OUT> Agent<IN, OUT>.session(input: IN): AgentSession<OUT> {
4040
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined)
4141
scope.launch {
4242
// Captured-on-the-stack: each session has its own holder, so
43-
// concurrent sessions can't race on a shared field. Step 3's
44-
// agentic-loop rewire moves skill-name tracking into the
45-
// FlowCollector chain proper.
43+
// concurrent sessions can't race on a shared field.
4644
var capturedSkillName: String? = null
45+
// #1739: emitter forwards AgentEvents from inside the agentic loop
46+
// (Token, ToolCallStarted, ToolCallArgumentsDelta, ToolCallFinished)
47+
// into the same channel as the bracket events. trySend is non-
48+
// suspending — appropriate for a BUFFERED channel; if the buffer
49+
// ever fills (it has high capacity), excess events would be
50+
// dropped silently. Step 4 will tighten this for high-throughput
51+
// streaming.
52+
val streamingEmitter: agents_engine.model.AgentEventEmitter = { event ->
53+
channel.trySend(event as AgentEvent<OUT>)
54+
}
4755
try {
48-
val output = agent.invokeSuspendForSession(input) { skillName ->
56+
val output = agent.invokeSuspendForSession(input, emitter = streamingEmitter) { skillName ->
4957
capturedSkillName = skillName
5058
channel.trySend(AgentEvent.SkillStarted(agent.name, skillName))
5159
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package agents_engine.runtime.events
2+
3+
import agents_engine.core.agent
4+
import agents_engine.model.LlmChunk
5+
import agents_engine.model.LlmMessage
6+
import agents_engine.model.LlmResponse
7+
import agents_engine.model.ModelClient
8+
import kotlinx.coroutines.delay
9+
import kotlinx.coroutines.flow.Flow
10+
import kotlinx.coroutines.flow.flow
11+
import kotlinx.coroutines.runBlocking
12+
import kotlin.test.Test
13+
import kotlin.test.assertIs
14+
import kotlin.test.assertTrue
15+
16+
/**
17+
* #1739 — proves that AgentEvent.Token events arrive INCREMENTALLY during
18+
* the agentic loop, not batched-at-end.
19+
*
20+
* The premortem flagged this as the load-bearing claim of streaming. Step
21+
* 2's tests only checked event *ordering* via `events.toList()`, which
22+
* buffers everything — a fully-batched implementation would have passed.
23+
*
24+
* Approach: a custom ModelClient overrides `chatStream` to emit chunks
25+
* with deliberate `delay(50)` between them. We collect events with
26+
* arrival timestamps and assert the first Token's arrival lands well
27+
* before Completed's. If `chatOrStream` accidentally aggregates and
28+
* batch-emits, the gap collapses and this test fires.
29+
*
30+
* Uses `runBlocking` (real clock) — runTest's virtual time defeats the
31+
* timing-based assertion this test is built on.
32+
*/
33+
class AgentSessionIncrementalArrivalTest {
34+
35+
/**
36+
* Streaming stub: emits four TextDelta chunks with 50ms between each,
37+
* then End. Total wire-time ≈ 150ms minimum.
38+
*/
39+
private val incrementalStub = object : ModelClient {
40+
override fun chat(messages: List<LlmMessage>): LlmResponse =
41+
error("incrementalStub forces the streaming path; chat() must not be called")
42+
43+
override suspend fun chatStream(messages: List<LlmMessage>): Flow<LlmChunk> = flow {
44+
emit(LlmChunk.TextDelta("alpha "))
45+
delay(50)
46+
emit(LlmChunk.TextDelta("beta "))
47+
delay(50)
48+
emit(LlmChunk.TextDelta("gamma "))
49+
delay(50)
50+
emit(LlmChunk.TextDelta("delta"))
51+
emit(LlmChunk.End(tokenUsage = null))
52+
}
53+
}
54+
55+
@Test
56+
fun `Token events arrive incrementally while the stream produces chunks, not batched at the end`() = runBlocking {
57+
val streamingAgent = agent<String, String>("inc") {
58+
prompt("Incremental stub.")
59+
model { ollama("llama3"); client = incrementalStub }
60+
skills {
61+
skill<String, String>("recite", "Streams four words") { tools() }
62+
}
63+
}
64+
65+
val session = streamingAgent.session("kick")
66+
67+
val startNs = System.nanoTime()
68+
var firstTokenMs: Long? = null
69+
var completedMs: Long? = null
70+
session.events.collect { event ->
71+
val elapsedMs = (System.nanoTime() - startNs) / 1_000_000
72+
when (event) {
73+
is AgentEvent.Token -> if (firstTokenMs == null) firstTokenMs = elapsedMs
74+
is AgentEvent.Completed<*> -> completedMs = elapsedMs
75+
else -> {}
76+
}
77+
}
78+
79+
// Both arrival timestamps must have been recorded.
80+
val first = firstTokenMs ?: error("never observed a Token event")
81+
val last = completedMs ?: error("never observed a Completed event")
82+
83+
// Gap >= 100ms means at least two delays elapsed between the first
84+
// Token arriving and Completed — proves incremental flow. The actual
85+
// gap should be ~150ms (three delays); 100ms gives slack for CI noise.
86+
val gap = last - first
87+
assertTrue(
88+
gap >= 100,
89+
"expected first Token to arrive at least 100ms before Completed (proof of incremental flow); " +
90+
"got first=${first}ms, completed=${last}ms, gap=${gap}ms",
91+
)
92+
93+
// Final assembled output spans all four chunks.
94+
val output = session.await()
95+
assertIs<String>(output)
96+
assertTrue("alpha beta gamma delta" in output, "expected full assembled text; got: \"$output\"")
97+
}
98+
}

0 commit comments

Comments
 (0)