Skip to content

Commit a8cc431

Browse files
committed
feat(acp-nats): wire cancel/prompt/session handlers to Bridge infrastructure
Remove dead_code annotations and wire cancel, prompt, new_session, load_session handlers to use CancelledSessions and spawn_session_ready. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 9084dc8 commit a8cc431

File tree

6 files changed

+29
-104
lines changed

6 files changed

+29
-104
lines changed

rsworkspace/crates/acp-nats/src/agent/cancel.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use super::Bridge;
22
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
33
use crate::session_id::AcpSessionId;
4-
use agent_client_protocol::{CancelNotification, Error, ErrorCode, Result};
4+
use agent_client_protocol::{
5+
CancelNotification, Error, ErrorCode, PromptResponse, Result, StopReason,
6+
};
57
use tracing::{info, instrument, warn};
68
use trogon_std::time::GetElapsed;
79

@@ -57,6 +59,16 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
5759
.record_error("cancel", "cancel_publish_failed");
5860
}
5961

62+
bridge
63+
.cancelled_sessions
64+
.mark_cancelled(args.session_id.clone(), &bridge.clock);
65+
bridge
66+
.pending_session_prompt_responses
67+
.cancel_waiter_for_session(
68+
&args.session_id,
69+
Ok(PromptResponse::new(StopReason::Cancelled)),
70+
);
71+
6072
bridge.metrics.record_request(
6173
"cancel",
6274
bridge.clock.elapsed(start).as_secs_f64(),

rsworkspace/crates/acp-nats/src/agent/load_session.rs

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
use super::Bridge;
2-
use crate::acp_prefix::AcpPrefix;
3-
use crate::config::SESSION_READY_DELAY;
42
use crate::error::AGENT_UNAVAILABLE;
5-
use crate::nats::{
6-
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
7-
RetryPolicy, agent,
8-
};
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
94
use crate::session_id::AcpSessionId;
10-
use crate::telemetry::metrics::Metrics;
115
use agent_client_protocol::{Error, ErrorCode, LoadSessionRequest, LoadSessionResponse, Result};
126
use tracing::{info, instrument, warn};
137
use trogon_nats::NatsError;
@@ -88,14 +82,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
8882
.map_err(map_load_session_error);
8983

9084
if result.is_ok() {
91-
let nats = bridge.nats.clone();
92-
let prefix = bridge.config.acp_prefix.clone();
93-
let session_id = session_id.clone();
94-
let metrics = bridge.metrics.clone();
95-
// TODO: track the JoinHandle so we can drain in-flight publishes on graceful shutdown.
96-
tokio::spawn(async move {
97-
publish_session_ready(&nats, &prefix, &session_id, &metrics).await;
98-
});
85+
bridge.spawn_session_ready(&args.session_id);
9986
}
10087

10188
bridge.metrics.record_request(
@@ -107,38 +94,6 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
10794
result
10895
}
10996

110-
async fn publish_session_ready<N: PublishClient + FlushClient>(
111-
nats: &N,
112-
prefix: &AcpPrefix,
113-
session_id: &AcpSessionId,
114-
metrics: &Metrics,
115-
) {
116-
tokio::time::sleep(SESSION_READY_DELAY).await;
117-
118-
let subject = agent::ext_session_ready(prefix.as_str(), session_id.as_str());
119-
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
120-
121-
let message = ExtSessionReady::new(agent_client_protocol::SessionId::from(
122-
session_id.to_string(),
123-
));
124-
125-
let options = PublishOptions::builder()
126-
.publish_retry_policy(RetryPolicy::standard())
127-
.flush_policy(FlushPolicy::standard())
128-
.build();
129-
130-
if let Err(e) = nats::publish(nats, &subject, &message, options).await {
131-
warn!(
132-
error = %e,
133-
session_id = %session_id,
134-
"Failed to publish session.ready"
135-
);
136-
metrics.record_error("session_ready", "session_ready_publish_failed");
137-
} else {
138-
info!(session_id = %session_id, "Published session.ready");
139-
}
140-
}
141-
14297
#[cfg(test)]
14398
mod tests {
14499
use super::{Bridge, map_load_session_error};

rsworkspace/crates/acp-nats/src/agent/mod.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,14 @@ use std::time::Duration;
3030
use tracing::{info, warn};
3131
use trogon_std::time::GetElapsed;
3232

33-
#[allow(dead_code)]
3433
const CANCELLED_SESSION_TTL: Duration = Duration::from_secs(300);
35-
#[allow(dead_code)]
3634
const CLEANUP_EVERY: usize = 16;
3735

38-
#[allow(dead_code)]
3936
pub(crate) struct CancelledSessions<I: Copy> {
4037
map: Mutex<HashMap<SessionId, I>>,
4138
cleanup_counter: std::sync::atomic::AtomicUsize,
4239
}
4340

44-
#[allow(dead_code)]
4541
impl<I: Copy> CancelledSessions<I> {
4642
pub fn new() -> Self {
4743
Self {
@@ -85,7 +81,6 @@ pub struct Bridge<N: RequestClient + PublishClient + FlushClient, C: GetElapsed>
8581
pub(crate) nats: N,
8682
pub(crate) clock: C,
8783
pub(crate) metrics: Metrics,
88-
#[allow(dead_code)]
8984
pub(crate) cancelled_sessions: CancelledSessions<C::Instant>,
9085
pub(crate) pending_session_prompt_responses: PendingSessionPromptResponseWaiters<C::Instant>,
9186
pub(crate) prompt_slot_counter: PromptSlotCounter,
@@ -112,7 +107,6 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C>
112107
&self.nats
113108
}
114109

115-
#[allow(dead_code)]
116110
pub(crate) fn register_session_ready_task(&self, task: tokio::task::JoinHandle<()>) {
117111
let mut tasks = self.session_ready_publish_tasks.lock().unwrap();
118112
tasks.retain(|task| !task.is_finished());
@@ -134,7 +128,6 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C>
134128
}
135129
}
136130

137-
#[allow(dead_code)]
138131
pub(crate) fn spawn_session_ready(&self, session_id: &SessionId) {
139132
let nats_clone = self.nats.clone();
140133
let prefix = self.config.acp_prefix().to_string();

rsworkspace/crates/acp-nats/src/agent/new_session.rs

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
use super::Bridge;
2-
use crate::config::SESSION_READY_DELAY;
32
use crate::error::AGENT_UNAVAILABLE;
4-
use crate::nats::{
5-
self, ExtSessionReady, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient,
6-
RetryPolicy, agent,
7-
};
8-
use crate::telemetry::metrics::Metrics;
9-
use agent_client_protocol::{
10-
Error, ErrorCode, NewSessionRequest, NewSessionResponse, Result, SessionId,
11-
};
3+
use crate::nats::{self, FlushClient, PublishClient, RequestClient, agent};
4+
use agent_client_protocol::{Error, ErrorCode, NewSessionRequest, NewSessionResponse, Result};
125
use tracing::{Span, info, instrument, warn};
136
use trogon_nats::NatsError;
147
use trogon_std::time::GetElapsed;
@@ -83,15 +76,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
8376
info!(session_id = %response.session_id, "Session created");
8477

8578
bridge.metrics.record_session_created();
86-
87-
let nats = bridge.nats.clone();
88-
let prefix = bridge.config.acp_prefix.clone();
89-
let session_id = response.session_id.clone();
90-
let metrics = bridge.metrics.clone();
91-
// TODO: track the JoinHandle so we can drain in-flight publishes on graceful shutdown.
92-
tokio::spawn(async move {
93-
publish_session_ready(&nats, prefix.as_str(), &session_id, &metrics).await;
94-
});
79+
bridge.spawn_session_ready(&response.session_id);
9580
}
9681

9782
bridge.metrics.record_request(
@@ -103,36 +88,6 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
10388
result
10489
}
10590

106-
async fn publish_session_ready<N: PublishClient + FlushClient>(
107-
nats: &N,
108-
prefix: &str,
109-
session_id: &SessionId,
110-
metrics: &Metrics,
111-
) {
112-
tokio::time::sleep(SESSION_READY_DELAY).await;
113-
114-
let subject = agent::ext_session_ready(prefix, &session_id.to_string());
115-
info!(session_id = %session_id, subject = %subject, "Publishing session.ready");
116-
117-
let message = ExtSessionReady::new(session_id.clone());
118-
119-
let options = PublishOptions::builder()
120-
.publish_retry_policy(RetryPolicy::standard())
121-
.flush_policy(FlushPolicy::standard())
122-
.build();
123-
124-
if let Err(e) = nats::publish(nats, &subject, &message, options).await {
125-
warn!(
126-
error = %e,
127-
session_id = %session_id,
128-
"Failed to publish session.ready"
129-
);
130-
metrics.record_error("session_ready", "session_ready_publish_failed");
131-
} else {
132-
info!(session_id = %session_id, "Published session.ready");
133-
}
134-
}
135-
13691
#[cfg(test)]
13792
mod tests {
13893
use super::{Bridge, map_new_session_error};

rsworkspace/crates/acp-nats/src/agent/prompt.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
8484
Err(error) => return Err(invalid_session_id_error(bridge, error)),
8585
};
8686

87+
if bridge
88+
.cancelled_sessions
89+
.take_if_cancelled(&args.session_id, &bridge.clock)
90+
.is_some()
91+
{
92+
info!(session_id = %args.session_id, "Prompt pre-flight: session already cancelled");
93+
return Ok(PromptResponse::new(
94+
agent_client_protocol::StopReason::Cancelled,
95+
));
96+
}
97+
8798
let nats = bridge.nats();
8899
let subject = agent::session_prompt(bridge.config.acp_prefix(), session_id.as_str());
89100

rsworkspace/crates/acp-nats/src/pending_prompt_waiters.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ impl<I: Copy> PendingSessionPromptResponseWaiters<I> {
165165

166166
/// Cancels any pending waiter for the session with a Cancelled response.
167167
/// Used when a cancel notification arrives; we don't have a prompt_token.
168-
#[allow(dead_code)]
169168
pub fn cancel_waiter_for_session(
170169
&self,
171170
session_id: &SessionId,

0 commit comments

Comments
 (0)