Skip to content

Commit 0c9d67c

Browse files
1 parent c20dc45 commit 0c9d67c

10 files changed

Lines changed: 69 additions & 8 deletions

File tree

README.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,38 +162,46 @@ used throughout the application, often via a shared context or runtime.
162162
across your application.
163163

164164
```rust
165-
// In your application's main function
165+
// In your application's main function
166+
166167
use std::sync::Arc;
167168
use Echo::Scheduler::SchedulerBuilder;
168169
use Echo::Task::Priority;
169170

170-
// Use the fluent builder to configure and build the scheduler
171+
// Use the fluent builder to configure and build the scheduler
172+
171173
let Scheduler = Arc::new(SchedulerBuilder::Create().WithWorkerCount(8).Build());
172174
```
173175

174176
2. **Submit Tasks:** Use the `Scheduler` instance to submit asynchronous work
175177
from anywhere in your application.
176178

177179
```rust
178-
// An example async block to be run by the scheduler
180+
// An example async block to be run by the scheduler
181+
179182
let MyTask = async {
180183
println!("This is running on an Echo worker thread!");
181-
// ... perform some work ...
184+
// ... perform some work ...
185+
182186
};
183187

184-
// Submit the task with a desired priority
188+
// Submit the task with a desired priority
189+
185190
Scheduler.Submit(MyTask, Priority::Normal);
186191

187-
// Another example with high priority
192+
// Another example with high priority
193+
188194
Scheduler.Submit(async { /* critical work */ }, Priority::High);
189195
```
190196

191197
3. **Graceful Shutdown:** Before your application exits, ensure a clean
192198
shutdown of all worker threads.
193199

194200
```rust
195-
// In your application's shutdown sequence
196-
// Note: Arc::try_unwrap requires the Arc to have only one strong reference.
201+
// In your application's shutdown sequence
202+
203+
// Note: Arc::try_unwrap requires the Arc to have only one strong reference.
204+
197205
if let Ok(mut Scheduler) = Arc::try_unwrap(Scheduler) {
198206
Scheduler.Stop().await;
199207
}

Source/Library.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
//! Provides a structured concurrency runtime for Rust applications, built on a
44
//! high-performance, priority-aware, work-stealing scheduler. It is designed
55
//! to be a robust and efficient core execution engine for demanding,
6+
67
//! concurrent workloads.
78
89
#![allow(non_snake_case, non_camel_case_types)]
910

1011
// --- Crate Modules ---
1112
// Declares the main modules that constitute the library.
1213
pub mod Queue;
14+
1315
pub mod Scheduler;
16+
1417
pub mod Task;

Source/Queue/StealingQueue.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ pub trait Prioritized {
2525
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2626
pub enum Priority {
2727
High,
28+
2829
Normal,
30+
2931
Low,
3032
}
3133

@@ -54,6 +56,7 @@ pub struct StealingQueue<TTask:Prioritized<Kind = Priority>> {
5456
/// Contains all necessary components for a single worker thread to operate.
5557
///
5658
/// This includes the thread-local `Worker` deques, which are not safe to share,
59+
5760
/// making this `Context` object the sole owner of a worker's private queues.
5861
pub struct Context<TTask> {
5962
/// A unique identifier for the worker, used to avoid self-stealing.
@@ -79,56 +82,72 @@ impl<TTask:Prioritized<Kind = Priority>> StealingQueue<TTask> {
7982
/// 2. A `Vec` of `Context` objects, one for each worker thread to own.
8083
pub fn Create(Count:usize) -> (Self, Vec<Context<TTask>>) {
8184
let mut High:Vec<Worker<TTask>> = Vec::with_capacity(Count);
85+
8286
let mut Normal:Vec<Worker<TTask>> = Vec::with_capacity(Count);
87+
8388
let mut Low:Vec<Worker<TTask>> = Vec::with_capacity(Count);
8489

8590
// For each priority level, create a thread-local worker queue and its
8691
// corresponding shared stealer.
8792
let StealerHigh:Vec<Stealer<TTask>> = (0..Count)
8893
.map(|_| {
8994
let Worker = Worker::new_fifo();
95+
9096
let Stealer = Worker.stealer();
97+
9198
High.push(Worker);
99+
92100
Stealer
93101
})
94102
.collect();
95103

96104
let StealerNormal:Vec<Stealer<TTask>> = (0..Count)
97105
.map(|_| {
98106
let Worker = Worker::new_fifo();
107+
99108
let Stealer = Worker.stealer();
109+
100110
Normal.push(Worker);
111+
101112
Stealer
102113
})
103114
.collect();
104115

105116
let StealerLow:Vec<Stealer<TTask>> = (0..Count)
106117
.map(|_| {
107118
let Worker = Worker::new_fifo();
119+
108120
let Stealer = Worker.stealer();
121+
109122
Low.push(Worker);
123+
110124
Stealer
111125
})
112126
.collect();
113127

114128
// Bundle all shared components into an Arc for safe sharing.
115129
let Share = Arc::new(Share {
116130
Injector:(Injector::new(), Injector::new(), Injector::new()),
131+
117132
Stealer:(StealerHigh, StealerNormal, StealerLow),
118133
});
119134

120135
// Create a unique context for each worker, giving it ownership of its
121136
// local queues and a reference to the shared components.
122137
let mut Contexts = Vec::with_capacity(Count);
138+
123139
for Identifier in 0..Count {
124140
Contexts.push(Context {
125141
Identifier,
142+
126143
Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
144+
127145
Share:Share.clone(),
128146
});
129147
}
130148

131149
let Queue = Self { Share };
150+
132151
(Queue, Contexts)
133152
}
134153

@@ -138,7 +157,9 @@ impl<TTask:Prioritized<Kind = Priority>> StealingQueue<TTask> {
138157
pub fn Submit(&self, Task:TTask) {
139158
match Task.Rank() {
140159
Priority::High => self.Share.Injector.0.push(Task),
160+
141161
Priority::Normal => self.Share.Injector.1.push(Task),
162+
142163
Priority::Low => self.Share.Injector.2.push(Task),
143164
}
144165
}
@@ -169,8 +190,11 @@ impl<TTask> Context<TTask> {
169190
/// peer worker to ensure fair distribution and avoid contention hotspots.
170191
pub fn Steal<'a>(
171192
&self,
193+
172194
Injector:&'a Injector<TTask>,
195+
173196
Stealers:&'a [Stealer<TTask>],
197+
174198
Local:&'a Worker<TTask>,
175199
) -> Option<TTask> {
176200
// First, try to steal a batch from the global injector.
@@ -182,13 +206,15 @@ impl<TTask> Context<TTask> {
182206

183207
// If the global queue is empty, try stealing from peers.
184208
let Count = Stealers.len();
209+
185210
if Count <= 1 {
186211
// Cannot steal if there are no other workers.
187212
return None;
188213
}
189214

190215
// Allocation-free random iteration: pick a random starting point.
191216
let mut Rng = rand::rng();
217+
192218
let Start = Rng.random_range(0..Count);
193219

194220
// Iterate through all peers starting from the random offset.

Source/Queue/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! # Queue Module
22
//!
33
//! This module encapsulates all logic for the generic, priority-aware,
4+
45
//! work-stealing queue system, which is the foundational data structure of the
56
//! `Echo` scheduler.
67

Source/Scheduler/Scheduler.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ use crate::{
2828
pub struct Scheduler {
2929
/// The underlying work-stealing queue system used for task submission.
3030
Queue:StealingQueue<Task>,
31+
3132
/// Handles to the spawned worker threads, used for graceful shutdown.
3233
WorkerHandles:Vec<JoinHandle<()>>,
34+
3335
/// An atomic flag to signal all workers to shut down.
3436
IsRunning:Arc<AtomicBool>,
3537
}
@@ -52,10 +54,12 @@ impl Scheduler {
5254
// Spawn an asynchronous task for each worker.
5355
for Context in Contexts.into_iter() {
5456
let IsRunning = IsRunning.clone();
57+
5558
let WorkerHandle = tokio::spawn(async move {
5659
// Each task creates and runs a worker, consuming its context.
5760
Worker::Create(Context, IsRunning).Run().await;
5861
});
62+
5963
WorkerHandles.push(WorkerHandle);
6064
}
6165

@@ -79,15 +83,18 @@ impl Scheduler {
7983
pub async fn Stop(&mut self) {
8084
if !self.IsRunning.swap(false, Ordering::Relaxed) {
8185
info!("[Scheduler] Stop already initiated.");
86+
8287
return;
8388
}
8489

8590
info!("[Scheduler] Stopping worker threads...");
91+
8692
for Handle in self.WorkerHandles.drain(..) {
8793
if let Err(Error) = Handle.await {
8894
error!("[Scheduler] Error joining worker handle: {}", Error);
8995
}
9096
}
97+
9198
info!("[Scheduler] All workers stopped successfully.");
9299
}
93100
}
@@ -101,6 +108,7 @@ impl Drop for Scheduler {
101108
fn drop(&mut self) {
102109
if self.IsRunning.load(Ordering::Relaxed) {
103110
warn!("[Scheduler] Dropped without explicit stop. Signaling workers to terminate.");
111+
104112
self.IsRunning.store(false, Ordering::Relaxed);
105113
}
106114
}

Source/Scheduler/SchedulerBuilder.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::Scheduler::Scheduler::Scheduler;
1515
pub enum Concurrency {
1616
/// Specifies a maximum number of concurrent tasks for a queue.
1717
Limit(usize),
18+
1819
/// Allows an unlimited number of concurrent tasks for a queue.
1920
Unlimited,
2021
}
@@ -27,6 +28,7 @@ pub enum Concurrency {
2728
pub struct SchedulerBuilder {
2829
/// The number of worker threads to be spawned in the scheduler's pool.
2930
Count:usize,
31+
3032
/// Configuration for named queues with concurrency limits (for future
3133
/// use).
3234
Configuration:HashMap<String, Concurrency>,
@@ -40,6 +42,7 @@ impl SchedulerBuilder {
4042
/// viable.
4143
pub fn Create() -> Self {
4244
let Default = num_cpus::get().max(2);
45+
4346
Self { Count:Default, Configuration:HashMap::new() }
4447
}
4548

@@ -49,17 +52,20 @@ impl SchedulerBuilder {
4952
pub fn WithWorkerCount(mut self, Count:usize) -> Self {
5053
if Count == 0 {
5154
warn!("[SchedulerBuilder] Worker count of 0 is invalid. Defaulting to logical CPUs.");
55+
5256
self.Count = num_cpus::get().max(2);
5357
} else {
5458
self.Count = Count;
5559
}
60+
5661
self
5762
}
5863

5964
/// Configures a named queue with a specific concurrency limit (for future
6065
/// use).
6166
pub fn WithQueue(mut self, Name:&str, Limit:Concurrency) -> Self {
6267
self.Configuration.insert(Name.to_string(), Limit);
68+
6369
self
6470
}
6571

Source/Scheduler/Worker.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct Worker {
1919
/// The worker's execution context, which contains its private deques and a
2020
/// reference to the shared queue system.
2121
Context:Context<Task>,
22+
2223
/// An atomic flag, shared by all workers, to signal a shutdown request.
2324
IsRunning:Arc<AtomicBool>,
2425
}
@@ -46,7 +47,9 @@ impl Worker {
4647
"[Worker {}] Executing local task with priority: {:?}.",
4748
self.Context.Identifier, Task.Priority
4849
);
50+
4951
Task.Operation.await;
52+
5053
continue;
5154
}
5255

@@ -58,6 +61,7 @@ impl Worker {
5861
"[Worker {}] Executing stolen task with priority: {:?}.",
5962
self.Context.Identifier, Task.Priority
6063
);
64+
6165
Task.Operation.await;
6266
} else {
6367
// If there's truly no work anywhere, yield to the OS.

Source/Scheduler/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
// --- Public API ---
1111
pub mod Scheduler;
12+
1213
pub mod SchedulerBuilder;
1314

1415
// --- Internal Implementation ---

Source/Task/Priority.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
/// Represents the priority of a task to be executed by the scheduler.
99
///
1010
/// This enumeration allows the scheduler to ensure that high-priority,
11+
1112
/// user-facing operations (e.g., responding to UI input) are executed before
1213
/// lower-priority, long-running background tasks (e.g., file indexing).
1314
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]

Source/Task/Task.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub type Operation = Pin<Box<dyn Future<Output = ()> + Send>>;
1818
/// Represents a single, schedulable unit of work for the `Echo` scheduler.
1919
///
2020
/// This struct encapsulates an asynchronous operation along with metadata,
21+
2122
/// such as its `Priority`, that the scheduler uses to determine execution
2223
/// order.
2324
pub struct Task {
@@ -51,7 +52,9 @@ impl Prioritized for Task {
5152
fn Rank(&self) -> Self::Kind {
5253
match self.Priority {
5354
Priority::High => QueuePriority::High,
55+
5456
Priority::Normal => QueuePriority::Normal,
57+
5558
Priority::Low => QueuePriority::Low,
5659
}
5760
}

0 commit comments

Comments
 (0)