Skip to content

Commit 8173f24

Browse files
Skobeltsynclaude
andcommitted
feat(#1736): AgentEvent sealed hierarchy + AgentSession + Agent.session(input)
Second v0.5.0 streaming step. Consumer-facing layer on top of #1722's LlmChunk foundation. TDD red-first. Adds: - AgentEvent sealed interface (Token / ToolCallStarted / ToolCallArgumentsDelta / ToolCallFinished / SkillStarted / SkillCompleted / Completed<OUT> / Failed). agentId on every subtype for composition provenance. - AgentSession<OUT> with cold `events: Flow<AgentEvent<OUT>>` and `suspend fun await()`. - Free function `Agent<IN, OUT>.session(input)` driving a Channel-backed Flow + CompletableDeferred from a dedicated SupervisorJob scope. Closure-captured skill-name holder is per-session — concurrent sessions can't race. Backward compat: existing Agent.invokeSuspend(input) now delegates to a new internal invokeSuspendForSession(input, onSkillStarted) with a no-op callback. Same logic, byte-for-byte behavior; only the new entry point on Agent surfaces the skill name into the event flow. Step 2 scope (deliberate): emits ONLY SkillStarted → SkillCompleted → Completed (or Failed). Token / ToolCall* subtypes are defined in the hierarchy but NOT YET emitted — that's step 3, where the agentic loop is rewired onto a FlowCollector<AgentEvent>. tokensUsed stays null for the same reason. Full root + KSP + no-reflect smoke suites green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 287c964 commit 8173f24

5 files changed

Lines changed: 290 additions & 1 deletion

File tree

src/main/kotlin/agents_engine/core/Agent.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,20 @@ class Agent<IN, OUT>(
252252
* which lets parent-scope cancellation and `withTimeout` propagate cleanly into
253253
* the agentic loop. The blocking [invoke] is a thin shim over this.
254254
*/
255-
suspend fun invokeSuspend(input: IN): OUT {
255+
suspend fun invokeSuspend(input: IN): OUT = invokeSuspendForSession(input) { /* no-op */ }
256+
257+
/**
258+
* #1736 — session-aware sibling of [invokeSuspend]. Same logic, plus an
259+
* extra [onSkillStarted] callback fired after skill resolution and before
260+
* execution. Existing `invokeSuspend` delegates with a no-op callback, so
261+
* backward-compat is byte-for-byte; this entry point is only called by
262+
* `Agent.session(input)` to surface the skill name into the event flow.
263+
*/
264+
internal suspend fun invokeSuspendForSession(input: IN, onSkillStarted: (String) -> Unit): OUT {
256265
try {
257266
val skill = resolveSkill(input)
258267
skillChosenListener?.invoke(skill.name)
268+
onSkillStarted(skill.name)
259269
return if (skill.isAgentic) {
260270
castOut(executeAgentic(this, skill, input))
261271
} else {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package agents_engine.runtime.events
2+
3+
import agents_engine.model.TokenUsage
4+
5+
/**
6+
* #1736 — typed event emitted from `Agent.session(input).events` while the
7+
* agentic loop runs. See [the v0.5.0 streaming premortem](../../../../docs/premortem-0.5.0-streaming.md)
8+
* for the full design rationale.
9+
*
10+
* The sealed hierarchy is complete here so consumers can write exhaustive
11+
* `when` matches today. **Not every subtype is emitted yet.** v0.5.0 step 2
12+
* surfaces only [SkillStarted], [SkillCompleted], [Completed], and [Failed]
13+
* — enough for the consumer surface to be useful for `implementedBy`-style
14+
* agents. [Token], [ToolCallStarted], [ToolCallArgumentsDelta], and
15+
* [ToolCallFinished] land in step 3 when the agentic loop is rewired onto a
16+
* `FlowCollector<AgentEvent>`.
17+
*
18+
* Every event carries [agentId] — the name of the agent that produced it.
19+
* Composition operators (`then`, `Pipeline`, `Branch`, `wrap`, `Swarm`)
20+
* preserve provenance via this field so a consumer collecting from a
21+
* composed pipeline can still tell which agent emitted which event.
22+
*
23+
* Only [Completed] carries the typed `OUT` payload; every other subtype
24+
* is `AgentEvent<Nothing>` so events flow through any `AgentSession<OUT>`
25+
* regardless of OUT.
26+
*/
27+
sealed interface AgentEvent<out OUT> {
28+
/** The agent that produced this event. For composed pipelines this is the inner agent's name, not the composition's. */
29+
val agentId: String
30+
31+
/**
32+
* A chunk of LLM-streamed text from a single skill turn. Providers chunk at
33+
* their own granularity — [text] may be a single token or a multi-token
34+
* chunk; the framework passes through as-is.
35+
*
36+
* Not yet emitted (step 3 — agentic loop rewire).
37+
*/
38+
data class Token(
39+
override val agentId: String,
40+
val skillName: String,
41+
val text: String,
42+
) : AgentEvent<Nothing>
43+
44+
/**
45+
* A new tool call has begun streaming. [callId] is unique per call within a
46+
* session; [ToolCallArgumentsDelta] and [ToolCallFinished] for the same call
47+
* share this id.
48+
*
49+
* Not yet emitted (step 3).
50+
*/
51+
data class ToolCallStarted(
52+
override val agentId: String,
53+
val skillName: String,
54+
val callId: String,
55+
val toolName: String,
56+
) : AgentEvent<Nothing>
57+
58+
/**
59+
* Partial tool-call arguments JSON. Multiple deltas may arrive per call for
60+
* providers that stream argument JSON (Anthropic, OpenAI). Non-streaming
61+
* providers emit one delta with the full JSON.
62+
*
63+
* Not yet emitted (step 3).
64+
*/
65+
data class ToolCallArgumentsDelta(
66+
override val agentId: String,
67+
val callId: String,
68+
val deltaJson: String,
69+
) : AgentEvent<Nothing>
70+
71+
/**
72+
* Tool call complete — arguments parsed, executor invoked, result captured.
73+
* [isError] flags executor exceptions that `onError` swallowed (loop keeps
74+
* going); when [isError] is true and `onError` rethrew, the session emits
75+
* [Failed] instead.
76+
*
77+
* Not yet emitted (step 3).
78+
*/
79+
data class ToolCallFinished(
80+
override val agentId: String,
81+
val callId: String,
82+
val toolName: String,
83+
val arguments: Map<String, Any?>,
84+
val result: Any?,
85+
val isError: Boolean,
86+
) : AgentEvent<Nothing>
87+
88+
/** Agent has resolved a skill and is about to execute it (typed-tool dispatch OR an `implementedBy` lambda). */
89+
data class SkillStarted(
90+
override val agentId: String,
91+
val skillName: String,
92+
) : AgentEvent<Nothing>
93+
94+
/**
95+
* Skill execution complete. [tokensUsed] reports the cumulative LLM token
96+
* usage for this skill turn, or null for `implementedBy` skills (no LLM)
97+
* and for v0.5.0 step 2 (token threading lands in step 3).
98+
*/
99+
data class SkillCompleted(
100+
override val agentId: String,
101+
val skillName: String,
102+
val tokensUsed: TokenUsage?,
103+
) : AgentEvent<Nothing>
104+
105+
/** Terminal success — carries the typed output of the agent invocation. Emitted exactly once on the happy path. */
106+
data class Completed<out OUT>(
107+
override val agentId: String,
108+
val output: OUT,
109+
val tokensUsed: TokenUsage?,
110+
) : AgentEvent<OUT>
111+
112+
/**
113+
* Terminal failure — emitted exactly once on the error path, BEFORE the
114+
* exception propagates. Consumers collecting `events.toList()` see this
115+
* as the last element; `session.await()` rethrows [cause].
116+
*/
117+
data class Failed(
118+
override val agentId: String,
119+
val cause: Throwable,
120+
) : AgentEvent<Nothing>
121+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package agents_engine.runtime.events
2+
3+
import kotlinx.coroutines.Deferred
4+
import kotlinx.coroutines.flow.Flow
5+
6+
/**
7+
* #1736 — handle to an in-flight agent invocation. Returned by
8+
* `agent.session(input)`. Carries a cold [events] flow and a terminal
9+
* [await] entry point.
10+
*
11+
* Cold flow: each call to `agent.session(...)` starts a fresh invocation,
12+
* regardless of whether you've collected from a previous session's [events].
13+
* To share one invocation's events across multiple collectors, use
14+
* `events.shareIn(scope, ...)`.
15+
*
16+
* Cancellation: cancelling the coroutine collecting [events] cancels the
17+
* agent invocation. Cancelling the coroutine calling [await] does the same.
18+
* Either path propagates into the upstream LLM HTTP call (step 3 hardens
19+
* this once native streaming adapters land).
20+
*/
21+
class AgentSession<OUT> internal constructor(
22+
val events: Flow<AgentEvent<OUT>>,
23+
private val resultDeferred: Deferred<OUT>,
24+
) {
25+
/**
26+
* Awaits the agent's typed output. Throws the original exception (NOT
27+
* wrapped) if the invocation failed — the [AgentEvent.Failed] event
28+
* still appears in [events] as the terminal element.
29+
*/
30+
suspend fun await(): OUT = resultDeferred.await()
31+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package agents_engine.runtime.events
2+
3+
import agents_engine.core.Agent
4+
import kotlinx.coroutines.CompletableDeferred
5+
import kotlinx.coroutines.CoroutineScope
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.SupervisorJob
8+
import kotlinx.coroutines.channels.Channel
9+
import kotlinx.coroutines.flow.consumeAsFlow
10+
import kotlinx.coroutines.launch
11+
12+
/**
13+
* #1736 — start a streaming session against [this] agent.
14+
*
15+
* Returns an [AgentSession] whose [AgentSession.events] is a cold flow of
16+
* typed [AgentEvent]s and whose [AgentSession.await] surfaces the typed
17+
* output (or rethrows the failure).
18+
*
19+
* **Step 2 scope (intentional):** for `implementedBy` skills the emitted
20+
* sequence is `SkillStarted → SkillCompleted → Completed`. For agentic
21+
* skills the same three events bracket the agentic loop, but [AgentEvent.Token]
22+
* and `ToolCall*` events are NOT yet surfaced — that's step 3, where the
23+
* agentic loop itself is rewired onto a `FlowCollector<AgentEvent>`.
24+
*
25+
* Cold flow: each call starts a fresh invocation. To replay events to
26+
* multiple collectors, wrap with `events.shareIn(...)`.
27+
*/
28+
fun <IN, OUT> Agent<IN, OUT>.session(input: IN): AgentSession<OUT> {
29+
val agent = this
30+
// BUFFERED keeps event production decoupled from consumer pace; an
31+
// implementedBy skill can complete and queue all four events before
32+
// the collector starts pulling. Step 3 may tune this for the
33+
// token-streaming case.
34+
val channel = Channel<AgentEvent<OUT>>(Channel.BUFFERED)
35+
val result = CompletableDeferred<OUT>()
36+
37+
// Dedicated scope per session so a cancelled collector doesn't leak
38+
// the result coroutine. SupervisorJob keeps the session independent
39+
// of any unrelated coroutine the consumer happens to be running in.
40+
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined)
41+
scope.launch {
42+
// Captured-on-the-stack: each session has its own holder, so
43+
// concurrent sessions can't race on a shared field. Step 3's
44+
// agentic-loop rewire moves skill-name tracking into the
45+
// FlowCollector chain proper.
46+
var capturedSkillName: String? = null
47+
try {
48+
val output = agent.invokeSuspendForSession(input) { skillName ->
49+
capturedSkillName = skillName
50+
channel.trySend(AgentEvent.SkillStarted(agent.name, skillName))
51+
}
52+
channel.trySend(AgentEvent.SkillCompleted(agent.name, capturedSkillName ?: "?", null))
53+
channel.trySend(AgentEvent.Completed(agent.name, output, null))
54+
channel.close()
55+
result.complete(output)
56+
} catch (t: Throwable) {
57+
channel.trySend(AgentEvent.Failed(agent.name, t))
58+
channel.close()
59+
result.completeExceptionally(t)
60+
}
61+
}
62+
63+
return AgentSession(
64+
events = channel.consumeAsFlow(),
65+
resultDeferred = result,
66+
)
67+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package agents_engine.runtime.events
2+
3+
import agents_engine.core.agent
4+
import kotlinx.coroutines.flow.toList
5+
import kotlinx.coroutines.test.runTest
6+
import kotlin.test.Test
7+
import kotlin.test.assertEquals
8+
import kotlin.test.assertIs
9+
import kotlin.test.assertNull
10+
11+
// #1736 — TDD red-first for the v0.5.0 streaming consumer surface.
12+
//
13+
// The narrowest meaningful test: an `implementedBy` skill on a typed
14+
// agent, invoked via `agent.session(input)`, must produce ordered
15+
// `SkillStarted → SkillCompleted → Completed` events and the deferred
16+
// terminal must return the typed output. No LLM is involved — Token
17+
// and ToolCall* events are defined in the hierarchy but NOT emitted
18+
// in this step (step 3 will rewire the agentic loop to surface them).
19+
//
20+
// If the agentic-loop rewire later changes event semantics for
21+
// `implementedBy` skills, this test fires. The contract is: zero
22+
// Token / ToolCall events for non-agentic skills, by construction.
23+
24+
class AgentSessionBasicEventsTest {
25+
26+
@Test
27+
fun `session emits SkillStarted then SkillCompleted then Completed for an implementedBy skill`() = runTest {
28+
val echoAgent = agent<String, String>("echo") {
29+
skills {
30+
skill<String, String>("uppercase", "Uppercases the input") {
31+
implementedBy { it.uppercase() }
32+
}
33+
}
34+
}
35+
36+
val session = echoAgent.session("hello")
37+
val events = session.events.toList()
38+
val output = session.await()
39+
40+
assertEquals("HELLO", output, "session.await() must return the typed agent output")
41+
assertEquals(3, events.size, "expected 3 events for the implementedBy happy path; got: $events")
42+
43+
val started = events[0]
44+
assertIs<AgentEvent.SkillStarted>(started, "first event must be SkillStarted; got: $started")
45+
assertEquals("echo", started.agentId)
46+
assertEquals("uppercase", started.skillName)
47+
48+
val completed = events[1]
49+
assertIs<AgentEvent.SkillCompleted>(completed, "second event must be SkillCompleted; got: $completed")
50+
assertEquals("echo", completed.agentId)
51+
assertEquals("uppercase", completed.skillName)
52+
assertNull(completed.tokensUsed, "SkillCompleted.tokensUsed stays null until step 3 threads it through executeAgentic")
53+
54+
val terminal = events[2]
55+
assertIs<AgentEvent.Completed<String>>(terminal, "third event must be Completed<String>; got: $terminal")
56+
assertEquals("echo", terminal.agentId)
57+
assertEquals("HELLO", terminal.output)
58+
assertNull(terminal.tokensUsed)
59+
}
60+
}

0 commit comments

Comments
 (0)