Skip to content

Commit de6bb04

Browse files
Skobeltsynclaude
andcommitted
feat(#1751): Forum.session(input) — participants concurrent, captain last
Forum's session runs participants concurrently (events interleave by agentId, like Parallel) then runs the captain sequentially (events stream next). Terminal Completed has the captain's agentId. Preserves ForumReturnException short-circuit: if a participant or captain calls forum_return, the captain doesn't run (or is interrupted); terminal Completed carries the captured value cast through castForumReturnInternal. The mention listener still fires per-agent — the streaming session is purely additive. Forum gains: - outType, castOut, captainTakesTranscript exposed as `internal` so the session extension can read them (they remain private to the outside world). - castForumReturnInternal — visible wrapper around the private castForumReturn. - fireMentionListener — visible wrapper around the private mention callback. - ForumReturnException is now internal (was private at file scope) so the extension can catch it. TDD red-first: 2 participants + captain via the `*` builder. Asserts all three agents emit SkillStarted, captain's bracket follows both participants' SkillCompleted, terminal Completed has captain's agentId + the verdict. Full regression sweep green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent efbbedb commit de6bb04

3 files changed

Lines changed: 188 additions & 4 deletions

File tree

src/main/kotlin/agents_engine/composition/forum/Forum.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,18 @@ data class ForumTranscript<IN>(
3030

3131
class Forum<IN, OUT>(
3232
val agents: List<Agent<IN, *>>,
33-
private val outType: KClass<*>,
34-
private val castOut: (Any?) -> OUT,
35-
private val captainTakesTranscript: Boolean = false,
33+
internal val outType: KClass<*>,
34+
internal val castOut: (Any?) -> OUT,
35+
internal val captainTakesTranscript: Boolean = false,
3636
) {
3737
private var mentionListener: ((agentName: String, output: Any?) -> Unit)? = null
3838

39+
internal fun fireMentionListener(name: String, output: Any?) {
40+
mentionListener?.invoke(name, output)
41+
}
42+
43+
internal fun castForumReturnInternal(raw: Any?): OUT = castForumReturn(raw)
44+
3945
fun onMentionEmitted(block: (agentName: String, output: Any?) -> Unit) {
4046
mentionListener = block
4147
}
@@ -98,7 +104,7 @@ class Forum<IN, OUT>(
98104
}
99105
}
100106

101-
private class ForumReturnException(val value: Any?) : RuntimeException()
107+
internal class ForumReturnException(val value: Any?) : RuntimeException()
102108

103109
private fun buildForumReturnTool(): agents_engine.model.ToolDef =
104110
agents_engine.model.ToolDef(
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package agents_engine.composition.forum
2+
3+
import agents_engine.core.Agent
4+
import agents_engine.model.AgentEventEmitter
5+
import agents_engine.runtime.events.AgentEvent
6+
import agents_engine.runtime.events.AgentSession
7+
import agents_engine.runtime.events.runAgentInSession
8+
import kotlinx.coroutines.CompletableDeferred
9+
import kotlinx.coroutines.CoroutineScope
10+
import kotlinx.coroutines.Dispatchers
11+
import kotlinx.coroutines.SupervisorJob
12+
import kotlinx.coroutines.async
13+
import kotlinx.coroutines.awaitAll
14+
import kotlinx.coroutines.channels.Channel
15+
import kotlinx.coroutines.coroutineScope
16+
import kotlinx.coroutines.flow.consumeAsFlow
17+
import kotlinx.coroutines.launch
18+
import kotlinx.coroutines.withContext
19+
20+
/**
21+
* #1751 — start a streaming session against [this] forum.
22+
*
23+
* Participants run concurrently — their events stream into the shared
24+
* emitter and interleave by arrival order (like Parallel). After every
25+
* participant completes, the captain runs sequentially; the captain's
26+
* events stream next. Terminal `Completed(agentId=captain.name, output)`.
27+
*
28+
* Preserves the `ForumReturnException` short-circuit: if a participant
29+
* (or, less commonly, the captain) calls `forum_return`, the captain
30+
* doesn't run; terminal `Completed` carries the captured value cast
31+
* through `castForumReturnInternal`.
32+
*
33+
* Mention listener still fires per-agent (forum.onMentionEmitted) — the
34+
* streaming session is purely additive to the existing observability.
35+
*/
36+
fun <IN, OUT> Forum<IN, OUT>.session(input: IN): AgentSession<OUT> {
37+
val forum = this
38+
val channel = Channel<AgentEvent<OUT>>(Channel.BUFFERED)
39+
val result = CompletableDeferred<OUT>()
40+
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Unconfined)
41+
val captain = forum.agents.last()
42+
val participants = forum.agents.dropLast(1)
43+
44+
scope.launch {
45+
@Suppress("UNCHECKED_CAST")
46+
val emitter: AgentEventEmitter = { event -> channel.trySend(event as AgentEvent<OUT>) }
47+
try {
48+
val verdict: OUT = try {
49+
val contributions = withContext(Dispatchers.Default) {
50+
coroutineScope {
51+
participants.map { participant ->
52+
async {
53+
@Suppress("UNCHECKED_CAST")
54+
val out = runAgentInSession(
55+
participant as Agent<IN, Any?>,
56+
input,
57+
emitter,
58+
).first
59+
forum.fireMentionListener(participant.name, out)
60+
ParticipantContribution(participant.name, out)
61+
}
62+
}.awaitAll()
63+
}
64+
}
65+
val captainVerdict: OUT = if (forum.captainTakesTranscript) {
66+
val transcript = ForumTranscript(originalInput = input, contributions = contributions)
67+
@Suppress("UNCHECKED_CAST")
68+
runAgentInSession(
69+
captain as Agent<ForumTranscript<IN>, OUT>,
70+
transcript,
71+
emitter,
72+
).first
73+
} else {
74+
@Suppress("UNCHECKED_CAST")
75+
runAgentInSession(
76+
captain as Agent<IN, OUT>,
77+
input,
78+
emitter,
79+
).first
80+
}
81+
forum.fireMentionListener(captain.name, captainVerdict)
82+
captainVerdict
83+
} catch (e: ForumReturnException) {
84+
forum.castForumReturnInternal(e.value)
85+
}
86+
87+
channel.trySend(AgentEvent.Completed(captain.name, verdict, null))
88+
channel.close()
89+
result.complete(verdict)
90+
} catch (t: Throwable) {
91+
channel.trySend(AgentEvent.Failed(captain.name, t))
92+
channel.close()
93+
result.completeExceptionally(t)
94+
}
95+
}
96+
97+
return AgentSession(
98+
events = channel.consumeAsFlow(),
99+
resultDeferred = result,
100+
)
101+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package agents_engine.runtime.events
2+
3+
import agents_engine.composition.forum.session
4+
import agents_engine.composition.forum.times
5+
import agents_engine.core.agent
6+
import kotlinx.coroutines.flow.toList
7+
import kotlinx.coroutines.test.runTest
8+
import kotlin.test.Test
9+
import kotlin.test.assertEquals
10+
import kotlin.test.assertIs
11+
import kotlin.test.assertTrue
12+
13+
// #1751 — Forum session streams events from all participants
14+
// concurrently, then from the captain. Terminal Completed has the
15+
// captain's agentId.
16+
17+
class ForumSessionTest {
18+
19+
@Test
20+
fun `forum session emits participants' events then captain's events with captain as terminal agentId`() = runTest {
21+
val analyst = agent<String, String>("analyst") {
22+
skills {
23+
skill<String, String>("analyze", "Analyzes") {
24+
implementedBy { "analysis: $it" }
25+
}
26+
}
27+
}
28+
val critic = agent<String, String>("critic") {
29+
skills {
30+
skill<String, String>("critique", "Critiques") {
31+
implementedBy { "critique: $it" }
32+
}
33+
}
34+
}
35+
val captain = agent<String, String>("captain") {
36+
skills {
37+
skill<String, String>("verdict", "Final verdict") {
38+
implementedBy { "verdict: $it" }
39+
}
40+
}
41+
}
42+
val forum = analyst * critic * captain
43+
44+
val session = forum.session("topic")
45+
val events = session.events.toList()
46+
val output = session.await()
47+
48+
assertEquals("verdict: topic", output)
49+
50+
// Both participants' SkillStarted/SkillCompleted appear.
51+
val starts = events.filterIsInstance<AgentEvent.SkillStarted>()
52+
assertEquals(
53+
setOf("analyst", "critic", "captain"),
54+
starts.map { it.agentId }.toSet(),
55+
"every forum agent must emit SkillStarted; got: $starts",
56+
)
57+
58+
// Captain's bracket comes AFTER both participants completed.
59+
val captainStartIdx = events.indexOfFirst { it is AgentEvent.SkillStarted && it.agentId == "captain" }
60+
val analystCompletedIdx = events.indexOfFirst { it is AgentEvent.SkillCompleted && it.agentId == "analyst" }
61+
val criticCompletedIdx = events.indexOfFirst { it is AgentEvent.SkillCompleted && it.agentId == "critic" }
62+
assertTrue(
63+
analystCompletedIdx < captainStartIdx,
64+
"analyst.SkillCompleted must precede captain.SkillStarted; got events: $events",
65+
)
66+
assertTrue(
67+
criticCompletedIdx < captainStartIdx,
68+
"critic.SkillCompleted must precede captain.SkillStarted; got events: $events",
69+
)
70+
71+
// Terminal Completed.
72+
val terminal = events.last()
73+
assertIs<AgentEvent.Completed<String>>(terminal)
74+
assertEquals("captain", terminal.agentId, "Forum's terminal Completed uses the captain's name")
75+
assertEquals("verdict: topic", terminal.output)
76+
}
77+
}

0 commit comments

Comments
 (0)