Skip to content

Commit d3a979e

Browse files
committed
feat(acp-nats): add Bridge infrastructure for session cancel and ready
- Add CancelledSessions with TTL eviction for pre-flight cancel tracking - Add spawn_session_ready and session-ready task management to Bridge - Consolidate ClientMethod/ParsedClientSubject into parsing.rs - Add agent/client wildcard subjects - Add cancel_waiter_for_session to PendingPromptWaiters - Fix config min-concurrency clamping, add with_prefix alias - Remove unused subject capacity validation (tracked in FOLLOWUP.md) Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent cbd2cf8 commit d3a979e

8 files changed

Lines changed: 343 additions & 60 deletions

File tree

rsworkspace/crates/acp-nats/src/agent/mod.rs

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// Safety: `CancelledSessions` uses `Mutex` for interior mutability so the `Bridge` remains
2+
// `Send`/`Sync`. The fire-and-forget publish in `spawn_session_ready` uses `tokio::spawn`
3+
// and captures only cloned, `Send` values — it never touches shared state from the closure.
4+
15
mod authenticate;
26
mod cancel;
37
mod ext_method;
@@ -8,27 +12,85 @@ mod new_session;
812
mod prompt;
913
mod set_session_mode;
1014

11-
use crate::config::Config;
12-
use crate::nats::{FlushClient, PublishClient, RequestClient};
15+
use crate::config::{Config, SESSION_READY_DELAY};
16+
use crate::nats::{self, ExtSessionReady, FlushClient, PublishClient, RequestClient, agent};
1317
use crate::pending_prompt_waiters::PendingSessionPromptResponseWaiters;
1418
use crate::prompt_slot_counter::PromptSlotCounter;
1519
use crate::telemetry::metrics::Metrics;
1620
use agent_client_protocol::{
1721
Agent, AuthenticateRequest, AuthenticateResponse, CancelNotification, ExtNotification,
1822
ExtRequest, ExtResponse, InitializeRequest, InitializeResponse, LoadSessionRequest,
1923
LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse,
20-
Result, SetSessionModeRequest, SetSessionModeResponse,
24+
Result, SessionId, SetSessionModeRequest, SetSessionModeResponse,
2125
};
2226
use opentelemetry::metrics::Meter;
27+
use std::collections::HashMap;
28+
use std::sync::Mutex;
29+
use std::time::Duration;
30+
use tracing::{info, warn};
2331
use trogon_std::time::GetElapsed;
2432

33+
#[allow(dead_code)]
34+
const CANCELLED_SESSION_TTL: Duration = Duration::from_secs(300);
35+
#[allow(dead_code)]
36+
const CLEANUP_EVERY: usize = 16;
37+
38+
#[allow(dead_code)]
39+
pub(crate) struct CancelledSessions<I: Copy> {
40+
map: Mutex<HashMap<SessionId, I>>,
41+
cleanup_counter: std::sync::atomic::AtomicUsize,
42+
}
43+
44+
#[allow(dead_code)]
45+
impl<I: Copy> CancelledSessions<I> {
46+
pub fn new() -> Self {
47+
Self {
48+
map: Mutex::new(HashMap::new()),
49+
cleanup_counter: std::sync::atomic::AtomicUsize::new(0),
50+
}
51+
}
52+
53+
pub fn mark_cancelled<C: GetElapsed<Instant = I>>(&self, session_id: SessionId, clock: &C) {
54+
let mut map = self.map.lock().unwrap();
55+
map.insert(session_id, clock.now());
56+
let count = self
57+
.cleanup_counter
58+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
59+
if count.is_multiple_of(CLEANUP_EVERY) {
60+
map.retain(|_, ts| clock.elapsed(*ts) < CANCELLED_SESSION_TTL);
61+
}
62+
}
63+
64+
pub fn take_if_cancelled<C: GetElapsed<Instant = I>>(
65+
&self,
66+
session_id: &SessionId,
67+
clock: &C,
68+
) -> Option<()> {
69+
let mut map = self.map.lock().unwrap();
70+
let is_valid = map
71+
.get(session_id)
72+
.is_some_and(|ts| clock.elapsed(*ts) < CANCELLED_SESSION_TTL);
73+
74+
if is_valid {
75+
map.remove(session_id);
76+
Some(())
77+
} else {
78+
map.remove(session_id);
79+
None
80+
}
81+
}
82+
}
83+
2584
pub struct Bridge<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> {
2685
pub(crate) nats: N,
2786
pub(crate) clock: C,
2887
pub(crate) metrics: Metrics,
88+
#[allow(dead_code)]
89+
pub(crate) cancelled_sessions: CancelledSessions<C::Instant>,
2990
pub(crate) pending_session_prompt_responses: PendingSessionPromptResponseWaiters<C::Instant>,
3091
pub(crate) prompt_slot_counter: PromptSlotCounter,
3192
pub(crate) config: Config,
93+
pub(crate) session_ready_publish_tasks: Mutex<Vec<tokio::task::JoinHandle<()>>>,
3294
}
3395

3496
impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C> {
@@ -39,14 +101,73 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C>
39101
clock,
40102
config,
41103
metrics: Metrics::new(meter),
104+
cancelled_sessions: CancelledSessions::new(),
42105
pending_session_prompt_responses: PendingSessionPromptResponseWaiters::new(),
43106
prompt_slot_counter: PromptSlotCounter::new(max_concurrent),
107+
session_ready_publish_tasks: Mutex::new(Vec::new()),
44108
}
45109
}
46110

47111
pub(crate) fn nats(&self) -> &N {
48112
&self.nats
49113
}
114+
115+
#[allow(dead_code)]
116+
pub(crate) fn register_session_ready_task(&self, task: tokio::task::JoinHandle<()>) {
117+
let mut tasks = self.session_ready_publish_tasks.lock().unwrap();
118+
tasks.retain(|task| !task.is_finished());
119+
tasks.push(task);
120+
}
121+
122+
pub fn has_pending_session_ready_tasks(&self) -> bool {
123+
let mut tasks = self.session_ready_publish_tasks.lock().unwrap();
124+
tasks.retain(|task| !task.is_finished());
125+
!tasks.is_empty()
126+
}
127+
128+
pub async fn await_session_ready_tasks(&self) {
129+
let tasks = std::mem::take(&mut *self.session_ready_publish_tasks.lock().unwrap());
130+
for task in tasks {
131+
if let Err(e) = task.await {
132+
warn!(error = %e, "session_ready task panicked");
133+
}
134+
}
135+
}
136+
137+
#[allow(dead_code)]
138+
pub(crate) fn spawn_session_ready(&self, session_id: &SessionId) {
139+
let nats_clone = self.nats.clone();
140+
let prefix = self.config.acp_prefix().to_string();
141+
let session_id = session_id.clone();
142+
let metrics = self.metrics.clone();
143+
let session_ready_task = tokio::spawn(async move {
144+
tokio::time::sleep(SESSION_READY_DELAY).await;
145+
146+
let ready_subject = agent::ext_session_ready(&prefix, &session_id.to_string());
147+
info!(session_id = %session_id, subject = %ready_subject, "Publishing session.ready");
148+
149+
let ready_message = ExtSessionReady::new(session_id.clone());
150+
151+
let options = nats::PublishOptions::builder()
152+
.publish_retry_policy(nats::RetryPolicy::standard())
153+
.flush_policy(nats::FlushPolicy::standard())
154+
.build();
155+
156+
if let Err(e) =
157+
nats::publish(&nats_clone, &ready_subject, &ready_message, options).await
158+
{
159+
warn!(
160+
error = %e,
161+
session_id = %session_id,
162+
"Failed to publish session.ready"
163+
);
164+
metrics.record_error("session_ready", "session_ready_publish_failed");
165+
} else {
166+
info!(session_id = %session_id, "Published session.ready");
167+
}
168+
});
169+
self.register_session_ready_task(session_ready_task);
170+
}
50171
}
51172

52173
#[async_trait::async_trait(?Send)]

rsworkspace/crates/acp-nats/src/config.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::acp_prefix::AcpPrefix;
66
const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
77
const DEFAULT_PROMPT_TIMEOUT: Duration = Duration::from_secs(7200);
88
const DEFAULT_MAX_CONCURRENT_CLIENT_TASKS: usize = 256;
9+
910
/// Above this value, prompt timeout errors are rendered in seconds instead of milliseconds.
1011
pub(crate) const PROMPT_TIMEOUT_MESSAGE_SECS_THRESHOLD: Duration = Duration::from_secs(60);
1112
/// Suppresses duplicate timeout-related warnings for a short late-response window.
@@ -36,6 +37,10 @@ impl Config {
3637
}
3738
}
3839

40+
pub fn with_prefix(acp_prefix: AcpPrefix, nats: NatsConfig) -> Self {
41+
Self::new(acp_prefix, nats)
42+
}
43+
3944
pub fn with_operation_timeout(mut self, timeout: Duration) -> Self {
4045
self.operation_timeout = timeout;
4146
self
@@ -47,7 +52,7 @@ impl Config {
4752
}
4853

4954
pub fn with_max_concurrent_client_tasks(mut self, max: usize) -> Self {
50-
self.max_concurrent_client_tasks = max;
55+
self.max_concurrent_client_tasks = max.max(1);
5156
self
5257
}
5358

@@ -72,7 +77,7 @@ impl Config {
7277
///
7378
/// A minimum of 1 avoids misconfiguration that would permanently reject all client messages.
7479
pub fn max_concurrent_client_tasks(&self) -> usize {
75-
self.max_concurrent_client_tasks.max(1)
80+
self.max_concurrent_client_tasks
7681
}
7782

7883
#[cfg(test)]
@@ -81,7 +86,7 @@ impl Config {
8186
servers: vec!["localhost:4222".to_string()],
8287
auth: trogon_nats::NatsAuth::None,
8388
};
84-
Self::new(AcpPrefix::new(acp_prefix).unwrap(), nats)
89+
Self::new(AcpPrefix::new(acp_prefix.to_string()).unwrap(), nats)
8590
}
8691
}
8792

@@ -132,9 +137,8 @@ mod tests {
132137
}
133138

134139
#[test]
135-
fn config_max_concurrent_client_tasks_zero_is_clamped_to_one() {
136-
let config = Config::new(AcpPrefix::new("acp").unwrap(), default_nats())
137-
.with_max_concurrent_client_tasks(0);
140+
fn config_with_max_concurrent_client_tasks_enforces_minimum() {
141+
let config = Config::for_test("acp").with_max_concurrent_client_tasks(0);
138142
assert_eq!(config.max_concurrent_client_tasks(), 1);
139143
}
140144

@@ -144,4 +148,11 @@ mod tests {
144148
assert_eq!(config.nats().servers.len(), 1);
145149
assert_eq!(config.nats().servers[0], "localhost:4222");
146150
}
151+
152+
#[test]
153+
fn config_with_prefix_accepts_validated_prefix() {
154+
let config = Config::with_prefix(AcpPrefix::new("acp").unwrap(), default_nats());
155+
assert_eq!(config.acp_prefix(), "acp");
156+
}
157+
147158
}

rsworkspace/crates/acp-nats/src/nats/client_method.rs

Lines changed: 0 additions & 31 deletions
This file was deleted.
Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
1-
mod client_method;
21
mod extensions;
3-
mod parsed_client_subject;
42
mod parsing;
53
mod subjects;
64
pub(crate) mod token;
75

8-
pub use client_method::ClientMethod;
9-
pub use extensions::ExtSessionReady;
10-
pub use parsed_client_subject::ParsedClientSubject;
11-
pub use parsing::parse_client_subject;
12-
pub use subjects::{agent, client};
136
pub use trogon_nats::{
147
FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient, RetryPolicy,
158
SubscribeClient, connect, headers_with_trace_context, publish, request_with_timeout,
169
};
10+
11+
pub use extensions::ExtSessionReady;
12+
pub use parsing::{ClientMethod, ParsedClientSubject, parse_client_subject};
13+
pub use subjects::{agent, client};

rsworkspace/crates/acp-nats/src/nats/parsed_client_subject.rs

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)