Skip to content

Commit 2d01463

Browse files
Skobeltsynclaude
andcommitted
feat(#1745): Pipeline.session(input) — events flow through from every node
First composition operator gets the session surface. `a then b` produces a Pipeline<IN, OUT>; pipeline.session(input) now exposes a Flow<AgentEvent> that surfaces events from BOTH inner agents with their own agentIds, plus a single terminal Completed/Failed. Sequential composition is enforced by the typed boundary — `a` runs to completion (streaming its Token events) before `b` starts (with a's full typed OUT as input, streaming its own Token events). The user sees a continuous event stream demultiplexed by agentId. Implementation: - Extract runAgentInSession(agent, input, emitter) -> (OUT, TokenUsage?) helper from Agent.session(input). Emits SkillStarted/SkillCompleted around the agent run; does NOT emit Completed. Both Agent.session and Pipeline.session use it. - AgentEventEmitter typealias changed from `suspend (...) -> Unit` to non-suspend `(...) -> Unit` so it can be called from the non-suspend onSkillStarted callback. Implementations forward to channel.trySend which is already non-blocking. - Pipeline gains an optional sessionExec: (IN, AgentEventEmitter) -> OUT constructor parameter declared BEFORE execution so trailing-lambda syntax keeps binding to execution. Effective sessionExec falls back to execution-without-events when null (existing unconverted `then` overloads). - Only the Agent<A, B>.then(Agent<B, C>) overload populates sessionExec in this commit. Multi-stage chains (a then b then c) built via the Pipeline then Agent overload fall back to default; only the FINAL Agent emits events. Documented gap, separate follow-up flips each remaining overload. TDD red-first: two tests in PipelineSessionTest. Happy path asserts the ordered 5-event sequence and Completed.agentId = last agent's name. Failure path asserts terminal Failed with original cause when the second agent throws. Full regression sweep green across root + KSP + no-reflect subprojects. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent da438c7 commit 2d01463

5 files changed

Lines changed: 236 additions & 23 deletions

File tree

src/main/kotlin/agents_engine/composition/pipeline/Pipeline.kt

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,29 @@ import kotlinx.coroutines.runBlocking
1515
*/
1616
class Pipeline<IN, OUT>(
1717
val agents: List<Agent<*, *>>,
18+
/**
19+
* #1745 — session-aware execution path. When `pipeline.session(input)`
20+
* is called, this runs instead of [execution] with a non-null emitter,
21+
* surfacing inner-agent events with their own `agentId`s. Defaults to
22+
* the non-streaming `execution` so any `then` overload that hasn't been
23+
* converted yet still works (Pipeline session emits only the terminal
24+
* Completed, no inner events — known gap, see #1745 follow-ups).
25+
*
26+
* Declared BEFORE [execution] so the trailing-lambda construction
27+
* `Pipeline(agents) { ... }` still binds the lambda to [execution].
28+
*/
29+
internal val sessionExec: (suspend (
30+
input: IN,
31+
emitter: agents_engine.model.AgentEventEmitter,
32+
) -> OUT)? = null,
1833
private val execution: suspend (IN) -> OUT,
1934
) {
35+
/** Effective sessionExec: explicit when supplied, otherwise falls back to execution (no events). */
36+
internal val effectiveSessionExec: suspend (
37+
input: IN,
38+
emitter: agents_engine.model.AgentEventEmitter,
39+
) -> OUT = sessionExec ?: { input, _ -> execution(input) }
40+
2041
operator fun invoke(input: IN): OUT = runBlocking { execution(input) }
2142

2243
suspend fun invokeSuspend(input: IN): OUT = execution(input)
@@ -25,7 +46,18 @@ class Pipeline<IN, OUT>(
2546
infix fun <A, B, C> Agent<A, B>.then(other: Agent<B, C>): Pipeline<A, C> {
2647
this.markPlaced("pipeline")
2748
other.markPlaced("pipeline")
28-
return Pipeline(listOf(this, other)) { input -> other.invokeSuspend(this.invokeSuspend(input)) }
49+
val first = this
50+
return Pipeline(
51+
agents = listOf(first, other),
52+
// #1745: streaming path runs both agents through runAgentInSession
53+
// so events from both flow into the emitter with their own agentIds.
54+
sessionExec = { input, emitter ->
55+
val (mid, _) = agents_engine.runtime.events.runAgentInSession(first, input, emitter)
56+
val (out, _) = agents_engine.runtime.events.runAgentInSession(other, mid, emitter)
57+
out
58+
},
59+
execution = { input -> other.invokeSuspend(first.invokeSuspend(input)) },
60+
)
2961
}
3062

3163
infix fun <A, B, C> Pipeline<A, B>.then(other: Agent<B, C>): Pipeline<A, C> {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package agents_engine.composition.pipeline
2+
3+
import agents_engine.model.AgentEventEmitter
4+
import agents_engine.runtime.events.AgentEvent
5+
import agents_engine.runtime.events.AgentSession
6+
import kotlinx.coroutines.CompletableDeferred
7+
import kotlinx.coroutines.CoroutineScope
8+
import kotlinx.coroutines.Dispatchers
9+
import kotlinx.coroutines.SupervisorJob
10+
import kotlinx.coroutines.channels.Channel
11+
import kotlinx.coroutines.flow.consumeAsFlow
12+
import kotlinx.coroutines.launch
13+
14+
/**
15+
* #1745 — start a streaming session against [this] pipeline.
16+
*
17+
* Sequential composition: each inner agent runs to completion before the
18+
* next starts (the typed boundary forces a complete `MID` value to flow
19+
* from `a` to `b`). But WITHIN each agent, events stream incrementally:
20+
* the consumer sees `SkillStarted` / `Token` / `ToolCall*` / `SkillCompleted`
21+
* for `a`, then the same for `b`, terminated by exactly one `Completed`
22+
* with the pipeline's final `OUT`.
23+
*
24+
* `agentId` on every inner event names the source agent — composition
25+
* preserves provenance. The terminal `Completed.agentId` uses the LAST
26+
* agent's name (its `OUT` type matches the pipeline's `OUT`).
27+
*
28+
* **Step-4 scope:** only the `Agent then Agent` overload populates
29+
* `Pipeline.sessionExec` today. Multi-stage chains (`a then b then c`)
30+
* built via the `Pipeline then Agent` overload fall back to the default
31+
* (non-streaming) `sessionExec` — `pipeline.session(input)` will emit only
32+
* the terminal `Completed`. Separate follow-ups flip each composing
33+
* overload. Documented in the corresponding session test.
34+
*/
35+
fun <IN, OUT> Pipeline<IN, OUT>.session(input: IN): AgentSession<OUT> {
36+
val pipeline = this
37+
val channel = Channel<AgentEvent<OUT>>(Channel.BUFFERED)
38+
val result = CompletableDeferred<OUT>()
39+
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined)
40+
// agentId for the terminal Completed: last agent's name (its OUT
41+
// matches Pipeline's OUT). Pipeline has no name of its own.
42+
val terminalAgentId = pipeline.agents.lastOrNull()?.name ?: "pipeline"
43+
44+
scope.launch {
45+
@Suppress("UNCHECKED_CAST")
46+
val emitter: AgentEventEmitter = { event -> channel.trySend(event as AgentEvent<OUT>) }
47+
try {
48+
val output = pipeline.effectiveSessionExec(input, emitter)
49+
channel.trySend(AgentEvent.Completed(terminalAgentId, output, null))
50+
channel.close()
51+
result.complete(output)
52+
} catch (t: Throwable) {
53+
channel.trySend(AgentEvent.Failed(terminalAgentId, t))
54+
channel.close()
55+
result.completeExceptionally(t)
56+
}
57+
}
58+
59+
return AgentSession(
60+
events = channel.consumeAsFlow(),
61+
resultDeferred = result,
62+
)
63+
}

src/main/kotlin/agents_engine/model/StreamingAggregator.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import kotlinx.coroutines.withContext
1010
// subtypes (Token, ToolCall*, SkillStarted, SkillCompleted, Failed);
1111
// only `AgentEvent.Completed<OUT>` carries the typed payload and that's
1212
// emitted in `Agent.session(input)` after the loop returns.
13-
internal typealias AgentEventEmitter = suspend (AgentEvent<*>) -> Unit
13+
//
14+
// Non-suspend (#1745) so it can be called from non-suspend callbacks
15+
// like `Agent.invokeSuspendForSession`'s `onSkillStarted` lambda.
16+
// Implementations typically forward to `channel.trySend(event)`, which
17+
// is itself non-blocking — appropriate for BUFFERED-channel-backed Flows.
18+
internal typealias AgentEventEmitter = (AgentEvent<*>) -> Unit
1419

1520
/**
1621
* #1739 — round-trip the model: either via the existing non-streaming

src/main/kotlin/agents_engine/runtime/events/AgentSessionExtension.kt

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package agents_engine.runtime.events
22

33
import agents_engine.core.Agent
4+
import agents_engine.model.AgentEventEmitter
5+
import agents_engine.model.TokenUsage
46
import kotlinx.coroutines.CompletableDeferred
57
import kotlinx.coroutines.CoroutineScope
68
import kotlinx.coroutines.Dispatchers
@@ -39,33 +41,17 @@ fun <IN, OUT> Agent<IN, OUT>.session(input: IN): AgentSession<OUT> {
3941
// of any unrelated coroutine the consumer happens to be running in.
4042
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined)
4143
scope.launch {
42-
// Captured-on-the-stack: each session has its own holder, so
43-
// concurrent sessions can't race on a shared field.
44-
var capturedSkillName: String? = null
45-
// #1740: per-session usage capture from the agentic loop's cumulative
46-
// total. Stays null for implementedBy skills (no LLM round-trip).
47-
var capturedUsage: agents_engine.model.TokenUsage? = null
4844
// #1739: emitter forwards AgentEvents from inside the agentic loop
4945
// (Token, ToolCallStarted, ToolCallArgumentsDelta, ToolCallFinished)
5046
// into the same channel as the bracket events. trySend is non-
5147
// suspending — appropriate for a BUFFERED channel; if the buffer
5248
// ever fills (it has high capacity), excess events would be
53-
// dropped silently. Step 4 will tighten this for high-throughput
54-
// streaming.
55-
val streamingEmitter: agents_engine.model.AgentEventEmitter = { event ->
56-
channel.trySend(event as AgentEvent<OUT>)
57-
}
49+
// dropped silently.
50+
@Suppress("UNCHECKED_CAST")
51+
val emitter: AgentEventEmitter = { event -> channel.trySend(event as AgentEvent<OUT>) }
5852
try {
59-
val output = agent.invokeSuspendForSession(
60-
input,
61-
emitter = streamingEmitter,
62-
onSkillCompleted = { usage -> capturedUsage = usage },
63-
) { skillName ->
64-
capturedSkillName = skillName
65-
channel.trySend(AgentEvent.SkillStarted(agent.name, skillName))
66-
}
67-
channel.trySend(AgentEvent.SkillCompleted(agent.name, capturedSkillName ?: "?", capturedUsage))
68-
channel.trySend(AgentEvent.Completed(agent.name, output, capturedUsage))
53+
val (output, usage) = runAgentInSession(agent, input, emitter)
54+
channel.trySend(AgentEvent.Completed(agent.name, output, usage))
6955
channel.close()
7056
result.complete(output)
7157
} catch (t: Throwable) {
@@ -80,3 +66,36 @@ fun <IN, OUT> Agent<IN, OUT>.session(input: IN): AgentSession<OUT> {
8066
resultDeferred = result,
8167
)
8268
}
69+
70+
/**
71+
* #1745 — shared "run an agent and surface its bracket + inner events
72+
* onto [emitter]" helper. Used by both `Agent.session(input)` and
73+
* `Pipeline.session(input)`. Emits `SkillStarted` and `SkillCompleted`
74+
* around the agent's execution; does NOT emit `Completed` (the composing
75+
* caller emits exactly one terminal `Completed` after all stages run).
76+
*
77+
* Returns the typed output paired with the cumulative `TokenUsage` for
78+
* this agent's skill (null for `implementedBy`).
79+
*/
80+
internal suspend fun <IN, OUT> runAgentInSession(
81+
agent: Agent<IN, OUT>,
82+
input: IN,
83+
emitter: AgentEventEmitter,
84+
): Pair<OUT, TokenUsage?> {
85+
var capturedSkillName: String? = null
86+
var capturedUsage: TokenUsage? = null
87+
val output = agent.invokeSuspendForSession(
88+
input,
89+
emitter = emitter,
90+
onSkillCompleted = { usage -> capturedUsage = usage },
91+
) { skillName ->
92+
// SkillStarted fires BEFORE the skill body runs — emitting from
93+
// this onSkillStarted callback (non-suspend; emitter is also
94+
// non-suspend per #1745) means the event reaches the consumer
95+
// before any Token / ToolCall* events from this skill's loop.
96+
capturedSkillName = skillName
97+
emitter(AgentEvent.SkillStarted(agent.name, skillName))
98+
}
99+
emitter(AgentEvent.SkillCompleted(agent.name, capturedSkillName ?: "?", capturedUsage))
100+
return output to capturedUsage
101+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package agents_engine.runtime.events
2+
3+
import agents_engine.composition.pipeline.session
4+
import agents_engine.composition.pipeline.then
5+
import agents_engine.core.agent
6+
import kotlinx.coroutines.flow.toList
7+
import kotlinx.coroutines.test.runTest
8+
import kotlin.test.Test
9+
import kotlin.test.assertEquals
10+
import kotlin.test.assertFailsWith
11+
import kotlin.test.assertIs
12+
13+
// #1745 — Pipeline.session(input) flows events from every inner agent
14+
// with its own agentId. Sequential between nodes (typed boundary forces
15+
// a complete MID value to flow from a to b); streaming WITHIN a node.
16+
17+
class PipelineSessionTest {
18+
19+
@Test
20+
fun `pipeline session emits ordered events from both agents plus a terminal Completed`() = runTest {
21+
val parse = agent<String, Int>("parse") {
22+
skills {
23+
skill<String, Int>("length", "Computes input length") {
24+
implementedBy { it.length }
25+
}
26+
}
27+
}
28+
val describe = agent<Int, String>("describe") {
29+
skills {
30+
skill<Int, String>("format", "Formats integer") {
31+
implementedBy { "n=$it" }
32+
}
33+
}
34+
}
35+
val pipeline = parse then describe
36+
37+
val session = pipeline.session("hello")
38+
val events = session.events.toList()
39+
val output = session.await()
40+
41+
assertEquals("n=5", output, "pipeline output threads parse→describe correctly")
42+
assertEquals(5, events.size, "expected 5 events: 2× SkillStarted/SkillCompleted + 1 Completed; got: $events")
43+
44+
val e0 = events[0]; assertIs<AgentEvent.SkillStarted>(e0); assertEquals("parse", e0.agentId); assertEquals("length", e0.skillName)
45+
val e1 = events[1]; assertIs<AgentEvent.SkillCompleted>(e1); assertEquals("parse", e1.agentId); assertEquals("length", e1.skillName)
46+
val e2 = events[2]; assertIs<AgentEvent.SkillStarted>(e2); assertEquals("describe", e2.agentId); assertEquals("format", e2.skillName)
47+
val e3 = events[3]; assertIs<AgentEvent.SkillCompleted>(e3); assertEquals("describe", e3.agentId); assertEquals("format", e3.skillName)
48+
val e4 = events[4]; assertIs<AgentEvent.Completed<String>>(e4)
49+
assertEquals("describe", e4.agentId, "Completed.agentId uses the last agent's name (its OUT matches Pipeline's OUT)")
50+
assertEquals("n=5", e4.output)
51+
}
52+
53+
@Test
54+
fun `pipeline session terminates with Failed when the second agent throws — only first agent's events precede`() = runTest {
55+
val boom = IllegalStateException("middle blew up")
56+
val first = agent<String, Int>("first") {
57+
skills {
58+
skill<String, Int>("ok", "Returns length") {
59+
implementedBy { it.length }
60+
}
61+
}
62+
}
63+
val second = agent<Int, String>("second") {
64+
skills {
65+
skill<Int, String>("explode", "Throws unconditionally") {
66+
implementedBy { throw boom }
67+
}
68+
}
69+
}
70+
val pipeline = first then second
71+
72+
val session = pipeline.session("hello")
73+
val events = session.events.toList()
74+
75+
// First agent should run cleanly — its events appear.
76+
val firstStarted = events.filterIsInstance<AgentEvent.SkillStarted>().firstOrNull { it.agentId == "first" }
77+
?: error("expected SkillStarted(first) before the failure; got: $events")
78+
val firstCompleted = events.filterIsInstance<AgentEvent.SkillCompleted>().firstOrNull { it.agentId == "first" }
79+
?: error("expected SkillCompleted(first) before the failure; got: $events")
80+
assertEquals("first", firstStarted.agentId)
81+
assertEquals("first", firstCompleted.agentId)
82+
83+
// Second agent's SkillStarted may or may not have fired before
84+
// the implementedBy threw — but the terminal MUST be Failed,
85+
// not Completed, and must carry the original exception.
86+
val terminal = events.last()
87+
assertIs<AgentEvent.Failed>(terminal, "last event must be Failed; got: $terminal")
88+
// assertSame on cause: AgentEvent.Failed.cause carries the identity instance per #1737's contract.
89+
assertEquals(boom.message, terminal.cause.message, "terminal cause must carry the original exception")
90+
91+
// await() rethrows.
92+
assertFailsWith<IllegalStateException> { session.await() }
93+
}
94+
}

0 commit comments

Comments
 (0)