@@ -17,7 +17,7 @@ use std::sync::Arc;
1717use std:: time:: { Instant , SystemTime } ;
1818
1919use async_trait:: async_trait;
20- use futures:: { Future , StreamExt , future } ;
20+ use futures:: { future , Future , StreamExt } ;
2121use nativelink_config:: schedulers:: SimpleSpec ;
2222use nativelink_error:: { Code , Error , ResultExt } ;
2323use nativelink_metric:: { MetricsComponent , RootMetricsComponent } ;
@@ -30,14 +30,15 @@ use nativelink_util::operation_state_manager::{
3030 OperationFilter , OperationStageFlags , OrderDirection , UpdateOperationType ,
3131} ;
3232use nativelink_util:: origin_event:: OriginMetadata ;
33+ use nativelink_util:: platform_properties:: PlatformProperties ;
3334use nativelink_util:: shutdown_guard:: ShutdownGuard ;
3435use nativelink_util:: spawn;
3536use nativelink_util:: task:: JoinHandleDropGuard ;
36- use opentelemetry:: KeyValue ;
3737use opentelemetry:: baggage:: BaggageExt ;
3838use opentelemetry:: context:: { Context , FutureExt as OtelFutureExt } ;
39+ use opentelemetry:: KeyValue ;
3940use opentelemetry_semantic_conventions:: attribute:: ENDUSER_ID ;
40- use tokio:: sync:: { Notify , mpsc } ;
41+ use tokio:: sync:: { mpsc , Notify } ;
4142use tokio:: time:: Duration ;
4243use tracing:: { error, info, info_span, warn} ;
4344
@@ -48,7 +49,9 @@ use crate::simple_scheduler_state_manager::{SchedulerStateManager, SimpleSchedul
4849use crate :: worker:: { ActionInfoWithProps , ActionsState , Worker , WorkerState , WorkerTimestamp } ;
4950use crate :: worker_registry:: WorkerRegistry ;
5051use crate :: worker_scheduler:: WorkerScheduler ;
52+ use nativelink_util:: metrics:: StoreType :: Metrics ;
5153use serde:: Serialize ;
54+ use nativelink_util:: metrics:: EXECUTION_METRICS ;
5255
5356/// Default timeout for workers in seconds.
5457/// If this changes, remember to change the documentation in the config.
@@ -157,6 +160,11 @@ pub struct SimpleScheduler {
157160 /// e.g. "worker busy", "can't find any worker"
158161 /// Set to None to disable. This is quite noisy, so we limit it
159162 worker_match_logging_interval : Option < Duration > ,
163+
164+ /// Whether to use batch worker matching optimization.
165+ /// When enabled, actions are collected and matched to workers in a single
166+ /// batch operation, reducing lock contention.
167+ enable_batch_worker_matching : bool ,
160168}
161169
162170impl core:: fmt:: Debug for SimpleScheduler {
@@ -344,6 +352,7 @@ impl SimpleScheduler {
344352 }
345353
346354 let total_elapsed = start. elapsed ( ) ;
355+ EXECUTION_METRICS . do_try_match_duration . record ( total_elapsed. as_secs_f64 ( ) , & [ ] ) ;
347356 if total_elapsed > Duration :: from_secs ( 5 ) {
348357 warn ! (
349358 total_ms = total_elapsed. as_millis( ) ,
@@ -354,6 +363,173 @@ impl SimpleScheduler {
354363
355364 result
356365 }
366+
367+ /// Batch version of `do_try_match` that collects all queued actions and matches
368+ /// them to workers in a single batch operation. This reduces lock contention
369+ /// compared to the sequential version.
370+ async fn do_try_match_batch ( & self , full_worker_logging : bool ) -> Result < ( ) , Error > {
371+ let start = Instant :: now ( ) ;
372+
373+ // Collect all queued actions
374+ let stream = self
375+ . get_queued_operations ( )
376+ . await
377+ . err_tip ( || "Failed to get queued operations in do_try_match_batch" ) ?;
378+
379+ let query_elapsed = start. elapsed ( ) ;
380+ if query_elapsed > Duration :: from_secs ( 1 ) {
381+ warn ! (
382+ elapsed_ms = query_elapsed. as_millis( ) ,
383+ "Slow get_queued_operations query in batch mode"
384+ ) ;
385+ }
386+
387+ // Collect all action state results and compute their platform properties
388+ let action_state_results: Vec < _ > = stream. collect ( ) . await ;
389+
390+ if action_state_results. is_empty ( ) {
391+ return Ok ( ( ) ) ;
392+ }
393+
394+ // Prepare actions with their platform properties for batch matching
395+ struct PreparedAction {
396+ action_state_result : Box < dyn ActionStateResult > ,
397+ action_info : ActionInfoWithProps ,
398+ origin_metadata : OriginMetadata ,
399+ }
400+
401+ let mut prepared_actions: Vec < PreparedAction > = Vec :: with_capacity ( action_state_results. len ( ) ) ;
402+ let mut platform_properties_refs: Vec < & PlatformProperties > = Vec :: with_capacity ( action_state_results. len ( ) ) ;
403+
404+ for action_state_result in action_state_results {
405+ let ( action_info, maybe_origin_metadata) = match action_state_result
406+ . as_action_info ( )
407+ . await
408+ {
409+ Ok ( result) => result,
410+ Err ( err) => {
411+ warn ! ( ?err, "Failed to get action_info in batch mode, skipping" ) ;
412+ continue ;
413+ }
414+ } ;
415+
416+ // TODO(palfrey) We should not compute this every time and instead store
417+ // it with the ActionInfo when we receive it.
418+ let platform_properties = match self
419+ . platform_property_manager
420+ . make_platform_properties ( action_info. platform_properties . clone ( ) )
421+ {
422+ Ok ( props) => props,
423+ Err ( err) => {
424+ warn ! ( ?err, "Failed to make platform properties in batch mode, skipping" ) ;
425+ continue ;
426+ }
427+ } ;
428+
429+ let action_info_with_props = ActionInfoWithProps {
430+ inner : action_info,
431+ platform_properties,
432+ } ;
433+
434+ prepared_actions. push ( PreparedAction {
435+ action_state_result,
436+ action_info : action_info_with_props,
437+ origin_metadata : maybe_origin_metadata. unwrap_or_default ( ) ,
438+ } ) ;
439+ }
440+
441+ // Collect platform properties references for batch matching
442+ for prepared in & prepared_actions {
443+ platform_properties_refs. push ( & prepared. action_info . platform_properties ) ;
444+ }
445+
446+ // Batch find workers for all actions
447+ let matches = self
448+ . worker_scheduler
449+ . batch_find_workers_for_actions ( & platform_properties_refs, full_worker_logging)
450+ . await ;
451+
452+ let matches_count = matches. len ( ) ;
453+ let actions_count = prepared_actions. len ( ) ;
454+
455+ // Process the matches
456+ let mut result = Ok ( ( ) ) ;
457+ for ( action_idx, worker_id) in matches {
458+ let prepared = & prepared_actions[ action_idx] ;
459+
460+ let assign_result = async {
461+ // Extract the operation_id from the action_state
462+ let operation_id = {
463+ let ( action_state, _origin_metadata) = prepared
464+ . action_state_result
465+ . as_state ( )
466+ . await
467+ . err_tip ( || "Failed to get action_state in batch mode" ) ?;
468+ action_state. client_operation_id . clone ( )
469+ } ;
470+
471+ // Tell the matching engine that the operation is being assigned to a worker
472+ let assign_result = self
473+ . matching_engine_state_manager
474+ . assign_operation ( & operation_id, Ok ( & worker_id) )
475+ . await
476+ . err_tip ( || "Failed to assign operation in do_try_match_batch" ) ;
477+
478+ if let Err ( err) = assign_result {
479+ if err. code == Code :: Aborted {
480+ // Operation was cancelled, skip it
481+ return Ok ( ( ) ) ;
482+ }
483+ return Err ( err) ;
484+ }
485+
486+ // Notify the worker to run the action
487+ self . worker_scheduler
488+ . worker_notify_run_action (
489+ worker_id. clone ( ) ,
490+ operation_id,
491+ prepared. action_info . clone ( ) ,
492+ )
493+ . await
494+ . err_tip ( || "Failed to run worker_notify_run_action in do_try_match_batch" )
495+ } ;
496+
497+ let ctx = Context :: current_with_baggage ( vec ! [ KeyValue :: new(
498+ ENDUSER_ID ,
499+ prepared. origin_metadata. identity. clone( ) ,
500+ ) ] ) ;
501+
502+ result = result. merge (
503+ info_span ! ( "do_try_match_batch" )
504+ . in_scope ( || assign_result)
505+ . with_context ( ctx)
506+ . await ,
507+ ) ;
508+ }
509+
510+ let total_elapsed = start. elapsed ( ) ;
511+ EXECUTION_METRICS . do_try_match_duration . record ( total_elapsed. as_secs_f64 ( ) , & [ ] ) ;
512+ if total_elapsed > Duration :: from_secs ( 5 ) {
513+ warn ! (
514+ total_ms = total_elapsed. as_millis( ) ,
515+ query_ms = query_elapsed. as_millis( ) ,
516+ actions_processed = actions_count,
517+ matches_found = matches_count,
518+ "Slow do_try_match_batch cycle"
519+ ) ;
520+ }
521+
522+ result
523+ }
524+
525+ /// Internal method that dispatches to either batch or sequential matching.
526+ async fn do_try_match_internal ( & self , full_worker_logging : bool ) -> Result < ( ) , Error > {
527+ if self . enable_batch_worker_matching {
528+ self . do_try_match_batch ( full_worker_logging) . await
529+ } else {
530+ self . do_try_match ( full_worker_logging) . await
531+ }
532+ }
357533}
358534
359535impl SimpleScheduler {
@@ -494,7 +670,7 @@ impl SimpleScheduler {
494670 }
495671 } ;
496672
497- let res = scheduler. do_try_match ( full_worker_logging) . await ;
673+ let res = scheduler. do_try_match_internal ( full_worker_logging) . await ;
498674 if full_worker_logging {
499675 let operations_stream = scheduler
500676 . matching_engine_state_manager
@@ -598,6 +774,7 @@ impl SimpleScheduler {
598774 maybe_origin_event_tx,
599775 task_worker_matching_spawn,
600776 worker_match_logging_interval,
777+ enable_batch_worker_matching : spec. enable_batch_worker_matching ,
601778 }
602779 } ) ;
603780 ( action_scheduler, worker_scheduler_clone)
0 commit comments