Skip to content

Commit ff7cdac

Browse files
committed
feat(acp-nats): add new_session handler with session.ready lifecycle
- Add new_session handler aligned with initialize.rs patterns (handler-specific error mapping, single-line generics, local subject variable) - Add spawn_session_ready to publish ext.session.ready after successful session creation - Add SessionReady extension type for bridge-backend coordination - Add session_new and ext_session_ready NATS subject functions - Add record_session_created() and record_error() to Metrics - Add Bridge task tracking for session_ready publish tasks - Add comprehensive test suite mirroring initialize handler tests Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 2f05030 commit ff7cdac

File tree

6 files changed

+423
-15
lines changed

6 files changed

+423
-15
lines changed

rsworkspace/crates/acp-nats/src/agent/mod.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,42 @@
11
mod initialize;
2+
mod new_session;
23

34
use crate::config::Config;
4-
use crate::nats::{FlushClient, PublishClient, RequestClient};
5+
use crate::nats::{
6+
self, FlushClient, FlushPolicy, PublishClient, PublishOptions, RequestClient, RetryPolicy,
7+
SessionReady, agent,
8+
};
59
use crate::telemetry::metrics::Metrics;
610
use agent_client_protocol::ErrorCode;
711
use agent_client_protocol::{
812
Agent, AuthenticateRequest, AuthenticateResponse, CancelNotification, Error, ExtNotification,
913
ExtRequest, ExtResponse, InitializeRequest, InitializeResponse, LoadSessionRequest,
1014
LoadSessionResponse, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse,
11-
Result, SetSessionModeRequest, SetSessionModeResponse,
15+
Result, SessionId, SetSessionModeRequest, SetSessionModeResponse,
1216
};
1317
use opentelemetry::metrics::Meter;
18+
use std::cell::RefCell;
19+
use std::marker::PhantomData;
20+
use tracing::{info, warn};
1421
use trogon_std::time::GetElapsed;
1522

23+
/// NATS-backed implementation of the Agent Client Protocol.
24+
///
25+
/// # Thread Safety
26+
///
27+
/// `session_ready_publish_tasks` uses `RefCell` for interior mutability. The `Bridge`
28+
/// must be driven from a **single task** (or a single-threaded `LocalSet`). A
29+
/// `PhantomData<Rc<()>>` marker prevents accidental `Send`/`Sync` usage.
30+
///
31+
/// Background work (`spawn_session_ready`) is dispatched via `tokio::spawn`
32+
/// and captures only cloned, `Send` values -- it never touches the `RefCell`.
1633
pub struct Bridge<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> {
1734
pub(crate) nats: N,
1835
pub(crate) clock: C,
1936
pub(crate) config: Config,
2037
pub(crate) metrics: Metrics,
38+
pub(crate) session_ready_publish_tasks: RefCell<Vec<tokio::task::JoinHandle<()>>>,
39+
_not_send_sync: PhantomData<std::rc::Rc<()>>,
2140
}
2241

2342
impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C> {
@@ -27,12 +46,51 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C>
2746
clock,
2847
config,
2948
metrics: Metrics::new(meter),
49+
session_ready_publish_tasks: RefCell::new(Vec::new()),
50+
_not_send_sync: PhantomData,
3051
}
3152
}
3253

3354
pub(crate) fn nats(&self) -> &N {
3455
&self.nats
3556
}
57+
58+
pub(crate) fn spawn_session_ready(&self, session_id: &SessionId) {
59+
let nats_clone = self.nats.clone();
60+
let prefix = self.config.acp_prefix().to_owned();
61+
let session_id = session_id.clone();
62+
let metrics = self.metrics.clone();
63+
64+
let task = tokio::spawn(async move {
65+
let ready_subject =
66+
agent::ext_session_ready(&prefix, &session_id.to_string());
67+
info!(session_id = %session_id, subject = %ready_subject, "Publishing session.ready");
68+
69+
let ready_message = SessionReady::new(session_id.clone());
70+
71+
let options = PublishOptions::builder()
72+
.publish_retry_policy(RetryPolicy::standard())
73+
.flush_policy(FlushPolicy::standard())
74+
.build();
75+
76+
if let Err(e) =
77+
nats::publish(&nats_clone, &ready_subject, &ready_message, options).await
78+
{
79+
warn!(
80+
error = %e,
81+
session_id = %session_id,
82+
"Failed to publish session.ready"
83+
);
84+
metrics.record_error("session_ready", "session_ready_publish_failed");
85+
} else {
86+
info!(session_id = %session_id, "Published session.ready");
87+
}
88+
});
89+
90+
let mut tasks = self.session_ready_publish_tasks.borrow_mut();
91+
tasks.retain(|t| !t.is_finished());
92+
tasks.push(task);
93+
}
3694
}
3795

3896
#[async_trait::async_trait(?Send)]
@@ -48,11 +106,8 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Agent for Br
48106
))
49107
}
50108

51-
async fn new_session(&self, _args: NewSessionRequest) -> Result<NewSessionResponse> {
52-
Err(Error::new(
53-
ErrorCode::InternalError.into(),
54-
"not yet implemented",
55-
))
109+
async fn new_session(&self, args: NewSessionRequest) -> Result<NewSessionResponse> {
110+
new_session::handle(self, args).await
56111
}
57112

58113
async fn load_session(&self, _args: LoadSessionRequest) -> Result<LoadSessionResponse> {
@@ -109,7 +164,7 @@ mod tests {
109164
use crate::config::Config;
110165
use agent_client_protocol::{
111166
Agent, AuthenticateRequest, CancelNotification, ExtNotification, ExtRequest,
112-
LoadSessionRequest, NewSessionRequest, PromptRequest, SetSessionModeRequest,
167+
LoadSessionRequest, PromptRequest, SetSessionModeRequest,
113168
};
114169
use trogon_nats::AdvancedMockNatsClient;
115170

@@ -137,12 +192,6 @@ mod tests {
137192
.await
138193
.is_err()
139194
);
140-
assert!(
141-
bridge
142-
.new_session(NewSessionRequest::new("."))
143-
.await
144-
.is_err()
145-
);
146195
assert!(
147196
bridge
148197
.load_session(LoadSessionRequest::new("s1", "."))

0 commit comments

Comments
 (0)