@@ -14,7 +14,7 @@ use crate::in_flight_slot_guard::InFlightSlotGuard;
1414use crate :: jsonrpc:: extract_request_id;
1515use crate :: nats:: {
1616 ClientMethod , FlushClient , PublishClient , RequestClient , SubscribeClient , client,
17- headers_with_trace_context , parse_client_subject,
17+ parse_client_subject,
1818} ;
1919use agent_client_protocol:: { Client , ErrorCode } ;
2020use async_nats:: Message ;
@@ -26,32 +26,6 @@ use tracing::{Span, error, info, instrument, warn};
2626use trogon_std:: JsonSerialize ;
2727use trogon_std:: time:: GetElapsed ;
2828
29- fn jsonrpc_error_response (
30- request_id : agent_client_protocol:: RequestId ,
31- code : ErrorCode ,
32- message : & str ,
33- ) -> Bytes {
34- let id_value = serde_json:: to_value ( & request_id) . unwrap_or ( serde_json:: Value :: Null ) ;
35- let response = serde_json:: json!( {
36- "jsonrpc" : "2.0" ,
37- "id" : id_value,
38- "error" : {
39- "code" : i32 :: from( code) ,
40- "message" : message
41- }
42- } ) ;
43- serde_json:: to_vec ( & response)
44- . unwrap_or_else ( |e| {
45- format ! (
46- "{{\" jsonrpc\" :\" 2.0\" ,\" id\" :null,\" error\" :{{\" code\" :{},\" message\" :\" failed to serialize error response: {}\" }}}}" ,
47- i32 :: from( code) ,
48- e
49- )
50- . into_bytes ( )
51- } )
52- . into ( )
53- }
54-
5529async fn publish_backpressure_error_reply < N : PublishClient + FlushClient , S : JsonSerialize > (
5630 nats : & N ,
5731 payload : & [ u8 ] ,
@@ -281,28 +255,25 @@ async fn dispatch_client_method<
281255 . await ;
282256 }
283257 ClientMethod :: TerminalWaitForExit => {
284- let result = terminal_wait_for_exit:: handle (
285- & payload,
286- ctx. client ,
287- parsed. session_id . as_str ( ) ,
288- ctx. bridge . config . operation_timeout ( ) ,
289- )
290- . await ;
291258 if let Some ( reply_to) = & reply {
259+ let result = terminal_wait_for_exit:: handle (
260+ & payload,
261+ ctx. client ,
262+ parsed. session_id . as_str ( ) ,
263+ ctx. bridge . config . operation_timeout ( ) ,
264+ )
265+ . await ;
292266 let request_id = extract_request_id ( & payload) ;
293267 match result {
294268 Ok ( bytes) => {
295- let headers = headers_with_trace_context ( ) ;
296- if let Err ( e) = ctx
297- . nats
298- . publish_with_headers ( reply_to. clone ( ) , headers, bytes. into ( ) )
299- . await
300- {
301- error ! ( error = %e, "Failed to publish reply" ) ;
302- }
303- if let Err ( e) = ctx. nats . flush ( ) . await {
304- warn ! ( error = %e, "Failed to flush response" ) ;
305- }
269+ rpc_reply:: publish_reply (
270+ ctx. nats ,
271+ reply_to,
272+ bytes. into ( ) ,
273+ rpc_reply:: CONTENT_TYPE_JSON ,
274+ "terminal/wait_for_exit reply" ,
275+ )
276+ . await ;
306277 }
307278 Err ( e) => {
308279 let ( code, message) = terminal_wait_for_exit:: error_code_and_message ( & e) ;
@@ -312,20 +283,27 @@ async fn dispatch_client_method<
312283 session_id = %parsed. session_id,
313284 "Failed to handle terminal/wait_for_exit"
314285 ) ;
315- let bytes = jsonrpc_error_response ( request_id, code, & message) ;
316- let headers = headers_with_trace_context ( ) ;
317- if let Err ( e) = ctx
318- . nats
319- . publish_with_headers ( reply_to. clone ( ) , headers, bytes)
320- . await
321- {
322- error ! ( error = %e, "Failed to publish error reply" ) ;
323- }
324- if let Err ( e) = ctx. nats . flush ( ) . await {
325- warn ! ( error = %e, "Failed to flush error reply" ) ;
326- }
286+ let ( bytes, content_type) = rpc_reply:: error_response_bytes (
287+ ctx. serializer ,
288+ request_id,
289+ code,
290+ & message,
291+ ) ;
292+ rpc_reply:: publish_reply (
293+ ctx. nats ,
294+ reply_to,
295+ bytes,
296+ content_type,
297+ "terminal/wait_for_exit error reply" ,
298+ )
299+ . await ;
327300 }
328301 }
302+ } else {
303+ warn ! (
304+ session_id = %parsed. session_id,
305+ "terminal/wait_for_exit requires reply subject; ignoring message"
306+ ) ;
329307 }
330308 }
331309 }
@@ -840,6 +818,44 @@ mod tests {
840818 ) ;
841819 }
842820
821+ #[ tokio:: test]
822+ async fn dispatch_client_method_terminal_wait_for_exit_reply_none_skips_handler ( ) {
823+ let nats = MockNatsClient :: new ( ) ;
824+ let client = MockClient :: new ( ) ;
825+ let session_id = AcpSessionId :: new ( "sess-1" ) . unwrap ( ) ;
826+
827+ let envelope = Request {
828+ id : RequestId :: Number ( 1 ) ,
829+ method : std:: sync:: Arc :: from ( "terminal/wait_for_exit" ) ,
830+ params : Some ( WaitForTerminalExitRequest :: new ( "sess-1" , "term-001" ) ) ,
831+ } ;
832+ let payload = bytes:: Bytes :: from ( serde_json:: to_vec ( & envelope) . unwrap ( ) ) ;
833+
834+ let parsed = crate :: nats:: ParsedClientSubject {
835+ session_id,
836+ method : ClientMethod :: TerminalWaitForExit ,
837+ } ;
838+
839+ let bridge = make_bridge ( nats. clone ( ) ) ;
840+ let ctx = DispatchContext {
841+ nats : & nats,
842+ client : & client,
843+ bridge : & bridge,
844+ serializer : & StdJsonSerialize ,
845+ } ;
846+ dispatch_client_method (
847+ "acp.sess-1.client.terminal.wait_for_exit" ,
848+ parsed,
849+ payload,
850+ None ,
851+ & ctx,
852+ )
853+ . await ;
854+
855+ assert ! ( nats. published_messages( ) . is_empty( ) ) ;
856+ assert_eq ! ( client. wait_for_terminal_exit_call_count( ) , 0 ) ;
857+ }
858+
843859 #[ tokio:: test]
844860 async fn dispatch_client_method_dispatches_terminal_wait_for_exit ( ) {
845861 let nats = MockNatsClient :: new ( ) ;
0 commit comments