Skip to content

Commit aa06e2a

Browse files
Skobeltsynclaude
andcommitted
feat(#1746): multi-stage Pipeline session — events flow through every stage
Flips two of the Pipeline-involving `then` overloads to populate `sessionExec`, so 3+ stage chains now stream events from ALL stages instead of only the final Agent: - Pipeline<A,B>.then(Agent<B,C>) — chains the inner Pipeline's effectiveSessionExec into runAgentInSession for the trailing Agent - Pipeline<A,B>.then(Pipeline<B,C>) — chains both pipelines' effectiveSessionExec Out of scope: overloads involving Forum, Parallel, Branch, Loop — those operators don't have session support yet. Their related `then` overloads stay at the default non-streaming fallback until each operator gains its own session impl. TDD red-first: new test `three-stage pipeline emits events from all three agents` in PipelineSessionTest. Before flip, the 3-stage chain emitted only the terminal Completed (1 event); after, all 7 events fire (3× SkillStarted+SkillCompleted + 1 terminal). Full regression sweep green across root + KSP + no-reflect. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2d01463 commit aa06e2a

2 files changed

Lines changed: 57 additions & 2 deletions

File tree

src/main/kotlin/agents_engine/composition/pipeline/Pipeline.kt

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,20 @@ infix fun <A, B, C> Agent<A, B>.then(other: Agent<B, C>): Pipeline<A, C> {
6262

6363
infix fun <A, B, C> Pipeline<A, B>.then(other: Agent<B, C>): Pipeline<A, C> {
6464
other.markPlaced("pipeline")
65-
return Pipeline(agents + other) { input -> other.invokeSuspend(this.invokeSuspend(input)) }
65+
val inner = this
66+
return Pipeline(
67+
agents = inner.agents + other,
68+
// #1746: chain the inner Pipeline's streaming output into the new
69+
// Agent's session run. Inner Pipeline's effectiveSessionExec
70+
// forwards events from each of its stages; runAgentInSession then
71+
// emits the trailing Agent's bracket events.
72+
sessionExec = { input, emitter ->
73+
val mid = inner.effectiveSessionExec(input, emitter)
74+
val (out, _) = agents_engine.runtime.events.runAgentInSession(other, mid, emitter)
75+
out
76+
},
77+
execution = { input -> other.invokeSuspend(inner.invokeSuspend(input)) },
78+
)
6679
}
6780

6881
infix fun <A, B, C> Agent<A, B>.then(other: Forum<B, C>): Pipeline<A, C> {
@@ -75,7 +88,16 @@ infix fun <A, B, C> Pipeline<A, B>.then(other: Forum<B, C>): Pipeline<A, C> {
7588
}
7689

7790
infix fun <A, B, C> Pipeline<A, B>.then(other: Pipeline<B, C>): Pipeline<A, C> {
78-
return Pipeline(agents + other.agents) { input -> other.invokeSuspend(this.invokeSuspend(input)) }
91+
val left = this
92+
return Pipeline(
93+
agents = left.agents + other.agents,
94+
// #1746: chain both pipelines' streaming exec paths.
95+
sessionExec = { input, emitter ->
96+
val mid = left.effectiveSessionExec(input, emitter)
97+
other.effectiveSessionExec(mid, emitter)
98+
},
99+
execution = { input -> other.invokeSuspend(left.invokeSuspend(input)) },
100+
)
79101
}
80102

81103
infix fun <A, B, C> Agent<A, B>.then(other: Parallel<B, C>): Pipeline<A, List<C>> {

src/test/kotlin/agents_engine/runtime/events/PipelineSessionTest.kt

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,39 @@ class PipelineSessionTest {
5050
assertEquals("n=5", e4.output)
5151
}
5252

53+
@Test
54+
fun `three-stage pipeline emits events from all three agents — proves Pipeline then Agent overload flows events through`() = runTest {
55+
// a then b then c is left-associative: (a then b) then c, which goes
56+
// through the Pipeline<*, *>.then(Agent<*, *>) overload. Before #1746
57+
// that overload's sessionExec fell back to the default (no events from
58+
// a or b) — only c's events appeared. After: all three.
59+
val a = agent<String, Int>("a") {
60+
skills { skill<String, Int>("len", "Length") { implementedBy { it.length } } }
61+
}
62+
val b = agent<Int, Int>("b") {
63+
skills { skill<Int, Int>("doubled", "Doubles") { implementedBy { it * 2 } } }
64+
}
65+
val c = agent<Int, String>("c") {
66+
skills { skill<Int, String>("describe", "Describe") { implementedBy { "n=$it" } } }
67+
}
68+
val pipeline = a then b then c
69+
70+
val session = pipeline.session("hello")
71+
val events = session.events.toList()
72+
val output = session.await()
73+
74+
assertEquals("n=10", output, "pipeline output: 5 → 10 → \"n=10\"")
75+
assertEquals(7, events.size, "expected 3× SkillStarted/SkillCompleted + 1 Completed; got: $events")
76+
77+
// Ordered: each agent's pair appears in chain order before the next agent runs.
78+
val agentIds = events.filterIsInstance<AgentEvent.SkillStarted>().map { it.agentId }
79+
assertEquals(listOf("a", "b", "c"), agentIds, "SkillStarted events must fire in chain order; got: $agentIds")
80+
81+
val completed = events.last()
82+
assertIs<AgentEvent.Completed<String>>(completed)
83+
assertEquals("c", completed.agentId, "terminal Completed uses the LAST agent's name")
84+
}
85+
5386
@Test
5487
fun `pipeline session terminates with Failed when the second agent throws — only first agent's events precede`() = runTest {
5588
val boom = IllegalStateException("middle blew up")

0 commit comments

Comments
 (0)