Skip to content

Commit c9f8da9

Browse files
committed
fix(multi_agents): normalise for stability
1 parent 6159cb4 commit c9f8da9

6 files changed

Lines changed: 299 additions & 21 deletions

File tree

codex-rs/core/src/agent/control_tests.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::config::ConfigBuilder;
99
use crate::context::ContextualUserFragment;
1010
use crate::context::SubagentNotification;
1111
use crate::init_state_db;
12+
use crate::thread_manager::StartThreadOptions;
1213
use assert_matches::assert_matches;
1314
use codex_features::Feature;
1415
use codex_login::CodexAuth;
@@ -20,6 +21,7 @@ use codex_protocol::models::ResponseItem;
2021
use codex_protocol::protocol::CompactedItem;
2122
use codex_protocol::protocol::ErrorEvent;
2223
use codex_protocol::protocol::EventMsg;
24+
use codex_protocol::protocol::InitialHistory;
2325
use codex_protocol::protocol::InterAgentCommunication;
2426
use codex_protocol::protocol::SessionSource;
2527
use codex_protocol::protocol::SubAgentSource;
@@ -1701,6 +1703,86 @@ async fn multi_agent_v2_completion_wakes_direct_parent() {
17011703
));
17021704
}
17031705

1706+
#[tokio::test]
1707+
async fn multi_agent_v2_terminal_turn_event_wakes_direct_parent() {
1708+
let harness = AgentControlHarness::new().await;
1709+
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
1710+
let mut tester_config = harness.config.clone();
1711+
let _ = tester_config.features.enable(Feature::MultiAgentV2);
1712+
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
1713+
let tester_path = worker_path.join("tester").expect("tester path");
1714+
let tester_thread = harness
1715+
.manager
1716+
.start_thread_with_options(StartThreadOptions {
1717+
config: tester_config,
1718+
initial_history: InitialHistory::New,
1719+
session_source: Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
1720+
parent_thread_id: worker_thread_id,
1721+
depth: 2,
1722+
agent_path: Some(tester_path.clone()),
1723+
agent_nickname: None,
1724+
agent_role: Some("explorer".to_string()),
1725+
})),
1726+
thread_source: None,
1727+
dynamic_tools: Vec::new(),
1728+
persist_extended_history: false,
1729+
metrics_service_name: None,
1730+
parent_trace: None,
1731+
environments: Vec::new(),
1732+
})
1733+
.await
1734+
.expect("tester thread should start")
1735+
.thread;
1736+
let tester_turn = tester_thread.codex.session.new_default_turn().await;
1737+
tester_thread
1738+
.codex
1739+
.session
1740+
.send_event(
1741+
tester_turn.as_ref(),
1742+
EventMsg::TurnComplete(TurnCompleteEvent {
1743+
turn_id: tester_turn.sub_id.clone(),
1744+
last_agent_message: Some("done".to_string()),
1745+
completed_at: None,
1746+
duration_ms: None,
1747+
time_to_first_token_ms: None,
1748+
}),
1749+
)
1750+
.await;
1751+
1752+
let expected_message = crate::session_prefix::format_subagent_notification_message(
1753+
tester_path.as_str(),
1754+
&AgentStatus::Completed(Some("done".to_string())),
1755+
);
1756+
let expected = (
1757+
worker_thread_id,
1758+
Op::InterAgentCommunication {
1759+
communication: InterAgentCommunication::new(
1760+
tester_path,
1761+
worker_path,
1762+
Vec::new(),
1763+
expected_message,
1764+
/*trigger_turn*/ true,
1765+
),
1766+
},
1767+
);
1768+
1769+
timeout(Duration::from_secs(5), async {
1770+
loop {
1771+
let captured = harness
1772+
.manager
1773+
.captured_ops()
1774+
.into_iter()
1775+
.find(|entry| *entry == expected);
1776+
if captured == Some(expected.clone()) {
1777+
break;
1778+
}
1779+
sleep(Duration::from_millis(10)).await;
1780+
}
1781+
})
1782+
.await
1783+
.expect("terminal turn event should wake the direct parent");
1784+
}
1785+
17041786
#[tokio::test]
17051787
async fn completion_watcher_notifies_parent_when_child_is_missing() {
17061788
let harness = AgentControlHarness::new().await;

codex-rs/core/src/session/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1705,7 +1705,7 @@ impl Session {
17051705
parent_agent_path,
17061706
Vec::new(),
17071707
message,
1708-
/*trigger_turn*/ false,
1708+
/*trigger_turn*/ true,
17091709
);
17101710
if let Err(err) = self
17111711
.services

codex-rs/core/src/tools/handlers/multi_agents/spawn.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ async fn handle_spawn_agent(
189189
)
190190
.await;
191191
let new_thread_id = result?.thread_id;
192+
session
193+
.input_queue
194+
.accept_mailbox_delivery_for_current_turn(&session.active_turn, &turn.sub_id)
195+
.await;
192196
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
193197
turn.session_telemetry.counter(
194198
"codex.multi_agent.spawn",

codex-rs/core/src/tools/handlers/multi_agents_spec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ fn spawn_agent_common_properties_v2(agent_type_description: &str) -> BTreeMap<St
594594
(
595595
"fork_turns".to_string(),
596596
JsonSchema::string(Some(
597-
"Optional number of turns to fork. Defaults to `all`. Use `none`, `all`, or a positive integer string such as `3` to fork only the most recent turns."
597+
"Optional number of turns to fork. Defaults to `all`, or `none` when agent_type, model, or reasoning_effort is set. Use `none`, `all`, or a positive integer string such as `3` to fork only the most recent turns."
598598
.to_string(),
599599
)),
600600
),

codex-rs/core/src/tools/handlers/multi_agents_tests.rs

Lines changed: 184 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::config::AgentRoleConfig;
44
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
55
use crate::function_tool::FunctionCallError;
66
use crate::init_state_db;
7+
use crate::session::TurnInput;
78
use crate::session::tests::make_session_and_context;
89
use crate::session_prefix::format_subagent_notification_message;
910
use crate::thread_manager::thread_store_from_config;
@@ -92,6 +93,30 @@ fn parse_agent_id(id: &str) -> ThreadId {
9293
ThreadId::from_string(id).expect("agent id should be valid")
9394
}
9495

96+
#[derive(Clone, Copy)]
97+
struct MailboxDeliveryTestTask;
98+
99+
impl crate::tasks::SessionTask for MailboxDeliveryTestTask {
100+
fn kind(&self) -> crate::state::TaskKind {
101+
crate::state::TaskKind::Regular
102+
}
103+
104+
fn span_name(&self) -> &'static str {
105+
"session_task.mailbox_delivery_test"
106+
}
107+
108+
async fn run(
109+
self: Arc<Self>,
110+
_session: Arc<crate::tasks::SessionTaskContext>,
111+
_ctx: Arc<TurnContext>,
112+
_input: Vec<TurnInput>,
113+
cancellation_token: CancellationToken,
114+
) -> Option<String> {
115+
cancellation_token.cancelled().await;
116+
None
117+
}
118+
}
119+
95120
fn thread_manager() -> ThreadManager {
96121
ThreadManager::with_models_provider_for_tests(
97122
CodexAuth::from_api_key("dummy"),
@@ -294,6 +319,132 @@ async fn spawn_agent_uses_explorer_role_and_preserves_approval_policy() {
294319
assert_eq!(snapshot.model_provider_id, "ollama");
295320
}
296321

322+
#[tokio::test]
323+
async fn spawn_agent_reopens_mailbox_delivery_for_current_turn() {
324+
let (mut session, mut turn) = make_session_and_context().await;
325+
let manager = thread_manager();
326+
let root = manager
327+
.start_thread((*turn.config).clone())
328+
.await
329+
.expect("root thread should start");
330+
session.services.agent_control = manager.agent_control();
331+
session.conversation_id = root.thread_id;
332+
turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
333+
parent_thread_id: session.conversation_id,
334+
depth: 0,
335+
agent_path: Some(AgentPath::root()),
336+
agent_nickname: None,
337+
agent_role: None,
338+
});
339+
let communication = InterAgentCommunication::new(
340+
AgentPath::try_from("/root/worker").expect("worker path should parse"),
341+
AgentPath::root(),
342+
Vec::new(),
343+
"queued child update".to_string(),
344+
/*trigger_turn*/ false,
345+
);
346+
let session = Arc::new(session);
347+
let turn = Arc::new(turn);
348+
session
349+
.spawn_task(Arc::clone(&turn), Vec::new(), MailboxDeliveryTestTask)
350+
.await;
351+
session
352+
.input_queue
353+
.defer_mailbox_delivery_to_next_turn(&session.active_turn, &turn.sub_id)
354+
.await;
355+
session
356+
.input_queue
357+
.enqueue_mailbox_communication(communication.clone())
358+
.await;
359+
360+
SpawnAgentHandler::default()
361+
.handle(invocation(
362+
session.clone(),
363+
turn.clone(),
364+
"spawn_agent",
365+
function_payload(json!({
366+
"message": "inspect this repo",
367+
"agent_type": "explorer"
368+
})),
369+
))
370+
.await
371+
.expect("spawn_agent should succeed");
372+
373+
assert_eq!(
374+
session
375+
.input_queue
376+
.get_pending_input(&session.active_turn)
377+
.await,
378+
vec![TurnInput::ResponseInputItem(
379+
communication.to_response_input_item()
380+
)],
381+
);
382+
session.abort_all_tasks(TurnAbortReason::Replaced).await;
383+
}
384+
385+
#[tokio::test]
386+
async fn multi_agent_v2_spawn_agent_reopens_mailbox_delivery_for_current_turn() {
387+
let (mut session, mut turn) = make_session_and_context().await;
388+
let manager = thread_manager();
389+
let root = manager
390+
.start_thread((*turn.config).clone())
391+
.await
392+
.expect("root thread should start");
393+
session.services.agent_control = manager.agent_control();
394+
session.conversation_id = root.thread_id;
395+
turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
396+
parent_thread_id: session.conversation_id,
397+
depth: 0,
398+
agent_path: Some(AgentPath::root()),
399+
agent_nickname: None,
400+
agent_role: None,
401+
});
402+
let communication = InterAgentCommunication::new(
403+
AgentPath::try_from("/root/worker").expect("worker path should parse"),
404+
AgentPath::root(),
405+
Vec::new(),
406+
"queued child update".to_string(),
407+
/*trigger_turn*/ false,
408+
);
409+
let session = Arc::new(session);
410+
let turn = Arc::new(turn);
411+
session
412+
.spawn_task(Arc::clone(&turn), Vec::new(), MailboxDeliveryTestTask)
413+
.await;
414+
session
415+
.input_queue
416+
.defer_mailbox_delivery_to_next_turn(&session.active_turn, &turn.sub_id)
417+
.await;
418+
session
419+
.input_queue
420+
.enqueue_mailbox_communication(communication.clone())
421+
.await;
422+
423+
SpawnAgentHandlerV2::default()
424+
.handle(invocation(
425+
session.clone(),
426+
turn.clone(),
427+
"spawn_agent",
428+
function_payload(json!({
429+
"message": "inspect this repo",
430+
"task_name": "worker"
431+
})),
432+
))
433+
.await
434+
.expect("spawn_agent should succeed");
435+
436+
assert_eq!(
437+
session
438+
.input_queue
439+
.get_pending_input(&session.active_turn)
440+
.await,
441+
vec![TurnInput::ResponseInputItem(
442+
communication.to_response_input_item()
443+
)],
444+
);
445+
session.abort_all_tasks(TurnAbortReason::Replaced).await;
446+
}
447+
297448
#[tokio::test]
298449
async fn spawn_agent_fork_context_rejects_agent_type_override() {
299450
let (mut session, mut turn) = make_session_and_context().await;
@@ -409,8 +560,9 @@ async fn multi_agent_v2_spawn_fork_turns_all_rejects_agent_type_override() {
409560
}
410561

411562
#[tokio::test]
412-
async fn multi_agent_v2_spawn_defaults_to_full_fork_and_rejects_child_model_overrides() {
563+
async fn multi_agent_v2_spawn_defaults_to_new_thread_for_agent_type_override() {
413564
let (mut session, mut turn) = make_session_and_context().await;
565+
let role_name = install_role_with_model_override(&mut turn).await;
414566
let manager = thread_manager();
415567
let root = manager
416568
.start_thread((*turn.config).clone())
@@ -425,28 +577,45 @@ async fn multi_agent_v2_spawn_defaults_to_full_fork_and_rejects_child_model_over
425577
.expect("test config should allow feature update");
426578
turn.config = Arc::new(config);
427579

428-
let err = SpawnAgentHandlerV2::default()
580+
let session = Arc::new(session);
581+
let turn = Arc::new(turn);
582+
let output = SpawnAgentHandlerV2::default()
429583
.handle(invocation(
430-
Arc::new(session),
431-
Arc::new(turn),
584+
session.clone(),
585+
turn.clone(),
432586
"spawn_agent",
433587
function_payload(json!({
434588
"message": "inspect this repo",
435-
"task_name": "fork_context_v2",
436-
"model": "gpt-5-child-override",
437-
"reasoning_effort": "low"
589+
"task_name": "role_default",
590+
"agent_type": role_name
438591
})),
439592
))
440593
.await
441-
.err()
442-
.expect("default full fork should reject child model overrides");
443-
444-
assert_eq!(
445-
err,
446-
FunctionCallError::RespondToModel(
447-
"Full-history forked agents inherit the parent agent type, model, and reasoning effort; omit agent_type, model, and reasoning_effort, or spawn without a full-history fork.".to_string(),
594+
.expect("implicit new-thread spawn should allow agent_type overrides");
595+
let (content, _) = expect_text_output(output);
596+
let result: serde_json::Value =
597+
serde_json::from_str(&content).expect("spawn_agent result should be json");
598+
assert_eq!(result["task_name"], "/root/role_default");
599+
let agent_id = session
600+
.services
601+
.agent_control
602+
.resolve_agent_reference(
603+
session.conversation_id,
604+
&turn.session_source,
605+
"role_default",
448606
)
449-
);
607+
.await
608+
.expect("spawned task name should resolve");
609+
let snapshot = manager
610+
.get_thread(agent_id)
611+
.await
612+
.expect("spawned agent thread should exist")
613+
.config_snapshot()
614+
.await;
615+
616+
assert_eq!(snapshot.model, "gpt-5-role-override");
617+
assert_eq!(snapshot.model_provider_id, "ollama");
618+
assert_eq!(snapshot.reasoning_effort, Some(ReasoningEffort::Minimal));
450619
}
451620

452621
#[tokio::test]

0 commit comments

Comments
 (0)