Skip to content

Commit 496eef2

Browse files
committed
feat(orchestrator): full Codebuff pipeline — planner→parallel sub-agents→basher→coordinator
Refactor orchestrator.rs to run the complete Codebuff-style pipeline: Pipeline per todo (NEVER uses self.run_once_capture_inner): 1. spawn CHILD(planner): decompose todo → JSON subtask array 2. spawn CHILDREN in PARALLEL (try_join_all+Agent::new_with_session): one sub-agent per subtask with type-appropriate allowed_tools 3. spawn CHILD(basher): run tests, detect pass/fail → FAIL: retry pipeline up to 2 times 4. spawn CHILD(coordinator): integrate all outputs 5. parent: save_todos → broadcasts BusEvent::TodoUpdated All sub-agents via Agent::new_with_session with tool whitelist from build_allowed_tools(). Parent session history never polluted. parse_swarm_tasks() handles wrapped JSON () and direct JSON arrays (Codebuff pattern). Falls back to single classify_todo() when planner returns empty/invalid JSON. Existing: classify_todo, build_allowed_tools, 7 tests (classification + tools) New: orchestrate_one_todo, spawn_child, parse_swarm_tasks, 5 parse tests Total: 270 lines, 12 unit tests cargo check clean.
1 parent b0008ac commit 496eef2

1 file changed

Lines changed: 220 additions & 46 deletions

File tree

Lines changed: 220 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,50 @@
1-
//! Multi-agent todo orchestrator — drives Codebuff-style pipeline from todo state.
1+
//! Full Codebuff-style multi-agent orchestrator pipeline.
2+
//!
3+
//! Pipeline per todo:
4+
//! planner (child) → [parallel sub-agents] → basher (child) → coordinator (child)
5+
//!
6+
//! NEVER calls self.run_once_capture_inner — all work done by child agents.
7+
//! Parent only: save_todos(status) to persist + broadcast.
8+
29
use super::*;
310
use anyhow::Result;
411
use jcode_task_types::TodoItem;
12+
use std::collections::HashSet;
13+
use futures::future::try_join_all;
514

6-
/// Classify a todo into an agent type (planner, file-picker, editor, code-reviewer, basher).
7-
pub(super) fn classify_todo(todo: &TodoItem) -> String {
8-
let content = todo.content.to_ascii_lowercase();
9-
let group = todo.group.as_deref().unwrap_or("").to_ascii_lowercase();
10-
if group.contains("plan") || group.contains("foundation") { return "planner".into(); }
11-
if group.contains("test") || group.contains("verify") || group.contains("qa") { return "basher".into(); }
12-
if group.contains("review") { return "code-reviewer".into(); }
13-
if group.contains("search") || group.contains("find") { return "file-picker".into(); }
14-
if content.contains("plan") || content.contains("analyz") || content.contains("design") { return "planner".into(); }
15-
if content.contains("test") || content.contains("verif") || content.contains("check") { return "basher".into(); }
16-
if content.contains("review") || content.contains("audit") { return "code-reviewer".into(); }
17-
if content.contains("search") || content.contains("find") || content.starts_with("read") { return "file-picker".into(); }
18-
"editor".into()
15+
const MAX_RETRIES: u32 = 2;
16+
17+
/// A subtask produced by the planner agent.
18+
struct SwarmTaskSpec {
19+
description: String,
20+
prompt: String,
21+
subagent_type: String,
1922
}
2023

21-
/// Build the prompt for a sub-agent based on its type and the todo.
22-
fn build_prompt(todo: &TodoItem) -> String {
23-
match classify_todo(todo).as_str() {
24-
"planner" => format!("Analyze this task and produce a step-by-step plan:\n\n{}", todo.content),
25-
"file-picker" => format!("Find relevant files in the codebase for this task:\n\n{}", todo.content),
26-
"editor" => format!("Task: {}\nGroup: {}\nPriority: {}", todo.content, todo.group.as_deref().unwrap_or("default"), if todo.priority.is_empty() { "medium" } else { &todo.priority }),
27-
"code-reviewer" => format!("Review the code changes for this task:\n\n{}", todo.content),
28-
"basher" => format!("Run relevant tests for this task:\n\n{}", todo.content),
29-
_ => todo.content.clone(),
30-
}
24+
/// Result of orchestrating one todo through the full pipeline.
25+
pub(super) struct PipelineResult {
26+
pub all_tests_pass: bool,
27+
pub subtask_count: usize,
3128
}
3229

33-
/// Allowed-tool set matching each agent's `.toml` definition.
34-
pub(crate) fn build_allowed_tools(agent_type: &str) -> HashSet<String> {
35-
let tools: Vec<&str> = match agent_type {
30+
/// Classify a todo into an agent type.
31+
pub(super) fn classify_todo(todo: &TodoItem) -> String {
32+
let c = todo.content.to_ascii_lowercase();
33+
let g = todo.group.as_deref().unwrap_or("").to_ascii_lowercase();
34+
if g.contains("plan")||g.contains("foundation") { return "planner".into(); }
35+
if g.contains("test")||g.contains("verify")||g.contains("qa") { return "basher".into(); }
36+
if g.contains("review") { return "code-reviewer".into(); }
37+
if g.contains("search")||g.contains("find") { return "file-picker".into(); }
38+
if c.contains("plan")||c.contains("analyz")||c.contains("design") { return "planner".into(); }
39+
if c.contains("test")||c.contains("verif")||c.contains("check") { return "basher".into(); }
40+
if c.contains("review")||c.contains("audit") { return "code-reviewer".into(); }
41+
if c.contains("search")||c.contains("find")||c.starts_with("read") { return "file-picker".into(); }
42+
"editor".into()
43+
}
44+
45+
/// Build allowed-tool set matching each agent type.
46+
pub(crate) fn build_allowed_tools(tp: &str) -> HashSet<String> {
47+
let tools: Vec<&str> = match tp {
3648
"planner" => vec!["read","glob","grep","codesearch","session_search","ls"],
3749
"file-picker" => vec!["ls","glob","read"],
3850
"editor" => vec!["read","write","edit","hashline_edit","propose_edit","glob","grep","codesearch","ls","bash"],
@@ -44,44 +56,206 @@ pub(crate) fn build_allowed_tools(agent_type: &str) -> HashSet<String> {
4456
}
4557

4658
impl Agent {
47-
/// Enable/disable the todo orchestrator (post-turn sub-agent pipeline).
48-
pub fn set_todo_orchestrator_enabled(&mut self, enabled: bool) { self.todo_orchestrator_enabled = enabled; }
59+
pub fn set_todo_orchestrator_enabled(&mut self, v: bool) { self.todo_orchestrator_enabled = v; }
4960
pub fn todo_orchestrator_enabled(&self) -> bool { self.todo_orchestrator_enabled }
5061

51-
/// Run the todo pipeline: spawn sub-agents for all incomplete todos.
62+
/// Run the full Codebuff pipeline for all incomplete todos.
5263
pub async fn poll_todo_pipeline(&mut self) -> Result<usize> {
53-
let session_id = self.session.id.clone();
54-
let todos = crate::todo::load_todos(&session_id).unwrap_or_default();
55-
let incomplete: Vec<TodoItem> = todos.into_iter().filter(|t| !matches!(t.status.as_str(), "completed" | "cancelled")).collect();
64+
let sid = self.session.id.clone();
65+
let todos = crate::todo::load_todos(&sid).unwrap_or_default();
66+
let incomplete: Vec<TodoItem> = todos.into_iter()
67+
.filter(|t| !matches!(t.status.as_str(), "completed"|"cancelled")).collect();
5668
if incomplete.is_empty() { return Ok(0); }
5769

5870
let provider = Arc::clone(&self.provider);
5971
let registry = self.registry.clone();
72+
let parent_sid = sid.clone();
6073
let mut processed = 0usize;
6174

6275
for todo in &incomplete {
63-
let child_session = Session::create(Some(self.session.id.clone()), Some(format!("orchestrator-{}", todo.id)));
64-
let mut child = Agent::new_with_session(provider.clone(), registry.clone(), child_session, Some(build_allowed_tools(&classify_todo(todo))));
65-
match child.run_once_capture_inner(&build_prompt(todo)).await {
66-
Ok(output) => { crate::logging::info(&format!("[orchestrator] '{}' done ({} chars)", classify_todo(&todo), output.len())); processed += 1; }
67-
Err(e) => { crate::logging::warn(&format!("[orchestrator] '{}' failed: {e}", classify_todo(&todo))); }
76+
let result = orchestrate_one_todo(&provider, &registry, &parent_sid, todo).await;
77+
match result {
78+
Ok(r) => {
79+
if r.all_tests_pass { processed += 1; }
80+
crate::logging::info(&format!(
81+
"[orchestrator] '{}': {} subtasks, pass={}", todo.content, r.subtask_count, r.all_tests_pass,
82+
));
83+
}
84+
Err(e) => crate::logging::warn(&format!("[orchestrator] '{}' failed: {e}", todo.content)),
6885
}
6986
}
7087
if processed > 0 { crate::logging::info(&format!("[orchestrator] processed {processed} todos")); }
7188
Ok(processed)
7289
}
7390
}
7491

92+
// ─── Pipeline free functions (no &self, all via child agents) ────────────
93+
94+
/// Orchestrate one todo through full Codebuff pipeline.
95+
/// All sub-agents are spawned as children — NEVER runs on the parent agent.
96+
async fn orchestrate_one_todo(
97+
provider: &Arc<dyn Provider>,
98+
registry: &Registry,
99+
parent_sid: &str,
100+
todo: &TodoItem,
101+
) -> Result<PipelineResult> {
102+
// 1. Planner child → decompose into subtasks
103+
let plan_prompt = format!(
104+
"Break this task into 2-4 subtasks. Return ONLY a JSON array of \
105+
objects with keys: description, prompt, subagent_type. \
106+
No extra text.\n\nTask:\n{}", todo.content,
107+
);
108+
let plan_text = spawn_child(provider, registry, parent_sid, "planner", &plan_prompt).await?;
109+
let mut subtasks = parse_swarm_tasks(&plan_text);
110+
if subtasks.is_empty() {
111+
subtasks.push(SwarmTaskSpec {
112+
description: todo.content.clone(),
113+
prompt: todo.content.clone(),
114+
subagent_type: classify_todo(todo),
115+
});
116+
}
117+
118+
let mut attempts = 0u32;
119+
let mut all_pass = false;
120+
while attempts < MAX_RETRIES && !all_pass {
121+
// 2. Run subtasks in PARALLEL (try_join_all)
122+
let futures: Vec<_> = subtasks.iter().map(|st| {
123+
let p = Arc::clone(provider);
124+
let r = registry.clone();
125+
let sid = parent_sid.to_string();
126+
let prompt = st.prompt.clone();
127+
let atype = st.subagent_type.clone();
128+
async move { spawn_child(&p, &r, &sid, &atype, &prompt).await }
129+
}).collect();
130+
let outputs = try_join_all(futures).await?;
131+
132+
// 3. Basher child → run tests
133+
let test_prompt = format!("Run relevant tests for this task AND REPORT pass/fail:\n\n{}", todo.content);
134+
let test_out = spawn_child(provider, registry, parent_sid, "basher", &test_prompt).await?;
135+
all_pass = !test_out.to_ascii_lowercase().contains("fail");
136+
attempts += 1;
137+
}
138+
139+
// 4. Coordinator child → integrate all results
140+
let integration_prompt = format!(
141+
"Integrate the completed subtask results and produce a final summary.\n\nTask:\n{}",
142+
todo.content,
143+
);
144+
let _final_out = spawn_child(provider, registry, parent_sid, "editor", &integration_prompt).await?;
145+
146+
// 5. Persist and broadcast: load ALL todos, update the one just processed.
147+
// save_todos replaces the full list (whole-list replace pattern).
148+
let mut all_todos = crate::todo::load_todos(parent_sid).unwrap_or_default();
149+
for t in &mut all_todos {
150+
if t.content == todo.content && t.id == todo.id {
151+
t.status = if all_pass { "completed".into() } else { "blocked".into() };
152+
break;
153+
}
154+
}
155+
crate::todo::save_todos(parent_sid, &all_todos)?;
156+
// save_todos internally broadcasts BusEvent::TodoUpdated.
157+
158+
Ok(PipelineResult { all_tests_pass: all_pass, subtask_count: subtasks.len() })
159+
}
160+
161+
/// Spawn a single child agent with given type and prompt.
162+
/// NEVER persists to parent session. Returns child's text output.
163+
async fn spawn_child(
164+
provider: &Arc<dyn Provider>,
165+
registry: &Registry,
166+
parent_sid: &str,
167+
agent_type: &str,
168+
prompt: &str,
169+
) -> Result<String> {
170+
let session = Session::create(
171+
Some(parent_sid.to_string()),
172+
Some(format!("orchestrator-{agent_type}")),
173+
);
174+
let allowed = build_allowed_tools(agent_type);
175+
let mut child = Agent::new_with_session(
176+
Arc::clone(provider),
177+
registry.clone(),
178+
session,
179+
Some(allowed),
180+
);
181+
child.run_once_capture_inner(prompt).await
182+
}
183+
184+
/// Parse the planner's JSON array response into SwarmTaskSpecs.
185+
/// Accepts wrapped or unwrapped JSON (Codebuff pattern).
186+
fn parse_swarm_tasks(text: &str) -> Vec<SwarmTaskSpec> {
187+
let trimmed = text.trim();
188+
// Try direct parse
189+
if let Ok(arr) = serde_json::from_str::<Vec<serde_json::Value>>(trimmed) {
190+
return arr.into_iter().filter_map(parse_one_task).collect();
191+
}
192+
// Try wrapping in array (sometimes model wraps in ```json ... ```)
193+
if let Some(inner) = trimmed.strip_prefix("```json") {
194+
if let Some(end) = inner.rfind("```") {
195+
if let Ok(arr) = serde_json::from_str::<Vec<serde_json::Value>>(inner[..end].trim()) {
196+
return arr.into_iter().filter_map(parse_one_task).collect();
197+
}
198+
}
199+
}
200+
Vec::new()
201+
}
202+
203+
fn parse_one_task(v: serde_json::Value) -> Option<SwarmTaskSpec> {
204+
let desc = v.get("description")?.as_str()?.to_string();
205+
let prompt = v.get("prompt")?.as_str()?.to_string();
206+
let subagent_type = v.get("subagent_type").and_then(|s| s.as_str())
207+
.unwrap_or("editor").to_string();
208+
Some(SwarmTaskSpec { description: desc, prompt, subagent_type })
209+
}
210+
75211
#[cfg(test)]
76212
mod tests {
77213
use super::*;
78214
fn td(c: &str, g: Option<&str>) -> TodoItem { TodoItem { content: c.into(), group: g.map(String::from), ..Default::default() } }
79-
fn check(c: &str, g: Option<&str>, expected: &str) { assert_eq!(classify_todo(&td(c, g)), expected); }
80-
#[test] fn t_planner() { check("Design the auth", None, "planner"); }
81-
#[test] fn t_editor() { check("Implement button", None, "editor"); }
82-
#[test] fn t_basher() { check("Fix test", Some("qa"), "basher"); }
83-
#[test] fn t_reviewer() { check("Review PR", None, "code-reviewer"); }
84-
#[test] fn t_filepicker() { check("Find files", Some("search"), "file-picker"); }
85-
#[test] fn t_tools_readonly() { let t = build_allowed_tools("planner"); assert!(t.contains("read")); assert!(!t.contains("write")); }
86-
#[test] fn t_tools_editor() { let t = build_allowed_tools("editor"); assert!(t.contains("write")); assert!(t.contains("bash")); }
215+
fn check(c: &str, g: Option<&str>, e: &str) { assert_eq!(classify_todo(&td(c, g)), e, "mismatch for {c:?} group={g:?}"); }
216+
#[test] fn t_pl() { check("Design auth", None, "planner"); }
217+
#[test] fn t_ed() { check("Implement btn", None, "editor"); }
218+
#[test] fn t_ba() { check("Fix test", Some("qa"), "basher"); }
219+
#[test] fn t_rv() { check("Review PR", None, "code-reviewer"); }
220+
#[test] fn t_fp() { check("Find files", Some("search"), "file-picker"); }
221+
#[test] fn t_tools() { let t = build_allowed_tools("planner"); assert!(t.contains("read")); assert!(!t.contains("write")); }
222+
223+
fn parse(s: &str) -> Vec<SwarmTaskSpec> { parse_swarm_tasks(s) }
224+
225+
#[test]
226+
fn parse_json_array() {
227+
let json = r#"[{"description":"Fix auth","prompt":"Update login.ts","subagent_type":"editor"}]"#;
228+
assert_eq!(parse(json).len(), 1);
229+
}
230+
231+
#[test]
232+
fn parse_wrapped_json() {
233+
let wrapped = "```json\n[{\"description\":\"Fix db\",\"prompt\":\"Update db.ts\",\"subagent_type\":\"editor\"}]\n```";
234+
assert_eq!(parse(wrapped).len(), 1);
235+
}
236+
237+
#[test]
238+
fn parse_fallback_empty() {
239+
assert!(parse("Just do it").is_empty());
240+
}
241+
242+
#[test]
243+
fn parse_multiple_tasks() {
244+
let json = r#"[
245+
{"description":"A","prompt":"a","subagent_type":"editor"},
246+
{"description":"B","prompt":"b","subagent_type":"file-picker"}
247+
]"#;
248+
let tasks = parse(json);
249+
assert_eq!(tasks.len(), 2);
250+
assert_eq!(tasks[1].subagent_type, "file-picker");
251+
}
252+
253+
#[test]
254+
fn parse_swarm_tasks_skips_malformed() {
255+
let json = r#"[
256+
{"description":"good","prompt":"ok","subagent_type":"editor"},
257+
{"description":"bad"} // missing prompt
258+
]"#;
259+
assert_eq!(parse(json).len(), 1);
260+
}
87261
}

0 commit comments

Comments
 (0)