@@ -18,7 +18,7 @@ use std::collections::BTreeMap;
1818use std:: hash:: { Hash , Hasher } ;
1919use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
2020use std:: sync:: Arc ;
21- use std:: time:: { Instant , SystemTime } ;
21+ use std:: time:: { Instant , SystemTime , UNIX_EPOCH } ;
2222
2323use async_trait:: async_trait;
2424use futures:: Future ;
@@ -57,6 +57,10 @@ const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60;
5757/// If this changes, remember to change the documentation in the config.
5858const DEFAULT_MAX_JOB_RETRIES : usize = 3 ;
5959
60+ /// Default timeout for actions without any listeners
61+ /// If this changes, remember to change the documentation in the config.
62+ const DEFAULT_DISCONNECT_TIMEOUT_S : u64 = 60 ;
63+
6064/// An action that is being awaited on and last known state.
6165struct AwaitedAction {
6266 action_info : Arc < ActionInfo > ,
@@ -68,14 +72,36 @@ struct AwaitedAction {
6872 /// Possible last error set by the worker. If empty and attempts is set, it may be due to
6973 /// something like a worker timeout.
7074 last_error : Option < Error > ,
75+
76+ /// Updated on every client connect and periodically while it has listeners.
77+ last_update_timestamp : Mutex < u64 >
7178}
7279
80+ impl AwaitedAction {
81+ pub fn set_last_update_timestamp ( & self , timestamp : u64 ) {
82+ let mut guard = self . last_update_timestamp . lock ( ) ;
83+ * guard = timestamp;
84+ }
85+ pub fn get_last_update_timestamp ( & self ) -> u64 {
86+ let guard = self . last_update_timestamp . lock ( ) ;
87+ * guard
88+ }
89+ }
7390/// Holds the relationship of a worker that is executing a specific action.
7491struct RunningAction {
7592 worker_id : WorkerId ,
7693 action : AwaitedAction ,
7794}
7895
96+ impl RunningAction {
97+ pub fn set_last_update_timestamp ( & self , timestamp : u64 ) {
98+ self . action . set_last_update_timestamp ( timestamp) ;
99+ }
100+ pub fn get_last_update_timestamp ( & self ) -> u64 {
101+ self . action . get_last_update_timestamp ( )
102+ }
103+ }
104+
79105struct Workers {
80106 workers : LruCache < WorkerId , Worker > ,
81107 /// The allocation strategy for workers.
@@ -230,6 +256,8 @@ struct SimpleSchedulerImpl {
230256 /// Notify task<->worker matching engine that work needs to be done.
231257 tasks_or_workers_change_notify : Arc < Notify > ,
232258 metrics : Arc < Metrics > ,
259+ /// How long the server will wait for a client to reconnect before removing the action from the queue.
260+ disconnect_timeout_s : u64 ,
233261}
234262
235263impl SimpleSchedulerImpl {
@@ -307,6 +335,7 @@ impl SimpleSchedulerImpl {
307335 notify_channel : tx,
308336 attempts : 0 ,
309337 last_error : None ,
338+ last_update_timestamp : Mutex :: new ( 0 )
310339 } ,
311340 ) ;
312341
@@ -428,6 +457,7 @@ impl SimpleSchedulerImpl {
428457 Ok ( ( ) )
429458 }
430459
460+
431461 // TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we can create a map
432462 // of capabilities of each worker and then try and match the actions to the worker using
433463 // the map lookup (ie. map reduce).
@@ -440,6 +470,15 @@ impl SimpleSchedulerImpl {
440470 let action_infos: Vec < Arc < ActionInfo > > =
441471 self . queued_actions . keys ( ) . rev ( ) . cloned ( ) . collect ( ) ;
442472 for action_info in action_infos {
473+ // add update to queued action update timestamp here
474+ let action = self . queued_actions . get_mut ( & action_info) . unwrap ( ) ;
475+ let now = SystemTime :: now ( ) . duration_since ( UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) ;
476+ if action. notify_channel . receiver_count ( ) > 0 {
477+ action. set_last_update_timestamp ( now)
478+ } else if action. get_last_update_timestamp ( ) + self . disconnect_timeout_s < now {
479+ self . queued_actions_set . remove ( & action_info) ;
480+ self . queued_actions . remove ( & action_info) ;
481+ }
443482 let Some ( awaited_action) = self . queued_actions . get ( action_info. as_ref ( ) ) else {
444483 error ! (
445484 "queued_actions out of sync with itself for action {}" ,
@@ -501,6 +540,17 @@ impl SimpleSchedulerImpl {
501540 } ,
502541 ) ;
503542 }
543+
544+ let mut remove_actions = Vec :: new ( ) ;
545+ let running_actions = & mut self . active_actions . values ( ) . collect :: < Vec < _ > > ( ) ;
546+ for running_action in running_actions {
547+ if running_action. action . notify_channel . receiver_count ( ) > 0 {
548+ running_action. set_last_update_timestamp ( SystemTime :: now ( ) . duration_since ( UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) ) ;
549+ } else if running_action. get_last_update_timestamp ( ) + self . disconnect_timeout_s < SystemTime :: now ( ) . duration_since ( UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) {
550+ remove_actions. push ( running_action. action . action_info . clone ( ) )
551+ }
552+ }
553+ self . active_actions . retain ( |x, _| { !remove_actions. contains ( x) } ) ;
504554 }
505555
506556 fn update_action_with_internal_error (
@@ -694,6 +744,11 @@ impl SimpleScheduler {
694744 max_job_retries = DEFAULT_MAX_JOB_RETRIES ;
695745 }
696746
747+ let mut disconnect_timeout_s = scheduler_cfg. disconnect_timeout_s ;
748+ if disconnect_timeout_s == 0 {
749+ disconnect_timeout_s = DEFAULT_DISCONNECT_TIMEOUT_S ;
750+ }
751+
697752 let tasks_or_workers_change_notify = Arc :: new ( Notify :: new ( ) ) ;
698753
699754 let metrics = Arc :: new ( Metrics :: default ( ) ) ;
@@ -709,6 +764,7 @@ impl SimpleScheduler {
709764 max_job_retries,
710765 tasks_or_workers_change_notify : tasks_or_workers_change_notify. clone ( ) ,
711766 metrics : metrics. clone ( ) ,
767+ disconnect_timeout_s
712768 } ) ) ;
713769 let weak_inner = Arc :: downgrade ( & inner) ;
714770 Self {
0 commit comments