1+ // Safety: `CancelledSessions` uses `Mutex` for interior mutability so the `Bridge` remains
2+ // `Send`/`Sync`. The fire-and-forget publish in `spawn_session_ready` uses `tokio::spawn`
3+ // and captures only cloned, `Send` values — it never touches shared state from the closure.
4+
15mod authenticate;
26mod cancel;
37mod ext_method;
@@ -8,27 +12,78 @@ mod new_session;
812mod prompt;
913mod set_session_mode;
1014
11- use crate :: config:: Config ;
12- use crate :: nats:: { FlushClient , PublishClient , RequestClient } ;
15+ use crate :: config:: { Config , SESSION_READY_DELAY } ;
16+ use crate :: nats:: { self , ExtSessionReady , FlushClient , PublishClient , RequestClient , agent } ;
1317use crate :: pending_prompt_waiters:: PendingSessionPromptResponseWaiters ;
1418use crate :: prompt_slot_counter:: PromptSlotCounter ;
1519use crate :: telemetry:: metrics:: Metrics ;
1620use agent_client_protocol:: {
1721 Agent , AuthenticateRequest , AuthenticateResponse , CancelNotification , ExtNotification ,
1822 ExtRequest , ExtResponse , InitializeRequest , InitializeResponse , LoadSessionRequest ,
1923 LoadSessionResponse , NewSessionRequest , NewSessionResponse , PromptRequest , PromptResponse ,
20- Result , SetSessionModeRequest , SetSessionModeResponse ,
24+ Result , SessionId , SetSessionModeRequest , SetSessionModeResponse ,
2125} ;
2226use opentelemetry:: metrics:: Meter ;
27+ use std:: collections:: HashMap ;
28+ use std:: sync:: Mutex ;
29+ use std:: time:: Duration ;
30+ use tracing:: { info, warn} ;
2331use trogon_std:: time:: GetElapsed ;
2432
33+ const CANCELLED_SESSION_TTL : Duration = Duration :: from_secs ( 300 ) ;
34+ const CLEANUP_EVERY : usize = 16 ;
35+
36+ pub ( crate ) struct CancelledSessions < I : Copy > {
37+ map : Mutex < HashMap < SessionId , I > > ,
38+ cleanup_counter : std:: sync:: atomic:: AtomicUsize ,
39+ }
40+
41+ impl < I : Copy > CancelledSessions < I > {
42+ pub fn new ( ) -> Self {
43+ Self {
44+ map : Mutex :: new ( HashMap :: new ( ) ) ,
45+ cleanup_counter : std:: sync:: atomic:: AtomicUsize :: new ( 0 ) ,
46+ }
47+ }
48+
49+ pub fn mark_cancelled < C : GetElapsed < Instant = I > > ( & self , session_id : SessionId , clock : & C ) {
50+ let mut map = self . map . lock ( ) . unwrap ( ) ;
51+ map. insert ( session_id, clock. now ( ) ) ;
52+ let count = self . cleanup_counter . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
53+ if count. is_multiple_of ( CLEANUP_EVERY ) {
54+ map. retain ( |_, ts| clock. elapsed ( * ts) < CANCELLED_SESSION_TTL ) ;
55+ }
56+ }
57+
58+ pub fn take_if_cancelled < C : GetElapsed < Instant = I > > (
59+ & self ,
60+ session_id : & SessionId ,
61+ clock : & C ,
62+ ) -> Option < ( ) > {
63+ let mut map = self . map . lock ( ) . unwrap ( ) ;
64+ let is_valid = map
65+ . get ( session_id)
66+ . is_some_and ( |ts| clock. elapsed ( * ts) < CANCELLED_SESSION_TTL ) ;
67+
68+ if is_valid {
69+ map. remove ( session_id) ;
70+ Some ( ( ) )
71+ } else {
72+ map. remove ( session_id) ;
73+ None
74+ }
75+ }
76+ }
77+
2578pub struct Bridge < N : RequestClient + PublishClient + FlushClient , C : GetElapsed > {
2679 pub ( crate ) nats : N ,
2780 pub ( crate ) clock : C ,
2881 pub ( crate ) metrics : Metrics ,
82+ pub ( crate ) cancelled_sessions : CancelledSessions < C :: Instant > ,
2983 pub ( crate ) pending_session_prompt_responses : PendingSessionPromptResponseWaiters < C :: Instant > ,
3084 pub ( crate ) prompt_slot_counter : PromptSlotCounter ,
3185 pub ( crate ) config : Config ,
86+ pub ( crate ) session_ready_publish_tasks : Mutex < Vec < tokio:: task:: JoinHandle < ( ) > > > ,
3287}
3388
3489impl < N : RequestClient + PublishClient + FlushClient , C : GetElapsed > Bridge < N , C > {
@@ -39,14 +94,69 @@ impl<N: RequestClient + PublishClient + FlushClient, C: GetElapsed> Bridge<N, C>
3994 clock,
4095 config,
4196 metrics : Metrics :: new ( meter) ,
97+ cancelled_sessions : CancelledSessions :: new ( ) ,
4298 pending_session_prompt_responses : PendingSessionPromptResponseWaiters :: new ( ) ,
4399 prompt_slot_counter : PromptSlotCounter :: new ( max_concurrent) ,
100+ session_ready_publish_tasks : Mutex :: new ( Vec :: new ( ) ) ,
44101 }
45102 }
46103
47104 pub ( crate ) fn nats ( & self ) -> & N {
48105 & self . nats
49106 }
107+
108+ pub ( crate ) fn register_session_ready_task ( & self , task : tokio:: task:: JoinHandle < ( ) > ) {
109+ let mut tasks = self . session_ready_publish_tasks . lock ( ) . unwrap ( ) ;
110+ tasks. retain ( |task| !task. is_finished ( ) ) ;
111+ tasks. push ( task) ;
112+ }
113+
114+ pub fn has_pending_session_ready_tasks ( & self ) -> bool {
115+ !self . session_ready_publish_tasks . lock ( ) . unwrap ( ) . is_empty ( )
116+ }
117+
118+ pub async fn await_session_ready_tasks ( & self ) {
119+ let tasks = std:: mem:: take ( & mut * self . session_ready_publish_tasks . lock ( ) . unwrap ( ) ) ;
120+ for task in tasks {
121+ if let Err ( e) = task. await {
122+ warn ! ( error = %e, "session_ready task panicked" ) ;
123+ }
124+ }
125+ }
126+
127+ pub ( crate ) fn spawn_session_ready ( & self , session_id : & SessionId ) {
128+ let nats_clone = self . nats . clone ( ) ;
129+ let prefix = self . config . acp_prefix ( ) . to_string ( ) ;
130+ let session_id = session_id. clone ( ) ;
131+ let metrics = self . metrics . clone ( ) ;
132+ let session_ready_task = tokio:: spawn ( async move {
133+ tokio:: time:: sleep ( SESSION_READY_DELAY ) . await ;
134+
135+ let ready_subject = agent:: ext_session_ready ( & prefix, & session_id. to_string ( ) ) ;
136+ info ! ( session_id = %session_id, subject = %ready_subject, "Publishing session.ready" ) ;
137+
138+ let ready_message = ExtSessionReady :: new ( session_id. clone ( ) ) ;
139+
140+ let options = nats:: PublishOptions :: builder ( )
141+ . publish_retry_policy ( nats:: RetryPolicy :: standard ( ) )
142+ . flush_policy ( nats:: FlushPolicy :: standard ( ) )
143+ . build ( ) ;
144+
145+ if let Err ( e) =
146+ nats:: publish ( & nats_clone, & ready_subject, & ready_message, options) . await
147+ {
148+ warn ! (
149+ error = %e,
150+ session_id = %session_id,
151+ "Failed to publish session.ready"
152+ ) ;
153+ metrics. record_error ( "session_ready" , "session_ready_publish_failed" ) ;
154+ } else {
155+ info ! ( session_id = %session_id, "Published session.ready" ) ;
156+ }
157+ } ) ;
158+ self . register_session_ready_task ( session_ready_task) ;
159+ }
50160}
51161
52162#[ async_trait:: async_trait( ?Send ) ]
0 commit comments