@@ -18,7 +18,7 @@ use std::collections::BTreeMap;
1818use std:: hash:: { Hash , Hasher } ;
1919use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
2020use std:: sync:: Arc ;
21- use std:: time:: { Instant , SystemTime } ;
21+ use std:: time:: { Instant , SystemTime , UNIX_EPOCH } ;
2222
2323use async_trait:: async_trait;
2424use futures:: Future ;
@@ -57,6 +57,10 @@ const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60;
5757/// If this changes, remember to change the documentation in the config.
5858const DEFAULT_MAX_JOB_RETRIES : usize = 3 ;
5959
60+ /// Default timeout for actions without any listeners
61+ /// If this changes, remember to change the documentation in the config.
62+ const DEFAULT_DISCONNECT_TIMEOUT_S : u64 = 60 ;
63+
6064/// An action that is being awaited on and last known state.
6165struct AwaitedAction {
6266 action_info : Arc < ActionInfo > ,
@@ -71,6 +75,19 @@ struct AwaitedAction {
7175
7276 /// Worker that is currently running this action, None if unassigned.
7377 worker_id : Option < WorkerId > ,
78+
79+ /// Updated on every client connect and periodically while it has listeners.
80+ last_update_timestamp : Arc < AtomicU64 > ,
81+ }
82+
83+ impl AwaitedAction {
84+ pub fn set_last_update_timestamp ( & self , timestamp : u64 ) {
85+ self . last_update_timestamp
86+ . store ( timestamp, Ordering :: Relaxed ) ;
87+ }
88+ pub fn get_last_update_timestamp ( & self ) -> u64 {
89+ self . last_update_timestamp . load ( Ordering :: Relaxed )
90+ }
7491}
7592
7693struct Workers {
@@ -227,6 +244,8 @@ struct SimpleSchedulerImpl {
227244 /// Notify task<->worker matching engine that work needs to be done.
228245 tasks_or_workers_change_notify : Arc < Notify > ,
229246 metrics : Arc < Metrics > ,
247+ /// How long the server will wait for a client to reconnect before removing the action from the queue.
248+ disconnect_timeout_s : u64 ,
230249}
231250
232251impl SimpleSchedulerImpl {
@@ -305,6 +324,7 @@ impl SimpleSchedulerImpl {
305324 attempts : 0 ,
306325 last_error : None ,
307326 worker_id : None ,
327+ last_update_timestamp : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
308328 } ,
309329 ) ;
310330
@@ -340,6 +360,20 @@ impl SimpleSchedulerImpl {
340360 . map ( Self :: subscribe_to_channel)
341361 }
342362
363+ fn set_action_last_update_for_test (
364+ & self ,
365+ unique_qualifier : & ActionInfoHashKey ,
366+ timestamp : u64 ,
367+ ) {
368+ let awaited_action = self
369+ . queued_actions_set
370+ . get ( unique_qualifier)
371+ . and_then ( |action_info| self . queued_actions . get ( action_info) )
372+ . or_else ( || self . active_actions . get ( unique_qualifier) )
373+ . expect ( "Could not find action" ) ;
374+ awaited_action. set_last_update_timestamp ( timestamp)
375+ }
376+
343377 fn retry_action ( & mut self , action_info : & Arc < ActionInfo > , worker_id : & WorkerId , err : Error ) {
344378 match self . active_actions . remove ( action_info) {
345379 Some ( running_action) => {
@@ -434,6 +468,22 @@ impl SimpleSchedulerImpl {
434468 let action_infos: Vec < Arc < ActionInfo > > =
435469 self . queued_actions . keys ( ) . rev ( ) . cloned ( ) . collect ( ) ;
436470 for action_info in action_infos {
471+ // add update to queued action update timestamp here
472+ let action = self . queued_actions . get_mut ( & action_info) . unwrap ( ) ;
473+ let now = SystemTime :: now ( )
474+ . duration_since ( UNIX_EPOCH )
475+ . unwrap ( )
476+ . as_secs ( ) ;
477+ if action. notify_channel . receiver_count ( ) > 0 {
478+ action. set_last_update_timestamp ( now) ;
479+ } else if action. get_last_update_timestamp ( ) + self . disconnect_timeout_s < now {
480+ warn ! (
481+ "Client disconnect timeout elapsed - Removing action with digest hash {}" ,
482+ action_info. unique_qualifier. digest. hash_str( )
483+ ) ;
484+ self . queued_actions_set . remove ( & action_info) ;
485+ self . queued_actions . remove ( & action_info) ;
486+ }
437487 let Some ( awaited_action) = self . queued_actions . get ( action_info. as_ref ( ) ) else {
438488 error ! (
439489 "queued_actions out of sync with itself for action {}" ,
@@ -490,6 +540,21 @@ impl SimpleSchedulerImpl {
490540 awaited_action. attempts += 1 ;
491541 self . active_actions . insert ( action_info, awaited_action) ;
492542 }
543+
544+ let mut remove_actions = Vec :: new ( ) ;
545+ for running_action in self . active_actions . values ( ) {
546+ let now = SystemTime :: now ( )
547+ . duration_since ( UNIX_EPOCH )
548+ . unwrap ( )
549+ . as_secs ( ) ;
550+ if running_action. notify_channel . receiver_count ( ) > 0 {
551+ running_action. set_last_update_timestamp ( now) ;
552+ } else if running_action. get_last_update_timestamp ( ) + self . disconnect_timeout_s < now {
553+ remove_actions. push ( running_action. action_info . clone ( ) )
554+ }
555+ }
556+ self . active_actions
557+ . retain ( |x, _| !remove_actions. contains ( x) ) ;
493558 }
494559
495560 fn update_action_with_internal_error (
@@ -688,6 +753,11 @@ impl SimpleScheduler {
688753 max_job_retries = DEFAULT_MAX_JOB_RETRIES ;
689754 }
690755
756+ let mut disconnect_timeout_s = scheduler_cfg. disconnect_timeout_s ;
757+ if disconnect_timeout_s == 0 {
758+ disconnect_timeout_s = DEFAULT_DISCONNECT_TIMEOUT_S ;
759+ }
760+
691761 let tasks_or_workers_change_notify = Arc :: new ( Notify :: new ( ) ) ;
692762
693763 let metrics = Arc :: new ( Metrics :: default ( ) ) ;
@@ -703,6 +773,7 @@ impl SimpleScheduler {
703773 max_job_retries,
704774 tasks_or_workers_change_notify : tasks_or_workers_change_notify. clone ( ) ,
705775 metrics : metrics. clone ( ) ,
776+ disconnect_timeout_s,
706777 } ) ) ;
707778 let weak_inner = Arc :: downgrade ( & inner) ;
708779 Self {
@@ -735,6 +806,15 @@ impl SimpleScheduler {
735806 }
736807 }
737808
809+ pub fn set_action_last_update_for_test (
810+ & self ,
811+ unique_qualifier : & ActionInfoHashKey ,
812+ timestamp : u64 ,
813+ ) {
814+ let inner = self . get_inner_lock ( ) ;
815+ inner. set_action_last_update_for_test ( unique_qualifier, timestamp)
816+ }
817+
738818 /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
739819 #[ must_use]
740820 pub fn contains_worker_for_test ( & self , worker_id : & WorkerId ) -> bool {
0 commit comments