1+ pub ( crate ) mod fs_read_text_file;
12pub ( crate ) mod session_update;
23
34use crate :: agent:: Bridge ;
5+ use crate :: error:: AGENT_UNAVAILABLE ;
46use crate :: in_flight_slot_guard:: InFlightSlotGuard ;
7+ use crate :: jsonrpc:: extract_request_id;
58use crate :: nats:: {
69 ClientMethod , FlushClient , PublishClient , RequestClient , SubscribeClient , client,
7- parse_client_subject,
10+ headers_with_trace_context , parse_client_subject,
811} ;
9- use agent_client_protocol:: Client ;
12+ use agent_client_protocol:: { Client , Error , ErrorCode , RequestId , Response } ;
1013use async_nats:: Message ;
1114use bytes:: Bytes ;
1215use futures:: StreamExt ;
@@ -84,6 +87,9 @@ async fn process_message<
8487 }
8588 } ;
8689
90+ let payload = msg. payload . clone ( ) ;
91+ let reply = msg. reply . as_ref ( ) . map ( |r| r. to_string ( ) ) ;
92+
8793 let current_in_flight = in_flight. get ( ) ;
8894 if current_in_flight >= max_concurrent {
8995 warn ! (
@@ -96,10 +102,42 @@ async fn process_message<
96102 . metrics
97103 . record_error ( "client" , "client_backpressure_rejected" ) ;
98104
105+ if let Some ( reply_to) = & reply {
106+ let request_id = extract_request_id ( & payload) ;
107+ let bytes = serde_json:: to_vec ( & Response :: < ( ) > :: Error {
108+ id : request_id,
109+ error : Error :: new (
110+ i32:: from ( ErrorCode :: Other ( AGENT_UNAVAILABLE ) ) ,
111+ "Client proxy overloaded; retry with backoff" ,
112+ ) ,
113+ } )
114+ . unwrap_or_else ( |e| {
115+ serde_json:: to_vec ( & Response :: < ( ) > :: Error {
116+ id : RequestId :: Null ,
117+ error : Error :: new (
118+ i32:: from ( ErrorCode :: Other ( AGENT_UNAVAILABLE ) ) ,
119+ format ! (
120+ "Client proxy overloaded; retry with backoff (serialization failed: {})" ,
121+ e
122+ ) ,
123+ ) ,
124+ } )
125+ . unwrap ( )
126+ } )
127+ . into ( ) ;
128+ let headers = headers_with_trace_context ( ) ;
129+ if let Err ( e) = nats
130+ . publish_with_headers ( reply_to. clone ( ) , headers, bytes)
131+ . await
132+ {
133+ error ! ( error = %e, "Failed to publish backpressure error reply" ) ;
134+ }
135+ if let Err ( e) = nats. flush ( ) . await {
136+ warn ! ( error = %e, "Failed to flush backpressure error reply" ) ;
137+ }
138+ }
99139 return ;
100140 }
101-
102- let payload = msg. payload . clone ( ) ;
103141 let nats = nats. clone ( ) ;
104142
105143 let bridge_clone = bridge. clone ( ) ;
@@ -110,6 +148,7 @@ async fn process_message<
110148 & subject,
111149 parsed,
112150 payload,
151+ reply,
113152 & nats,
114153 client. as_ref ( ) ,
115154 bridge_clone. as_ref ( ) ,
@@ -118,7 +157,7 @@ async fn process_message<
118157 } ) ;
119158}
120159
121- #[ instrument( skip( payload, _nats , client, _bridge) , fields( subject = %subject, session_id = tracing:: field:: Empty ) ) ]
160+ #[ instrument( skip( payload, nats , client, _bridge) , fields( subject = %subject, session_id = tracing:: field:: Empty ) ) ]
122161async fn dispatch_client_method <
123162 N : SubscribeClient + RequestClient + PublishClient + FlushClient ,
124163 Cl : Client ,
@@ -127,13 +166,76 @@ async fn dispatch_client_method<
127166 subject : & str ,
128167 parsed : crate :: nats:: ParsedClientSubject ,
129168 payload : Bytes ,
130- _nats : & N ,
169+ reply : Option < String > ,
170+ nats : & N ,
131171 client : & Cl ,
132172 _bridge : & Bridge < N , C > ,
133173) {
134174 Span :: current ( ) . record ( "session_id" , parsed. session_id . as_str ( ) ) ;
135175
136176 match parsed. method {
177+ ClientMethod :: FsReadTextFile => {
178+ let request_id = extract_request_id ( & payload) ;
179+ match fs_read_text_file:: handle ( & payload, client) . await {
180+ Ok ( bytes) => {
181+ if let Some ( reply_to) = & reply {
182+ let result =
183+ serde_json:: from_slice ( & bytes) . unwrap_or ( serde_json:: Value :: Null ) ;
184+ let response_bytes = serde_json:: to_vec ( & Response :: Result {
185+ id : request_id,
186+ result,
187+ } )
188+ . unwrap ( )
189+ . into ( ) ;
190+ let headers = headers_with_trace_context ( ) ;
191+ if let Err ( e) = nats
192+ . publish_with_headers ( reply_to. clone ( ) , headers, response_bytes)
193+ . await
194+ {
195+ error ! ( error = %e, "Failed to publish fs_read_text_file reply" ) ;
196+ }
197+ if let Err ( e) = nats. flush ( ) . await {
198+ warn ! ( error = %e, "Failed to flush fs_read_text_file reply" ) ;
199+ }
200+ }
201+ }
202+ Err ( e) => {
203+ let ( code, message) = fs_read_text_file:: error_code_and_message ( & e) ;
204+ warn ! (
205+ error = %e,
206+ session_id = %parsed. session_id,
207+ "Failed to handle fs_read_text_file"
208+ ) ;
209+ if let Some ( reply_to) = & reply {
210+ let bytes = serde_json:: to_vec ( & Response :: < ( ) > :: Error {
211+ id : request_id,
212+ error : Error :: new ( i32:: from ( code) , message. as_str ( ) ) ,
213+ } )
214+ . unwrap_or_else ( |e| {
215+ serde_json:: to_vec ( & Response :: < ( ) > :: Error {
216+ id : RequestId :: Null ,
217+ error : Error :: new (
218+ i32:: from ( code) ,
219+ format ! ( "{} (serialization failed: {})" , message, e) ,
220+ ) ,
221+ } )
222+ . unwrap ( )
223+ } )
224+ . into ( ) ;
225+ let headers = headers_with_trace_context ( ) ;
226+ if let Err ( e) = nats
227+ . publish_with_headers ( reply_to. clone ( ) , headers, bytes)
228+ . await
229+ {
230+ error ! ( error = %e, "Failed to publish fs_read_text_file error reply" ) ;
231+ }
232+ if let Err ( e) = nats. flush ( ) . await {
233+ warn ! ( error = %e, "Failed to flush fs_read_text_file error reply" ) ;
234+ }
235+ }
236+ }
237+ }
238+ }
137239 ClientMethod :: SessionUpdate => {
138240 session_update:: handle ( & payload, client, & parsed. session_id ) . await ;
139241 }
@@ -145,8 +247,8 @@ mod tests {
145247 use super :: * ;
146248 use crate :: session_id:: AcpSessionId ;
147249 use agent_client_protocol:: {
148- ContentBlock , ContentChunk , RequestPermissionRequest , RequestPermissionResponse ,
149- SessionNotification , SessionUpdate ,
250+ ContentBlock , ContentChunk , ReadTextFileRequest , Request , RequestId ,
251+ RequestPermissionRequest , RequestPermissionResponse , SessionNotification , SessionUpdate ,
150252 } ;
151253 use async_trait:: async_trait;
152254 use std:: cell:: RefCell ;
@@ -286,6 +388,7 @@ mod tests {
286388 "acp.sess-1.client.session.update" ,
287389 parsed,
288390 payload,
391+ None ,
289392 & nats,
290393 & client,
291394 & bridge,
@@ -334,6 +437,32 @@ mod tests {
334437 assert ! ( nats. published_messages( ) . is_empty( ) ) ;
335438 }
336439
440+ #[ tokio:: test]
441+ async fn process_message_backpressure_with_reply_publishes_error ( ) {
442+ let nats = MockNatsClient :: new ( ) ;
443+ let bridge = make_bridge ( nats. clone ( ) ) ;
444+ let client = Rc :: new ( MockClient :: new ( ) ) ;
445+ let in_flight = Rc :: new ( Cell :: new ( 1usize ) ) ;
446+
447+ let envelope = Request {
448+ id : RequestId :: Number ( 1 ) ,
449+ method : std:: sync:: Arc :: from ( "fs/read_text_file" ) ,
450+ params : Some ( ReadTextFileRequest :: new (
451+ agent_client_protocol:: SessionId :: from ( "sess1" ) ,
452+ "/tmp/foo.txt" . to_string ( ) ,
453+ ) ) ,
454+ } ;
455+ let payload = serde_json:: to_vec ( & envelope) . unwrap ( ) ;
456+ let msg = make_msg (
457+ "acp.sess1.client.fs.read_text_file" ,
458+ & payload,
459+ Some ( "_INBOX.reply" ) ,
460+ ) ;
461+ process_message ( msg, & nats, client, bridge, & in_flight, 1 ) . await ;
462+
463+ assert_eq ! ( nats. published_messages( ) , vec![ "_INBOX.reply" ] ) ;
464+ }
465+
337466 #[ tokio:: test]
338467 async fn process_message_valid_dispatch_spawns_task ( ) {
339468 let local = tokio:: task:: LocalSet :: new ( ) ;
0 commit comments