diff --git a/codex-rs/mock-acp-agent/src/main.rs b/codex-rs/mock-acp-agent/src/main.rs index 62b16568e..6cdfaa255 100644 --- a/codex-rs/mock-acp-agent/src/main.rs +++ b/codex-rs/mock-acp-agent/src/main.rs @@ -312,6 +312,75 @@ impl acp::Agent for MockAgent { } } + // Support interleaved text and tool calls to test for duplicate message bug + // This sends text DURING the tool call, which should trigger the bug where + // the incomplete ExecCell gets flushed to history, creating duplicates. + if std::env::var("MOCK_AGENT_INTERLEAVED_TOOL_CALL").is_ok() { + eprintln!("Mock agent: sending interleaved tool call sequence"); + + let tool_call_id = acp::ToolCallId("interleaved-tool-001".to_string().into()); + + // Step 1: Send tool call (begin) + self.send_tool_call( + session_id.clone(), + acp::ToolCall { + id: tool_call_id.clone(), + title: "Executing interleaved command".to_string(), + kind: acp::ToolKind::Execute, + status: acp::ToolCallStatus::Pending, + content: vec![], + locations: vec![], + raw_input: Some(json!({"command": "test"})), + raw_output: None, + meta: None, + }, + ) + .await?; + + // Small delay to ensure the begin event is processed + sleep(Duration::from_millis(50)).await; + + // Step 2: Send text DURING the tool call - this triggers the bug! + // When this text arrives, handle_streaming_delta calls flush_active_cell() + // which moves the incomplete ExecCell to history. + self.send_text_chunk(session_id.clone(), "Processing command...") + .await?; + + // Small delay + sleep(Duration::from_millis(50)).await; + + // Step 3: Send tool call completion + // At this point, the ExecCell is no longer in active_cell, so a new one + // will be created, resulting in duplicate entries. + self.send_tool_call_update( + session_id.clone(), + acp::ToolCallUpdate { + id: tool_call_id.clone(), + fields: acp::ToolCallUpdateFields { + title: None, + kind: None, + status: Some(acp::ToolCallStatus::Completed), + content: Some(vec![acp::ToolCallContent::Content { + content: acp::ContentBlock::Text(acp::TextContent { + text: "Command completed".to_string(), + annotations: None, + meta: None, + }), + }]), + locations: None, + raw_input: None, + raw_output: Some(json!({"exit_code": 0})), + }, + meta: None, + }, + ) + .await?; + + // Final text + self.send_text_chunk(session_id.clone(), "Interleaved test done.") + .await?; + } + // Support sending tool calls for testing ACP tool call display if std::env::var("MOCK_AGENT_SEND_TOOL_CALL").is_ok() { eprintln!("Mock agent: sending tool call sequence"); diff --git a/codex-rs/tui-pty-e2e/tests/acp_tool_calls.rs b/codex-rs/tui-pty-e2e/tests/acp_tool_calls.rs index 3d7041cf0..9ff73c60f 100644 --- a/codex-rs/tui-pty-e2e/tests/acp_tool_calls.rs +++ b/codex-rs/tui-pty-e2e/tests/acp_tool_calls.rs @@ -159,6 +159,98 @@ fn test_acp_tool_call_completion_rendered_in_tui() { insta::assert_snapshot!("acp_tool_call_echo", normalize_for_input_snapshot(contents)); } +/// Test that ACP tool calls do NOT appear twice (once as Running, once as Ran) +/// +/// This test verifies that when a tool call completes, there is only ONE entry +/// in the TUI output, not duplicate entries showing both "Running" and "Ran" +/// states. The expected behavior is that the "Running" state should be +/// updated in-place to become "Ran" when the tool call completes. +/// +/// ## Bug being tested: +/// When agent text streams while a tool call is active, the incomplete ExecCell +/// gets flushed to history. Then when the tool call completes, a new ExecCell +/// is created, resulting in duplicate entries: +/// 1. "Running ..." (flushed incomplete cell) +/// 2. "Ran ..." (new completed cell) +/// +/// This test uses MOCK_AGENT_INTERLEAVED_TOOL_CALL which sends text DURING +/// the tool call to trigger this exact scenario. +#[test] +fn test_acp_tool_call_no_duplicate_messages() { + // Configure mock agent to send interleaved text and tool calls + // This triggers the bug by sending text DURING the tool call execution + let config = SessionConfig::new() + .with_model("mock-model".to_owned()) + .with_agent_env("MOCK_AGENT_INTERLEAVED_TOOL_CALL", "1"); + + let mut session = + TuiSession::spawn_with_config(24, 80, config).expect("Failed to spawn codex in ACP mode"); + + // Wait for startup + session + .wait_for_text("›", TIMEOUT) + .expect("ACP mode should start"); + + std::thread::sleep(TIMEOUT_INPUT); + + // Send a prompt to trigger the interleaved tool call + session.send_str("Test interleaved").unwrap(); + std::thread::sleep(TIMEOUT_INPUT); + session.send_key(Key::Enter).unwrap(); + + // Wait for the final text which means everything completed + session + .wait_for_text("Interleaved test done", Duration::from_secs(10)) + .expect("Should receive completion response"); + + std::thread::sleep(TIMEOUT_PRESNAPSHOT); + + let contents = session.screen_contents(); + + // Count occurrences of the tool title "Executing interleaved command" + // It should appear exactly ONCE (in the completed "Ran" form) + let tool_title = "Executing interleaved command"; + let count = contents.matches(tool_title).count(); + + assert_eq!( + count, 1, + "Tool call '{}' should appear exactly once, but appeared {} times.\n\ + This indicates duplicate messages (both 'Running' and 'Ran' states visible).\n\ + Screen contents:\n{}", + tool_title, count, contents + ); + + // Also verify we see "Ran" (completed state) + assert!( + contents.contains("Ran"), + "Should show completed 'Ran' state. Screen contents:\n{}", + contents + ); + + // Verify we don't have both "Running" AND "Ran" for this tool call + // (which would indicate duplicates) + let has_running = contents + .lines() + .any(|line| line.contains("Running") && line.contains("Executing interleaved")); + let has_ran = contents + .lines() + .any(|line| line.contains("Ran") && line.contains("Executing interleaved")); + + assert!( + !(has_running && has_ran), + "Should NOT have both 'Running' and 'Ran' states for the same tool call.\n\ + This indicates duplicate messages.\n\ + Screen contents:\n{}", + contents + ); + + // Snapshot for visual verification + insta::assert_snapshot!( + "acp_tool_call_no_duplicates", + normalize_for_input_snapshot(contents) + ); +} + /// Snapshot test for ACP tool call rendering /// /// This captures the exact visual rendering of an ACP tool call diff --git a/codex-rs/tui-pty-e2e/tests/snapshots/acp_tool_calls__acp_tool_call_no_duplicates.snap b/codex-rs/tui-pty-e2e/tests/snapshots/acp_tool_calls__acp_tool_call_no_duplicates.snap new file mode 100644 index 000000000..14ce89a7f --- /dev/null +++ b/codex-rs/tui-pty-e2e/tests/snapshots/acp_tool_calls__acp_tool_call_no_duplicates.snap @@ -0,0 +1,28 @@ +--- +source: tui-pty-e2e/tests/acp_tool_calls.rs +assertion_line: 248 +expression: normalize_for_input_snapshot(contents) +--- +■ Operation 'ListCustomPrompts' is not supported in ACP mode + + +› Test interleaved + + +■ Operation 'AddToHistory' is not supported in ACP mode + +─ Worked for 0s ──────────────────────────────────────────────────────────────── + +• Test message 1Test message 2 + +─ Worked for 0s ──────────────────────────────────────────────────────────────── + +• Processing command...Interleaved test done. + +• Ran 'Executing interleaved command' + └ (no output) + + +› [DEFAULT_PROMPT] + + 100% context left · ? for shortcuts diff --git a/codex-rs/tui-pty-e2e/tests/snapshots/startup__startup_welcome_dimensions_40x120.snap b/codex-rs/tui-pty-e2e/tests/snapshots/startup__startup_welcome_dimensions_40x120.snap index 1c4556637..bd6a9d5f8 100644 --- a/codex-rs/tui-pty-e2e/tests/snapshots/startup__startup_welcome_dimensions_40x120.snap +++ b/codex-rs/tui-pty-e2e/tests/snapshots/startup__startup_welcome_dimensions_40x120.snap @@ -2,9 +2,6 @@ source: tui-pty-e2e/tests/startup.rs expression: normalize_for_input_snapshot(contents) --- -■ Operation 'ListCustomPrompts' is not supported in ACP mode - - › [DEFAULT_PROMPT] 100% context left · ? for shortcuts diff --git a/codex-rs/tui-pty-e2e/tests/streaming.rs b/codex-rs/tui-pty-e2e/tests/streaming.rs index 31186fc5c..6ee4a66f8 100644 --- a/codex-rs/tui-pty-e2e/tests/streaming.rs +++ b/codex-rs/tui-pty-e2e/tests/streaming.rs @@ -60,9 +60,51 @@ fn test_escape_cancels_streaming() { // Press Escape to cancel doesn't work? session.send_key(Key::Escape).unwrap(); std::thread::sleep(TIMEOUT_INPUT); + + std::thread::sleep(TIMEOUT); + // Verify cancellation completed + // (exact behavior depends on TUI implementation) + session + .wait_for_text( + "Conversation interrupted - tell the model what to do differently", + TIMEOUT, + ) + .expect("No interrupt reported"); + + std::thread::sleep(TIMEOUT); + assert_snapshot!( + "cancelled_stream", + normalize_for_input_snapshot(session.screen_contents()) + ) +} + +#[test] +fn test_ctrl_c_cancels_streaming() { + // Use git_init to prevent "Snapshots disabled" from racing with "Working" status + let config = SessionConfig::new().with_stream_until_cancel(); + let mut session = TuiSession::spawn_with_config(24, 80, config).unwrap(); + + // Wait for the prompt to appear (indicated by the chevron character) + session + .wait_for_text("›", TIMEOUT) + .expect("Prompt did not appear"); + std::thread::sleep(TIMEOUT_INPUT); + + // Submit prompt + session.send_str("testing!!!").unwrap(); + session.wait_for_text("testing!!!", TIMEOUT).unwrap(); + std::thread::sleep(TIMEOUT_INPUT); + session.send_key(Key::Enter).unwrap(); + std::thread::sleep(TIMEOUT_INPUT); + + // Wait for streaming to start + session + .wait_for_text("Working", TIMEOUT) + .expect("Streaming did not start"); + // Press ctrl-c to cancel doesn't work? - // session.send_key(Key::Ctrl('c')).unwrap(); - // std::thread::sleep(TIMEOUT_INPUT); + session.send_key(Key::Ctrl('c')).unwrap(); + std::thread::sleep(TIMEOUT_INPUT); std::thread::sleep(TIMEOUT); // Verify cancellation completed @@ -74,7 +116,7 @@ fn test_escape_cancels_streaming() { ) .expect("No interrupt reported"); - std::thread::sleep(TIMEOUT_PRESNAPSHOT); + std::thread::sleep(TIMEOUT); assert_snapshot!( "cancelled_stream", normalize_for_input_snapshot(session.screen_contents()) diff --git a/codex-rs/tui/docs.md b/codex-rs/tui/docs.md index e660268b9..c14ba2268 100644 --- a/codex-rs/tui/docs.md +++ b/codex-rs/tui/docs.md @@ -130,6 +130,23 @@ Most event types (exec begin/end, MCP calls, elicitation) are queued during acti - The `InterruptManager` still contains `ExecApproval` and `ApplyPatchApproval` variants for completeness, but these methods are marked `#[allow(dead_code)]` - `on_task_complete()` calls `flush_interrupt_queue()` for any remaining queued items +**Pending ExecCell Tracking:** + +The `PendingExecCellTracker` (`chatwidget/pending_exec_cells.rs`) prevents duplicate ACP tool call messages in the chat history. The problem it solves: + +1. Agent makes a tool call (e.g., `shell`) which creates an ExecCell in `active_cell` +2. Agent streams text *during* the tool call execution +3. Streaming text causes `flush_active_cell()`, which would normally push the incomplete ExecCell to history and clear `active_cell` +4. When `ExecCommandEnd` arrives, `handle_exec_end_now()` would create a *new* ExecCell since `active_cell` is empty +5. Result: duplicate entries for the same tool call + +The tracker intercepts this by: +- `save_pending()`: Called during flush if the ExecCell has pending (incomplete) call_ids - saves the cell keyed by call_id instead of pushing to history +- `retrieve()`: Called in `handle_exec_end_now()` - retrieves and removes the saved cell, restoring it to `active_cell` for completion +- `drain_failed()`: Called in `on_task_complete()` - marks any uncompleted pending cells as failed and returns them for insertion into history + +This follows the same encapsulation pattern as `InterruptManager`: self-contained state in its own module file with typed public methods instead of exposing raw data structures. + **ACP File Tracing:** - The TUI calls `codex_acp::init_file_tracing()` at startup (`tui/src/lib.rs`) to write `.codex-acp.log` in the current directory. Every mock agent logs `ACP agent spawned (pid: ...)` there, which makes the agent-switching tests in `tui-pty-e2e` deterministic and ensures developers can inspect agent subprocess lifecycles during debugging. diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 2daafcc9e..8ea7f7421 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -109,6 +109,8 @@ use crate::text_formatting::truncate_text; use crate::tui::FrameRequester; mod interrupts; use self::interrupts::InterruptManager; +mod pending_exec_cells; +use self::pending_exec_cells::PendingExecCellTracker; mod agent; use self::agent::spawn_agent; use self::agent::spawn_agent_from_existing; @@ -322,6 +324,8 @@ pub(crate) struct ChatWidget { feedback: codex_feedback::CodexFeedback, // Current session rollout path (if known) current_rollout_path: Option, + // Tracks incomplete ExecCells that were flushed before completion. + pending_exec_cells: PendingExecCellTracker, // Pending agent selection for next prompt submission pending_agent: Option, // Expected model name for agent switch synchronization. @@ -520,6 +524,14 @@ impl ChatWidget { // during streaming. This is necessary for ACP mode which doesn't send a // separate AgentMessage event to trigger handle_stream_finished(). self.flush_interrupt_queue(); + + // Flush any pending ExecCells that weren't completed (e.g., due to interruption). + for pending_cell in self.pending_exec_cells.drain_failed() { + self.needs_final_message_separator = true; + self.app_event_tx + .send(AppEvent::InsertHistoryCell(pending_cell)); + } + // Mark task stopped and request redraw now that all content is in history. self.bottom_pane.set_task_running(false); self.running_commands.clear(); @@ -1005,21 +1017,29 @@ impl ChatWidget { let is_unified_exec_interaction = matches!(source, ExecCommandSource::UnifiedExecInteraction); - let needs_new = self - .active_cell - .as_ref() - .map(|cell| cell.as_any().downcast_ref::().is_none()) - .unwrap_or(true); - if needs_new { - self.flush_active_cell(); - self.active_cell = Some(Box::new(new_active_exec_command( - ev.call_id.clone(), - command, - parsed, - source, - None, - self.config.animations, - ))); + // First check if there's a pending ExecCell for this call_id + // (saved when the incomplete cell was flushed due to streaming) + if let Some(pending_cell) = self.pending_exec_cells.retrieve(&ev.call_id) { + // Move the pending cell to active_cell so we can complete it + self.active_cell = Some(pending_cell); + } else { + // Normal flow: check if active_cell is an ExecCell + let needs_new = self + .active_cell + .as_ref() + .map(|cell| cell.as_any().downcast_ref::().is_none()) + .unwrap_or(true); + if needs_new { + self.flush_active_cell(); + self.active_cell = Some(Box::new(new_active_exec_command( + ev.call_id.clone(), + command, + parsed, + source, + None, + self.config.animations, + ))); + } } if let Some(cell) = self @@ -1284,6 +1304,7 @@ impl ChatWidget { last_rendered_width: std::cell::Cell::new(None), feedback, current_rollout_path: None, + pending_exec_cells: PendingExecCellTracker::new(), pending_agent: None, expected_model, session_configured_received: false, @@ -1365,6 +1386,7 @@ impl ChatWidget { last_rendered_width: std::cell::Cell::new(None), feedback, current_rollout_path: None, + pending_exec_cells: PendingExecCellTracker::new(), pending_agent: None, expected_model, // For existing conversations, we've already received SessionConfigured @@ -1638,6 +1660,22 @@ impl ChatWidget { fn flush_active_cell(&mut self) { if let Some(active) = self.active_cell.take() { + // Check if this is an incomplete ExecCell that should be saved to pending + // instead of being flushed to history. This prevents duplicate entries when + // the ExecCommandEnd event arrives later. + if let Some(exec_cell) = active.as_any().downcast_ref::() + && exec_cell.is_active() + { + // Get the pending call_ids before we consume the cell + let pending_ids = exec_cell.pending_call_ids(); + if !pending_ids.is_empty() { + // Save to pending map using the first pending call_id as key + let key = pending_ids[0].clone(); + self.pending_exec_cells.save_pending(key, active); + return; + } + } + // Normal flush path - cell is complete or not an ExecCell self.needs_final_message_separator = true; self.app_event_tx.send(AppEvent::InsertHistoryCell(active)); } diff --git a/codex-rs/tui/src/chatwidget/pending_exec_cells.rs b/codex-rs/tui/src/chatwidget/pending_exec_cells.rs new file mode 100644 index 000000000..99f9cbf69 --- /dev/null +++ b/codex-rs/tui/src/chatwidget/pending_exec_cells.rs @@ -0,0 +1,143 @@ +//! Tracks incomplete ExecCells that were flushed before completion. +//! +//! When agent text streams during an ACP tool call execution, the incomplete +//! ExecCell gets flushed from `active_cell`. This tracker saves those cells +//! by `call_id` so they can be retrieved and completed when `ExecCommandEnd` +//! arrives, preventing duplicate entries in history. + +use std::collections::HashMap; + +use crate::exec_cell::ExecCell; +use crate::history_cell::HistoryCell; + +/// Manages incomplete ExecCells that were flushed before their tool calls completed. +/// +/// This prevents duplicate history entries when streaming text causes an incomplete +/// ExecCell to be flushed, and then a new one would be created when the tool call ends. +#[derive(Default)] +pub(crate) struct PendingExecCellTracker { + /// Incomplete cells keyed by call_id for later retrieval. + pending: HashMap>, +} + +impl PendingExecCellTracker { + /// Creates a new empty tracker. + pub(crate) fn new() -> Self { + Self { + pending: HashMap::new(), + } + } + + /// Saves a pending cell by its call_id. + /// + /// Called when an incomplete ExecCell is flushed from `active_cell` during streaming. + pub(crate) fn save_pending(&mut self, call_id: String, cell: Box) { + self.pending.insert(call_id, cell); + } + + /// Retrieves and removes a pending cell by call_id. + /// + /// Called when `ExecCommandEnd` arrives to check if there's an incomplete cell + /// that should be completed instead of creating a new one. + pub(crate) fn retrieve(&mut self, call_id: &str) -> Option> { + self.pending.remove(call_id) + } + + /// Drains all pending cells, marking them as failed. + /// + /// Called on task completion to clean up any cells that weren't completed + /// (e.g., due to interruption). Returns the cells for insertion into history. + pub(crate) fn drain_failed(&mut self) -> Vec> { + self.pending + .drain() + .map(|(_, mut cell)| { + if let Some(exec) = cell.as_any_mut().downcast_mut::() { + exec.mark_failed(); + } + cell + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::exec_cell::new_active_exec_command; + use codex_core::protocol::ExecCommandSource; + + fn make_test_exec_cell(call_id: &str) -> Box { + Box::new(new_active_exec_command( + call_id.to_string(), + vec!["echo".to_string(), "test".to_string()], + vec![], + ExecCommandSource::Agent, + None, + false, // animations disabled + )) + } + + #[test] + fn save_and_retrieve_returns_cell() { + let mut tracker = PendingExecCellTracker::new(); + let call_id = "test-call-001"; + + tracker.save_pending(call_id.to_string(), make_test_exec_cell(call_id)); + + let retrieved = tracker.retrieve(call_id); + assert!(retrieved.is_some(), "Should retrieve the saved cell"); + + // Second retrieve should return None (cell was removed) + let second = tracker.retrieve(call_id); + assert!(second.is_none(), "Cell should be removed after retrieval"); + } + + #[test] + fn retrieve_nonexistent_returns_none() { + let mut tracker = PendingExecCellTracker::new(); + + let result = tracker.retrieve("nonexistent-call"); + assert!(result.is_none(), "Should return None for unknown call_id"); + } + + #[test] + fn drain_failed_returns_all_cells_and_empties_tracker() { + let mut tracker = PendingExecCellTracker::new(); + + tracker.save_pending("call-1".to_string(), make_test_exec_cell("call-1")); + tracker.save_pending("call-2".to_string(), make_test_exec_cell("call-2")); + + let drained = tracker.drain_failed(); + assert_eq!(drained.len(), 2, "Should drain all pending cells"); + + // Tracker should be empty now + assert!( + tracker.retrieve("call-1").is_none(), + "Tracker should be empty after drain" + ); + assert!( + tracker.retrieve("call-2").is_none(), + "Tracker should be empty after drain" + ); + } + + #[test] + fn drain_failed_marks_exec_cells_as_failed() { + let mut tracker = PendingExecCellTracker::new(); + tracker.save_pending("call-1".to_string(), make_test_exec_cell("call-1")); + + let drained = tracker.drain_failed(); + assert_eq!(drained.len(), 1); + + // The cell should no longer be active (mark_failed sets output on all calls) + let cell = &drained[0]; + if let Some(exec) = cell.as_any().downcast_ref::() { + assert!( + !exec.is_active(), + "ExecCell should be marked as failed (not active)" + ); + } else { + panic!("Expected ExecCell"); + } + } +} diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 8dc00f93f..203cd2573 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -378,7 +378,10 @@ fn make_chatwidget_manual() -> ( last_rendered_width: std::cell::Cell::new(None), feedback: codex_feedback::CodexFeedback::new(), current_rollout_path: None, + pending_exec_cells: PendingExecCellTracker::new(), pending_agent: None, + expected_model: None, + session_configured_received: false, }; (widget, rx, op_rx) } diff --git a/codex-rs/tui/src/exec_cell/model.rs b/codex-rs/tui/src/exec_cell/model.rs index 76316968c..d4104af42 100644 --- a/codex-rs/tui/src/exec_cell/model.rs +++ b/codex-rs/tui/src/exec_cell/model.rs @@ -110,6 +110,15 @@ impl ExecCell { self.calls.iter().any(|c| c.output.is_none()) } + /// Returns the call_ids of calls that have not yet completed (no output). + pub(crate) fn pending_call_ids(&self) -> Vec { + self.calls + .iter() + .filter(|c| c.output.is_none()) + .map(|c| c.call_id.clone()) + .collect() + } + pub(crate) fn active_start_time(&self) -> Option { self.calls .iter()