diff --git a/nori-rs/acp/docs.md b/nori-rs/acp/docs.md index b48cd5f48..c67487134 100644 --- a/nori-rs/acp/docs.md +++ b/nori-rs/acp/docs.md @@ -846,8 +846,6 @@ When `Op::Interrupt` fires, the ACP backend now only submits `InboundEvent::Canc - `SessionPhaseChanged(Idle)` and `PromptCompleted { stop_reason, last_agent_message }` are emitted only when that prompt response is reduced - queued follow-up prompts remain in the reducer-owned outbound queue until an eligible drain point (`stop_reason: end_turn`) -`SacpConnection::prompt()` also carries a small amount of session-local transport state so cancellation tails can be absorbed without widening the public phase model. If the previous prompt ended with `Cancelled`, the next prompt request may receive one or more immediate empty `end_turn` responses before the agent starts working on the user's real follow-up prompt. The connection layer now treats those empty terminal responses as stale cancel-tail cleanup and retries the same ACP prompt request until either streamed updates arrive or a non-stale stop reason is observed. That keeps the reducer contract unchanged: it still only sees the final logical completion for the user-facing prompt turn. - This removes the old synthetic interrupt-abort fast-path that treated cancel as immediate idle. The TUI now renders ACP interrupt state from reducer-owned phase/completion projections instead of inferring prompt ownership from interrupt timing. **Tool Classification System:** diff --git a/nori-rs/acp/src/backend/session_reducer.rs b/nori-rs/acp/src/backend/session_reducer.rs index de5ecc93d..ff769fdf1 100644 --- a/nori-rs/acp/src/backend/session_reducer.rs +++ b/nori-rs/acp/src/backend/session_reducer.rs @@ -18,7 +18,6 @@ use nori_protocol::session_runtime::SessionPhase; use nori_protocol::session_runtime::SessionRuntime; use nori_protocol::session_runtime::TranscriptMessage; use nori_protocol::session_runtime::TranscriptRole; -use tracing::debug; /// Everything that can affect [`SessionRuntime`] state. #[derive(Debug)] @@ -41,32 +40,6 @@ pub enum InboundEvent { LoadSubmit { request_id: String }, } -pub(super) fn inbound_event_kind(event: &InboundEvent) -> &'static str { - match event { - InboundEvent::Notification(update) => crate::connection::session_update_kind(update), - InboundEvent::PromptResponse { .. } => "prompt_response", - InboundEvent::PromptFailed => "prompt_failed", - InboundEvent::LoadResponse => "load_response", - InboundEvent::PermissionRequest { .. } => "permission_request", - InboundEvent::PromptSubmit(_) => "prompt_submit", - InboundEvent::CancelSubmit => "cancel_submit", - InboundEvent::LoadSubmit { .. } => "load_submit", - } -} - -pub(super) fn session_phase_label(phase: &SessionPhase) -> &'static str { - match phase { - SessionPhase::Idle => "idle", - SessionPhase::Loading { .. } => "loading", - SessionPhase::Prompt { - cancelling: true, .. - } => "cancelling", - SessionPhase::Prompt { - cancelling: false, .. - } => "prompt", - } -} - /// Side effects the caller must execute after reduction. #[derive(Debug, PartialEq)] pub enum SideEffect { @@ -143,12 +116,6 @@ fn reduce_prompt_submit( ) { if runtime.phase != SessionPhase::Idle { runtime.queue.push_back(prompt); - debug!( - target: "acp_event_flow", - phase = session_phase_label(&runtime.phase), - queue_len = runtime.queue.len(), - "Queued prompt while another session request is active" - ); out.events.push(ClientEvent::QueueChanged(QueueChanged { prompts: queued_prompt_texts(runtime), })); @@ -159,7 +126,6 @@ fn reduce_prompt_submit( } fn start_prompt(runtime: &mut SessionRuntime, prompt: QueuedPrompt, out: &mut ReduceOutput) { - let phase_before = session_phase_label(&runtime.phase); let request_id = new_request_id(); // Build ACP content blocks from the queued prompt. @@ -189,15 +155,6 @@ fn start_prompt(runtime: &mut SessionRuntime, prompt: QueuedPrompt, out: &mut Re }); } - debug!( - target: "acp_event_flow", - request_id = %request_id, - prompt_kind = ?prompt.kind, - phase_before, - queue_len = runtime.queue.len(), - "Reducer started prompt and emitted session/prompt side effect" - ); - out.events .push(ClientEvent::SessionPhaseChanged(runtime.phase_view())); out.side_effects.push(SideEffect::SendPrompt { @@ -242,20 +199,6 @@ fn reduce_cancel_submit(runtime: &mut SessionRuntime, out: &mut ReduceOutput) { } } - debug!( - target: "acp_event_flow", - request_id = %owner_id, - pending_permission_requests = runtime - .active - .as_ref() - .map_or(0, |active| active.pending_permission_requests.len()), - tool_calls = runtime - .active - .as_ref() - .map_or(0, |active| active.tool_call_ids.len()), - "Reducer marked the active prompt as cancelling" - ); - out.events .push(ClientEvent::SessionPhaseChanged(runtime.phase_view())); out.side_effects.push(SideEffect::SendCancel); @@ -271,22 +214,6 @@ fn reduce_prompt_response( stop_reason: acp::StopReason, out: &mut ReduceOutput, ) { - let active_request_id = runtime - .active - .as_ref() - .map(|active| active.request_id.clone()) - .unwrap_or_else(|| "".to_string()); - let phase_before = session_phase_label(&runtime.phase); - let queue_len_before = runtime.queue.len(); - debug!( - target: "acp_event_flow", - active_request_id, - phase_before, - queue_len_before, - ?stop_reason, - "Reducer received prompt response" - ); - if !matches!(runtime.phase, SessionPhase::Prompt { .. }) { out.events.push(ClientEvent::Warning(WarningInfo { message: "Received prompt response while not in Prompt phase".to_string(), @@ -299,15 +226,6 @@ fn reduce_prompt_response( runtime.phase = SessionPhase::Idle; - debug!( - target: "acp_event_flow", - active_request_id, - ?stop_reason, - should_drain_queue, - queue_len_after_finalize = runtime.queue.len(), - "Reducer finalized prompt response" - ); - out.events .push(ClientEvent::SessionPhaseChanged(runtime.phase_view())); out.events @@ -325,18 +243,6 @@ fn reduce_prompt_response( } fn reduce_prompt_failed(runtime: &mut SessionRuntime, out: &mut ReduceOutput) { - let active_request_id = runtime - .active - .as_ref() - .map(|active| active.request_id.clone()) - .unwrap_or_else(|| "".to_string()); - debug!( - target: "acp_event_flow", - active_request_id, - phase = session_phase_label(&runtime.phase), - "Reducer received prompt failure" - ); - if !matches!(runtime.phase, SessionPhase::Prompt { .. }) { out.events.push(ClientEvent::Warning(WarningInfo { message: "Received prompt failure while not in Prompt phase".to_string(), @@ -404,18 +310,6 @@ fn reduce_notification( normalizer: &mut ClientEventNormalizer, out: &mut ReduceOutput, ) { - debug!( - target: "acp_event_flow", - update_kind = crate::connection::session_update_kind(&update), - phase = session_phase_label(&runtime.phase), - active_request_id = runtime - .active - .as_ref() - .map(|active| active.request_id.as_str()) - .unwrap_or(""), - "Reducer received session/update" - ); - // Session metadata updates are accepted in any phase. if is_session_metadata_update(&update) { reduce_metadata_update(runtime, &update, normalizer, out); diff --git a/nori-rs/acp/src/backend/session_reducer/tests.rs b/nori-rs/acp/src/backend/session_reducer/tests.rs index 4239dac56..777b62088 100644 --- a/nori-rs/acp/src/backend/session_reducer/tests.rs +++ b/nori-rs/acp/src/backend/session_reducer/tests.rs @@ -11,9 +11,7 @@ use pretty_assertions::assert_eq; use super::InboundEvent; use super::SideEffect; -use super::inbound_event_kind; use super::reduce; -use super::session_phase_label; fn new_runtime() -> SessionRuntime { SessionRuntime::new() @@ -127,41 +125,6 @@ fn prompt_response_transitions_to_idle() { ))); } -#[test] -fn inbound_event_kind_labels_prompt_response() { - assert_eq!( - inbound_event_kind(&InboundEvent::PromptResponse { - stop_reason: acp::StopReason::Cancelled, - }), - "prompt_response" - ); -} - -#[test] -fn session_phase_label_labels_known_phases() { - assert_eq!(session_phase_label(&SessionPhase::Idle), "idle"); - assert_eq!( - session_phase_label(&SessionPhase::Prompt { - request_id: "req-1".to_string(), - cancelling: false, - }), - "prompt" - ); - assert_eq!( - session_phase_label(&SessionPhase::Prompt { - request_id: "req-1".to_string(), - cancelling: true, - }), - "cancelling" - ); - assert_eq!( - session_phase_label(&SessionPhase::Loading { - request_id: "req-2".to_string(), - }), - "loading" - ); -} - // ========================================================================= // 2. Cancel semantics // ========================================================================= diff --git a/nori-rs/acp/src/backend/session_runtime_driver.rs b/nori-rs/acp/src/backend/session_runtime_driver.rs index cc038641c..f3e345f89 100644 --- a/nori-rs/acp/src/backend/session_runtime_driver.rs +++ b/nori-rs/acp/src/backend/session_runtime_driver.rs @@ -36,24 +36,6 @@ pub(crate) struct ReducerActions { pub completed_turn: Option, } -fn client_event_kind(event: &ClientEvent) -> &'static str { - match event { - ClientEvent::SessionUpdateInfo(_) => "session_update_info", - ClientEvent::SessionPhaseChanged(_) => "session_phase_changed", - ClientEvent::QueueChanged(_) => "queue_changed", - ClientEvent::MessageDelta(_) => "message_delta", - ClientEvent::PromptCompleted(_) => "prompt_completed", - ClientEvent::ToolSnapshot(_) => "tool_snapshot", - ClientEvent::ApprovalRequest(_) => "approval_request", - ClientEvent::AgentCommandsUpdate(_) => "agent_commands_update", - ClientEvent::PlanSnapshot(_) => "plan_snapshot", - ClientEvent::LoadCompleted => "load_completed", - ClientEvent::ContextCompacted(_) => "context_compacted", - ClientEvent::Warning(_) => "warning", - ClientEvent::ReplayEntry(_) => "replay_entry", - } -} - impl SessionDriver { pub(crate) fn new() -> Self { Self { @@ -100,14 +82,6 @@ impl SessionDriver { .map(|active| active.request_id.clone()) } - pub(crate) fn phase_label(&self) -> &'static str { - session_reducer::session_phase_label(&self.runtime.phase) - } - - pub(crate) fn queue_len(&self) -> usize { - self.runtime.queue.len() - } - pub(crate) fn push_permission_request( &mut self, request: &crate::connection::ApprovalRequest, @@ -148,27 +122,9 @@ impl AcpBackend { event, InboundEvent::PromptResponse { .. } | InboundEvent::PromptFailed ); - let event_kind = session_reducer::inbound_event_kind(&event); let actions = { let mut driver = self.session_driver.lock().await; - let phase_before = driver.phase_label(); - let active_before = driver.active_request_id(); - let queue_len_before = driver.queue_len(); - let actions = driver.apply(event); - debug!( - target: "acp_event_flow", - event_kind, - phase_before, - active_request_id_before = active_before.as_deref().unwrap_or(""), - queue_len_before, - phase_after = driver.phase_label(), - active_request_id_after = driver.active_request_id().as_deref().unwrap_or(""), - queue_len_after = driver.queue_len(), - client_events = actions.events.len(), - side_effects = actions.side_effects.len(), - "Applied reducer event in serialized session runtime" - ); - actions + driver.apply(event) }; self.dispatch_reducer_actions(actions).await; if start_idle_timer { @@ -302,35 +258,6 @@ impl AcpBackend { } async fn forward_and_record_client_event(&self, client_event: ClientEvent) { - match &client_event { - ClientEvent::SessionPhaseChanged(phase) => { - debug!( - target: "acp_event_flow", - client_event = client_event_kind(&client_event), - ?phase, - "Forwarding client event from ACP backend" - ); - } - ClientEvent::PromptCompleted(completed) => { - debug!( - target: "acp_event_flow", - client_event = client_event_kind(&client_event), - stop_reason = ?completed.stop_reason, - has_last_agent_message = completed - .last_agent_message - .as_ref() - .is_some_and(|message| !message.is_empty()), - "Forwarding client event from ACP backend" - ); - } - _ => { - debug!( - target: "acp_event_flow", - client_event = client_event_kind(&client_event), - "Forwarding client event from ACP backend" - ); - } - } emit_client_event( &self.backend_event_tx, self.transcript_recorder.as_ref(), @@ -488,7 +415,7 @@ impl AcpBackend { async fn execute_side_effect(&self, side_effect: SideEffect) { match side_effect { - SideEffect::SendPrompt { request_id, prompt } => { + SideEffect::SendPrompt { prompt, .. } => { if let Some(abort_handle) = self.idle_timer_abort.lock().await.take() { abort_handle.abort(); } @@ -505,39 +432,22 @@ impl AcpBackend { }; let backend = (*self).clone(); let prompt_result_tx = self.prompt_result_tx.clone(); - let request_id_for_task = request_id.clone(); tokio::spawn(async move { let session_id = backend.session_id.read().await.clone(); - let prompt_kind = prompt_kind.unwrap_or(QueuedPromptKind::User); - debug!( - target: "acp_event_flow", - request_id = %request_id_for_task, - session_id = %session_id, - ?prompt_kind, - content_blocks = prompt.len(), - "Sending ACP session/prompt request" - ); let result = backend.connection.prompt(session_id, prompt).await; match result { Ok(stop_reason) => { - debug!( - target: "acp_event_flow", - request_id = %request_id_for_task, - ?stop_reason, - "Prompt task received ACP session/prompt response" - ); let _ = prompt_result_tx .send(InboundEvent::PromptResponse { stop_reason }) .await; } Err(err) => { - warn!( - target: "acp_event_flow", - request_id = %request_id_for_task, - error = %err, - "Prompt task failed before reducer observed a prompt response" - ); - backend.send_prompt_error(prompt_kind, &err).await; + backend + .send_prompt_error( + prompt_kind.unwrap_or(QueuedPromptKind::User), + &err, + ) + .await; let _ = prompt_result_tx.send(InboundEvent::PromptFailed).await; } } @@ -545,11 +455,6 @@ impl AcpBackend { } SideEffect::SendCancel => { let session_id = self.session_id.read().await.clone(); - debug!( - target: "acp_event_flow", - session_id = %session_id, - "Sending ACP session/cancel notification" - ); if let Err(err) = self.connection.cancel(&session_id).await { warn!("Failed to cancel ACP session: {err}"); } diff --git a/nori-rs/acp/src/backend/spawn_and_relay.rs b/nori-rs/acp/src/backend/spawn_and_relay.rs index 26b6d878e..7d19c43e0 100644 --- a/nori-rs/acp/src/backend/spawn_and_relay.rs +++ b/nori-rs/acp/src/backend/spawn_and_relay.rs @@ -265,21 +265,12 @@ impl AcpBackend { approval_policy_rx: watch::Receiver, ) { let approval_policy_rx = approval_policy_rx; - let mut relay_seq = 0_i64; loop { tokio::select! { biased; maybe_event = event_rx.recv() => { match maybe_event { Some(crate::connection::ConnectionEvent::SessionUpdate(update)) => { - relay_seq += 1; - debug!( - target: "acp_event_flow", - relay_seq, - relay_source = "transport_event_rx", - update_kind = crate::connection::session_update_kind(&update), - "Relaying session/update into serialized session runtime" - ); let _ = backend .session_event_tx .send(session_runtime_driver::SessionRuntimeInput::Reducer( @@ -288,15 +279,7 @@ impl AcpBackend { .await; } Some(crate::connection::ConnectionEvent::ApprovalRequest(request)) => { - relay_seq += 1; let current_policy = *approval_policy_rx.borrow(); - debug!( - target: "acp_event_flow", - relay_seq, - relay_source = "transport_event_rx", - call_id = request.event.call_id(), - "Relaying permission request into serialized session runtime" - ); let _ = backend .session_event_tx .send( @@ -316,14 +299,6 @@ impl AcpBackend { maybe_result = prompt_result_rx.recv() => { match maybe_result { Some(result) => { - relay_seq += 1; - debug!( - target: "acp_event_flow", - relay_seq, - relay_source = "prompt_result_rx", - inbound_event = session_reducer::inbound_event_kind(&result), - "Relaying prompt result into serialized session runtime" - ); let _ = backend .session_event_tx .send(session_runtime_driver::SessionRuntimeInput::Reducer(result)) @@ -333,14 +308,6 @@ impl AcpBackend { while let Some(event) = event_rx.recv().await { match event { crate::connection::ConnectionEvent::SessionUpdate(update) => { - relay_seq += 1; - debug!( - target: "acp_event_flow", - relay_seq, - relay_source = "transport_event_rx_drain", - update_kind = crate::connection::session_update_kind(&update), - "Draining session/update after prompt result channel closed" - ); let _ = backend .session_event_tx .send(session_runtime_driver::SessionRuntimeInput::Reducer( @@ -349,15 +316,7 @@ impl AcpBackend { .await; } crate::connection::ConnectionEvent::ApprovalRequest(request) => { - relay_seq += 1; let current_policy = *approval_policy_rx.borrow(); - debug!( - target: "acp_event_flow", - relay_seq, - relay_source = "transport_event_rx_drain", - call_id = request.event.call_id(), - "Draining permission request after prompt result channel closed" - ); let _ = backend .session_event_tx .send( diff --git a/nori-rs/acp/src/backend/user_input.rs b/nori-rs/acp/src/backend/user_input.rs index 30cc6edca..dcc5f388e 100644 --- a/nori-rs/acp/src/backend/user_input.rs +++ b/nori-rs/acp/src/backend/user_input.rs @@ -163,27 +163,6 @@ impl AcpBackend { prompt_with_context }; - let (phase_before_submit, active_request_id_before_submit, queue_len_before_submit) = { - let driver = self.session_driver.lock().await; - ( - driver.phase_label(), - driver.active_request_id(), - driver.queue_len(), - ) - }; - debug!( - target: "acp_event_flow", - event_id = id, - phase_before_submit, - active_request_id_before_submit = active_request_id_before_submit - .as_deref() - .unwrap_or(""), - queue_len_before_submit, - prompt_text_len = final_prompt_text.len(), - image_blocks = image_blocks.len(), - "Accepted user prompt into ACP backend" - ); - let _ = self .session_event_tx .send(session_runtime_driver::SessionRuntimeInput::Reducer( diff --git a/nori-rs/acp/src/config/types/tests.rs b/nori-rs/acp/src/config/types/tests.rs index e8e286b62..5c764d06d 100644 --- a/nori-rs/acp/src/config/types/tests.rs +++ b/nori-rs/acp/src/config/types/tests.rs @@ -1023,7 +1023,7 @@ name = "Claude Code" slug = "claude-code" [agents.distribution.npx] -package = "@agentclientprotocol/claude-agent-acp" +package = "@zed-industries/claude-agent-acp@0.23.1" "#, ) .unwrap(); @@ -1033,7 +1033,7 @@ package = "@agentclientprotocol/claude-agent-acp" assert!(config.agents[0].distribution.npx.is_some()); assert_eq!( config.agents[0].distribution.npx.as_ref().unwrap().package, - "@agentclientprotocol/claude-agent-acp" + "@zed-industries/claude-agent-acp@0.23.1" ); } @@ -1130,7 +1130,7 @@ name = "Claude Code" slug = "claude-code" [agents.distribution.npx] -package = "@agentclientprotocol/claude-agent-acp" +package = "@zed-industries/claude-agent-acp@0.23.1" [[agents]] name = "Kimi" @@ -1217,7 +1217,7 @@ fn test_agent_distribution_resolve_rejects_multiple() { fn test_agent_distribution_resolve_npx() { let dist = AgentDistributionToml { npx: Some(PackageDistribution { - package: "@agentclientprotocol/claude-agent-acp".to_string(), + package: "@zed-industries/claude-agent-acp@0.23.1".to_string(), args: vec![], }), ..Default::default() @@ -1225,7 +1225,7 @@ fn test_agent_distribution_resolve_npx() { let resolved = dist.resolve().unwrap(); assert!(matches!(resolved, ResolvedDistribution::Npx { .. })); if let ResolvedDistribution::Npx { package, args } = resolved { - assert_eq!(package, "@agentclientprotocol/claude-agent-acp"); + assert_eq!(package, "@zed-industries/claude-agent-acp@0.23.1"); assert!(args.is_empty()); } } diff --git a/nori-rs/acp/src/connection/mod.rs b/nori-rs/acp/src/connection/mod.rs index 3c40e9b5b..a19e4a582 100644 --- a/nori-rs/acp/src/connection/mod.rs +++ b/nori-rs/acp/src/connection/mod.rs @@ -23,23 +23,6 @@ pub enum ConnectionEvent { ApprovalRequest(ApprovalRequest), } -pub(crate) fn session_update_kind(update: &acp::SessionUpdate) -> &'static str { - match update { - acp::SessionUpdate::AgentMessageChunk(_) => "agent_message_chunk", - acp::SessionUpdate::AgentThoughtChunk(_) => "agent_thought_chunk", - acp::SessionUpdate::UserMessageChunk(_) => "user_message_chunk", - acp::SessionUpdate::Plan(_) => "plan", - acp::SessionUpdate::ToolCall(_) => "tool_call", - acp::SessionUpdate::ToolCallUpdate(_) => "tool_call_update", - acp::SessionUpdate::AvailableCommandsUpdate(_) => "available_commands_update", - acp::SessionUpdate::CurrentModeUpdate(_) => "current_mode_update", - acp::SessionUpdate::ConfigOptionUpdate(_) => "config_option_update", - acp::SessionUpdate::SessionInfoUpdate(_) => "session_info_update", - acp::SessionUpdate::UsageUpdate(_) => "usage_update", - _ => "other", - } -} - /// The type of approval event to send to the UI. /// /// This enum allows us to use the more appropriate approval UI for different @@ -109,14 +92,3 @@ impl AcpModelState { } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn session_update_kind_labels_usage_update() { - let update = acp::SessionUpdate::UsageUpdate(acp::UsageUpdate::new(12, 100)); - assert_eq!(session_update_kind(&update), "usage_update"); - } -} diff --git a/nori-rs/acp/src/connection/sacp_connection.rs b/nori-rs/acp/src/connection/sacp_connection.rs index a5639a4f2..9e521abe1 100644 --- a/nori-rs/acp/src/connection/sacp_connection.rs +++ b/nori-rs/acp/src/connection/sacp_connection.rs @@ -5,7 +5,6 @@ //! `ConnectionTo` is `Send + Sync`, allowing direct async usage from the main //! tokio runtime without a dedicated thread or `LocalSet`. -use std::collections::HashMap; use std::path::Path; use std::process::Stdio; @@ -46,12 +45,6 @@ use sacp::UntypedMessage; /// Minimum supported ACP protocol version. const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1; -#[derive(Debug, Default)] -struct SessionPromptState { - update_seq: i64, - draining_cancel_tail: bool, -} - /// A thread-safe connection to an ACP agent subprocess using SACP v11. /// /// Unlike the old `AcpConnection`, this does NOT require a dedicated worker thread. @@ -73,10 +66,6 @@ pub struct SacpConnection { /// Ordered inbox of raw ACP events from the transport layer. event_rx: mpsc::Receiver, - /// Per-session prompt boundary state used to absorb stale terminal stop - /// responses after cancellation without widening the public phase model. - prompt_state: std::sync::Arc>>, - /// Thread-safe model state, updated on session creation and model switch. model_state: std::sync::Arc>, @@ -176,9 +165,6 @@ impl SacpConnection { let event_tx_for_notifications = event_tx.clone(); let event_tx_for_write = event_tx.clone(); let event_tx_for_read = event_tx.clone(); - let prompt_state = - std::sync::Arc::new(Mutex::new(HashMap::::new())); - let prompt_state_for_notifications = prompt_state.clone(); let approval_cwd = cwd.to_path_buf(); let write_cwd = cwd.to_path_buf(); let read_cwd = cwd.to_path_buf(); @@ -225,22 +211,7 @@ impl SacpConnection { .on_receive_notification( { let event_tx = event_tx_for_notifications; - let prompt_state = prompt_state_for_notifications; async move |notification: acp::SessionNotification, _connection| { - let session_id = notification.session_id.to_string(); - { - let mut prompt_state = prompt_state.lock().await; - prompt_state - .entry(session_id.clone()) - .or_default() - .update_seq += 1; - } - debug!( - target: "acp_event_flow", - session_id, - update_kind = super::session_update_kind(¬ification.update), - "Transport received ACP session/update notification" - ); if event_tx .send(ConnectionEvent::SessionUpdate(notification.update)) .await @@ -525,7 +496,6 @@ impl SacpConnection { cx, agent_capabilities: capabilities, event_rx, - prompt_state, model_state: std::sync::Arc::new(std::sync::RwLock::new(AcpModelState::new())), connection_task, child, @@ -597,82 +567,12 @@ impl SacpConnection { session_id: acp::SessionId, prompt: Vec, ) -> Result { - let session_key = session_id.to_string(); - let mut attempt = 0_i64; - - loop { - attempt += 1; - let (update_seq_before, draining_cancel_tail) = { - let mut prompt_state = self.prompt_state.lock().await; - let state = prompt_state.entry(session_key.clone()).or_default(); - (state.update_seq, state.draining_cancel_tail) - }; - - debug!( - target: "acp_event_flow", - session_id = %session_id, - content_blocks = prompt.len(), - attempt, - draining_cancel_tail, - update_seq_before, - "Transport sending ACP session/prompt request" - ); - let response = self - .cx - .send_request(acp::PromptRequest::new(session_id.clone(), prompt.clone())) - .block_task() - .await - .context("ACP prompt failed"); - - match response { - Ok(response) => { - let absorb_cancel_tail_end_turn = { - let mut prompt_state = self.prompt_state.lock().await; - let state = prompt_state.entry(session_key.clone()).or_default(); - let saw_updates = state.update_seq > update_seq_before; - let absorb = state.draining_cancel_tail - && !saw_updates - && response.stop_reason == acp::StopReason::EndTurn; - - if response.stop_reason == acp::StopReason::Cancelled { - state.draining_cancel_tail = true; - } else if !absorb { - state.draining_cancel_tail = false; - } - - debug!( - target: "acp_event_flow", - session_id = %session_id, - attempt, - stop_reason = ?response.stop_reason, - saw_updates, - absorb_cancel_tail_end_turn = absorb, - draining_cancel_tail_after = state.draining_cancel_tail, - update_seq_after = state.update_seq, - "Transport received ACP session/prompt response" - ); - - absorb - }; - - if absorb_cancel_tail_end_turn { - continue; - } - - return Ok(response.stop_reason); - } - Err(err) => { - warn!( - target: "acp_event_flow", - error = %err, - session_id = %session_id, - attempt, - "Transport session/prompt request failed" - ); - return Err(err); - } - } - } + self.cx + .send_request(acp::PromptRequest::new(session_id, prompt)) + .block_task() + .await + .context("ACP prompt failed") + .map(|r| r.stop_reason) } /// Cancel an ongoing prompt. diff --git a/nori-rs/acp/src/connection/sacp_connection_tests.rs b/nori-rs/acp/src/connection/sacp_connection_tests.rs index 048834b20..4cf2851b3 100644 --- a/nori-rs/acp/src/connection/sacp_connection_tests.rs +++ b/nori-rs/acp/src/connection/sacp_connection_tests.rs @@ -786,121 +786,3 @@ async fn test_sequential_prompt_after_cancel_receives_response() { .expect("Prompt 2 should not error after cancel"); assert_eq!(stop_reason_2, acp::StopReason::Cancelled); } - -/// Test that an immediate empty end_turn after a cancelled prompt does not -/// consume the next logical prompt turn. The connection should absorb that -/// stale terminal response and keep working until the user's follow-up prompt -/// receives real streamed content. -#[tokio::test] -#[serial] -async fn test_prompt_after_cancel_absorbs_empty_end_turn_tail() { - let Some(mut config) = mock_agent_config() else { - return; - }; - config.env.insert( - "MOCK_AGENT_STREAM_UNTIL_CANCEL".to_string(), - "1".to_string(), - ); - config.env.insert( - "MOCK_AGENT_CANCEL_TAIL_EMPTY_END_TURNS".to_string(), - "2".to_string(), - ); - config.env.insert( - "MOCK_AGENT_CANCEL_TAIL_FOLLOW_UP_RESPONSE".to_string(), - "Recovered after cancel tail".to_string(), - ); - - let temp_dir = tempdir().expect("temp dir"); - - let mut conn = SacpConnection::spawn( - &config, - temp_dir.path(), - crate::config::AcpProxyConfig::disabled(), - ) - .await - .expect("spawn"); - let mut event_rx = conn.take_event_receiver(); - - let session_id = conn - .create_session(temp_dir.path(), vec![]) - .await - .expect("create session"); - let conn = Arc::new(conn); - - let prompt1 = vec![acp::ContentBlock::Text(acp::TextContent::new("hello"))]; - let conn_for_prompt1 = Arc::clone(&conn); - let session_id_for_prompt1 = session_id.clone(); - let prompt1_task = tokio::spawn(async move { - conn_for_prompt1 - .prompt(session_id_for_prompt1, prompt1) - .await - }); - - let first_update = tokio::time::timeout(std::time::Duration::from_secs(5), event_rx.recv()) - .await - .expect("Prompt 1 should start streaming within 5s") - .expect("Event channel should stay open"); - assert!( - matches!( - first_update, - ConnectionEvent::SessionUpdate(acp::SessionUpdate::AgentMessageChunk(_)) - ), - "Prompt 1 should receive a streamed agent message before cancel" - ); - - conn.cancel(&session_id) - .await - .expect("prompt 1 cancel should succeed"); - - let stop_reason_1 = tokio::time::timeout(std::time::Duration::from_secs(5), prompt1_task) - .await - .expect("Prompt 1 should complete within 5s after cancel") - .expect("Prompt 1 task should not panic") - .expect("Prompt 1 should not error after cancel"); - assert_eq!(stop_reason_1, acp::StopReason::Cancelled); - - while tokio::time::timeout(std::time::Duration::from_millis(100), event_rx.recv()) - .await - .is_ok() - {} - - let prompt2 = vec![acp::ContentBlock::Text(acp::TextContent::new( - "what have you finished?", - ))]; - let conn_for_prompt2 = Arc::clone(&conn); - let session_id_for_prompt2 = session_id.clone(); - let prompt2_task = tokio::spawn(async move { - conn_for_prompt2 - .prompt(session_id_for_prompt2, prompt2) - .await - }); - - let second_update = tokio::time::timeout(std::time::Duration::from_secs(5), async { - loop { - let event = event_rx - .recv() - .await - .expect("Event channel should stay open"); - if let ConnectionEvent::SessionUpdate(acp::SessionUpdate::AgentMessageChunk(chunk)) = - event - && let acp::ContentBlock::Text(text) = chunk.content - { - return text.text; - } - } - }) - .await - .expect("Prompt 2 should receive streamed text after the stale end_turn tail is absorbed"); - - assert!( - second_update.contains("Recovered after cancel tail"), - "Prompt 2 should receive its real response after the stale cancel tail, got: {second_update:?}" - ); - - let stop_reason_2 = tokio::time::timeout(std::time::Duration::from_secs(5), prompt2_task) - .await - .expect("Prompt 2 should complete within 5s") - .expect("Prompt 2 task should not panic") - .expect("Prompt 2 should not error"); - assert_eq!(stop_reason_2, acp::StopReason::EndTurn); -} diff --git a/nori-rs/acp/src/registry.rs b/nori-rs/acp/src/registry.rs index 3ea0ef2ca..5494852cd 100644 --- a/nori-rs/acp/src/registry.rs +++ b/nori-rs/acp/src/registry.rs @@ -97,9 +97,7 @@ impl AgentKind { /// Get the ACP adapter package name for launching this agent pub fn acp_package(&self) -> &'static str { match self { - // @latest forces bunx to resolve the new scope instead of a stale - // @zed-industries cache entry with the same unscoped package name. - AgentKind::ClaudeCode => "@agentclientprotocol/claude-agent-acp@latest", + AgentKind::ClaudeCode => "@zed-industries/claude-agent-acp@0.23.1", // Codex uses Zed's ACP adapter AgentKind::Codex => "@zed-industries/codex-acp", // Gemini has native ACP support @@ -1020,7 +1018,7 @@ mod tests { assert!( config .args - .contains(&"@agentclientprotocol/claude-agent-acp@latest".to_string()) + .contains(&"@zed-industries/claude-agent-acp@0.23.1".to_string()) ); assert_eq!(config.provider_info.name, "Claude Code ACP"); } diff --git a/nori-rs/deny.toml b/nori-rs/deny.toml index cc7920b41..08e2048a4 100644 --- a/nori-rs/deny.toml +++ b/nori-rs/deny.toml @@ -74,7 +74,6 @@ unmaintained = "workspace" # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ - { id = "RUSTSEC-2026-0002", reason = "lru 0.12.5 is pinned by the ratatui patch branch in [patch.crates-io]; track removal once ratatui can move to a fixed lru release" }, { id = "RUSTSEC-2026-0097", reason = "rand 0.8.x pinned by oauth2/zbus/secret-service; no custom logger uses ThreadRng in this codebase" }, ] # If this is true, then cargo deny will use the git executable to fetch advisory database. diff --git a/nori-rs/mock-acp-agent/docs.md b/nori-rs/mock-acp-agent/docs.md index c0cb9e446..a0384fdbd 100644 --- a/nori-rs/mock-acp-agent/docs.md +++ b/nori-rs/mock-acp-agent/docs.md @@ -30,8 +30,6 @@ Used by `@/nori-rs/tui-pty-e2e/` for end-to-end integration testing. The mock ag **Prompt Echo**: The `MOCK_AGENT_ECHO_PROMPT` env var causes the mock agent's `prompt()` handler to echo back the full prompt text it received. Used by session context tests in `@/nori-rs/acp/src/backend/tests/part5.rs` to verify that `AcpBackendConfig.session_context` is correctly prepended to the first user prompt and consumed after that. -**Cancel Tail Ordering**: The `MOCK_AGENT_CANCEL_TAIL_EMPTY_END_TURNS` env var reproduces the Claude-style cancel tail that motivated the ACP cancellation-ordering fix. When a streaming prompt is cancelled, the mock agent queues N immediate empty `end_turn` responses for the next prompt attempts before finally allowing the real follow-up prompt to complete. `MOCK_AGENT_CANCEL_TAIL_FOLLOW_UP_RESPONSE` overrides the text returned by that eventual real follow-up turn. These knobs are used by `@/nori-rs/acp/src/connection/sacp_connection_tests.rs` and `@/nori-rs/tui-pty-e2e/tests/streaming.rs` to verify that Nori absorbs repeated stale terminal responses without admitting a new logical prompt turn too early. - **Stuck Tool Calls (No Completion)**: The `MOCK_AGENT_STUCK_TOOL_CALLS` env var triggers a scenario where 3 Read tool calls are sent with `Pending` status but never receive completion updates. After a short delay the agent sends its final text response and ends the turn. This reproduces the frozen-display bug where incomplete ExecCells fill the viewport and block `insert_history_lines()` from rendering the agent's text. The fix under test is `finalize_active_cell_as_failed()` in `@/nori-rs/tui/src/chatwidget.rs`. **Runaway Search Snapshot Amplification**: The `MOCK_AGENT_RUNAWAY_SEARCH` env var triggers a deterministic Search tool-call stream that repeatedly emits `InProgress` updates for the **same** `call_id` while the text artifact grows cumulatively on every update. Tunables: @@ -74,7 +72,7 @@ This simulates the real-world race condition that the `InterruptManager.flush_co - Uses the same ACP protocol as real agents for realistic testing - Simulates streaming with configurable chunk delays - Supports permission options (accept, deny, skip) -- Session state is tracked per-session ID, including cancel-tail replay state for ordering regressions +- Session state is tracked per-session ID - Sleep durations between mock events are tuned to create reliable timing in E2E tests Created and maintained by Nori. diff --git a/nori-rs/mock-acp-agent/src/main.rs b/nori-rs/mock-acp-agent/src/main.rs index bd5ec3525..4e8dacf68 100644 --- a/nori-rs/mock-acp-agent/src/main.rs +++ b/nori-rs/mock-acp-agent/src/main.rs @@ -42,8 +42,6 @@ struct MockAgent { client_request_tx: mpsc::UnboundedSender, next_session_id: Cell, cancel_requested: Cell, - pending_cancel_tail_empty_end_turns: Cell, - follow_up_after_cancel_tail: Cell, } impl MockAgent { @@ -56,8 +54,6 @@ impl MockAgent { next_session_id: Cell::new(0), client_request_tx, cancel_requested: Cell::new(false), - pending_cancel_tail_empty_end_turns: Cell::new(0), - follow_up_after_cancel_tail: Cell::new(false), } } @@ -289,14 +285,6 @@ impl acp::Agent for MockAgent { eprintln!("Mock agent: prompt"); self.cancel_requested.set(false); let session_id = arguments.session_id.clone(); - let pending_cancel_tail_empty_end_turns = self.pending_cancel_tail_empty_end_turns.get(); - if pending_cancel_tail_empty_end_turns > 0 { - self.pending_cancel_tail_empty_end_turns - .set(pending_cancel_tail_empty_end_turns - 1); - eprintln!("Mock agent: emitting empty end_turn from cancel tail"); - return Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)); - } - let complete_after_default_response = self.follow_up_after_cancel_tail.replace(false); // Support configurable stderr output for testing stderr capture if let Ok(count_str) = std::env::var("MOCK_AGENT_STDERR_COUNT") @@ -761,14 +749,6 @@ impl acp::Agent for MockAgent { return Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)); } - if complete_after_default_response { - let response = std::env::var("MOCK_AGENT_CANCEL_TAIL_FOLLOW_UP_RESPONSE") - .unwrap_or_else(|_| "Recovered after cancel tail".to_string()); - self.send_text_chunk(session_id.clone(), &response).await?; - eprintln!("Mock agent: completing follow-up prompt after cancel tail"); - return Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)); - } - // Support custom response text for TUI testing if let Ok(response) = std::env::var("MOCK_AGENT_RESPONSE") { self.send_text_chunk(session_id.clone(), &response).await?; @@ -1024,21 +1004,6 @@ impl acp::Agent for MockAgent { sleep(Duration::from_millis(10)).await; } - let cancel_tail_empty_end_turns = - std::env::var("MOCK_AGENT_CANCEL_TAIL_EMPTY_END_TURNS") - .ok() - .and_then(|count| count.parse::().ok()) - .unwrap_or(0); - if self.cancel_requested.get() && cancel_tail_empty_end_turns > 0 { - self.pending_cancel_tail_empty_end_turns - .set(cancel_tail_empty_end_turns); - self.follow_up_after_cancel_tail.set(true); - eprintln!( - "Mock agent: queued {} empty end_turn responses after cancel", - cancel_tail_empty_end_turns - ); - } - return Ok(acp::PromptResponse::new(if self.cancel_requested.get() { acp::StopReason::Cancelled } else { diff --git a/nori-rs/tui-pty-e2e/tests/streaming.rs b/nori-rs/tui-pty-e2e/tests/streaming.rs index fd640b536..01b8ad276 100644 --- a/nori-rs/tui-pty-e2e/tests/streaming.rs +++ b/nori-rs/tui-pty-e2e/tests/streaming.rs @@ -210,56 +210,3 @@ fn test_prompt_still_streams_after_interrupt() { ) .expect("Second prompt should also be interruptible"); } - -#[test] -#[cfg(target_os = "linux")] -fn test_prompt_after_interrupt_absorbs_empty_end_turn_tail() { - let config = SessionConfig::new() - .with_stream_until_cancel() - .with_agent_env("MOCK_AGENT_CANCEL_TAIL_EMPTY_END_TURNS", "2") - .with_agent_env( - "MOCK_AGENT_CANCEL_TAIL_FOLLOW_UP_RESPONSE", - "Recovered after cancel tail", - ); - let mut session = TuiSession::spawn_with_config(24, 80, config).unwrap(); - - session - .wait_for_text("›", TIMEOUT) - .expect("Prompt did not appear"); - std::thread::sleep(TIMEOUT_INPUT); - - session.send_str("first try").unwrap(); - std::thread::sleep(TIMEOUT_INPUT); - session.send_key(Key::Enter).unwrap(); - session - .wait_for_text("esc to interrupt", TIMEOUT) - .expect("First prompt should become interruptible"); - - session.send_key(Key::Escape).unwrap(); - session - .wait_for_text( - "Conversation interrupted - tell the model what to do differently", - TIMEOUT, - ) - .expect("First prompt should report interrupt"); - session - .wait_for_text("›", TIMEOUT) - .expect("Prompt should return after first interrupt"); - - session.send_str("what have you finished?").unwrap(); - std::thread::sleep(TIMEOUT_INPUT); - session.send_key(Key::Enter).unwrap(); - - session - .wait_for_text("Recovered after cancel tail", TIMEOUT) - .expect("The follow-up prompt should still produce a real response after the stale end_turn tail"); - session - .wait_for( - |screen| { - screen.contains("Recovered after cancel tail") - && !screen.contains("esc to interrupt") - }, - TIMEOUT, - ) - .expect("The follow-up prompt should settle instead of getting stuck in another interruptible turn"); -} diff --git a/spec/acp-cancel-turn-boundary-investigation/BUG_REPORT.md b/spec/acp-cancel-turn-boundary-investigation/BUG_REPORT.md deleted file mode 100644 index f0952ea08..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/BUG_REPORT.md +++ /dev/null @@ -1,312 +0,0 @@ -# ACP Cancel Turn Boundary Bug Report - -## Summary - -Nori releases the ACP session back to the UI too early after a cancelled prompt turn. -In the reproduced Claude ACP session, Nori emits `SessionPhaseChanged(Idle)` and -`PromptCompleted(stop_reason=Cancelled)` immediately after the cancelled prompt -response, then accepts the user's next prompt as a fresh turn. That follow-up -prompt is then immediately completed with `stopReason=end_turn` and no content. - -This appears to be a Nori turn-boundary handling bug, not an ACP adapter bug. -The reproduced wire stream is permissible ACP behavior and matches the shape -that other ACP clients are expected to tolerate. - -This document records the investigation only. It does not propose a fix. - -## Impact - -- User presses `Ctrl-C` during an ACP agent turn. -- Nori shows the interrupted state and returns the prompt. -- The next user prompt is accepted immediately. -- That prompt is then completed immediately with an empty `end_turn`. -- The UI appears to consume a stale stop/completion signal instead of treating - the post-cancel tail as part of the cancelled turn's completion lifecycle. - -The captured TUI state is backed up in: - -- `evidence/tui-capture-after-cancel.txt` -- `evidence/tui-capture-after-followup.txt` - -## Reproduction - -### Environment - -- Branch: `debug-acp-cancel-ordering-trace` -- Binary: `codex-rs/target/debug/nori` -- Agent: `claude-debug-acp` -- Logging: - - `RUST_LOG=acp_event_flow=debug,sacp::jsonrpc::handlers=debug,nori_tui=info` -- Runtime home: - - `.tmp/claude-debug-repro-home-2` - -### Steps - -1. Start `nori --agent claude-debug-acp --skip-trust-directory`. -2. Submit: - - `I'm testing something. Just run a foreground sleep 30 task, then say 'all done!'` -3. Wait until the tool call shows `sleep 30`. -4. Press `Ctrl-C`. -5. Submit: - - `what have you finished` - -### Observed Result - -- The cancelled turn ends. -- Nori returns to idle and accepts the follow-up prompt. -- The follow-up prompt immediately receives `stopReason=end_turn` with zero usage. -- The TUI jumps to a fresh prompt with no assistant answer. - -## Evidence Directory - -All backed-up artifacts for this investigation live in: - -- `spec/acp-cancel-turn-boundary-investigation/evidence/` - -Artifacts: - -- `wire-session.log` -- `nori-acp-trace.log` -- `nori-tui.log` -- `tui-capture-after-cancel.txt` -- `tui-capture-after-followup.txt` -- `acp-cancellation-spec.txt` -- `acp-session-update-note.txt` -- `sacp-ordering-excerpt.txt` -- `sacp-session-excerpt.txt` -- `toad-conversation-excerpt.py` -- `toad-agent-excerpt.py` -- `instrumentation.diff` - -## Raw Wire Evidence - -The reproduced wire sequence for the main session is: - -1. Client sends `session/cancel`. -2. Agent sends `session/update` with `usage_update`. -3. Agent responds to the original prompt request with `stopReason=cancelled`. -4. Client sends the next `session/prompt`. -5. Agent immediately responds with `stopReason=end_turn` and zero usage. - -See `evidence/wire-session.log`: - -- line 22: `session/cancel` -- line 23: post-cancel `usage_update` -- line 24: cancelled prompt response -- line 25: next `session/prompt` -- line 26: immediate empty `end_turn` - -This ordering is also visible in the original log copy: - -- [codex-rs/debug-acp-claude.log](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/debug-acp-claude.log:22) - -## ACP Specification References - -The ACP reference explicitly allows final `session/update` traffic after cancel, -as long as those updates are sent before the cancelled prompt response: - -- `acp-cancellation-spec.txt` -- Source reference: - - `/home/clifford/Documents/source/nori/docs/references/acp-llms-full.txt:3048` - - `/home/clifford/Documents/source/nori/docs/references/acp-llms-full.txt:3078` - -Key points from the reference: - -- The client may cancel with `session/cancel`. -- The agent must eventually respond to the original `session/prompt` with - `stopReason=cancelled`. -- The agent may still send `session/update` notifications after receiving - `session/cancel`, but before responding to `session/prompt`. -- The client should still accept those updates. - -There is also a specific note in the `session/update` section that clients -should continue accepting updates after cancel: - -- `acp-session-update-note.txt` -- Source reference: - - `/home/clifford/Documents/source/nori/docs/references/acp-llms-full.txt:3895` - -Nothing in the reproduced wire stream violates these requirements. - -## Investigation Log - -### 1. Initial hypothesis check - -The first investigation pass considered whether Nori was locally reordering -`session/update` notifications and prompt results while merging: - -- transport notifications from `event_rx` -- prompt results from `prompt_result_rx` - -Instrumentation was added at: - -- transport ingress: - - [codex-rs/acp/src/connection/sacp_connection.rs](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/acp/src/connection/sacp_connection.rs:168) -- relay merge point: - - [codex-rs/acp/src/backend/spawn_and_relay.rs](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/acp/src/backend/spawn_and_relay.rs:258) -- reducer: - - [codex-rs/acp/src/backend/session_reducer.rs](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/acp/src/backend/session_reducer.rs:44) -- runtime driver: - - [codex-rs/acp/src/backend/session_runtime_driver.rs](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/acp/src/backend/session_runtime_driver.rs:127) - -That tracing established that Nori did see the post-cancel `usage_update` before -the cancelled prompt response in the reproduced run. - -### 2. Added turn-boundary tracing - -To determine exactly when Nori released control back to the UI, two more -tracing points were added: - -- prompt admission into the ACP backend: - - [codex-rs/acp/src/backend/user_input.rs](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/acp/src/backend/user_input.rs:164) -- emitted client events, especially: - - `SessionPhaseChanged` - - `PromptCompleted` - - [codex-rs/acp/src/backend/session_runtime_driver.rs](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/acp/src/backend/session_runtime_driver.rs:304) - -These extra traces are what closed the loop. - -### 3. Reproduced with boundary tracing enabled - -The decisive sequence in `evidence/nori-acp-trace.log` is: - -1. Nori marks the active prompt as cancelling. - - line 54 -2. Nori forwards `SessionPhaseChanged(Cancelling)`. - - line 56 -3. Nori sends `session/cancel`. - - line 57 -4. Nori receives the cancelled prompt response. - - lines 64-66 -5. Nori finalizes that prompt response. - - line 68 -6. Nori forwards `SessionPhaseChanged(Idle)`. - - line 70 -7. Nori forwards `PromptCompleted(stop_reason=Cancelled)`. - - line 71 -8. Only after that, Nori accepts the user's follow-up prompt as a new turn. - - line 72 -9. That new turn immediately receives `stopReason=EndTurn`. - - lines 78-85 - -This sequence is the key investigation result. - -## What The Trace Proves - -The new trace proves all of the following in the reproduced session: - -### A. Nori can receive and parse both stop reasons off the wire - -Nori successfully parses: - -- the cancelled response for the interrupted turn -- the subsequent `end_turn` response that arrives after the next `session/prompt` - -Evidence: - -- `wire-session.log` lines 24 and 26 -- `nori-acp-trace.log` lines 64-66 and 78-80 - -So this is not a failure to deserialize or understand the wire payloads. - -### B. Nori releases the cancelled turn before the next logical stop boundary is fully resolved - -Nori explicitly emits: - -- `SessionPhaseChanged(Idle)` at line 70 -- `PromptCompleted(Cancelled)` at line 71 - -and then admits the follow-up prompt at line 72. - -That is the precise point where control returns to the UI and a new user turn -becomes possible. - -### C. The follow-up prompt is treated as a brand new turn - -The follow-up prompt is not queued behind additional cancel-tail processing. -It is accepted from idle: - -- `phase_before_submit="idle"` -- `active_request_id_before_submit=""` - -Evidence: - -- `nori-acp-trace.log` line 72 - -### D. The empty `end_turn` disrupts the following prompt turn, not the cancelled one - -The prompt request for `what have you finished` is started with a new request id: - -- line 73 - -That request then immediately gets `EndTurn`: - -- lines 78-85 - -This means the disruptive `end_turn` is observed as the response to the new -prompt turn after Nori has already returned to idle. - -## Comparison With SACP Ordering APIs - -Nori's current prompt path uses `block_task()`: - -- [codex-rs/acp/src/connection/sacp_connection.rs](/home/clifford/Documents/source/nori/cli/.worktrees/debug-acp-cancel-ordering-trace/codex-rs/acp/src/connection/sacp_connection.rs:527) - -SACP documents that `block_task()` acknowledges the response immediately: - -- `sacp-ordering-excerpt.txt` -- source: - - `/home/clifford/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sacp-10.1.0/src/jsonrpc.rs:2747` - -By contrast, `on_receiving_result()` keeps ordering until the callback completes: - -- `sacp-ordering-excerpt.txt` -- source: - - `/home/clifford/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sacp-10.1.0/src/jsonrpc.rs:2915` - -The session-oriented helper in SACP uses `on_receiving_result()` for prompt -completion and then keeps reading updates until a stop reason is drained: - -- `sacp-session-excerpt.txt` -- source: - - `/home/clifford/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sacp-10.1.0/src/session.rs:554` - - `/home/clifford/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/sacp-10.1.0/src/session.rs:588` - -This comparison does not by itself prove the root cause, but it is relevant -because Nori's current prompt handling takes the "ack immediately and process -later" path rather than a session-scoped "keep consuming until the turn is -actually over" path. - -## Comparison With Toad - -The example ACP client `toad` is also useful as a reference point. - -Its conversation layer waits for `agent.send_prompt(prompt)` and only then -calls `agent_turn_over(stop_reason)`: - -- `toad-conversation-excerpt.py` -- source: - - `/home/clifford/Documents/source/nori/cli/.worktrees/plan-session-update-support/toad/src/toad/widgets/conversation.py:822` - -Its ACP agent layer waits on the ACP prompt request and returns the stop reason: - -- `toad-agent-excerpt.py` -- source: - - `/home/clifford/Documents/source/nori/cli/.worktrees/plan-session-update-support/toad/src/toad/acp/agent.py:739` - -Again, this report is not claiming a fix from Toad's implementation. The -comparison is included because the same adapter is known to work in other ACP -clients, so Nori's turn-boundary handling is the thing under investigation. - -## Conclusion - -The evidence gathered here supports the following bug statement: - -> Nori completes and releases a cancelled ACP turn too early. It forwards -> `PromptCompleted(Cancelled)` and `SessionPhaseChanged(Idle)` immediately after -> the cancelled prompt response, then admits the next user prompt as a fresh -> turn. In the reproduced session, that fresh turn is immediately completed by -> an empty `end_turn`, producing the visible off-by-one stop-boundary bug. - -This report intentionally stops at attribution and evidence. It does not -recommend or document a fix. diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/acp-cancellation-spec.txt b/spec/acp-cancel-turn-boundary-investigation/evidence/acp-cancellation-spec.txt deleted file mode 100644 index a12c28d44..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/acp-cancellation-spec.txt +++ /dev/null @@ -1,35 +0,0 @@ -Clients **MAY** cancel an ongoing prompt turn at any time by sending a `session/cancel` notification: - -```json theme={null} -{ - "jsonrpc": "2.0", - "method": "session/cancel", - "params": { - "sessionId": "sess_abc123def456" - } -} -``` - -The Client **SHOULD** preemptively mark all non-finished tool calls pertaining to the current turn as `cancelled` as soon as it sends the `session/cancel` notification. - -The Client **MUST** respond to all pending `session/request_permission` requests with the `cancelled` outcome. - -When the Agent receives this notification, it **SHOULD** stop all language model requests and all tool call invocations as soon as possible. - -After all ongoing operations have been successfully aborted and pending updates have been sent, the Agent **MUST** respond to the original `session/prompt` request with the `cancelled` [stop reason](#stop-reasons). - - - API client libraries and tools often throw an exception when their operation is aborted, which may propagate as an error response to `session/prompt`. - - Clients often display unrecognized errors from the Agent to the user, which would be undesirable for cancellations as they aren't considered errors. - - Agents **MUST** catch these errors and return the semantically meaningful `cancelled` stop reason, so that Clients can reliably confirm the cancellation. - - -The Agent **MAY** send `session/update` notifications with content or tool call updates after receiving the `session/cancel` notification, but it **MUST** ensure that it does so before responding to the `session/prompt` request. - -The Client **SHOULD** still accept tool call updates received after sending `session/cancel`. - -*** - -Once a prompt turn completes, the Client may send another `session/prompt` to continue the conversation, building on the context established in previous turns. diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/acp-session-update-note.txt b/spec/acp-cancel-turn-boundary-investigation/evidence/acp-session-update-note.txt deleted file mode 100644 index 3d194f034..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/acp-session-update-note.txt +++ /dev/null @@ -1,15 +0,0 @@ - -Handles session update notifications from the agent. - -This is a notification endpoint (no response expected) that receives -real-time updates about session progress, including message chunks, -tool calls, and execution plans. - -Note: Clients SHOULD continue accepting tool call updates even after -sending a `session/cancel` notification, as the agent may send final -updates before responding with the cancelled stop reason. - -See protocol docs: [Agent Reports Output](https://agentclientprotocol.com/protocol/prompt-turn#3-agent-reports-output) - -#### SessionNotification - diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/instrumentation.diff b/spec/acp-cancel-turn-boundary-investigation/evidence/instrumentation.diff deleted file mode 100644 index 5d8b87d64..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/instrumentation.diff +++ /dev/null @@ -1,630 +0,0 @@ -diff --git a/codex-rs/acp/src/backend/session_reducer.rs b/codex-rs/acp/src/backend/session_reducer.rs -index b691e409..7574022d 100644 ---- a/codex-rs/acp/src/backend/session_reducer.rs -+++ b/codex-rs/acp/src/backend/session_reducer.rs -@@ -18,6 +18,7 @@ use nori_protocol::session_runtime::SessionPhase; - use nori_protocol::session_runtime::SessionRuntime; - use nori_protocol::session_runtime::TranscriptMessage; - use nori_protocol::session_runtime::TranscriptRole; -+use tracing::debug; - - /// Everything that can affect [`SessionRuntime`] state. - #[derive(Debug)] -@@ -40,6 +41,32 @@ pub enum InboundEvent { - LoadSubmit { request_id: String }, - } - -+pub(super) fn inbound_event_kind(event: &InboundEvent) -> &'static str { -+ match event { -+ InboundEvent::Notification(update) => crate::connection::session_update_kind(update), -+ InboundEvent::PromptResponse { .. } => "prompt_response", -+ InboundEvent::PromptFailed => "prompt_failed", -+ InboundEvent::LoadResponse => "load_response", -+ InboundEvent::PermissionRequest { .. } => "permission_request", -+ InboundEvent::PromptSubmit(_) => "prompt_submit", -+ InboundEvent::CancelSubmit => "cancel_submit", -+ InboundEvent::LoadSubmit { .. } => "load_submit", -+ } -+} -+ -+pub(super) fn session_phase_label(phase: &SessionPhase) -> &'static str { -+ match phase { -+ SessionPhase::Idle => "idle", -+ SessionPhase::Loading { .. } => "loading", -+ SessionPhase::Prompt { -+ cancelling: true, .. -+ } => "cancelling", -+ SessionPhase::Prompt { -+ cancelling: false, .. -+ } => "prompt", -+ } -+} -+ - /// Side effects the caller must execute after reduction. - #[derive(Debug, PartialEq)] - pub enum SideEffect { -@@ -116,6 +143,12 @@ fn reduce_prompt_submit( - ) { - if runtime.phase != SessionPhase::Idle { - runtime.queue.push_back(prompt); -+ debug!( -+ target: "acp_event_flow", -+ phase = session_phase_label(&runtime.phase), -+ queue_len = runtime.queue.len(), -+ "Queued prompt while another session request is active" -+ ); - out.events.push(ClientEvent::QueueChanged(QueueChanged { - prompts: queued_prompt_texts(runtime), - })); -@@ -126,6 +159,7 @@ fn reduce_prompt_submit( - } - - fn start_prompt(runtime: &mut SessionRuntime, prompt: QueuedPrompt, out: &mut ReduceOutput) { -+ let phase_before = session_phase_label(&runtime.phase); - let request_id = new_request_id(); - - // Build ACP content blocks from the queued prompt. -@@ -154,6 +188,15 @@ fn start_prompt(runtime: &mut SessionRuntime, prompt: QueuedPrompt, out: &mut Re - }); - } - -+ debug!( -+ target: "acp_event_flow", -+ request_id = %request_id, -+ prompt_kind = ?prompt.kind, -+ phase_before, -+ queue_len = runtime.queue.len(), -+ "Reducer started prompt and emitted session/prompt side effect" -+ ); -+ - out.events - .push(ClientEvent::SessionPhaseChanged(runtime.phase_view())); - out.side_effects.push(SideEffect::SendPrompt { -@@ -198,6 +241,20 @@ fn reduce_cancel_submit(runtime: &mut SessionRuntime, out: &mut ReduceOutput) { - } - } - -+ debug!( -+ target: "acp_event_flow", -+ request_id = %owner_id, -+ pending_permission_requests = runtime -+ .active -+ .as_ref() -+ .map_or(0, |active| active.pending_permission_requests.len()), -+ tool_calls = runtime -+ .active -+ .as_ref() -+ .map_or(0, |active| active.tool_call_ids.len()), -+ "Reducer marked the active prompt as cancelling" -+ ); -+ - out.events - .push(ClientEvent::SessionPhaseChanged(runtime.phase_view())); - out.side_effects.push(SideEffect::SendCancel); -@@ -213,6 +270,22 @@ fn reduce_prompt_response( - stop_reason: acp::StopReason, - out: &mut ReduceOutput, - ) { -+ let active_request_id = runtime -+ .active -+ .as_ref() -+ .map(|active| active.request_id.clone()) -+ .unwrap_or_else(|| "".to_string()); -+ let phase_before = session_phase_label(&runtime.phase); -+ let queue_len_before = runtime.queue.len(); -+ debug!( -+ target: "acp_event_flow", -+ active_request_id, -+ phase_before, -+ queue_len_before, -+ ?stop_reason, -+ "Reducer received prompt response" -+ ); -+ - if !matches!(runtime.phase, SessionPhase::Prompt { .. }) { - out.events.push(ClientEvent::Warning(WarningInfo { - message: "Received prompt response while not in Prompt phase".to_string(), -@@ -225,6 +298,15 @@ fn reduce_prompt_response( - - runtime.phase = SessionPhase::Idle; - -+ debug!( -+ target: "acp_event_flow", -+ active_request_id, -+ ?stop_reason, -+ should_drain_queue, -+ queue_len_after_finalize = runtime.queue.len(), -+ "Reducer finalized prompt response" -+ ); -+ - out.events - .push(ClientEvent::SessionPhaseChanged(runtime.phase_view())); - out.events -@@ -242,6 +324,18 @@ fn reduce_prompt_response( - } - - fn reduce_prompt_failed(runtime: &mut SessionRuntime, out: &mut ReduceOutput) { -+ let active_request_id = runtime -+ .active -+ .as_ref() -+ .map(|active| active.request_id.clone()) -+ .unwrap_or_else(|| "".to_string()); -+ debug!( -+ target: "acp_event_flow", -+ active_request_id, -+ phase = session_phase_label(&runtime.phase), -+ "Reducer received prompt failure" -+ ); -+ - if !matches!(runtime.phase, SessionPhase::Prompt { .. }) { - out.events.push(ClientEvent::Warning(WarningInfo { - message: "Received prompt failure while not in Prompt phase".to_string(), -@@ -308,6 +402,18 @@ fn reduce_notification( - normalizer: &mut ClientEventNormalizer, - out: &mut ReduceOutput, - ) { -+ debug!( -+ target: "acp_event_flow", -+ update_kind = crate::connection::session_update_kind(&update), -+ phase = session_phase_label(&runtime.phase), -+ active_request_id = runtime -+ .active -+ .as_ref() -+ .map(|active| active.request_id.as_str()) -+ .unwrap_or(""), -+ "Reducer received session/update" -+ ); -+ - // Session metadata updates are accepted in any phase. - if is_session_metadata_update(&update) { - reduce_metadata_update(runtime, &update, normalizer, out); -diff --git a/codex-rs/acp/src/backend/session_reducer/tests.rs b/codex-rs/acp/src/backend/session_reducer/tests.rs -index b03d44e4..27e75593 100644 ---- a/codex-rs/acp/src/backend/session_reducer/tests.rs -+++ b/codex-rs/acp/src/backend/session_reducer/tests.rs -@@ -11,7 +11,9 @@ use pretty_assertions::assert_eq; - - use super::InboundEvent; - use super::SideEffect; -+use super::inbound_event_kind; - use super::reduce; -+use super::session_phase_label; - - fn new_runtime() -> SessionRuntime { - SessionRuntime::new() -@@ -125,6 +127,41 @@ fn prompt_response_transitions_to_idle() { - ))); - } - -+#[test] -+fn inbound_event_kind_labels_prompt_response() { -+ assert_eq!( -+ inbound_event_kind(&InboundEvent::PromptResponse { -+ stop_reason: acp::StopReason::Cancelled, -+ }), -+ "prompt_response" -+ ); -+} -+ -+#[test] -+fn session_phase_label_labels_known_phases() { -+ assert_eq!(session_phase_label(&SessionPhase::Idle), "idle"); -+ assert_eq!( -+ session_phase_label(&SessionPhase::Prompt { -+ request_id: "req-1".to_string(), -+ cancelling: false, -+ }), -+ "prompt" -+ ); -+ assert_eq!( -+ session_phase_label(&SessionPhase::Prompt { -+ request_id: "req-1".to_string(), -+ cancelling: true, -+ }), -+ "cancelling" -+ ); -+ assert_eq!( -+ session_phase_label(&SessionPhase::Loading { -+ request_id: "req-2".to_string(), -+ }), -+ "loading" -+ ); -+} -+ - // ========================================================================= - // 2. Cancel semantics - // ========================================================================= -diff --git a/codex-rs/acp/src/backend/session_runtime_driver.rs b/codex-rs/acp/src/backend/session_runtime_driver.rs -index f3e345f8..cc038641 100644 ---- a/codex-rs/acp/src/backend/session_runtime_driver.rs -+++ b/codex-rs/acp/src/backend/session_runtime_driver.rs -@@ -36,6 +36,24 @@ pub(crate) struct ReducerActions { - pub completed_turn: Option, - } - -+fn client_event_kind(event: &ClientEvent) -> &'static str { -+ match event { -+ ClientEvent::SessionUpdateInfo(_) => "session_update_info", -+ ClientEvent::SessionPhaseChanged(_) => "session_phase_changed", -+ ClientEvent::QueueChanged(_) => "queue_changed", -+ ClientEvent::MessageDelta(_) => "message_delta", -+ ClientEvent::PromptCompleted(_) => "prompt_completed", -+ ClientEvent::ToolSnapshot(_) => "tool_snapshot", -+ ClientEvent::ApprovalRequest(_) => "approval_request", -+ ClientEvent::AgentCommandsUpdate(_) => "agent_commands_update", -+ ClientEvent::PlanSnapshot(_) => "plan_snapshot", -+ ClientEvent::LoadCompleted => "load_completed", -+ ClientEvent::ContextCompacted(_) => "context_compacted", -+ ClientEvent::Warning(_) => "warning", -+ ClientEvent::ReplayEntry(_) => "replay_entry", -+ } -+} -+ - impl SessionDriver { - pub(crate) fn new() -> Self { - Self { -@@ -82,6 +100,14 @@ impl SessionDriver { - .map(|active| active.request_id.clone()) - } - -+ pub(crate) fn phase_label(&self) -> &'static str { -+ session_reducer::session_phase_label(&self.runtime.phase) -+ } -+ -+ pub(crate) fn queue_len(&self) -> usize { -+ self.runtime.queue.len() -+ } -+ - pub(crate) fn push_permission_request( - &mut self, - request: &crate::connection::ApprovalRequest, -@@ -122,9 +148,27 @@ impl AcpBackend { - event, - InboundEvent::PromptResponse { .. } | InboundEvent::PromptFailed - ); -+ let event_kind = session_reducer::inbound_event_kind(&event); - let actions = { - let mut driver = self.session_driver.lock().await; -- driver.apply(event) -+ let phase_before = driver.phase_label(); -+ let active_before = driver.active_request_id(); -+ let queue_len_before = driver.queue_len(); -+ let actions = driver.apply(event); -+ debug!( -+ target: "acp_event_flow", -+ event_kind, -+ phase_before, -+ active_request_id_before = active_before.as_deref().unwrap_or(""), -+ queue_len_before, -+ phase_after = driver.phase_label(), -+ active_request_id_after = driver.active_request_id().as_deref().unwrap_or(""), -+ queue_len_after = driver.queue_len(), -+ client_events = actions.events.len(), -+ side_effects = actions.side_effects.len(), -+ "Applied reducer event in serialized session runtime" -+ ); -+ actions - }; - self.dispatch_reducer_actions(actions).await; - if start_idle_timer { -@@ -258,6 +302,35 @@ impl AcpBackend { - } - - async fn forward_and_record_client_event(&self, client_event: ClientEvent) { -+ match &client_event { -+ ClientEvent::SessionPhaseChanged(phase) => { -+ debug!( -+ target: "acp_event_flow", -+ client_event = client_event_kind(&client_event), -+ ?phase, -+ "Forwarding client event from ACP backend" -+ ); -+ } -+ ClientEvent::PromptCompleted(completed) => { -+ debug!( -+ target: "acp_event_flow", -+ client_event = client_event_kind(&client_event), -+ stop_reason = ?completed.stop_reason, -+ has_last_agent_message = completed -+ .last_agent_message -+ .as_ref() -+ .is_some_and(|message| !message.is_empty()), -+ "Forwarding client event from ACP backend" -+ ); -+ } -+ _ => { -+ debug!( -+ target: "acp_event_flow", -+ client_event = client_event_kind(&client_event), -+ "Forwarding client event from ACP backend" -+ ); -+ } -+ } - emit_client_event( - &self.backend_event_tx, - self.transcript_recorder.as_ref(), -@@ -415,7 +488,7 @@ impl AcpBackend { - - async fn execute_side_effect(&self, side_effect: SideEffect) { - match side_effect { -- SideEffect::SendPrompt { prompt, .. } => { -+ SideEffect::SendPrompt { request_id, prompt } => { - if let Some(abort_handle) = self.idle_timer_abort.lock().await.take() { - abort_handle.abort(); - } -@@ -432,22 +505,39 @@ impl AcpBackend { - }; - let backend = (*self).clone(); - let prompt_result_tx = self.prompt_result_tx.clone(); -+ let request_id_for_task = request_id.clone(); - tokio::spawn(async move { - let session_id = backend.session_id.read().await.clone(); -+ let prompt_kind = prompt_kind.unwrap_or(QueuedPromptKind::User); -+ debug!( -+ target: "acp_event_flow", -+ request_id = %request_id_for_task, -+ session_id = %session_id, -+ ?prompt_kind, -+ content_blocks = prompt.len(), -+ "Sending ACP session/prompt request" -+ ); - let result = backend.connection.prompt(session_id, prompt).await; - match result { - Ok(stop_reason) => { -+ debug!( -+ target: "acp_event_flow", -+ request_id = %request_id_for_task, -+ ?stop_reason, -+ "Prompt task received ACP session/prompt response" -+ ); - let _ = prompt_result_tx - .send(InboundEvent::PromptResponse { stop_reason }) - .await; - } - Err(err) => { -- backend -- .send_prompt_error( -- prompt_kind.unwrap_or(QueuedPromptKind::User), -- &err, -- ) -- .await; -+ warn!( -+ target: "acp_event_flow", -+ request_id = %request_id_for_task, -+ error = %err, -+ "Prompt task failed before reducer observed a prompt response" -+ ); -+ backend.send_prompt_error(prompt_kind, &err).await; - let _ = prompt_result_tx.send(InboundEvent::PromptFailed).await; - } - } -@@ -455,6 +545,11 @@ impl AcpBackend { - } - SideEffect::SendCancel => { - let session_id = self.session_id.read().await.clone(); -+ debug!( -+ target: "acp_event_flow", -+ session_id = %session_id, -+ "Sending ACP session/cancel notification" -+ ); - if let Err(err) = self.connection.cancel(&session_id).await { - warn!("Failed to cancel ACP session: {err}"); - } -diff --git a/codex-rs/acp/src/backend/spawn_and_relay.rs b/codex-rs/acp/src/backend/spawn_and_relay.rs -index 4f15b443..1184a60a 100644 ---- a/codex-rs/acp/src/backend/spawn_and_relay.rs -+++ b/codex-rs/acp/src/backend/spawn_and_relay.rs -@@ -262,12 +262,21 @@ impl AcpBackend { - approval_policy_rx: watch::Receiver, - ) { - let approval_policy_rx = approval_policy_rx; -+ let mut relay_seq = 0_i64; - loop { - tokio::select! { - biased; - maybe_event = event_rx.recv() => { - match maybe_event { - Some(crate::connection::ConnectionEvent::SessionUpdate(update)) => { -+ relay_seq += 1; -+ debug!( -+ target: "acp_event_flow", -+ relay_seq, -+ relay_source = "transport_event_rx", -+ update_kind = crate::connection::session_update_kind(&update), -+ "Relaying session/update into serialized session runtime" -+ ); - let _ = backend - .session_event_tx - .send(session_runtime_driver::SessionRuntimeInput::Reducer( -@@ -276,7 +285,15 @@ impl AcpBackend { - .await; - } - Some(crate::connection::ConnectionEvent::ApprovalRequest(request)) => { -+ relay_seq += 1; - let current_policy = *approval_policy_rx.borrow(); -+ debug!( -+ target: "acp_event_flow", -+ relay_seq, -+ relay_source = "transport_event_rx", -+ call_id = request.event.call_id(), -+ "Relaying permission request into serialized session runtime" -+ ); - let _ = backend - .session_event_tx - .send( -@@ -296,6 +313,14 @@ impl AcpBackend { - maybe_result = prompt_result_rx.recv() => { - match maybe_result { - Some(result) => { -+ relay_seq += 1; -+ debug!( -+ target: "acp_event_flow", -+ relay_seq, -+ relay_source = "prompt_result_rx", -+ inbound_event = session_reducer::inbound_event_kind(&result), -+ "Relaying prompt result into serialized session runtime" -+ ); - let _ = backend - .session_event_tx - .send(session_runtime_driver::SessionRuntimeInput::Reducer(result)) -@@ -305,6 +330,14 @@ impl AcpBackend { - while let Some(event) = event_rx.recv().await { - match event { - crate::connection::ConnectionEvent::SessionUpdate(update) => { -+ relay_seq += 1; -+ debug!( -+ target: "acp_event_flow", -+ relay_seq, -+ relay_source = "transport_event_rx_drain", -+ update_kind = crate::connection::session_update_kind(&update), -+ "Draining session/update after prompt result channel closed" -+ ); - let _ = backend - .session_event_tx - .send(session_runtime_driver::SessionRuntimeInput::Reducer( -@@ -313,7 +346,15 @@ impl AcpBackend { - .await; - } - crate::connection::ConnectionEvent::ApprovalRequest(request) => { -+ relay_seq += 1; - let current_policy = *approval_policy_rx.borrow(); -+ debug!( -+ target: "acp_event_flow", -+ relay_seq, -+ relay_source = "transport_event_rx_drain", -+ call_id = request.event.call_id(), -+ "Draining permission request after prompt result channel closed" -+ ); - let _ = backend - .session_event_tx - .send( -diff --git a/codex-rs/acp/src/backend/user_input.rs b/codex-rs/acp/src/backend/user_input.rs -index bce9d6d4..33737d7c 100644 ---- a/codex-rs/acp/src/backend/user_input.rs -+++ b/codex-rs/acp/src/backend/user_input.rs -@@ -161,6 +161,27 @@ impl AcpBackend { - prompt_with_context - }; - -+ let (phase_before_submit, active_request_id_before_submit, queue_len_before_submit) = { -+ let driver = self.session_driver.lock().await; -+ ( -+ driver.phase_label(), -+ driver.active_request_id(), -+ driver.queue_len(), -+ ) -+ }; -+ debug!( -+ target: "acp_event_flow", -+ event_id = id, -+ phase_before_submit, -+ active_request_id_before_submit = active_request_id_before_submit -+ .as_deref() -+ .unwrap_or(""), -+ queue_len_before_submit, -+ prompt_text_len = final_prompt_text.len(), -+ image_blocks = image_blocks.len(), -+ "Accepted user prompt into ACP backend" -+ ); -+ - let _ = self - .session_event_tx - .send(session_runtime_driver::SessionRuntimeInput::Reducer( -diff --git a/codex-rs/acp/src/connection/mod.rs b/codex-rs/acp/src/connection/mod.rs -index b5186336..6a75b354 100644 ---- a/codex-rs/acp/src/connection/mod.rs -+++ b/codex-rs/acp/src/connection/mod.rs -@@ -22,6 +22,23 @@ pub enum ConnectionEvent { - ApprovalRequest(ApprovalRequest), - } - -+pub(crate) fn session_update_kind(update: &acp::SessionUpdate) -> &'static str { -+ match update { -+ acp::SessionUpdate::AgentMessageChunk(_) => "agent_message_chunk", -+ acp::SessionUpdate::AgentThoughtChunk(_) => "agent_thought_chunk", -+ acp::SessionUpdate::UserMessageChunk(_) => "user_message_chunk", -+ acp::SessionUpdate::Plan(_) => "plan", -+ acp::SessionUpdate::ToolCall(_) => "tool_call", -+ acp::SessionUpdate::ToolCallUpdate(_) => "tool_call_update", -+ acp::SessionUpdate::AvailableCommandsUpdate(_) => "available_commands_update", -+ acp::SessionUpdate::CurrentModeUpdate(_) => "current_mode_update", -+ acp::SessionUpdate::ConfigOptionUpdate(_) => "config_option_update", -+ acp::SessionUpdate::SessionInfoUpdate(_) => "session_info_update", -+ acp::SessionUpdate::UsageUpdate(_) => "usage_update", -+ _ => "other", -+ } -+} -+ - /// The type of approval event to send to the UI. - /// - /// This enum allows us to use the more appropriate approval UI for different -@@ -91,3 +108,14 @@ impl AcpModelState { - } - } - } -+ -+#[cfg(test)] -+mod tests { -+ use super::*; -+ -+ #[test] -+ fn session_update_kind_labels_usage_update() { -+ let update = acp::SessionUpdate::UsageUpdate(acp::UsageUpdate::new(12, 100)); -+ assert_eq!(session_update_kind(&update), "usage_update"); -+ } -+} -diff --git a/codex-rs/acp/src/connection/sacp_connection.rs b/codex-rs/acp/src/connection/sacp_connection.rs -index 7e4f54fc..edb669b4 100644 ---- a/codex-rs/acp/src/connection/sacp_connection.rs -+++ b/codex-rs/acp/src/connection/sacp_connection.rs -@@ -169,6 +169,11 @@ impl SacpConnection { - { - let event_tx = event_tx_for_notifications; - async move |notification: acp::SessionNotification, _connection| { -+ debug!( -+ target: "acp_event_flow", -+ update_kind = super::session_update_kind(¬ification.update), -+ "Transport received ACP session/update notification" -+ ); - if event_tx - .send(ConnectionEvent::SessionUpdate(notification.update)) - .await -@@ -524,12 +529,35 @@ impl SacpConnection { - session_id: acp::SessionId, - prompt: Vec, - ) -> Result { -- self.cx -+ debug!( -+ target: "acp_event_flow", -+ session_id = %session_id, -+ content_blocks = prompt.len(), -+ "Transport sending ACP session/prompt request" -+ ); -+ let response = self -+ .cx - .send_request(acp::PromptRequest::new(session_id, prompt)) - .block_task() - .await -- .context("ACP prompt failed") -- .map(|r| r.stop_reason) -+ .context("ACP prompt failed"); -+ match &response { -+ Ok(response) => { -+ debug!( -+ target: "acp_event_flow", -+ stop_reason = ?response.stop_reason, -+ "Transport received ACP session/prompt response" -+ ); -+ } -+ Err(err) => { -+ warn!( -+ target: "acp_event_flow", -+ error = %err, -+ "Transport session/prompt request failed" -+ ); -+ } -+ } -+ response.map(|response| response.stop_reason) - } - - /// Cancel an ongoing prompt. diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/sacp-ordering-excerpt.txt b/spec/acp-cancel-turn-boundary-investigation/evidence/sacp-ordering-excerpt.txt deleted file mode 100644 index f83b92b42..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/sacp-ordering-excerpt.txt +++ /dev/null @@ -1,46 +0,0 @@ - pub async fn block_task(self) -> Result - where - T: Send, - { - match self.response_rx.await { - Ok(ResponsePayload { - result: Ok(json_value), - ack_tx, - }) => { - // Ack immediately - we're in a spawned task, so the dispatch loop - // can continue while we process the value. - if let Some(tx) = ack_tx { - let _ = tx.send(()); - } - #[track_caller] - pub fn on_receiving_result( - self, - task: impl FnOnce(Result) -> F + 'static + Send, - ) -> Result<(), crate::Error> - where - F: Future> + 'static + Send, - T: Send, - { - let task_tx = self.task_tx.clone(); - let method = self.method; - let response_rx = self.response_rx; - let to_result = self.to_result; - let location = Location::caller(); - - Task::new(location, async move { - match response_rx.await { - Ok(ResponsePayload { result, ack_tx }) => { - // Convert the result using to_result for Ok values - let typed_result = match result { - Ok(json_value) => to_result(json_value), - Err(err) => Err(err), - }; - - // Run the user's callback - let outcome = task(typed_result).await; - - // Ack AFTER the callback completes - this is the key difference - // from block_task. The dispatch loop waits for this ack. - if let Some(tx) = ack_tx { - let _ = tx.send(()); - } diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/sacp-session-excerpt.txt b/spec/acp-cancel-turn-boundary-investigation/evidence/sacp-session-excerpt.txt deleted file mode 100644 index c0bf83d38..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/sacp-session-excerpt.txt +++ /dev/null @@ -1,47 +0,0 @@ - /// Send a prompt to the agent. You can then read messages sent in response. - pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> { - let update_tx = self.update_tx.clone(); - self.connection - .send_request_to( - AgentPeer, - PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]), - ) - .on_receiving_result(async move |result| { - let PromptResponse { - stop_reason, - meta: _, - .. - } = result?; - - update_tx - .unbounded_send(SessionMessage::StopReason(stop_reason)) - .map_err(crate::util::internal_error)?; - - Ok(()) - }) - } - /// Read all updates until the end of the turn and create a string. - /// Ignores non-text updates. - pub async fn read_to_string(&mut self) -> Result { - let mut output = String::new(); - loop { - let update = self.read_update().await?; - tracing::trace!(?update, "read_to_string update"); - match update { - SessionMessage::SessionMessage(message_cx) => MatchMessage::new(message_cx) - .if_notification(async |notif: SessionNotification| match notif.update { - SessionUpdate::AgentMessageChunk(ContentChunk { - content: ContentBlock::Text(text), - meta: _, - .. - }) => { - output.push_str(&text.text); - Ok(()) - } - _ => Ok(()), - }) - .await - .otherwise_ignore()?, - SessionMessage::StopReason(_stop_reason) => break, - } - } diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/toad-agent-excerpt.py b/spec/acp-cancel-turn-boundary-investigation/evidence/toad-agent-excerpt.py deleted file mode 100644 index 587386a86..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/toad-agent-excerpt.py +++ /dev/null @@ -1,41 +0,0 @@ - async def acp_session_prompt( - self, prompt: list[protocol.ContentBlock] - ) -> str | None: - """Send the prompt to the agent. - - Returns: - The stop reason. - - """ - with self.request(): - session_prompt = api.session_prompt(prompt, self.session_id) - try: - result = await session_prompt.wait() - except jsonrpc.APIError as error: - details = "" - match error.data: - case {"details": details}: - pass - - self.post_message( - AgentFail( - "Failed to send prompt" or error.message, - ( - str(details) - if details - else f"{self._agent_data['name']} returned an error" - ), - ) - ) - return None - except jsonrpc.JSONRPCError as error: - self.post_message( - AgentFail( - "Failed to send prompt" or error.message, - (error.message or f"{self._agent_data['name']} returned an error"), - ) - ) - return None - - assert result is not None - return result.get("stopReason") diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/toad-conversation-excerpt.py b/spec/acp-cancel-turn-boundary-investigation/evidence/toad-conversation-excerpt.py deleted file mode 100644 index 81fb5ef96..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/toad-conversation-excerpt.py +++ /dev/null @@ -1,36 +0,0 @@ - @work - async def send_prompt_to_agent(self, prompt: str) -> None: - if self.agent is not None: - stop_reason: str | None = None - self.busy_count += 1 - try: - self.turn = "agent" - stop_reason = await self.agent.send_prompt(prompt) - except jsonrpc.APIError as error: - from toad.widgets.markdown_note import MarkdownNote - - self.turn = "client" - - message = error.message or "no details were provided" - - await self.post( - MarkdownNote( - INTERNAL_EROR.replace("$ERROR", message), - classes="-stop-reason", - ) - ) - finally: - self.busy_count -= 1 - self.call_later(self.agent_turn_over, stop_reason) - - async def agent_turn_over(self, stop_reason: str | None) -> None: - """Called when the agent's turn is over. - - Args: - stop_reason: The stop reason returned from the Agent, or `None`. - """ - self.turn = "client" - if self._agent_thought is not None and self._agent_thought.loading: - await self._agent_thought.remove() - if self._loading is not None: - await self._loading.remove() diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/tui-capture-after-cancel.txt b/spec/acp-cancel-turn-boundary-investigation/evidence/tui-capture-after-cancel.txt deleted file mode 100644 index 3811772e4..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/tui-capture-after-cancel.txt +++ /dev/null @@ -1,24 +0,0 @@ -╭──────────────────────────────────────────────────────────────╮ -│ Nori CLI v0.0.0 │ -│ │ -│ directory: ~/…/debug-acp-cancel-ordering-trace/codex-rs │ -│ agent: claude-debug-acp │ -│ skillset: clifford │ -│ │ -│ Instruction Files │ -│ ~/.claude/CLAUDE.md │ -│ ~/Documents/…/debug-acp-cancel-ordering-trace/CLAUDE.md │ -│ total ~4,809 tokens │ -╰──────────────────────────────────────────────────────────────╯ - - -› I'm testing something. Just run a foreground sleep 30 task, then say 'all done!' - - -• Running sleep 30 - -• Session usage: 24539 / 1000000 tokens, cost 0.16 USD - -■ Conversation interrupted - tell the model what to do differently. Something went wrong? Report the issue at https:// -github.com/tilework-tech/nori-cli/issues - diff --git a/spec/acp-cancel-turn-boundary-investigation/evidence/tui-capture-after-followup.txt b/spec/acp-cancel-turn-boundary-investigation/evidence/tui-capture-after-followup.txt deleted file mode 100644 index 287ecbd55..000000000 --- a/spec/acp-cancel-turn-boundary-investigation/evidence/tui-capture-after-followup.txt +++ /dev/null @@ -1,34 +0,0 @@ -╭──────────────────────────────────────────────────────────────╮ -│ Nori CLI v0.0.0 │ -│ │ -│ directory: ~/…/debug-acp-cancel-ordering-trace/codex-rs │ -│ agent: claude-debug-acp │ -│ skillset: clifford │ -│ │ -│ Instruction Files │ -│ ~/.claude/CLAUDE.md │ -│ ~/Documents/…/debug-acp-cancel-ordering-trace/CLAUDE.md │ -│ total ~4,809 tokens │ -╰──────────────────────────────────────────────────────────────╯ - - -› I'm testing something. Just run a foreground sleep 30 task, then say 'all done!' - - -• Running sleep 30 - -• Session usage: 24539 / 1000000 tokens, cost 0.16 USD - -■ Conversation interrupted - tell the model what to do differently. Something went wrong? Report the issue at https:// -github.com/tilework-tech/nori-cli/issues - - -› what have you finished - - - -› Explain this codebase - - ⎇ debug-acp-cancel-ordering-t... - +936 -13 - ? for shortcuts