Skip to content

Commit 785b813

Browse files
1 parent b3930da commit 785b813

3 files changed

Lines changed: 199 additions & 107 deletions

File tree

Source/Queue/StealingQueue.rs

Lines changed: 158 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,183 @@
1-
// Defines a high-performance, priority-aware, work-stealing deque for
2-
// distributing tasks among scheduler workers.
1+
// @module StealingQueue
2+
// @description A generic, priority-aware, work-stealing queue implementation.
3+
// This module is self-contained and can be used by any scheduler or application
4+
// to manage and distribute tasks of any type `T`.
35

4-
use crossbeam_deque::{Injector, Stealer, Worker};
6+
#![allow(non_snake_case, non_camel_case_types)]
7+
8+
use std::sync::Arc;
9+
10+
use crossbeam_deque::{Injector, Stealer, Worker as WorkerDeque};
511
use rand::seq::SliceRandom;
612

7-
use crate::Task::{Priority::Enum, Task::Struct};
13+
// The task must have a way to specify its priority. We define a trait for this.
14+
pub trait Prioritized {
15+
type P: PartialEq + Eq + Copy;
816

9-
/// A container for a set of queues for a single priority level.
10-
struct PriorityQueueSet {
11-
GlobalInjector:Injector<Struct>,
12-
WorkerQueue:Vec<Worker<Struct>>,
13-
Stealer:Vec<Stealer<Struct>>,
17+
fn GetPriority(&self) -> Self::P;
1418
}
1519

16-
impl PriorityQueueSet {
17-
/// Creates a new set of queues for a given number of workers.
18-
fn New(NumberOfWorker:usize) -> Self {
19-
let WorkerQueue:Vec<Worker<Struct>> = (0..NumberOfWorker).map(|_| Worker::new_fifo()).collect();
20-
Self {
21-
GlobalInjector:Injector::new(),
22-
Stealer:WorkerQueue.iter().map(|w| w.stealer()).collect(),
23-
WorkerQueue,
24-
}
25-
}
20+
// A simple enum for the library to use. The consumer's task must map to this.
21+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22+
pub enum Priority {
23+
High,
24+
25+
Normal,
26+
27+
Low,
2628
}
2729

28-
/// A collection of worker deques that supports priority-aware work-stealing.
29-
///
30-
/// This struct holds three distinct sets of queues, one for each priority level
31-
/// (`High`, `Normal`, `Low`). When a worker needs a task, it always checks for
32-
/// higher-priority work before considering lower-priority work.
33-
pub(crate) struct StealingQueue {
34-
High:PriorityQueueSet,
35-
Normal:PriorityQueueSet,
36-
Low:PriorityQueueSet,
30+
// The parts of the queue that can be safely shared across all threads.
31+
struct Shared<T> {
32+
// High, Normal, Low
33+
Injector:(Injector<T>, Injector<T>, Injector<T>),
34+
35+
// High, Normal, Low
36+
Stealer:(Vec<Stealer<T>>, Vec<Stealer<T>>, Vec<Stealer<T>>),
3737
}
3838

39-
impl StealingQueue {
40-
/// Creates a new `StealingQueue` with a dedicated set of queues for each
41-
/// priority level.
42-
pub fn New(NumberOfWorker:usize) -> Self {
43-
Self {
44-
High:PriorityQueueSet::New(NumberOfWorker),
45-
Normal:PriorityQueueSet::New(NumberOfWorker),
46-
Low:PriorityQueueSet::New(NumberOfWorker),
47-
}
48-
}
39+
/// The public-facing work-stealing queue. It is generic over the task type `T`.
40+
/// This object is held by the task submitter.
41+
pub struct StealingQueue<T:Prioritized<P = Priority>> {
42+
Shared:Arc<Shared<T>>,
43+
}
44+
45+
/// A context object that contains everything a single worker thread needs to
46+
/// operate. This includes its own private, thread-local deques.
47+
pub struct WorkerContext<T> {
48+
Id:usize,
49+
50+
// High, Normal, Low
51+
Local:(WorkerDeque<T>, WorkerDeque<T>, WorkerDeque<T>),
52+
53+
Shared:Arc<Shared<T>>,
54+
}
55+
56+
impl<T:Prioritized<P = Priority>> StealingQueue<T> {
57+
/// Creates a new work-stealing queue system.
58+
///
59+
/// Returns a tuple containing:
60+
/// 1. The `StealingQueue` for submitting tasks.
61+
/// 2. A `Vec` of `WorkerContext`s, one for each worker to be spawned.
62+
pub fn New(Count:usize) -> (Self, Vec<WorkerContext<T>>) {
63+
let mut High:Vec<WorkerDeque<T>> = Vec::with_capacity(Count);
64+
65+
let mut Normal:Vec<WorkerDeque<T>> = Vec::with_capacity(Count);
66+
67+
let mut Low:Vec<WorkerDeque<T>> = Vec::with_capacity(Count);
68+
69+
// --- FIX: Use the documented API for creating Worker/Stealer pairs ---
70+
let StealerHigh:Vec<Stealer<T>> = (0..Count)
71+
.map(|_| {
72+
// 1. Create the Worker.
73+
let Worker = WorkerDeque::new_fifo();
74+
75+
// 2. Get its Stealer.
76+
let Stealer = Worker.stealer();
77+
78+
// 3. Store the Worker part.
79+
High.push(Worker);
80+
81+
// 4. Return the Stealer part.
82+
Stealer
83+
})
84+
.collect();
4985

50-
/// Submits a task to the appropriate global injection queue based on its
51-
/// priority.
52-
pub fn Push(&self, Task:Struct) {
53-
match Task.Priority {
54-
Enum::High => self.High.GlobalInjector.push(Task),
55-
Enum::Normal => self.Normal.GlobalInjector.push(Task),
56-
Enum::Low => self.Low.GlobalInjector.push(Task),
86+
let StealerNormal:Vec<Stealer<T>> = (0..Count)
87+
.map(|_| {
88+
let Worker = WorkerDeque::new_fifo();
89+
90+
let Stealer = Worker.stealer();
91+
92+
Normal.push(Worker);
93+
94+
Stealer
95+
})
96+
.collect();
97+
98+
let StealerLow:Vec<Stealer<T>> = (0..Count)
99+
.map(|_| {
100+
let Worker = WorkerDeque::new_fifo();
101+
102+
let Stealer = Worker.stealer();
103+
104+
Low.push(Worker);
105+
106+
Stealer
107+
})
108+
.collect();
109+
110+
let Shared = Arc::new(Shared {
111+
Injector:(Injector::new(), Injector::new(), Injector::new()),
112+
113+
Stealer:(StealerHigh, StealerNormal, StealerLow),
114+
});
115+
116+
let mut Context = Vec::with_capacity(Count);
117+
118+
for Id in 0..Count {
119+
// We use remove(0) because we built the Vecs in order and need to consume them.
120+
Context.push(WorkerContext {
121+
Id,
122+
123+
Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
124+
125+
Shared:Shared.clone(),
126+
});
57127
}
128+
129+
let Queue = Self { Shared };
130+
131+
(Queue, Context)
58132
}
59133

60-
/// Attempts to find a task for a given worker, always prioritizing
61-
/// `High` > `Normal` > `Low`.
62-
pub fn StealForWorker(&self, WorkerId:usize) -> Option<Struct> {
63-
self.FindTaskInSet(&self.High, WorkerId)
64-
.or_else(|| self.FindTaskInSet(&self.Normal, WorkerId))
65-
.or_else(|| self.FindTaskInSet(&self.Low, WorkerId))
134+
/// Submits a new task to the queue.
135+
/// This is thread-safe and can be called from anywhere.
136+
pub fn Submit(&self, Task:T) {
137+
match Task.GetPriority() {
138+
Priority::High => self.Shared.Injector.0.push(Task),
139+
140+
Priority::Normal => self.Shared.Injector.1.push(Task),
141+
142+
Priority::Low => self.Shared.Injector.2.push(Task),
143+
}
66144
}
145+
}
67146

68-
/// Implements the core work-finding logic for a specific priority level.
69-
/// It first checks the worker's local queue, then attempts to steal.
70-
fn FindTaskInSet(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Struct> {
71-
Set.WorkerQueue[WorkerId]
72-
.pop()
73-
.or_else(|| self.StealFromSetGlobalOrPeer(Set, WorkerId))
147+
impl<T> WorkerContext<T> {
148+
/// Finds the next available task for this worker.
149+
/// Implements the full priority-aware, work-stealing logic.
150+
pub fn NextTask(&self) -> Option<T> {
151+
// Pop from local High
152+
self.Local.0.pop()
153+
// Pop from local Normal
154+
.or_else(|| self.Local.1.pop())
155+
// Pop from local Low
156+
.or_else(|| self.Local.2.pop())
157+
// Steal High
158+
.or_else(|| self.Steal(&self.Shared.Injector.0, &self.Shared.Stealer.0, &self.Local.0))
159+
// Steal Normal
160+
.or_else(|| self.Steal(&self.Shared.Injector.1, &self.Shared.Stealer.1, &self.Local.1))
161+
// Steal Low
162+
.or_else(|| self.Steal(&self.Shared.Injector.2, &self.Shared.Stealer.2, &self.Local.2))
74163
}
75164

76-
/// Attempts to steal work for a specific priority level, first from the
77-
/// global queue, then from peer workers.
78-
fn StealFromSetGlobalOrPeer(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Struct> {
79-
// Try stealing from the global injector for this priority set.
80-
if Set.GlobalInjector.steal_batch_and_pop(&Set.WorkerQueue[WorkerId]).is_success() {
81-
return Set.WorkerQueue[WorkerId].pop();
165+
fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealer:&'a [Stealer<T>], Local:&'a WorkerDeque<T>) -> Option<T> {
166+
if Injector.steal_batch_and_pop(Local).is_success() {
167+
return Local.pop();
82168
}
83169

84-
// Try stealing from peers for this priority set. We shuffle the indices
85-
// to ensure fairness and avoid contention hotspots.
86-
let mut ShuffledIndex:Vec<usize> = (0..Set.Stealer.len()).collect();
87-
ShuffledIndex.shuffle(&mut rand::rng());
170+
let mut Index:Vec<usize> = (0..Stealer.len()).collect();
88171

89-
for Index in ShuffledIndex {
90-
if Index == WorkerId {
91-
continue; // Don't steal from ourselves.
172+
Index.shuffle(&mut rand::rng());
173+
174+
for i in Index {
175+
if i == self.Id {
176+
continue;
92177
}
93-
if Set.Stealer[Index].steal_batch_and_pop(&Set.WorkerQueue[WorkerId]).is_success() {
94-
return Set.WorkerQueue[WorkerId].pop();
178+
179+
if Stealer[i].steal_batch_and_pop(Local).is_success() {
180+
return Local.pop();
95181
}
96182
}
97183

Source/Scheduler/Scheduler.rs

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,54 +5,57 @@ use std::{
55
Arc,
66
atomic::{AtomicBool, Ordering},
77
},
8+
time::Duration,
89
};
910

10-
use log::{error, info, warn};
11+
use log::{error, info, trace, warn};
1112
use tokio::task::JoinHandle;
1213

13-
/// @module Scheduler
14-
/// @description Defines the main `Scheduler` that manages the worker pool, task
15-
/// queue, and task execution lifecycle.
16-
use super::Worker::Worker;
14+
// CHANGED: Use the new reusable library.
15+
use crate::Queue::StealingQueue::StealingQueue;
1716
use crate::{
18-
Queue::StealingQueue::StealingQueue,
1917
Scheduler::SchedulerBuilder::Concurrency,
2018
Task::{Priority::Enum, Task::Struct},
2119
};
2220

2321
/// Manages a pool of worker threads and a work-stealing queue to execute tasks
2422
/// efficiently. This struct is the public-facing API of the Echo scheduler.
2523
pub struct Scheduler {
26-
/// The underlying work-stealing queue shared by all workers.
27-
Queue:Arc<StealingQueue>,
24+
// CHANGED: The Scheduler now holds an instance of our generic queue.
25+
Queue:StealingQueue<Struct>,
2826
/// Handles to the spawned worker threads, allowing for graceful shutdown.
2927
WorkerHandles:Vec<JoinHandle<()>>,
3028
/// An atomic flag to signal workers to shut down.
3129
IsRunning:Arc<AtomicBool>,
3230
}
3331

3432
impl Scheduler {
35-
/// Creates and starts a new scheduler with a given configuration.
36-
/// This is a crate-private function, intended to be called only by the
37-
/// `SchedulerBuilder`.
38-
///
39-
/// @param number_of_workers - The number of worker threads to spawn.
40-
/// @param _queue_configs - Configuration for named queues with concurrency
41-
/// limits (future use).
4233
pub(crate) fn Start(number_of_workers:usize, _queue_configs:HashMap<String, Concurrency>) -> Self {
4334
info!("[Scheduler] Starting scheduler with {} worker threads.", number_of_workers);
4435
let IsRunning = Arc::new(AtomicBool::new(true));
45-
let Queue = Arc::new(StealingQueue::New(number_of_workers));
36+
37+
// 1. Create the queue system. This is now a single, clean line.
38+
let (Queue, WorkerContexts) = StealingQueue::New(number_of_workers);
4639

4740
let mut WorkerHandles = Vec::with_capacity(number_of_workers);
4841

49-
for WorkerIdentifier in 0..number_of_workers {
50-
let CloneQueue = Queue.clone();
42+
// 2. Iterate over the contexts, giving one to each new thread.
43+
for Context in WorkerContexts.into_iter() {
5144
let CloneIsRunning = IsRunning.clone();
5245

5346
let WorkerHandle = tokio::spawn(async move {
54-
let WorkerInstance = Worker::New(WorkerIdentifier, CloneQueue, CloneIsRunning);
55-
WorkerInstance.Run().await;
47+
// The worker logic is now simple and lives directly inside the spawned task.
48+
trace!("[Worker] Starting execution loop.");
49+
while CloneIsRunning.load(Ordering::Relaxed) {
50+
// Use the context to find the next task.
51+
if let Some(Task) = Context.NextTask() {
52+
trace!("[Worker] Found task with priority {:?}. Executing.", Task.Priority);
53+
Task.Future.await
54+
} else {
55+
tokio::time::sleep(Duration::from_millis(1)).await;
56+
}
57+
}
58+
trace!("[Worker] Execution loop finished.");
5659
});
5760

5861
WorkerHandles.push(WorkerHandle);
@@ -61,22 +64,14 @@ impl Scheduler {
6164
Self { Queue, WorkerHandles, IsRunning }
6265
}
6366

64-
/// Submits a new task (as a `Future`) to the scheduler's global queue.
65-
/// The task will be picked up by the next available worker.
66-
///
67-
/// @param future_instance - The async block or function to execute.
68-
/// @param task_priority - The priority of the task.
6967
pub fn submit<F>(&self, future_instance:F, task_priority:Enum)
7068
where
7169
F: Future<Output = ()> + Send + 'static, {
7270
let new_task = Struct::New(future_instance, task_priority);
73-
self.Queue.Push(new_task);
71+
// 3. Use the generic queue's submit method.
72+
self.Queue.Submit(new_task);
7473
}
7574

76-
/// Asynchronously shuts down the scheduler.
77-
///
78-
/// This signals all worker threads to stop their loops and then waits for
79-
/// them to complete their current tasks and exit gracefully.
8075
pub async fn ShutDown(&mut self) {
8176
if !self.IsRunning.swap(false, Ordering::Relaxed) {
8277
info!("[Scheduler] ShutDown already initiated.");
@@ -94,13 +89,8 @@ impl Scheduler {
9489
}
9590

9691
impl Drop for Scheduler {
97-
/// Ensures that the scheduler is shut down when it goes out of scope,
98-
/// preventing orphaned worker threads.
9992
fn drop(&mut self) {
10093
if self.IsRunning.load(Ordering::Relaxed) {
101-
// If the scheduler is dropped without an explicit async shutdown,
102-
// we must signal the workers to stop. We cannot await the handles
103-
// here, but the threads will eventually terminate.
10494
warn!("[Scheduler] Scheduler dropped without explicit shutdown. Signaling workers to stop.");
10595
self.IsRunning.store(false, Ordering::Relaxed);
10696
}

Source/Task/Task.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::{future::Future, pin::Pin};
44
/// @description Defines the `Task` struct, which is the internal unit of work
55
/// for the Echo scheduler.
66
use super::Priority::Enum;
7+
// NEW: Import traits and types from our new reusable library.
8+
use crate::Queue::StealingQueue::{Prioritized, Priority as ReusablePriority};
79

810
/// A type alias for a boxed, send-able, pinned future that returns no value.
911
/// This is the standard way to handle dynamic, async operations in Rust.
@@ -33,3 +35,17 @@ impl Struct {
3335
Self { Future:Box::pin(FutureInstance), Priority:PriorityValue }
3436
}
3537
}
38+
39+
// NEW: Implement the `Prioritized` trait to make our `Task` compatible
40+
// with the generic `ReusableQueue`.
41+
impl Prioritized for Struct {
42+
type P = ReusablePriority;
43+
44+
fn GetPriority(&self) -> Self::P {
45+
match self.Priority {
46+
Enum::High => ReusablePriority::High,
47+
Enum::Normal => ReusablePriority::Normal,
48+
Enum::Low => ReusablePriority::Low,
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)