1313// limitations under the License.
1414
1515use core:: ops:: { Deref , DerefMut } ;
16+ use core:: sync:: atomic:: { AtomicU64 , Ordering } ;
1617use core:: time:: Duration ;
1718use std:: sync:: Arc ;
18- use std:: time:: UNIX_EPOCH ;
19+ use std:: time:: { Instant , UNIX_EPOCH } ;
1920
2021use async_lock:: Mutex ;
2122use lru:: LruCache ;
@@ -33,8 +34,34 @@ use tokio::sync::Notify;
3334use tonic:: async_trait;
3435use tracing:: { error, info, trace, warn} ;
3536
37+ /// Metrics for tracking scheduler performance.
38+ #[ derive( Debug , Default ) ]
39+ pub struct SchedulerMetrics {
40+ /// Total number of worker additions.
41+ pub workers_added : AtomicU64 ,
42+ /// Total number of worker removals.
43+ pub workers_removed : AtomicU64 ,
44+ /// Total number of `find_worker_for_action` calls.
45+ pub find_worker_calls : AtomicU64 ,
46+ /// Total number of successful worker matches.
47+ pub find_worker_hits : AtomicU64 ,
48+ /// Total number of failed worker matches (no worker found).
49+ pub find_worker_misses : AtomicU64 ,
50+ /// Total time spent in `find_worker_for_action` (nanoseconds).
51+ pub find_worker_time_ns : AtomicU64 ,
52+ /// Total number of workers iterated during find operations.
53+ pub workers_iterated : AtomicU64 ,
54+ /// Total number of action dispatches.
55+ pub actions_dispatched : AtomicU64 ,
56+ /// Total number of keep-alive updates.
57+ pub keep_alive_updates : AtomicU64 ,
58+ /// Total number of worker timeouts.
59+ pub worker_timeouts : AtomicU64 ,
60+ }
61+
3662use crate :: platform_property_manager:: PlatformPropertyManager ;
3763use crate :: worker:: { ActionInfoWithProps , Worker , WorkerTimestamp , WorkerUpdate } ;
64+ use crate :: worker_capability_index:: WorkerCapabilityIndex ;
3865use crate :: worker_registry:: SharedWorkerRegistry ;
3966use crate :: worker_scheduler:: WorkerScheduler ;
4067
@@ -91,6 +118,11 @@ struct ApiWorkerSchedulerImpl {
91118
92119 /// Whether the worker scheduler is shutting down.
93120 shutting_down : bool ,
121+
122+ /// Index for fast worker capability lookup.
123+ /// Used to accelerate `find_worker_for_action` by filtering candidates
124+ /// based on properties before doing linear scan.
125+ capability_index : WorkerCapabilityIndex ,
94126}
95127
96128impl core:: fmt:: Debug for ApiWorkerSchedulerImpl {
@@ -99,6 +131,10 @@ impl core::fmt::Debug for ApiWorkerSchedulerImpl {
99131 . field ( "workers" , & self . workers )
100132 . field ( "allocation_strategy" , & self . allocation_strategy )
101133 . field ( "worker_change_notify" , & self . worker_change_notify )
134+ . field (
135+ "capability_index_size" ,
136+ & self . capability_index . worker_count ( ) ,
137+ )
102138 . field ( "worker_registry" , & self . worker_registry )
103139 . finish_non_exhaustive ( )
104140 }
@@ -145,8 +181,13 @@ impl ApiWorkerSchedulerImpl {
145181 /// Note: This function will not do any task matching.
146182 fn add_worker ( & mut self , worker : Worker ) -> Result < ( ) , Error > {
147183 let worker_id = worker. id . clone ( ) ;
184+ let platform_properties = worker. platform_properties . clone ( ) ;
148185 self . workers . put ( worker_id. clone ( ) , worker) ;
149186
187+ // Add to capability index for fast matching
188+ self . capability_index
189+ . add_worker ( & worker_id, & platform_properties) ;
190+
150191 // Worker is not cloneable, and we do not want to send the initial connection results until
151192 // we have added it to the map, or we might get some strange race conditions due to the way
152193 // the multi-threaded runtime works.
@@ -169,6 +210,9 @@ impl ApiWorkerSchedulerImpl {
169210 /// Note: The caller is responsible for any rescheduling of any tasks that might be
170211 /// running.
171212 fn remove_worker ( & mut self , worker_id : & WorkerId ) -> Option < Worker > {
213+ // Remove from capability index
214+ self . capability_index . remove_worker ( worker_id) ;
215+
172216 let result = self . workers . pop ( worker_id) ;
173217 self . worker_change_notify . notify_one ( ) ;
174218 result
@@ -189,47 +233,65 @@ impl ApiWorkerSchedulerImpl {
189233 Ok ( ( ) )
190234 }
191235
192- fn inner_worker_checker (
193- ( worker_id , w ) : & ( & WorkerId , & Worker ) ,
236+ fn inner_find_worker_for_action (
237+ & self ,
194238 platform_properties : & PlatformProperties ,
195239 full_worker_logging : bool ,
196- ) -> bool {
197- if !w. can_accept_work ( ) {
240+ ) -> Option < WorkerId > {
241+ // Use capability index to get candidate workers that match STATIC properties
242+ // (Exact, Unknown) and have the required property keys (Priority, Minimum).
243+ // This reduces complexity from O(W × P) to O(P × log(W)) for exact properties.
244+ let candidates = self
245+ . capability_index
246+ . find_matching_workers ( platform_properties) ;
247+
248+ if candidates. is_empty ( ) {
198249 if full_worker_logging {
199- info ! (
200- "Worker {worker_id} cannot accept work because is_paused: {}, is_draining: {}" ,
201- w. is_paused, w. is_draining
202- ) ;
250+ info ! ( "No workers in capability index match required properties" ) ;
203251 }
204- false
205- } else if !platform_properties. is_satisfied_by ( & w. platform_properties , full_worker_logging)
206- {
207- if full_worker_logging {
208- info ! ( "Worker {worker_id} properties are insufficient" ) ;
252+ return None ;
253+ }
254+
255+ // Check function for availability AND dynamic Minimum property verification.
256+ // The index only does presence checks for Minimum properties since their
257+ // values change dynamically as jobs are assigned to workers.
258+ let worker_matches = |( worker_id, w) : & ( & WorkerId , & Worker ) | -> bool {
259+ if !w. can_accept_work ( ) {
260+ if full_worker_logging {
261+ info ! (
262+ "Worker {worker_id} cannot accept work: is_paused={}, is_draining={}" ,
263+ w. is_paused, w. is_draining
264+ ) ;
265+ }
266+ return false ;
209267 }
210- false
211- } else {
268+
269+ // Verify Minimum properties at runtime (their values are dynamic)
270+ if !platform_properties. is_satisfied_by ( & w. platform_properties , full_worker_logging) {
271+ return false ;
272+ }
273+
212274 true
213- }
214- }
275+ } ;
215276
216- fn inner_find_worker_for_action (
217- & self ,
218- platform_properties : & PlatformProperties ,
219- full_worker_logging : bool ,
220- ) -> Option < WorkerId > {
221- let mut workers_iter = self . workers . iter ( ) ;
222- let workers_iter = match self . allocation_strategy {
277+ // Now check constraints on filtered candidates.
278+ // Iterate in LRU order based on allocation strategy.
279+ let workers_iter = self . workers . iter ( ) ;
280+
281+ match self . allocation_strategy {
223282 // Use rfind to get the least recently used that satisfies the properties.
224- WorkerAllocationStrategy :: LeastRecentlyUsed => workers_iter. rfind ( |worker| {
225- Self :: inner_worker_checker ( worker, platform_properties, full_worker_logging)
226- } ) ,
283+ WorkerAllocationStrategy :: LeastRecentlyUsed => workers_iter
284+ . rev ( )
285+ . filter ( |( worker_id, _) | candidates. contains ( worker_id) )
286+ . find ( & worker_matches)
287+ . map ( |( _, w) | w. id . clone ( ) ) ,
288+
227289 // Use find to get the most recently used that satisfies the properties.
228- WorkerAllocationStrategy :: MostRecentlyUsed => workers_iter. find ( |worker| {
229- Self :: inner_worker_checker ( worker , platform_properties , full_worker_logging )
230- } ) ,
231- } ;
232- workers_iter . map ( | ( _ , w ) | w . id . clone ( ) )
290+ WorkerAllocationStrategy :: MostRecentlyUsed => workers_iter
291+ . filter ( | ( worker_id , _ ) | candidates . contains ( worker_id ) )
292+ . find ( & worker_matches )
293+ . map ( | ( _ , w ) | w . id . clone ( ) ) ,
294+ }
233295 }
234296
235297 async fn update_action (
@@ -411,6 +473,9 @@ pub struct ApiWorkerScheduler {
411473 worker_timeout_s : u64 ,
412474 /// Shared worker registry for checking worker liveness.
413475 worker_registry : SharedWorkerRegistry ,
476+
477+ /// Performance metrics for observability.
478+ metrics : Arc < SchedulerMetrics > ,
414479}
415480
416481impl ApiWorkerScheduler {
@@ -430,10 +495,12 @@ impl ApiWorkerScheduler {
430495 worker_change_notify,
431496 worker_registry : worker_registry. clone ( ) ,
432497 shutting_down : false ,
498+ capability_index : WorkerCapabilityIndex :: new ( ) ,
433499 } ) ,
434500 platform_property_manager,
435501 worker_timeout_s,
436502 worker_registry,
503+ metrics : Arc :: new ( SchedulerMetrics :: default ( ) ) ,
437504 } )
438505 }
439506
@@ -448,12 +515,21 @@ impl ApiWorkerScheduler {
448515 operation_id : OperationId ,
449516 action_info : ActionInfoWithProps ,
450517 ) -> Result < ( ) , Error > {
518+ self . metrics
519+ . actions_dispatched
520+ . fetch_add ( 1 , Ordering :: Relaxed ) ;
451521 let mut inner = self . inner . lock ( ) . await ;
452522 inner
453523 . worker_notify_run_action ( worker_id, operation_id, action_info)
454524 . await
455525 }
456526
527+ /// Returns the scheduler metrics for observability.
528+ #[ must_use]
529+ pub const fn get_metrics ( & self ) -> & Arc < SchedulerMetrics > {
530+ & self . metrics
531+ }
532+
457533 /// Attempts to find a worker that is capable of running this action.
458534 // TODO(palfrey) This algorithm is not very efficient. Simple testing using a tree-like
459535 // structure showed worse performance on a 10_000 worker * 7 properties * 1000 queued tasks
@@ -463,8 +539,35 @@ impl ApiWorkerScheduler {
463539 platform_properties : & PlatformProperties ,
464540 full_worker_logging : bool ,
465541 ) -> Option < WorkerId > {
542+ let start = Instant :: now ( ) ;
543+ self . metrics
544+ . find_worker_calls
545+ . fetch_add ( 1 , Ordering :: Relaxed ) ;
546+
466547 let inner = self . inner . lock ( ) . await ;
467- inner. inner_find_worker_for_action ( platform_properties, full_worker_logging)
548+ let worker_count = inner. workers . len ( ) as u64 ;
549+ let result = inner. inner_find_worker_for_action ( platform_properties, full_worker_logging) ;
550+
551+ // Track workers iterated (worst case is all workers)
552+ self . metrics
553+ . workers_iterated
554+ . fetch_add ( worker_count, Ordering :: Relaxed ) ;
555+
556+ if result. is_some ( ) {
557+ self . metrics
558+ . find_worker_hits
559+ . fetch_add ( 1 , Ordering :: Relaxed ) ;
560+ } else {
561+ self . metrics
562+ . find_worker_misses
563+ . fetch_add ( 1 , Ordering :: Relaxed ) ;
564+ }
565+
566+ #[ allow( clippy:: cast_possible_truncation) ]
567+ self . metrics
568+ . find_worker_time_ns
569+ . fetch_add ( start. elapsed ( ) . as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
570+ result
468571 }
469572
470573 /// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
@@ -515,6 +618,7 @@ impl WorkerScheduler for ApiWorkerScheduler {
515618 let now = UNIX_EPOCH + Duration :: from_secs ( worker_timestamp) ;
516619 self . worker_registry . register_worker ( & worker_id, now) . await ;
517620
621+ self . metrics . workers_added . fetch_add ( 1 , Ordering :: Relaxed ) ;
518622 Ok ( ( ) )
519623 }
520624
0 commit comments