1- // @module StealingQueue
2- // @description A generic, priority-aware, work-stealing queue implementation.
3- // This module is self-contained and can be used by any scheduler or application
4- // to manage and distribute tasks of any type `T`.
5-
61#![ allow( non_snake_case, non_camel_case_types) ]
72
83use std:: sync:: Arc ;
94
105use crossbeam_deque:: { Injector , Stealer , Worker } ;
116use rand:: seq:: SliceRandom ;
127
13- // The task must have a way to specify its priority. We define a trait for this.
148pub trait Prioritized {
159 type P : PartialEq + Eq + Copy ;
1610
1711 fn GetPriority ( & self ) -> Self :: P ;
1812}
1913
20- // A simple enum for the library to use. The consumer's task must map to this.
2114#[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
2215pub enum Priority {
2316 High ,
@@ -27,58 +20,40 @@ pub enum Priority {
2720 Low ,
2821}
2922
30- // The parts of the queue that can be safely shared across all threads.
3123struct Share < T > {
32- // High, Normal, Low
3324 Injector : ( Injector < T > , Injector < T > , Injector < T > ) ,
3425
35- // High, Normal, Low
3626 Stealer : ( Vec < Stealer < T > > , Vec < Stealer < T > > , Vec < Stealer < T > > ) ,
3727}
3828
39- /// The public-facing work-stealing queue. It is generic over the task type `T`.
40- /// This object is held by the task submitter.
4129pub struct StealingQueue < T : Prioritized < P = Priority > > {
4230 Share : Arc < Share < T > > ,
4331}
4432
45- /// A context object that contains everything a single worker thread needs to
46- /// operate. This includes its own private, thread-local deques.
4733pub struct Context < T > {
4834 pub Identifier : usize ,
4935
50- // High, Normal, Low
5136 Local : ( Worker < T > , Worker < T > , Worker < T > ) ,
5237
5338 Share : Arc < Share < T > > ,
5439}
5540
5641impl < T : Prioritized < P = Priority > > StealingQueue < T > {
57- /// Creates a new work-stealing queue system.
58- ///
59- /// Returns a tuple containing:
60- /// 1. The `StealingQueue` for submitting tasks.
61- /// 2. A `Vec` of `WorkerContext`s, one for each worker to be spawned.
6242 pub fn New ( Count : usize ) -> ( Self , Vec < Context < T > > ) {
6343 let mut High : Vec < Worker < T > > = Vec :: with_capacity ( Count ) ;
6444
6545 let mut Normal : Vec < Worker < T > > = Vec :: with_capacity ( Count ) ;
6646
6747 let mut Low : Vec < Worker < T > > = Vec :: with_capacity ( Count ) ;
6848
69- // --- FIX: Use the documented API for creating Worker/Stealer pairs ---
7049 let StealerHigh : Vec < Stealer < T > > = ( 0 ..Count )
7150 . map ( |_| {
72- // 1. Create the Worker.
7351 let Worker = Worker :: new_fifo ( ) ;
7452
75- // 2. Get its Stealer.
7653 let Stealer = Worker . stealer ( ) ;
7754
78- // 3. Store the Worker part.
7955 High . push ( Worker ) ;
8056
81- // 4. Return the Stealer part.
8257 Stealer
8358 } )
8459 . collect ( ) ;
@@ -116,7 +91,6 @@ impl<T:Prioritized<P = Priority>> StealingQueue<T> {
11691 let mut Context = Vec :: with_capacity ( Count ) ;
11792
11893 for Id in 0 ..Count {
119- // We use remove(0) because we built the Vecs in order and need to consume them.
12094 Context . push ( Context {
12195 Identifier : Id ,
12296
@@ -131,8 +105,6 @@ impl<T:Prioritized<P = Priority>> StealingQueue<T> {
131105 ( Queue , Context )
132106 }
133107
134- /// Submits a new task to the queue.
135- /// This is thread-safe and can be called from anywhere.
136108 pub fn Submit ( & self , Task : T ) {
137109 match Task . GetPriority ( ) {
138110 Priority :: High => self . Share . Injector . 0 . push ( Task ) ,
@@ -145,20 +117,14 @@ impl<T:Prioritized<P = Priority>> StealingQueue<T> {
145117}
146118
147119impl < T > Context < T > {
148- /// Finds the next available task for this worker.
149- /// Implements the full priority-aware, work-stealing logic.
150120 pub fn NextTask ( & self ) -> Option < T > {
151- // Pop from local High
152- self . Local . 0 . pop ( )
153- // Pop from local Normal
121+ self . Local
122+ . 0
123+ . pop ( )
154124 . or_else ( || self . Local . 1 . pop ( ) )
155- // Pop from local Low
156125 . or_else ( || self . Local . 2 . pop ( ) )
157- // Steal High
158126 . or_else ( || self . Steal ( & self . Share . Injector . 0 , & self . Share . Stealer . 0 , & self . Local . 0 ) )
159- // Steal Normal
160127 . or_else ( || self . Steal ( & self . Share . Injector . 1 , & self . Share . Stealer . 1 , & self . Local . 1 ) )
161- // Steal Low
162128 . or_else ( || self . Steal ( & self . Share . Injector . 2 , & self . Share . Stealer . 2 , & self . Local . 2 ) )
163129 }
164130
0 commit comments