@@ -82,7 +82,7 @@ pub async fn handle<N: RequestClient + PublishClient + FlushClient, C: GetElapse
8282mod tests {
8383 use super :: Bridge ;
8484 use crate :: config:: Config ;
85- use agent_client_protocol:: { Agent , CancelNotification , ErrorCode } ;
85+ use agent_client_protocol:: { Agent , CancelNotification , ErrorCode , SessionId , StopReason } ;
8686 use opentelemetry:: Value ;
8787 use opentelemetry:: metrics:: MeterProvider ;
8888 use opentelemetry_sdk:: metrics:: data:: { AggregatedMetrics , MetricData } ;
@@ -91,6 +91,7 @@ mod tests {
9191 } ;
9292 use std:: time:: Duration ;
9393 use trogon_nats:: AdvancedMockNatsClient ;
94+ use trogon_std:: time:: MockClock ;
9495
9596 fn mock_bridge ( ) -> (
9697 AdvancedMockNatsClient ,
@@ -129,6 +130,22 @@ mod tests {
129130 ( mock, bridge, exporter, provider)
130131 }
131132
133+ fn mock_bridge_with_clock ( ) -> (
134+ AdvancedMockNatsClient ,
135+ MockClock ,
136+ Bridge < AdvancedMockNatsClient , MockClock > ,
137+ ) {
138+ let mock = AdvancedMockNatsClient :: new ( ) ;
139+ let clock = MockClock :: new ( ) ;
140+ let bridge = Bridge :: new (
141+ mock. clone ( ) ,
142+ clock. clone ( ) ,
143+ & opentelemetry:: global:: meter ( "acp-nats-test" ) ,
144+ Config :: for_test ( "acp" ) ,
145+ ) ;
146+ ( mock, clock, bridge)
147+ }
148+
132149 fn has_request_metric (
133150 finished_metrics : & [ opentelemetry_sdk:: metrics:: data:: ResourceMetrics ] ,
134151 method : & str ,
@@ -319,4 +336,102 @@ mod tests {
319336 ) ) ;
320337 provider. shutdown ( ) . unwrap ( ) ;
321338 }
339+
340+ #[ tokio:: test]
341+ async fn cancel_marks_session_as_cancelled ( ) {
342+ let ( _mock, _clock, bridge) = mock_bridge_with_clock ( ) ;
343+ let session_id = "cancel-session-001" ;
344+
345+ assert ! (
346+ bridge
347+ . cancelled_sessions
348+ . take_if_cancelled( & session_id. into( ) , & bridge. clock)
349+ . is_none( )
350+ ) ;
351+
352+ let notification = CancelNotification :: new ( session_id) ;
353+ bridge. cancel ( notification) . await . unwrap ( ) ;
354+
355+ assert ! (
356+ bridge
357+ . cancelled_sessions
358+ . take_if_cancelled( & session_id. into( ) , & bridge. clock)
359+ . is_some( )
360+ ) ;
361+ }
362+
363+ #[ tokio:: test]
364+ async fn cancel_resolves_pending_prompt_waiter_with_cancelled ( ) {
365+ let ( _mock, _clock, bridge) = mock_bridge_with_clock ( ) ;
366+ let session_id: SessionId = "cancel-session-002" . into ( ) ;
367+
368+ let ( rx, _guard, _token) = bridge
369+ . pending_session_prompt_responses
370+ . register_waiter ( session_id. clone ( ) )
371+ . unwrap ( ) ;
372+
373+ let notification = CancelNotification :: new ( session_id. clone ( ) ) ;
374+ bridge. cancel ( notification) . await . unwrap ( ) ;
375+
376+ let response = rx
377+ . await
378+ . expect ( "Should receive cancelled response" )
379+ . expect ( "Prompt waiter should receive success response" ) ;
380+ assert_eq ! ( response. stop_reason, StopReason :: Cancelled ) ;
381+ }
382+
383+ #[ tokio:: test]
384+ async fn cancel_publishes_to_nats ( ) {
385+ let ( mock, _clock, bridge) = mock_bridge_with_clock ( ) ;
386+ let session_id = "cancel-session-003" ;
387+
388+ let notification = CancelNotification :: new ( session_id) ;
389+ bridge. cancel ( notification) . await . unwrap ( ) ;
390+
391+ let published = mock. published_messages ( ) ;
392+ assert ! (
393+ published. iter( ) . any( |s| s. contains( "session.cancel" ) ) ,
394+ "Expected cancel publish, got: {:?}" ,
395+ published
396+ ) ;
397+ }
398+
399+ #[ tokio:: test]
400+ async fn cancel_session_evicts_expired_on_mark ( ) {
401+ let ( _mock, clock, bridge) = mock_bridge_with_clock ( ) ;
402+
403+ let session_old: SessionId = "old-session" . into ( ) ;
404+ let session_new: SessionId = "new-session" . into ( ) ;
405+
406+ bridge
407+ . cancelled_sessions
408+ . mark_cancelled ( session_old. clone ( ) , & bridge. clock ) ;
409+
410+ clock. advance ( Duration :: from_secs ( 301 ) ) ;
411+
412+ for idx in 0 ..15 {
413+ let filler_session: SessionId = format ! ( "filler-{idx}" ) . into ( ) ;
414+ bridge
415+ . cancelled_sessions
416+ . mark_cancelled ( filler_session, & bridge. clock ) ;
417+ }
418+
419+ bridge
420+ . cancelled_sessions
421+ . mark_cancelled ( session_new. clone ( ) , & bridge. clock ) ;
422+
423+ assert ! (
424+ bridge
425+ . cancelled_sessions
426+ . take_if_cancelled( & session_old, & bridge. clock)
427+ . is_none( )
428+ ) ;
429+
430+ assert ! (
431+ bridge
432+ . cancelled_sessions
433+ . take_if_cancelled( & session_new, & bridge. clock)
434+ . is_some( )
435+ ) ;
436+ }
322437}
0 commit comments