Skip to content

Commit ccdd32d

Browse files
1 parent b555aa6 commit ccdd32d

11 files changed

Lines changed: 269 additions & 145 deletions

File tree

Source/Queue/StealingQueue.rs

Lines changed: 74 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,147 @@
1+
//! A generic, priority-aware, work-stealing queue implementation.
2+
//!
3+
//! This module is self-contained and can be used by any scheduler or
4+
//! application to manage and distribute tasks of any type that can be
5+
//! prioritized.
6+
17
#![allow(non_snake_case, non_camel_case_types)]
28

39
use std::sync::Arc;
410

511
use crossbeam_deque::{Injector, Stealer, Worker};
612
use rand::seq::SliceRandom;
713

14+
/// Defines a contract for types that can be prioritized by the queue.
815
pub trait Prioritized {
9-
type P: PartialEq + Eq + Copy;
10-
11-
fn GetPriority(&self) -> Self::P;
16+
/// The type of the priority value used by the implementor.
17+
type Kind: PartialEq + Eq + Copy;
18+
/// A method to retrieve the priority of the item.
19+
fn Rank(&self) -> Self::Kind;
1220
}
1321

22+
/// Defines the internal priority levels used by the generic queue.
1423
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1524
pub enum Priority {
1625
High,
17-
1826
Normal,
19-
2027
Low,
2128
}
2229

30+
/// Holds the queue components that are safe to share across all threads.
31+
///
32+
/// This includes global injectors for submitting new tasks and stealers for
33+
/// taking tasks from other workers, organized by priority level.
2334
struct Share<T> {
35+
/// Global, multi-producer queues for each priority.
2436
Injector:(Injector<T>, Injector<T>, Injector<T>),
25-
37+
/// Share handles for stealing tasks from each worker's queue.
2638
Stealer:(Vec<Stealer<T>>, Vec<Stealer<T>>, Vec<Stealer<T>>),
2739
}
2840

29-
pub struct StealingQueue<T:Prioritized<P = Priority>> {
41+
/// A generic, priority-aware, work-stealing queue.
42+
///
43+
/// This is the public-facing entry point for submitting tasks. It is generic
44+
/// over any task type `T` that implements the `Prioritized` trait.
45+
pub struct StealingQueue<T:Prioritized<Kind = Priority>> {
46+
/// A shared, thread-safe pointer to the queue's shared components.
3047
Share:Arc<Share<T>>,
3148
}
3249

50+
/// Contains all necessary components for a single worker thread to operate.
51+
///
52+
/// This includes the thread-local `Worker` deques, which are not safe to share,
53+
/// making this context object the sole owner of a worker's private queues.
3354
pub struct Context<T> {
55+
/// A unique identifier for the worker, used to avoid self-stealing.
3456
pub Identifier:usize,
35-
57+
/// Thread-local work queues for each priority level.
3658
Local:(Worker<T>, Worker<T>, Worker<T>),
37-
59+
/// A reference to the shared components of the entire queue system.
3860
Share:Arc<Share<T>>,
3961
}
4062

41-
impl<T:Prioritized<P = Priority>> StealingQueue<T> {
42-
pub fn New(Count:usize) -> (Self, Vec<Context<T>>) {
63+
impl<T:Prioritized<Kind = Priority>> StealingQueue<T> {
64+
/// Creates a complete work-stealing queue system.
65+
///
66+
/// This function initializes all the necessary queues, both shared and
67+
/// thread-local, for a given number of workers.
68+
///
69+
/// Returns a tuple containing:
70+
/// 1. The public-facing `StealingQueue` for submitting new tasks.
71+
/// 2. A `Vec` of `Context` objects, one for each worker thread to own.
72+
pub fn Create(Count:usize) -> (Self, Vec<Context<T>>) {
4373
let mut High:Vec<Worker<T>> = Vec::with_capacity(Count);
44-
4574
let mut Normal:Vec<Worker<T>> = Vec::with_capacity(Count);
46-
4775
let mut Low:Vec<Worker<T>> = Vec::with_capacity(Count);
4876

77+
// For each priority level, create a thread-local worker queue and its
78+
// corresponding shared stealer.
4979
let StealerHigh:Vec<Stealer<T>> = (0..Count)
5080
.map(|_| {
5181
let Worker = Worker::new_fifo();
52-
5382
let Stealer = Worker.stealer();
54-
5583
High.push(Worker);
56-
5784
Stealer
5885
})
5986
.collect();
6087

6188
let StealerNormal:Vec<Stealer<T>> = (0..Count)
6289
.map(|_| {
6390
let Worker = Worker::new_fifo();
64-
6591
let Stealer = Worker.stealer();
66-
6792
Normal.push(Worker);
68-
6993
Stealer
7094
})
7195
.collect();
7296

7397
let StealerLow:Vec<Stealer<T>> = (0..Count)
7498
.map(|_| {
7599
let Worker = Worker::new_fifo();
76-
77100
let Stealer = Worker.stealer();
78-
79101
Low.push(Worker);
80-
81102
Stealer
82103
})
83104
.collect();
84105

85-
let Shared = Arc::new(Share {
106+
// Bundle all shared components into an Arc for safe sharing.
107+
let Share = Arc::new(Share {
86108
Injector:(Injector::new(), Injector::new(), Injector::new()),
87-
88109
Stealer:(StealerHigh, StealerNormal, StealerLow),
89110
});
90111

91-
let mut Context = Vec::with_capacity(Count);
92-
93-
for Id in 0..Count {
94-
Context.push(Context {
95-
Identifier:Id,
96-
112+
// Create a unique context for each worker, giving it ownership of its
113+
// local queues and a reference to the shared components.
114+
let mut Contexts = Vec::with_capacity(Count);
115+
for Identifier in 0..Count {
116+
Contexts.push(Context {
117+
Identifier,
97118
Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
98-
99-
Share:Shared.clone(),
119+
Share:Share.clone(),
100120
});
101121
}
102122

103-
let Queue = Self { Share:Shared };
104-
105-
(Queue, Context)
123+
let Queue = Self { Share };
124+
(Queue, Contexts)
106125
}
107126

127+
/// Submits a new task to the appropriate global queue based on its
128+
/// priority. This method is thread-safe and can be called from any
129+
/// context.
108130
pub fn Submit(&self, Task:T) {
109-
match Task.GetPriority() {
131+
match Task.Rank() {
110132
Priority::High => self.Share.Injector.0.push(Task),
111-
112133
Priority::Normal => self.Share.Injector.1.push(Task),
113-
114134
Priority::Low => self.Share.Injector.2.push(Task),
115135
}
116136
}
117137
}
118138

119139
impl<T> Context<T> {
120-
pub fn NextTask(&self) -> Option<T> {
140+
/// Finds the next available task for the worker to execute.
141+
// This method implements the complete work-finding logic:
142+
/// 1. Check local queue (high to low priority).
143+
/// 2. Steal from the system (high to low priority).
144+
pub fn Next(&self) -> Option<T> {
121145
self.Local
122146
.0
123147
.pop()
@@ -128,25 +152,27 @@ impl<T> Context<T> {
128152
.or_else(|| self.Steal(&self.Share.Injector.2, &self.Share.Stealer.2, &self.Local.2))
129153
}
130154

131-
fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealer:&'a [Stealer<T>], Local:&'a Worker<T>) -> Option<T> {
155+
/// Attempts to steal a task from a specific priority set.
156+
///
157+
/// It first tries to steal a batch from the global injector queue. If that
158+
/// fails, it attempts to steal from a randomly chosen peer worker to ensure
159+
/// fair distribution and avoid contention hotspots.
160+
fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealers:&'a [Stealer<T>], Local:&'a Worker<T>) -> Option<T> {
132161
if Injector.steal_batch_and_pop(Local).is_success() {
133162
return Local.pop();
134163
}
135164

136-
let mut Index:Vec<usize> = (0..Stealer.len()).collect();
165+
let mut Indices:Vec<usize> = (0..Stealers.len()).collect();
166+
Indices.shuffle(&mut rand::rng());
137167

138-
Index.shuffle(&mut rand::rng());
139-
140-
for i in Index {
141-
if i == self.Identifier {
168+
for Index in Indices {
169+
if Index == self.Identifier {
142170
continue;
143171
}
144-
145-
if Stealer[i].steal_batch_and_pop(Local).is_success() {
172+
if Stealers[Index].steal_batch_and_pop(Local).is_success() {
146173
return Local.pop();
147174
}
148175
}
149-
150176
None
151177
}
152178
}

Source/Queue/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
//! Declares the `StealingQueue` module.
2+
//!
3+
//! This module encapsulates all logic for the generic, priority-aware,
4+
//! work-stealing queue system, which is the foundational component of the
5+
//! scheduler.
6+
17
#![allow(non_snake_case, non_camel_case_types)]
28

9+
/// Provides the generic, priority-aware, work-stealing queue implementation.
310
pub mod StealingQueue;

Source/Scheduler/Scheduler.rs

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
//! Manages the pool of workers and the task queue system.
2+
3+
#![allow(non_snake_case, non_camel_case_types)]
4+
15
use std::{
26
collections::HashMap,
37
future::Future,
@@ -10,74 +14,86 @@ use std::{
1014
use log::{error, info, warn};
1115
use tokio::task::JoinHandle;
1216

13-
use super::Worker::Worker;
17+
use super::{SchedulerBuilder::Concurrency, Worker::Worker};
1418
use crate::{
15-
Queue::StealingQueue::StealingQueue as StealingQueueStruct,
16-
Scheduler::SchedulerBuilder::Concurrency as ConcurrencyEnum,
17-
Task::{Priority::Enum as PriorityEnum, Task::Struct as TaskStruct},
19+
Queue::StealingQueue::StealingQueue,
20+
Task::{Priority::Priority, Task::Task},
1821
};
1922

23+
/// Manages a pool of worker threads and a work-stealing queue to execute
24+
/// tasks efficiently. This is the primary public-facing struct of the library.
2025
pub struct Scheduler {
21-
Queue:StealingQueueStruct<TaskStruct>,
22-
26+
/// The underlying work-stealing queue system used for task submission.
27+
Queue:StealingQueue<Task>,
28+
/// Handles to the spawned worker threads, used for graceful shutdown.
2329
Handle:Vec<JoinHandle<()>>,
24-
30+
/// An atomic flag to signal all workers to shut down.
2531
Running:Arc<AtomicBool>,
2632
}
2733

2834
impl Scheduler {
29-
pub fn Start(number_of_workers:usize, _queue_configs:HashMap<String, ConcurrencyEnum>) -> Self {
30-
info!("[Scheduler] Starting scheduler with {} worker threads.", number_of_workers);
31-
35+
/// Creates and starts a new scheduler with a given configuration.
36+
///
37+
/// This is a crate-private function, intended to be called only by the
38+
/// `SchedulerBuilder`'s `Build` method.
39+
pub(crate) fn Create(Count:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
40+
info!("[Scheduler] Create with {} workers.", Count);
3241
let Running = Arc::new(AtomicBool::new(true));
3342

34-
let (Queue, WorkerContexts) = StealingQueueStruct::<TaskStruct>::New(number_of_workers);
35-
36-
let mut Handle = Vec::with_capacity(number_of_workers);
43+
// Create the entire queue system and retrieve the contexts for each worker.
44+
let (Queue, Contexts) = StealingQueue::<Task>::Create(Count);
3745

38-
for Context in WorkerContexts.into_iter() {
46+
let mut Handle = Vec::with_capacity(Count);
47+
// Spawn an asynchronous task for each worker.
48+
for Context in Contexts.into_iter() {
3949
let Running = Running.clone();
40-
4150
Handle.push(tokio::spawn(async move {
42-
let WorkerInstance = Worker::New(Context, Running);
43-
44-
WorkerInstance.Run().await;
51+
// Each task creates and runs a worker, consuming its context.
52+
Worker::Create(Context, Running).Run().await;
4553
}));
4654
}
4755

4856
Self { Queue, Handle, Running }
4957
}
5058

51-
pub fn Submit<F>(&self, future_instance:F, task_priority:PriorityEnum)
59+
/// Submits a new task to the scheduler's global queue.
60+
///
61+
/// The task will be picked up by the next available worker according to its
62+
/// priority and the work-stealing logic.
63+
pub fn Submit<F>(&self, Operation:F, Priority:Priority)
5264
where
5365
F: Future<Output = ()> + Send + 'static, {
54-
self.Queue.Submit(TaskStruct::New(future_instance, task_priority));
66+
self.Queue.Submit(Task::Create(Operation, Priority));
5567
}
5668

57-
pub async fn ShutDown(&mut self) {
69+
/// Asynchronously shuts down the scheduler.
70+
///
71+
/// This method signals all worker threads to stop their loops and then
72+
/// waits for each one to complete its current task and exit gracefully.
73+
pub async fn Stop(&mut self) {
5874
if !self.Running.swap(false, Ordering::Relaxed) {
59-
info!("[Scheduler] ShutDown already initiated.");
60-
75+
info!("[Scheduler] Stop already initiated.");
6176
return;
6277
}
6378

64-
info!("[Scheduler] Shutting down worker threads...");
65-
66-
for handle in self.Handle.drain(..) {
67-
if let Err(e) = handle.await {
68-
error!("[Scheduler] Error joining worker task during shutdown: {}", e);
79+
info!("[Scheduler] Stopping worker threads...");
80+
for Handle in self.Handle.drain(..) {
81+
if let Err(Error) = Handle.await {
82+
error!("[Scheduler] Error joining worker: {}", Error);
6983
}
7084
}
71-
72-
info!("[Scheduler] All workers shut down successfully.");
85+
info!("[Scheduler] All workers stopped.");
7386
}
7487
}
7588

7689
impl Drop for Scheduler {
90+
/// Ensures workers are signaled to stop if the `Scheduler` is dropped.
91+
///
92+
/// This prevents orphaned worker threads if the user forgets to call the
93+
/// explicit `Stop` method.
7794
fn drop(&mut self) {
7895
if self.Running.load(Ordering::Relaxed) {
79-
warn!("[Scheduler] Scheduler dropped without explicit shutdown. Signaling workers to stop.");
80-
96+
warn!("[Scheduler] Dropped without explicit stop. Signaling workers.");
8197
self.Running.store(false, Ordering::Relaxed);
8298
}
8399
}

0 commit comments

Comments
 (0)