1+ pub ( crate ) mod fs_read_text_file;
12pub ( crate ) mod session_update;
23
34use crate :: agent:: Bridge ;
5+ use crate :: error:: AGENT_UNAVAILABLE ;
46use crate :: in_flight_slot_guard:: InFlightSlotGuard ;
7+ use crate :: jsonrpc:: { ErrorResponse , ResultResponse , extract_request_id} ;
58use crate :: nats:: {
69 ClientMethod , FlushClient , PublishClient , RequestClient , SubscribeClient , client,
7- parse_client_subject,
10+ headers_with_trace_context , parse_client_subject,
811} ;
912use agent_client_protocol:: Client ;
13+ use agent_client_protocol:: ErrorCode ;
1014use async_nats:: Message ;
1115use bytes:: Bytes ;
1216use futures:: StreamExt ;
@@ -84,6 +88,9 @@ async fn process_message<
8488 }
8589 } ;
8690
91+ let payload = msg. payload . clone ( ) ;
92+ let reply = msg. reply . as_ref ( ) . map ( |r| r. to_string ( ) ) ;
93+
8794 let current_in_flight = in_flight. get ( ) ;
8895 if current_in_flight >= max_concurrent {
8996 warn ! (
@@ -96,10 +103,39 @@ async fn process_message<
96103 . metrics
97104 . record_error ( "client" , "client_backpressure_rejected" ) ;
98105
106+ if let Some ( reply_to) = & reply {
107+ let request_id = extract_request_id ( & payload) ;
108+ let bytes = ErrorResponse :: new (
109+ request_id,
110+ ErrorCode :: Other ( AGENT_UNAVAILABLE ) ,
111+ "Client proxy overloaded; retry with backoff" ,
112+ )
113+ . to_bytes ( )
114+ . unwrap_or_else ( |e| {
115+ ErrorResponse :: new (
116+ serde_json:: Value :: Null ,
117+ ErrorCode :: Other ( AGENT_UNAVAILABLE ) ,
118+ format ! (
119+ "Client proxy overloaded; retry with backoff (serialization failed: {})" ,
120+ e
121+ ) ,
122+ )
123+ . to_bytes ( )
124+ . unwrap ( )
125+ } ) ;
126+ let headers = headers_with_trace_context ( ) ;
127+ if let Err ( e) = nats
128+ . publish_with_headers ( reply_to. clone ( ) , headers, bytes)
129+ . await
130+ {
131+ error ! ( error = %e, "Failed to publish backpressure error reply" ) ;
132+ }
133+ if let Err ( e) = nats. flush ( ) . await {
134+ warn ! ( error = %e, "Failed to flush backpressure error reply" ) ;
135+ }
136+ }
99137 return ;
100138 }
101-
102- let payload = msg. payload . clone ( ) ;
103139 let nats = nats. clone ( ) ;
104140
105141 let bridge_clone = bridge. clone ( ) ;
@@ -110,6 +146,7 @@ async fn process_message<
110146 & subject,
111147 parsed,
112148 payload,
149+ reply,
113150 & nats,
114151 client. as_ref ( ) ,
115152 bridge_clone. as_ref ( ) ,
@@ -118,7 +155,7 @@ async fn process_message<
118155 } ) ;
119156}
120157
121- #[ instrument( skip( payload, _nats , client, _bridge) , fields( subject = %subject, session_id = tracing:: field:: Empty ) ) ]
158+ #[ instrument( skip( payload, nats , client, _bridge) , fields( subject = %subject, session_id = tracing:: field:: Empty ) ) ]
122159async fn dispatch_client_method <
123160 N : SubscribeClient + RequestClient + PublishClient + FlushClient ,
124161 Cl : Client ,
@@ -127,13 +164,68 @@ async fn dispatch_client_method<
127164 subject : & str ,
128165 parsed : crate :: nats:: ParsedClientSubject ,
129166 payload : Bytes ,
130- _nats : & N ,
167+ reply : Option < String > ,
168+ nats : & N ,
131169 client : & Cl ,
132170 _bridge : & Bridge < N , C > ,
133171) {
134172 Span :: current ( ) . record ( "session_id" , parsed. session_id . as_str ( ) ) ;
135173
136174 match parsed. method {
175+ ClientMethod :: FsReadTextFile => {
176+ let request_id = extract_request_id ( & payload) ;
177+ match fs_read_text_file:: handle ( & payload, client) . await {
178+ Ok ( bytes) => {
179+ if let Some ( reply_to) = & reply {
180+ let result =
181+ serde_json:: from_slice ( & bytes) . unwrap_or ( serde_json:: Value :: Null ) ;
182+ let response_bytes =
183+ ResultResponse :: new ( request_id, result) . to_bytes ( ) . unwrap ( ) ;
184+ let headers = headers_with_trace_context ( ) ;
185+ if let Err ( e) = nats
186+ . publish_with_headers ( reply_to. clone ( ) , headers, response_bytes)
187+ . await
188+ {
189+ error ! ( error = %e, "Failed to publish fs_read_text_file reply" ) ;
190+ }
191+ if let Err ( e) = nats. flush ( ) . await {
192+ warn ! ( error = %e, "Failed to flush fs_read_text_file reply" ) ;
193+ }
194+ }
195+ }
196+ Err ( e) => {
197+ let ( code, message) = fs_read_text_file:: error_code_and_message ( & e) ;
198+ warn ! (
199+ error = %e,
200+ session_id = %parsed. session_id,
201+ "Failed to handle fs_read_text_file"
202+ ) ;
203+ if let Some ( reply_to) = & reply {
204+ let bytes = ErrorResponse :: new ( request_id, code, message. as_str ( ) )
205+ . to_bytes ( )
206+ . unwrap_or_else ( |e| {
207+ ErrorResponse :: new (
208+ serde_json:: Value :: Null ,
209+ code,
210+ format ! ( "{} (serialization failed: {})" , message, e) ,
211+ )
212+ . to_bytes ( )
213+ . unwrap ( )
214+ } ) ;
215+ let headers = headers_with_trace_context ( ) ;
216+ if let Err ( e) = nats
217+ . publish_with_headers ( reply_to. clone ( ) , headers, bytes)
218+ . await
219+ {
220+ error ! ( error = %e, "Failed to publish fs_read_text_file error reply" ) ;
221+ }
222+ if let Err ( e) = nats. flush ( ) . await {
223+ warn ! ( error = %e, "Failed to flush fs_read_text_file error reply" ) ;
224+ }
225+ }
226+ }
227+ }
228+ }
137229 ClientMethod :: SessionUpdate => {
138230 session_update:: handle ( & payload, client, & parsed. session_id ) . await ;
139231 }
@@ -143,10 +235,11 @@ async fn dispatch_client_method<
143235#[ cfg( test) ]
144236mod tests {
145237 use super :: * ;
238+ use crate :: jsonrpc:: JsonRpcRequest ;
146239 use crate :: session_id:: AcpSessionId ;
147240 use agent_client_protocol:: {
148- ContentBlock , ContentChunk , RequestPermissionRequest , RequestPermissionResponse ,
149- SessionNotification , SessionUpdate ,
241+ ContentBlock , ContentChunk , ReadTextFileRequest , RequestPermissionRequest ,
242+ RequestPermissionResponse , SessionNotification , SessionUpdate ,
150243 } ;
151244 use async_trait:: async_trait;
152245 use std:: cell:: RefCell ;
@@ -286,6 +379,7 @@ mod tests {
286379 "acp.sess-1.client.session.update" ,
287380 parsed,
288381 payload,
382+ None ,
289383 & nats,
290384 & client,
291385 & bridge,
@@ -334,6 +428,33 @@ mod tests {
334428 assert ! ( nats. published_messages( ) . is_empty( ) ) ;
335429 }
336430
431+ #[ tokio:: test]
432+ async fn process_message_backpressure_with_reply_publishes_error ( ) {
433+ let nats = MockNatsClient :: new ( ) ;
434+ let bridge = make_bridge ( nats. clone ( ) ) ;
435+ let client = Rc :: new ( MockClient :: new ( ) ) ;
436+ let in_flight = Rc :: new ( Cell :: new ( 1usize ) ) ;
437+
438+ let envelope = JsonRpcRequest {
439+ jsonrpc : "2.0" . to_string ( ) ,
440+ id : serde_json:: json!( 1 ) ,
441+ method : Some ( "fs/read_text_file" . to_string ( ) ) ,
442+ params : Some ( ReadTextFileRequest :: new (
443+ agent_client_protocol:: SessionId :: from ( "sess1" ) ,
444+ "/tmp/foo.txt" . to_string ( ) ,
445+ ) ) ,
446+ } ;
447+ let payload = serde_json:: to_vec ( & envelope) . unwrap ( ) ;
448+ let msg = make_msg (
449+ "acp.sess1.client.fs.read_text_file" ,
450+ & payload,
451+ Some ( "_INBOX.reply" ) ,
452+ ) ;
453+ process_message ( msg, & nats, client, bridge, & in_flight, 1 ) . await ;
454+
455+ assert_eq ! ( nats. published_messages( ) , vec![ "_INBOX.reply" ] ) ;
456+ }
457+
337458 #[ tokio:: test]
338459 async fn process_message_valid_dispatch_spawns_task ( ) {
339460 let local = tokio:: task:: LocalSet :: new ( ) ;
0 commit comments