Skip to content

Commit 84288b3

Browse files
theahuranori-agent
andcommitted
fix(acp): prevent stale prompt uninstall from wiping new prompt's update channel
When a cancelled prompt returns AFTER a new prompt has installed its update sender in the shared active_update_tx slot, the old prompt's unconditional uninstall (setting to None) wipes the new prompt's channel. This causes the new prompt's response events to be routed to the persistent channel instead of the update handler, making the response invisible in the TUI. Fix: add a generation counter to the active_update_tx slot. Each prompt() and load_session() call gets a unique generation and only uninstalls if its generation still matches the slot. This prevents stale prompts from interfering with newer ones. 🤖 Generated with [Nori](https://noriagentic.com) Co-Authored-By: Nori <contact@tilework.tech>
1 parent d63cb91 commit 84288b3

1 file changed

Lines changed: 41 additions & 18 deletions

File tree

codex-rs/acp/src/connection/sacp_connection.rs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ const MINIMUM_SUPPORTED_VERSION: ProtocolVersion = ProtocolVersion::V1;
7777
/// - The `JrConnectionCx` is cloned out and used for all subsequent requests.
7878
/// - Session notifications and approval requests are forwarded via channels.
7979
/// - The session update channel is swapped for each prompt via an `Arc<Mutex<...>>`.
80+
/// Shared slot for the active session update sender, paired with a generation
81+
/// counter to prevent stale uninstalls from wiping a newer sender.
82+
type ActiveUpdateSlot = std::sync::Arc<Mutex<Option<(u64, mpsc::Sender<SessionUpdate>)>>>;
83+
8084
pub struct SacpConnection {
8185
/// Connection context for sending requests to the agent.
8286
cx: JrConnectionCx<ClientToAgent>,
@@ -95,9 +99,16 @@ pub struct SacpConnection {
9599

96100
/// Shared session update sender. The notification handler routes updates
97101
/// to whoever currently holds the active sender. During a prompt, this
98-
/// contains the caller's `update_tx`. Between turns, it is `None` and
99-
/// notifications fall through to the persistent channel.
100-
active_update_tx: std::sync::Arc<Mutex<Option<mpsc::Sender<SessionUpdate>>>>,
102+
/// contains the caller's `update_tx` paired with a generation counter.
103+
/// Between turns, it is `None` and notifications fall through to the
104+
/// persistent channel. The generation counter ensures that when an old
105+
/// prompt finishes AFTER a new prompt has installed its sender, the old
106+
/// prompt's uninstall does not wipe the new prompt's channel.
107+
active_update_tx: ActiveUpdateSlot,
108+
/// Monotonic generation counter for `active_update_tx`. Each
109+
/// `prompt()`/`load_session_with_updates()` call gets a unique
110+
/// generation so it only uninstalls its own sender.
111+
update_generation: std::sync::atomic::AtomicU64,
101112

102113
/// Handle to the background task driving the SACP connection.
103114
connection_task: tokio::task::JoinHandle<()>,
@@ -181,8 +192,7 @@ impl SacpConnection {
181192
// --- Set up channels ---
182193
let (approval_tx, approval_rx) = mpsc::channel::<ApprovalRequest>(16);
183194
let (persistent_tx, persistent_rx) = mpsc::channel::<SessionUpdate>(64);
184-
let active_update_tx: std::sync::Arc<Mutex<Option<mpsc::Sender<SessionUpdate>>>> =
185-
std::sync::Arc::new(Mutex::new(None));
195+
let active_update_tx: ActiveUpdateSlot = std::sync::Arc::new(Mutex::new(None));
186196

187197
// --- Build SACP connection ---
188198
let transport = ByteStreams::new(stdin.compat_write(), stdout.compat());
@@ -212,7 +222,7 @@ impl SacpConnection {
212222
async move |notification: SessionNotification, _cx| {
213223
let update = notification.update;
214224
let guard = update_tx.lock().await;
215-
if let Some(tx) = guard.as_ref() {
225+
if let Some((_, tx)) = guard.as_ref() {
216226
let _ = tx.try_send(update);
217227
} else {
218228
let _ = persistent_tx.try_send(update);
@@ -343,7 +353,7 @@ impl SacpConnection {
343353
.status(ToolCallStatus::Pending);
344354
{
345355
let guard = update_tx.lock().await;
346-
if let Some(tx) = guard.as_ref() {
356+
if let Some((_, tx)) = guard.as_ref() {
347357
let _ =
348358
tx.try_send(SessionUpdate::ToolCall(tool_call));
349359
} else {
@@ -446,7 +456,7 @@ impl SacpConnection {
446456
.status(ToolCallStatus::Pending);
447457
{
448458
let guard = update_tx.lock().await;
449-
if let Some(tx) = guard.as_ref() {
459+
if let Some((_, tx)) = guard.as_ref() {
450460
let _ =
451461
tx.try_send(SessionUpdate::ToolCall(tool_call));
452462
} else {
@@ -546,6 +556,7 @@ impl SacpConnection {
546556
persistent_rx,
547557
model_state: std::sync::Arc::new(std::sync::RwLock::new(AcpModelState::new())),
548558
active_update_tx,
559+
update_generation: std::sync::atomic::AtomicU64::new(0),
549560
connection_task,
550561
child,
551562
stderr_task,
@@ -595,10 +606,13 @@ impl SacpConnection {
595606
cwd: &Path,
596607
update_tx: mpsc::Sender<SessionUpdate>,
597608
) -> Result<SessionId> {
598-
// Install the update channel for replay events.
609+
let my_gen = self
610+
.update_generation
611+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
612+
+ 1;
599613
{
600614
let mut guard = self.active_update_tx.lock().await;
601-
*guard = Some(update_tx);
615+
*guard = Some((my_gen, update_tx));
602616
}
603617

604618
let result = self
@@ -608,10 +622,13 @@ impl SacpConnection {
608622
.await
609623
.context("Failed to load ACP session");
610624

611-
// Uninstall so replay events stop flowing to the caller's channel.
625+
// Only uninstall if we are still the active sender. A newer
626+
// prompt/load call may have replaced us while we were blocked.
612627
{
613628
let mut guard = self.active_update_tx.lock().await;
614-
*guard = None;
629+
if matches!(guard.as_ref(), Some((g, _)) if *g == my_gen) {
630+
*guard = None;
631+
}
615632
}
616633

617634
let response = result?;
@@ -623,8 +640,6 @@ impl SacpConnection {
623640
*state = AcpModelState::from_session_model_state(models);
624641
}
625642

626-
// The session ID from the request is reused since the response
627-
// doesn't contain one.
628643
Ok(SessionId::from(session_id.to_string()))
629644
}
630645

@@ -635,10 +650,13 @@ impl SacpConnection {
635650
prompt: Vec<ContentBlock>,
636651
update_tx: mpsc::Sender<SessionUpdate>,
637652
) -> Result<StopReason> {
638-
// Install the update channel.
653+
let my_gen = self
654+
.update_generation
655+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
656+
+ 1;
639657
{
640658
let mut guard = self.active_update_tx.lock().await;
641-
*guard = Some(update_tx);
659+
*guard = Some((my_gen, update_tx));
642660
}
643661

644662
let result = self
@@ -648,10 +666,15 @@ impl SacpConnection {
648666
.await
649667
.context("ACP prompt failed");
650668

651-
// Uninstall so inter-turn notifications go to persistent.
669+
// Only uninstall if we are still the active sender. When an
670+
// interrupt causes a new prompt to start before this one returns,
671+
// the new prompt will have installed its own sender — clearing
672+
// unconditionally would wipe the new prompt's channel.
652673
{
653674
let mut guard = self.active_update_tx.lock().await;
654-
*guard = None;
675+
if matches!(guard.as_ref(), Some((g, _)) if *g == my_gen) {
676+
*guard = None;
677+
}
655678
}
656679

657680
result.map(|r| r.stop_reason)

0 commit comments

Comments
 (0)