11use super :: Bridge ;
2- use crate :: config:: SESSION_READY_DELAY ;
32use crate :: error:: AGENT_UNAVAILABLE ;
4- use crate :: nats:: {
5- self , ExtSessionReady , FlushClient , FlushPolicy , PublishClient , PublishOptions , RequestClient ,
6- RetryPolicy , agent,
7- } ;
8- use crate :: telemetry:: metrics:: Metrics ;
9- use agent_client_protocol:: {
10- Error , ErrorCode , NewSessionRequest , NewSessionResponse , Result , SessionId ,
11- } ;
3+ use crate :: nats:: { self , FlushClient , PublishClient , RequestClient , agent} ;
4+ use agent_client_protocol:: { Error , ErrorCode , NewSessionRequest , NewSessionResponse , Result } ;
125use tracing:: { Span , info, instrument, warn} ;
136use trogon_nats:: NatsError ;
147use trogon_std:: time:: GetElapsed ;
@@ -83,15 +76,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
8376 info ! ( session_id = %response. session_id, "Session created" ) ;
8477
8578 bridge. metrics . record_session_created ( ) ;
86-
87- let nats = bridge. nats . clone ( ) ;
88- let prefix = bridge. config . acp_prefix . clone ( ) ;
89- let session_id = response. session_id . clone ( ) ;
90- let metrics = bridge. metrics . clone ( ) ;
91- // TODO: track the JoinHandle so we can drain in-flight publishes on graceful shutdown.
92- tokio:: spawn ( async move {
93- publish_session_ready ( & nats, prefix. as_str ( ) , & session_id, & metrics) . await ;
94- } ) ;
79+ bridge. spawn_session_ready ( & response. session_id ) ;
9580 }
9681
9782 bridge. metrics . record_request (
@@ -103,36 +88,6 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
10388 result
10489}
10590
106- async fn publish_session_ready < N : PublishClient + FlushClient > (
107- nats : & N ,
108- prefix : & str ,
109- session_id : & SessionId ,
110- metrics : & Metrics ,
111- ) {
112- tokio:: time:: sleep ( SESSION_READY_DELAY ) . await ;
113-
114- let subject = agent:: ext_session_ready ( prefix, & session_id. to_string ( ) ) ;
115- info ! ( session_id = %session_id, subject = %subject, "Publishing session.ready" ) ;
116-
117- let message = ExtSessionReady :: new ( session_id. clone ( ) ) ;
118-
119- let options = PublishOptions :: builder ( )
120- . publish_retry_policy ( RetryPolicy :: standard ( ) )
121- . flush_policy ( FlushPolicy :: standard ( ) )
122- . build ( ) ;
123-
124- if let Err ( e) = nats:: publish ( nats, & subject, & message, options) . await {
125- warn ! (
126- error = %e,
127- session_id = %session_id,
128- "Failed to publish session.ready"
129- ) ;
130- metrics. record_error ( "session_ready" , "session_ready_publish_failed" ) ;
131- } else {
132- info ! ( session_id = %session_id, "Published session.ready" ) ;
133- }
134- }
135-
13691#[ cfg( test) ]
13792mod tests {
13893 use super :: { Bridge , map_new_session_error} ;
0 commit comments