Skip to content

Commit a015315

Browse files
committed
Forums pt 2
1 parent ec978ae commit a015315

3 files changed

Lines changed: 489 additions & 3 deletions

File tree

src/main/kotlin/agents_engine/composition/forum/Forum.kt

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,40 @@
11
package agents_engine.composition.forum
22

33
import agents_engine.core.*
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.async
6+
import kotlinx.coroutines.runBlocking
47

58
class Forum<IN, OUT>(
69
val agents: List<Agent<*, *>>,
7-
)
10+
) {
11+
private var mentionListener: ((agentName: String, output: Any?) -> Unit)? = null
12+
13+
fun onMentionEmitted(block: (agentName: String, output: Any?) -> Unit) {
14+
mentionListener = block
15+
}
16+
17+
@Suppress("UNCHECKED_CAST")
18+
operator fun invoke(input: IN): OUT = runBlocking(Dispatchers.Default) {
19+
val forumInput = input as Any?
20+
val participants = agents.dropLast(1)
21+
val captain = agents.last() as Agent<Any?, OUT>
22+
23+
// All participants process the input concurrently
24+
participants.map { agent ->
25+
async {
26+
val output = (agent as Agent<Any?, Any?>)(forumInput)
27+
mentionListener?.invoke(agent.name, output)
28+
output
29+
}
30+
}.map { it.await() }
31+
32+
// Captain delivers the final verdict
33+
val verdict = captain(forumInput)
34+
mentionListener?.invoke(captain.name, verdict)
35+
verdict
36+
}
37+
}
838

939
operator fun <A, B, C> Agent<A, B>.times(other: Agent<*, C>): Forum<A, C> {
1040
this.markPlaced("forum")

src/main/kotlin/agents_engine/composition/pipeline/Pipeline.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ infix fun <A, B, C> Pipeline<A, B>.then(other: Agent<B, C>): Pipeline<A, C> {
2626

2727
infix fun <A, B, C> Agent<A, B>.then(other: Forum<B, C>): Pipeline<A, C> {
2828
this.markPlaced("pipeline")
29-
return Pipeline(listOf(this) + other.agents) { error("Forum execution not yet implemented") }
29+
return Pipeline(listOf(this) + other.agents) { input -> other(this(input)) }
3030
}
3131

3232
infix fun <A, B, C> Pipeline<A, B>.then(other: Forum<B, C>): Pipeline<A, C> {
33-
return Pipeline(agents + other.agents) { error("Forum execution not yet implemented") }
33+
return Pipeline(agents + other.agents) { input -> other(this(input)) }
3434
}
3535

3636
infix fun <A, B, C> Pipeline<A, B>.then(other: Pipeline<B, C>): Pipeline<A, C> {

0 commit comments

Comments
 (0)