1+ use acp_nats:: jetstream:: consumers:: commands_observer;
2+ use acp_nats:: jetstream:: streams:: commands_stream_name;
3+ use acp_nats:: nats:: agent:: wildcards:: GlobalAllSubject ;
4+ use acp_nats:: nats:: session:: wildcards:: { AllAgentExtSubject , AllAgentSubject } ;
15use acp_nats:: nats:: {
26 GlobalAgentMethod , ParsedAgentSubject , SessionAgentMethod , parse_agent_subject,
37} ;
4- use acp_nats:: { AcpPrefix , AcpSessionId , NatsClientProxy } ;
8+ use acp_nats:: {
9+ AcpPrefix , AcpSessionId , NatsClientProxy , PromptResponseSubject , ReqId , ResponseSubject ,
10+ } ;
511use agent_client_protocol:: {
612 Agent , AuthenticateRequest , CancelNotification , CloseSessionRequest , ExtNotification ,
713 ExtRequest , ForkSessionRequest , InitializeRequest , ListSessionsRequest , LoadSessionRequest ,
@@ -158,8 +164,8 @@ where
158164 N : SubscribeClient + PublishClient + FlushClient + Clone + ' static ,
159165 A : Agent + ' static ,
160166{
161- let global_wildcard = acp_nats :: nats :: agent :: wildcards :: GlobalAllSubject :: new ( prefix) ;
162- let session_wildcard = acp_nats :: nats :: session :: wildcards :: AllAgentSubject :: new ( prefix) ;
167+ let global_wildcard = GlobalAllSubject :: new ( prefix) ;
168+ let session_wildcard = AllAgentSubject :: new ( prefix) ;
163169
164170 info ! (
165171 global = %global_wildcard,
@@ -204,8 +210,8 @@ where
204210 N : SubscribeClient + PublishClient + FlushClient + Clone + ' static ,
205211 A : Agent + ' static ,
206212{
207- let global_wildcard = acp_nats :: nats :: agent :: wildcards :: GlobalAllSubject :: new ( prefix) ;
208- let ext_wildcard = acp_nats :: nats :: session :: wildcards :: AllAgentExtSubject :: new ( prefix) ;
213+ let global_wildcard = GlobalAllSubject :: new ( prefix) ;
214+ let ext_wildcard = AllAgentExtSubject :: new ( prefix) ;
209215
210216 info ! (
211217 global = %global_wildcard,
@@ -469,9 +475,8 @@ where
469475 } ;
470476 }
471477 _ = keepalive. tick( ) => {
472- if let Err ( e) = js_msg. ack_with( AckKind :: Progress ) . await {
473- warn!( error = %e, "Failed to send in_progress keepalive" ) ;
474- }
478+ let _ = js_msg. ack_with( AckKind :: Progress ) . await
479+ . inspect_err( |e| warn!( error = %e, "Failed to send in_progress keepalive" ) ) ;
475480 }
476481 }
477482 }
@@ -490,8 +495,8 @@ where
490495 trogon_nats:: jetstream:: JsMessageOf < J > : JsDispatchMessage ,
491496 A : Agent + ' static ,
492497{
493- let stream_name = acp_nats :: jetstream :: streams :: commands_stream_name ( prefix) ;
494- let config = acp_nats :: jetstream :: consumers :: commands_observer ( ) ;
498+ let stream_name = commands_stream_name ( prefix) ;
499+ let config = commands_observer ( ) ;
495500
496501 info ! ( stream = %stream_name, "Starting JetStream consumer for COMMANDS stream" ) ;
497502
@@ -567,18 +572,14 @@ async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent, M: JsDisp
567572 . headers
568573 . as_ref ( )
569574 . and_then ( |h| h. get ( trogon_nats:: REQ_ID_HEADER ) )
570- . map ( |v| acp_nats :: ReqId :: from_header ( v. as_str ( ) ) ) ;
575+ . map ( |v| ReqId :: from_header ( v. as_str ( ) ) ) ;
571576
572577 let reply_subject: Option < String > = match ( & req_id, & method) {
573- ( Some ( rid) , SessionAgentMethod :: Prompt ) => Some (
574- acp_nats:: nats:: session:: agent:: PromptResponseSubject :: new ( prefix, & session_id, rid)
575- . to_string ( ) ,
576- ) ,
578+ ( Some ( rid) , SessionAgentMethod :: Prompt ) => {
579+ Some ( PromptResponseSubject :: new ( prefix, & session_id, rid) . to_string ( ) )
580+ }
577581 ( _, SessionAgentMethod :: Cancel ) => None ,
578- ( Some ( rid) , _) => Some (
579- acp_nats:: nats:: session:: agent:: ResponseSubject :: new ( prefix, & session_id, rid)
580- . to_string ( ) ,
581- ) ,
582+ ( Some ( rid) , _) => Some ( ResponseSubject :: new ( prefix, & session_id, rid) . to_string ( ) ) ,
582583 ( None , _) => {
583584 warn ! ( subject, "JetStream message missing X-Req-Id header" ) ;
584585 None
@@ -673,20 +674,20 @@ async fn dispatch_js_message<N: PublishClient + FlushClient, A: Agent, M: JsDisp
673674 }
674675 }
675676 Err ( DispatchError :: NotificationHandler ( _) ) => {
676- if let Err ( e ) = js_msg. ack ( ) . await {
677- warn ! ( subject, error = %e, "Failed to ack after notification handler error" ) ;
678- }
677+ let _ = js_msg. ack ( ) . await . inspect_err (
678+ |e| warn ! ( subject, error = %e, "Failed to ack after notification handler error" ) ,
679+ ) ;
679680 }
680681 }
681682
682- if let Err ( e ) = result {
683+ let _ = result. inspect_err ( |e| {
683684 warn ! (
684685 subject,
685686 session_id = session_id. as_str( ) ,
686687 error = %e,
687688 "Error handling JetStream request"
688689 ) ;
689- }
690+ } ) ;
690691}
691692
692693#[ cfg( test) ]
@@ -702,13 +703,23 @@ mod tests {
702703 struct MockAgent {
703704 initialized : RefCell < bool > ,
704705 cancelled : RefCell < Vec < String > > ,
706+ fail_cancel : bool ,
705707 }
706708
707709 impl MockAgent {
708710 fn new ( ) -> Self {
709711 Self {
710712 initialized : RefCell :: new ( false ) ,
711713 cancelled : RefCell :: new ( Vec :: new ( ) ) ,
714+ fail_cancel : false ,
715+ }
716+ }
717+
718+ fn failing_cancel ( ) -> Self {
719+ Self {
720+ initialized : RefCell :: new ( false ) ,
721+ cancelled : RefCell :: new ( Vec :: new ( ) ) ,
722+ fail_cancel : true ,
712723 }
713724 }
714725 }
@@ -754,6 +765,9 @@ mod tests {
754765 }
755766
756767 async fn cancel ( & self , args : CancelNotification ) -> agent_client_protocol:: Result < ( ) > {
768+ if self . fail_cancel {
769+ return Err ( AcpError :: internal_error ( ) ) ;
770+ }
757771 self . cancelled
758772 . borrow_mut ( )
759773 . push ( args. session_id . to_string ( ) ) ;
@@ -1844,6 +1858,32 @@ mod tests {
18441858 dispatch_js_message ( js_msg, & agent, & nats, & test_prefix ( ) ) . await ;
18451859 }
18461860
1861+ #[ tokio:: test]
1862+ async fn dispatch_js_message_cancel_notification_handler_error_ack_failure ( ) {
1863+ use tracing_subscriber:: util:: SubscriberInitExt ;
1864+ let _guard = tracing_subscriber:: fmt ( ) . with_test_writer ( ) . set_default ( ) ;
1865+
1866+ let nats = MockNatsClient :: new ( ) ;
1867+ let agent = MockAgent :: failing_cancel ( ) ;
1868+ let payload = serialize ( & CancelNotification :: new ( "s1" ) ) ;
1869+ let js_msg = MockJsMessage :: with_failing_signals ( async_nats:: Message {
1870+ subject : "acp.session.s1.agent.cancel" . into ( ) ,
1871+ reply : None ,
1872+ payload : Bytes :: copy_from_slice ( & payload) ,
1873+ headers : None ,
1874+ status : None ,
1875+ description : None ,
1876+ length : payload. len ( ) ,
1877+ } ) ;
1878+ dispatch_js_message ( js_msg, & agent, & nats, & test_prefix ( ) ) . await ;
1879+ }
1880+
1881+ fn init_handler_error (
1882+ _: InitializeRequest ,
1883+ ) -> std:: future:: Ready < agent_client_protocol:: Result < InitializeResponse > > {
1884+ std:: future:: ready ( Err ( AcpError :: internal_error ( ) ) )
1885+ }
1886+
18471887 #[ tokio:: test]
18481888 async fn handle_request_with_keepalive_completes_fast ( ) {
18491889 let nats = MockNatsClient :: new ( ) ;
@@ -1871,12 +1911,7 @@ mod tests {
18711911 ) ) ;
18721912 let msg = make_nats_message ( "acp.agent.initialize" , & payload, None ) ;
18731913 let js_msg = make_js_msg ( "acp.agent.initialize" , & payload, None ) ;
1874-
1875- let result =
1876- handle_request_with_keepalive ( & msg, & nats, & js_msg, |_: InitializeRequest | async {
1877- Err :: < InitializeResponse , _ > ( agent_client_protocol:: Error :: new ( -1 , "not called" ) )
1878- } )
1879- . await ;
1914+ let result = handle_request_with_keepalive ( & msg, & nats, & js_msg, init_handler_error) . await ;
18801915 assert ! ( result. is_err( ) ) ;
18811916 }
18821917
@@ -1885,15 +1920,23 @@ mod tests {
18851920 let nats = MockNatsClient :: new ( ) ;
18861921 let msg = make_nats_message ( "acp.agent.initialize" , b"not json" , Some ( "_INBOX.1" ) ) ;
18871922 let js_msg = make_js_msg ( "acp.agent.initialize" , b"not json" , Some ( "_INBOX.1" ) ) ;
1888-
1889- let result =
1890- handle_request_with_keepalive ( & msg, & nats, & js_msg, |_: InitializeRequest | async {
1891- Err :: < InitializeResponse , _ > ( agent_client_protocol:: Error :: new ( -1 , "not called" ) )
1892- } )
1893- . await ;
1923+ let result = handle_request_with_keepalive ( & msg, & nats, & js_msg, init_handler_error) . await ;
18941924 assert ! ( result. is_err( ) ) ;
18951925 }
18961926
1927+ #[ tokio:: test]
1928+ async fn handle_request_with_keepalive_handler_returns_error ( ) {
1929+ let nats = MockNatsClient :: new ( ) ;
1930+ let payload = serialize ( & InitializeRequest :: new (
1931+ agent_client_protocol:: ProtocolVersion :: V0 ,
1932+ ) ) ;
1933+ let msg = make_nats_message ( "acp.agent.initialize" , & payload, Some ( "_INBOX.1" ) ) ;
1934+ let js_msg = make_js_msg ( "acp.agent.initialize" , & payload, Some ( "_INBOX.1" ) ) ;
1935+ let result = handle_request_with_keepalive ( & msg, & nats, & js_msg, init_handler_error) . await ;
1936+ assert ! ( result. is_ok( ) ) ;
1937+ assert ! ( !nats. published_messages( ) . is_empty( ) ) ;
1938+ }
1939+
18971940 #[ tokio:: test( start_paused = true ) ]
18981941 async fn handle_request_with_keepalive_progress_ack_failure ( ) {
18991942 use tracing_subscriber:: util:: SubscriberInitExt ;
0 commit comments