@@ -165,6 +165,14 @@ pub struct SimpleScheduler {
165165 /// When enabled, actions are collected and matched to workers in a single
166166 /// batch operation, reducing lock contention.
167167 enable_batch_worker_matching : bool ,
168+
169+ /// Maximum interval between batch matching cycles.
170+ /// Even without triggers, matching runs at least this often.
171+ batch_interval : Duration ,
172+
173+ /// Debounce window after first trigger.
174+ /// After a notification, wait this long to collect more changes before running.
175+ batch_debounce : Duration ,
168176}
169177
170178impl core:: fmt:: Debug for SimpleScheduler {
@@ -443,7 +451,7 @@ impl SimpleScheduler {
443451 platform_properties_refs. push ( & prepared. action_info . platform_properties ) ;
444452 }
445453
446- // Batch find workers for all actions
454+ // Batch find workers for all actions (single lock acquisition)
447455 let matches = self
448456 . worker_scheduler
449457 . batch_find_workers_for_actions ( & platform_properties_refs, full_worker_logging)
@@ -452,63 +460,74 @@ impl SimpleScheduler {
452460 let matches_count = matches. len ( ) ;
453461 let actions_count = prepared_actions. len ( ) ;
454462
455- // Process the matches
463+ if matches. is_empty ( ) {
464+ return Ok ( ( ) ) ;
465+ }
466+
467+ // Phase 1: Extract operation_ids and assign operations to workers
468+ // Collect successful assignments for batch worker notification
469+ let mut successful_assignments: Vec < ( WorkerId , OperationId , ActionInfoWithProps ) > =
470+ Vec :: with_capacity ( matches_count) ;
456471 let mut result = Ok ( ( ) ) ;
472+
457473 for ( action_idx, worker_id) in matches {
458474 let prepared = & prepared_actions[ action_idx] ;
459475
460- let assign_result = async {
461- // Extract the operation_id from the action_state
462- let operation_id = {
463- let ( action_state, _origin_metadata) = prepared
464- . action_state_result
465- . as_state ( )
466- . await
467- . err_tip ( || "Failed to get action_state in batch mode" ) ?;
468- action_state. client_operation_id . clone ( )
469- } ;
476+ // Extract the operation_id from the action_state
477+ let operation_id = match prepared. action_state_result . as_state ( ) . await {
478+ Ok ( ( action_state, _origin_metadata) ) => action_state. client_operation_id . clone ( ) ,
479+ Err ( err) => {
480+ warn ! ( ?err, "Failed to get action_state in batch mode, skipping" ) ;
481+ continue ;
482+ }
483+ } ;
470484
471- // Tell the matching engine that the operation is being assigned to a worker
472- let assign_result = self
473- . matching_engine_state_manager
474- . assign_operation ( & operation_id, Ok ( & worker_id) )
475- . await
476- . err_tip ( || "Failed to assign operation in do_try_match_batch" ) ;
485+ // Tell the matching engine that the operation is being assigned to a worker
486+ let assign_result = self
487+ . matching_engine_state_manager
488+ . assign_operation ( & operation_id, Ok ( & worker_id) )
489+ . await
490+ . err_tip ( || "Failed to assign operation in do_try_match_batch" ) ;
477491
478- if let Err ( err) = assign_result {
492+ match assign_result {
493+ Ok ( ( ) ) => {
494+ // Assignment successful, queue for batch worker notification
495+ successful_assignments. push ( (
496+ worker_id,
497+ operation_id,
498+ prepared. action_info . clone ( ) ,
499+ ) ) ;
500+ }
501+ Err ( err) => {
479502 if err. code == Code :: Aborted {
480503 // Operation was cancelled, skip it
481- return Ok ( ( ) ) ;
504+ continue ;
482505 }
483- return Err ( err) ;
506+ result = result . merge ( Err ( err) ) ;
484507 }
508+ }
509+ }
485510
486- // Notify the worker to run the action
487- self . worker_scheduler
488- . worker_notify_run_action (
489- worker_id. clone ( ) ,
490- operation_id,
491- prepared. action_info . clone ( ) ,
492- )
493- . await
494- . err_tip ( || "Failed to run worker_notify_run_action in do_try_match_batch" )
495- } ;
496-
497- let ctx = Context :: current_with_baggage ( vec ! [ KeyValue :: new(
498- ENDUSER_ID ,
499- prepared. origin_metadata. identity. clone( ) ,
500- ) ] ) ;
501-
502- result = result. merge (
503- info_span ! ( "do_try_match_batch" )
504- . in_scope ( || assign_result)
505- . with_context ( ctx)
506- . await ,
507- ) ;
511+ // Phase 2: Batch notify workers (single lock acquisition)
512+ if !successful_assignments. is_empty ( ) {
513+ let notify_results = self
514+ . worker_scheduler
515+ . batch_worker_notify_run_action ( successful_assignments)
516+ . await ;
517+
518+ // Merge notification results
519+ for notify_result in notify_results {
520+ result = result. merge (
521+ notify_result
522+ . err_tip ( || "Failed to run batch_worker_notify_run_action in do_try_match_batch" ) ,
523+ ) ;
524+ }
508525 }
509526
510527 let total_elapsed = start. elapsed ( ) ;
511- EXECUTION_METRICS . do_try_match_duration . record ( total_elapsed. as_secs_f64 ( ) , & [ ] ) ;
528+ EXECUTION_METRICS
529+ . do_try_match_duration
530+ . record ( total_elapsed. as_secs_f64 ( ) , & [ ] ) ;
512531 if total_elapsed > Duration :: from_secs ( 5 ) {
513532 warn ! (
514533 total_ms = total_elapsed. as_millis( ) ,
@@ -631,6 +650,11 @@ impl SimpleScheduler {
631650
632651 let worker_scheduler_clone = worker_scheduler. clone ( ) ;
633652
653+ // Capture batch timing parameters for the matching loop
654+ let batch_interval = Duration :: from_millis ( spec. batch_interval_ms ) ;
655+ let batch_debounce = Duration :: from_millis ( spec. batch_debounce_ms ) ;
656+ let enable_batch_worker_matching = spec. enable_batch_worker_matching ;
657+
634658 let action_scheduler = Arc :: new_cyclic ( move |weak_self| -> Self {
635659 let weak_inner = weak_self. clone ( ) ;
636660 let task_worker_matching_spawn =
@@ -639,24 +663,48 @@ impl SimpleScheduler {
639663 let mut worker_match_logging_last: Option <Instant > = None ;
640664 // Break out of the loop only when the inner is dropped.
641665 loop {
642- let task_change_fut = task_change_notify. notified( ) ;
643- let worker_change_fut = worker_change_notify. notified( ) ;
644- tokio:: pin!( task_change_fut) ;
645- tokio:: pin!( worker_change_fut) ;
646- // Wait for either of these futures to be ready.
647- let state_changed = future:: select( task_change_fut, worker_change_fut) ;
648- if last_match_successful {
649- let _ = state_changed. await ;
666+ // Use hybrid timer + debounce approach for batch mode,
667+ // or the original notification-based approach for sequential mode.
668+ if enable_batch_worker_matching {
669+ // Phase 1: Wait for trigger OR batch_interval timeout
670+ let deadline = tokio:: time:: Instant :: now( ) + batch_interval;
671+
672+ let triggered = tokio:: select! {
673+ _ = task_change_notify. notified( ) => true ,
674+ _ = worker_change_notify. notified( ) => true ,
675+ _ = tokio:: time:: sleep_until( deadline) => false ,
676+ } ;
677+
678+ // Phase 2: If triggered, apply debounce window to collect more changes
679+ // But don't exceed the original batch_interval deadline
680+ if triggered && batch_debounce > Duration :: ZERO {
681+ let debounce_until = tokio:: time:: Instant :: now( ) + batch_debounce;
682+ let effective_deadline = debounce_until. min( deadline) ;
683+ tokio:: time:: sleep_until( effective_deadline) . await ;
684+ }
685+
686+ // If last match failed, add extra delay to avoid hard loop
687+ if !last_match_successful {
688+ tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . await ;
689+ }
650690 } else {
651- // If the last match failed, then run again after a short sleep.
652- // This resolves issues where we tried to re-schedule a job to
653- // a disconnected worker. The sleep ensures we don't enter a
654- // hard loop if there's something wrong inside do_try_match.
655- let sleep_fut = tokio:: time:: sleep( Duration :: from_millis( 100 ) ) ;
656- tokio:: pin!( sleep_fut) ;
657- let _ = future:: select( state_changed, sleep_fut) . await ;
691+ // Original notification-based approach for sequential mode
692+ let task_change_fut = task_change_notify. notified( ) ;
693+ let worker_change_fut = worker_change_notify. notified( ) ;
694+ tokio:: pin!( task_change_fut) ;
695+ tokio:: pin!( worker_change_fut) ;
696+ let state_changed = future:: select( task_change_fut, worker_change_fut) ;
697+ if last_match_successful {
698+ let _ = state_changed. await ;
699+ } else {
700+ // If the last match failed, then run again after a short sleep.
701+ let sleep_fut = tokio:: time:: sleep( Duration :: from_millis( 100 ) ) ;
702+ tokio:: pin!( sleep_fut) ;
703+ let _ = future:: select( state_changed, sleep_fut) . await ;
704+ }
658705 }
659706
707+ // Phase 3: Run the matching
660708 let result = match weak_inner. upgrade( ) {
661709 Some ( scheduler) => {
662710 let now = Instant :: now( ) ;
@@ -775,6 +823,8 @@ impl SimpleScheduler {
775823 task_worker_matching_spawn,
776824 worker_match_logging_interval,
777825 enable_batch_worker_matching : spec. enable_batch_worker_matching ,
826+ batch_interval : Duration :: from_millis ( spec. batch_interval_ms ) ,
827+ batch_debounce : Duration :: from_millis ( spec. batch_debounce_ms ) ,
778828 }
779829 } ) ;
780830 ( action_scheduler, worker_scheduler_clone)
0 commit comments