@@ -16,7 +16,7 @@ use crate::in_flight_slot_guard::InFlightSlotGuard;
1616use crate :: jsonrpc:: extract_request_id;
1717use crate :: nats:: {
1818 ClientMethod , FlushClient , PublishClient , RequestClient , SubscribeClient , client,
19- headers_with_trace_context , parse_client_subject,
19+ parse_client_subject,
2020} ;
2121use agent_client_protocol:: { Client , ErrorCode } ;
2222use async_nats:: Message ;
@@ -28,30 +28,6 @@ use tracing::{Span, error, info, instrument, warn};
2828use trogon_std:: JsonSerialize ;
2929use trogon_std:: time:: GetElapsed ;
3030
31- fn jsonrpc_error_response (
32- request_id : agent_client_protocol:: RequestId ,
33- code : ErrorCode ,
34- message : & str ,
35- ) -> Bytes {
36- let id_value = serde_json:: to_value ( & request_id) . unwrap_or ( serde_json:: Value :: Null ) ;
37- let response = serde_json:: json!( {
38- "jsonrpc" : "2.0" ,
39- "id" : id_value,
40- "error" : {
41- "code" : i32 :: from( code) ,
42- "message" : message
43- }
44- } ) ;
45- serde_json:: to_vec ( & response) . unwrap_or_else ( |e| {
46- format ! (
47- "{{\" jsonrpc\" :\" 2.0\" ,\" id\" :null,\" error\" :{{\" code\" :{},\" message\" :\" failed to serialize error response: {}\" }}}}" ,
48- i32 :: from( code) ,
49- e
50- )
51- . into_bytes ( )
52- } )
53- . into ( )
54- }
5531async fn publish_backpressure_error_reply < N : PublishClient + FlushClient , S : JsonSerialize > (
5632 nats : & N ,
5733 payload : & [ u8 ] ,
@@ -207,11 +183,9 @@ async fn dispatch_client_method<
207183 reply : Option < String > ,
208184 ctx : & DispatchContext < ' _ , N , Cl , C , S > ,
209185) {
210- let request_id = extract_request_id ( & payload) ;
211-
212186 Span :: current ( ) . record ( "session_id" , parsed. session_id . as_str ( ) ) ;
213187
214- let response_bytes : Option < Result < Vec < u8 > , Box < dyn std :: error :: Error + Send + Sync > > > = match parsed. method {
188+ match parsed. method {
215189 ClientMethod :: FsReadTextFile => {
216190 fs_read_text_file:: handle (
217191 & payload,
@@ -222,7 +196,6 @@ async fn dispatch_client_method<
222196 ctx. serializer ,
223197 )
224198 . await ;
225- None
226199 }
227200 ClientMethod :: FsWriteTextFile => {
228201 fs_write_text_file:: handle (
@@ -234,7 +207,6 @@ async fn dispatch_client_method<
234207 ctx. serializer ,
235208 )
236209 . await ;
237- None
238210 }
239211 ClientMethod :: SessionRequestPermission => {
240212 request_permission:: handle (
@@ -246,18 +218,9 @@ async fn dispatch_client_method<
246218 ctx. serializer ,
247219 )
248220 . await ;
249- None
250221 }
251222 ClientMethod :: SessionUpdate => {
252- if reply. is_some ( ) {
253- warn ! (
254- session_id = %parsed. session_id,
255- method = ?parsed. method,
256- "Unexpected reply subject on notification request"
257- ) ;
258- }
259- session_update:: handle ( & payload, ctx. client ) . await ;
260- None
223+ session_update:: handle ( & payload, ctx. client , reply. is_some ( ) ) . await ;
261224 }
262225 ClientMethod :: ExtSessionPromptResponse => {
263226 ext_session_prompt_response:: handle (
@@ -267,7 +230,6 @@ async fn dispatch_client_method<
267230 ctx. bridge ,
268231 )
269232 . await ;
270- None
271233 }
272234 ClientMethod :: TerminalCreate => {
273235 terminal_create:: handle (
@@ -279,7 +241,6 @@ async fn dispatch_client_method<
279241 ctx. serializer ,
280242 )
281243 . await ;
282- None
283244 }
284245 ClientMethod :: TerminalKill => {
285246 terminal_kill:: handle (
@@ -291,12 +252,17 @@ async fn dispatch_client_method<
291252 ctx. serializer ,
292253 )
293254 . await ;
294- None
295255 }
296256 ClientMethod :: TerminalOutput => {
297- Some (
298- terminal_output:: handle ( & payload, ctx. client , parsed. session_id . as_str ( ) ) . await ,
257+ terminal_output:: handle (
258+ & payload,
259+ ctx. client ,
260+ reply. as_deref ( ) ,
261+ ctx. nats ,
262+ parsed. session_id . as_str ( ) ,
263+ ctx. serializer ,
299264 )
265+ . await ;
300266 }
301267 ClientMethod :: TerminalRelease => {
302268 terminal_release:: handle (
@@ -308,7 +274,6 @@ async fn dispatch_client_method<
308274 ctx. serializer ,
309275 )
310276 . await ;
311- None
312277 }
313278 ClientMethod :: TerminalWaitForExit => {
314279 terminal_wait_for_exit:: handle (
@@ -321,51 +286,6 @@ async fn dispatch_client_method<
321286 ctx. serializer ,
322287 )
323288 . await ;
324- None
325- }
326- } ;
327-
328- if let ( Some ( reply_to) , Some ( result) ) = ( reply, response_bytes) {
329- match result {
330- Ok ( bytes) => {
331- let mut headers = headers_with_trace_context ( ) ;
332- headers. insert ( "Content-Type" , rpc_reply:: CONTENT_TYPE_JSON ) ;
333- if let Err ( e) = ctx
334- . nats
335- . publish_with_headers ( reply_to, headers, bytes. into ( ) )
336- . await
337- {
338- error ! ( error = %e, "Failed to publish reply" ) ;
339- }
340- if let Err ( e) = ctx. nats . flush ( ) . await {
341- warn ! ( error = %e, "Failed to flush response" ) ;
342- }
343- }
344- Err ( e) => {
345- error ! (
346- error = %e,
347- method = ?parsed. method,
348- session_id = %parsed. session_id,
349- "Failed to handle client request"
350- ) ;
351- let bytes = jsonrpc_error_response (
352- request_id,
353- ErrorCode :: InternalError ,
354- "Internal error while handling client request" ,
355- ) ;
356- let mut headers = headers_with_trace_context ( ) ;
357- headers. insert ( "Content-Type" , rpc_reply:: CONTENT_TYPE_JSON ) ;
358- if let Err ( e) = ctx
359- . nats
360- . publish_with_headers ( reply_to, headers, bytes)
361- . await
362- {
363- error ! ( error = %e, "Failed to publish error reply" ) ;
364- }
365- if let Err ( e) = ctx. nats . flush ( ) . await {
366- warn ! ( error = %e, "Failed to flush error reply" ) ;
367- }
368- }
369289 }
370290 }
371291}
0 commit comments