Skip to content

Commit 64fca95

Browse files
Re-Sync
1 parent 2ac32c9 commit 64fca95

10 files changed

Lines changed: 487 additions & 0 deletions

File tree

Source/lib.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
2+
3+
/**
4+
* @module Echo Crate
5+
* @description A high-performance, structured concurrency and task scheduling library for Rust,
6+
* built on top of `tokio` and a work-stealing queue. Echo is designed to
7+
* natively execute `Future`s, providing a robust runtime for complex, asynchronous applications.
8+
*/
9+
10+
#![allow(non_snake_case, non_camel_case_types)]
11+
12+
// --- Public API ---
13+
pub mod scheduler;
14+
pub mod task;
15+
16+
// --- Internal Implementation ---
17+
mod queue;

Source/queue/StealingQueue.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
//! Defines a high-performance, priority-aware, work-stealing deque for
2+
//! distributing tasks among scheduler workers.
3+
4+
use crossbeam_deque::{Injector, Stealer, Worker};
5+
use rand::seq::SliceRandom;
6+
7+
use crate::task::{Priority, Task};
8+
9+
/// A container for a set of queues for a single priority level.
10+
struct PriorityQueueSet {
11+
GlobalInjector:Injector<Task>,
12+
WorkerQueue:Vec<Worker<Task>>,
13+
Stealer:Vec<Stealer<Task>>,
14+
}
15+
16+
impl PriorityQueueSet {
17+
/// Creates a new set of queues for a given number of workers.
18+
fn New(NumberOfWorker:usize) -> Self {
19+
let WorkerQueue:Vec<Worker<Task>> = (0..NumberOfWorker).map(|_| Worker::new_fifo()).collect();
20+
Self {
21+
GlobalInjector:Injector::new(),
22+
Stealer:WorkerQueue.iter().map(|w| w.stealer()).collect(),
23+
WorkerQueue,
24+
}
25+
}
26+
}
27+
28+
/// A collection of worker deques that supports priority-aware work-stealing.
29+
///
30+
/// This struct holds three distinct sets of queues, one for each priority level
31+
/// (`High`, `Normal`, `Low`). When a worker needs a task, it always checks for
32+
/// higher-priority work before considering lower-priority work.
33+
pub(crate) struct StealingQueue {
34+
High:PriorityQueueSet,
35+
Normal:PriorityQueueSet,
36+
Low:PriorityQueueSet,
37+
}
38+
39+
impl StealingQueue {
40+
/// Creates a new `StealingQueue` with a dedicated set of queues for each
41+
/// priority level.
42+
pub fn New(NumberOfWorker:usize) -> Self {
43+
Self {
44+
High:PriorityQueueSet::New(NumberOfWorker),
45+
Normal:PriorityQueueSet::New(NumberOfWorker),
46+
Low:PriorityQueueSet::New(NumberOfWorker),
47+
}
48+
}
49+
50+
/// Submits a task to the appropriate global injection queue based on its
51+
/// priority.
52+
pub fn Push(&self, Task:Task) {
53+
match Task.Priority {
54+
Priority::High => self.High.GlobalInjector.push(Task),
55+
Priority::Normal => self.Normal.GlobalInjector.push(Task),
56+
Priority::Low => self.Low.GlobalInjector.push(Task),
57+
}
58+
}
59+
60+
/// Attempts to find a task for a given worker, always prioritizing
61+
/// `High` > `Normal` > `Low`.
62+
pub fn StealForWorker(&self, WorkerId:usize) -> Option<Task> {
63+
self.FindTaskInSet(&self.High, WorkerId)
64+
.or_else(|| self.FindTaskInSet(&self.Normal, WorkerId))
65+
.or_else(|| self.FindTaskInSet(&self.Low, WorkerId))
66+
}
67+
68+
/// Implements the core work-finding logic for a specific priority level.
69+
/// It first checks the worker's local queue, then attempts to steal.
70+
fn FindTaskInSet(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Task> {
71+
Set.WorkerQueue[WorkerId]
72+
.pop()
73+
.or_else(|| self.StealFromSetGlobalOrPeer(Set, WorkerId))
74+
}
75+
76+
/// Attempts to steal work for a specific priority level, first from the
77+
/// global queue, then from peer workers.
78+
fn StealFromSetGlobalOrPeer(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Task> {
79+
// Try stealing from the global injector for this priority set.
80+
if Set.GlobalInjector.steal_batch_and_pop(&Set.WorkerQueue[WorkerId]).is_success() {
81+
return Set.WorkerQueue[WorkerId].pop();
82+
}
83+
84+
// Try stealing from peers for this priority set. We shuffle the indices
85+
// to ensure fairness and avoid contention hotspots.
86+
let mut ShuffledIndex:Vec<usize> = (0..Set.Stealer.len()).collect();
87+
ShuffledIndex.shuffle(&mut rand::thread_rng());
88+
89+
for Index in ShuffledIndex {
90+
if Index == WorkerId {
91+
continue; // Don't steal from ourselves.
92+
}
93+
if Set.Stealer[Index].steal_batch_and_pop(&Set.WorkerQueue[WorkerId]).is_success() {
94+
return Set.WorkerQueue[WorkerId].pop();
95+
}
96+
}
97+
98+
None
99+
}
100+
}

Source/queue/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
3+
/**
4+
* @module queue
5+
* @description This module provides the high-performance, concurrent queueing
6+
* implementations used by the Echo scheduler. These are internal components
7+
* of the library.
8+
*/
9+
10+
#![allow(non_snake_case, non_camel_case_types)]
11+
12+
mod StealingQueue;
13+
14+
/**
15+
* Re-exports the `StealingQueue` for use within the `Echo` crate, but keeps it
16+
* private from external consumers.
17+
* @see StealingQueue
18+
*/
19+
pub(crate) use self::StealingQueue::StealingQueue;

Source/scheduler/Scheduler.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::{
2+
collections::HashMap,
3+
future::Future,
4+
sync::{
5+
Arc,
6+
atomic::{AtomicBool, Ordering},
7+
},
8+
};
9+
10+
use log::{error, info, warn};
11+
use tokio::task::JoinHandle;
12+
13+
/// @module Scheduler
14+
/// @description Defines the main `Scheduler` that manages the worker pool, task
15+
/// queue, and task execution lifecycle.
16+
use super::Worker::Worker;
17+
use crate::{
18+
queue::StealingQueue,
19+
scheduler::SchedulerBuilder::Concurrency,
20+
task::{Priority, Task},
21+
};
22+
23+
/// Manages a pool of worker threads and a work-stealing queue to execute tasks
24+
/// efficiently. This struct is the public-facing API of the Echo scheduler.
25+
pub struct Scheduler {
26+
/// The underlying work-stealing queue shared by all workers.
27+
Queue:Arc<StealingQueue>,
28+
/// Handles to the spawned worker threads, allowing for graceful shutdown.
29+
WorkerHandles:Vec<JoinHandle<()>>,
30+
/// An atomic flag to signal workers to shut down.
31+
IsRunning:Arc<AtomicBool>,
32+
}
33+
34+
impl Scheduler {
35+
/// Creates and starts a new scheduler with a given configuration.
36+
/// This is a crate-private function, intended to be called only by the
37+
/// `SchedulerBuilder`.
38+
///
39+
/// @param NumberOfWorkers - The number of worker threads to spawn.
40+
/// @param QueueConfigs - Configuration for named queues with concurrency
41+
/// limits (future use).
42+
pub(crate) fn Start(NumberOfWorkers:usize, _QueueConfigs:HashMap<String, Concurrency>) -> Self {
43+
info!("[Scheduler] Starting scheduler with {} worker threads.", NumberOfWorkers);
44+
let IsRunning = Arc::new(AtomicBool::new(true));
45+
let Queue = Arc::new(StealingQueue::New(NumberOfWorkers));
46+
47+
let mut WorkerHandles = Vec::with_capacity(NumberOfWorkers);
48+
49+
for WorkerId in 0..NumberOfWorkers {
50+
let WorkerInstance = Worker::New(WorkerId, Queue.clone(), IsRunning.clone());
51+
let WorkerHandle = tokio::spawn(async move {
52+
WorkerInstance.Run().await;
53+
});
54+
WorkerHandles.push(WorkerHandle);
55+
}
56+
57+
Self { Queue, WorkerHandles, IsRunning }
58+
}
59+
60+
/// Submits a new task (as a `Future`) to the scheduler's global queue.
61+
/// The task will be picked up by the next available worker.
62+
///
63+
/// @param FutureInstance - The async block or function to execute.
64+
/// @param TaskPriority - The priority of the task.
65+
pub fn Submit<F>(&self, FutureInstance:F, TaskPriority:Priority)
66+
where
67+
F: Future<Output = ()> + Send + 'static, {
68+
let NewTask = Task::New(FutureInstance, TaskPriority);
69+
self.Queue.Push(NewTask);
70+
}
71+
72+
/// Asynchronously shuts down the scheduler.
73+
///
74+
/// This signals all worker threads to stop their loops and then waits for
75+
/// them to complete their current tasks and exit gracefully.
76+
pub async fn Shutdown(&mut self) {
77+
if !self.IsRunning.swap(false, Ordering::Relaxed) {
78+
info!("[Scheduler] Shutdown already initiated.");
79+
return;
80+
}
81+
82+
info!("[Scheduler] Shutting down worker threads...");
83+
for Handle in self.WorkerHandles.drain(..) {
84+
if let Err(e) = Handle.await {
85+
error!("[Scheduler] Error joining worker task during shutdown: {}", e);
86+
}
87+
}
88+
info!("[Scheduler] All workers shut down successfully.");
89+
}
90+
}
91+
92+
impl Drop for Scheduler {
93+
/// Ensures that the scheduler is shut down when it goes out of scope,
94+
/// preventing orphaned worker threads.
95+
fn drop(&mut self) {
96+
if self.IsRunning.load(Ordering::Relaxed) {
97+
// If the scheduler is dropped without an explicit async shutdown,
98+
// we must signal the workers to stop. We cannot await the handles
99+
// here, but the threads will eventually terminate.
100+
warn!("[Scheduler] Scheduler dropped without explicit shutdown. Signaling workers to stop.");
101+
self.IsRunning.store(false, Ordering::Relaxed);
102+
}
103+
}
104+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//! Defines the fluent builder for creating and configuring a `Scheduler`
2+
//! instance.
3+
4+
use std::collections::HashMap;
5+
6+
use log::warn;
7+
8+
use super::Scheduler;
9+
10+
/// An enum to define concurrency limits for named queues.
11+
#[derive(Debug, Clone, Copy)]
12+
pub enum Concurrency {
13+
/// Specifies a maximum number of concurrent tasks for a queue.
14+
Limit(usize),
15+
/// Allows an unlimited number of concurrent tasks for a queue.
16+
Unlimited,
17+
}
18+
19+
/// A fluent builder for creating a `Scheduler`.
20+
///
21+
/// This pattern provides a clear and readable API for configuring complex
22+
/// objects, such as a multi-queue, multi-threaded scheduler, before they are
23+
/// constructed.
24+
pub struct SchedulerBuilder {
25+
WorkerCount:usize,
26+
/// Stores the configuration for named queues.
27+
QueueConfiguration:HashMap<String, Concurrency>,
28+
}
29+
30+
impl SchedulerBuilder {
31+
/// Creates a new `SchedulerBuilder` with default settings.
32+
///
33+
/// By default, the worker count is set to the number of logical CPUs on the
34+
/// system, with a minimum of 2.
35+
pub fn New() -> Self {
36+
let DefaultWorkerCount = num_cpus::get().max(2);
37+
Self { WorkerCount:DefaultWorkerCount, QueueConfiguration:HashMap::new() }
38+
}
39+
40+
/// Sets the total number of worker threads for the scheduler's pool.
41+
///
42+
/// # Arguments
43+
/// * `WorkerCount` - The desired number of workers. If `0`, it defaults
44+
/// to the number of logical CPUs on the system.
45+
pub fn WithWorkerCount(mut self, WorkerCount:usize) -> Self {
46+
if WorkerCount == 0 {
47+
warn!("[SchedulerBuilder] Worker count of 0 is invalid. Defaulting to number of logical CPUs.");
48+
self.WorkerCount = num_cpus::get().max(2);
49+
} else {
50+
self.WorkerCount = WorkerCount;
51+
}
52+
self
53+
}
54+
55+
/// Configures a named queue with a specific concurrency limit.
56+
/// Tasks can later be submitted to this specific queue.
57+
///
58+
/// # Arguments
59+
/// * `QueueName` - The name of the queue (e.g., "DiskIO", "Network").
60+
/// * `ConcurrencyLimit` - The concurrency configuration for this queue.
61+
pub fn WithQueue(mut self, QueueName:&str, ConcurrencyLimit:Concurrency) -> Self {
62+
self.QueueConfiguration.insert(QueueName.to_string(), ConcurrencyLimit);
63+
self
64+
}
65+
66+
/// Builds and starts the `Scheduler` with the specified configuration.
67+
/// This consumes the builder.
68+
///
69+
/// # Returns
70+
/// A new, running `Scheduler` instance.
71+
pub fn Build(self) -> Scheduler {
72+
// The Scheduler's internal `Start` method will receive this
73+
// configuration and set up the corresponding queues and worker logic.
74+
Scheduler::Start(self.WorkerCount, self.QueueConfiguration)
75+
}
76+
}
77+
78+
impl Default for SchedulerBuilder {
79+
/// Provides a default `SchedulerBuilder` instance.
80+
fn default() -> Self { Self::New() }
81+
}

Source/scheduler/Worker.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use std::sync::{
2+
Arc,
3+
atomic::{AtomicBool, Ordering},
4+
};
5+
6+
use log::trace;
7+
use tokio::time::{Duration, sleep};
8+
9+
/// @module Worker (Scheduler)
10+
/// @description Defines the `Worker` struct, which represents a single
11+
/// execution thread in the scheduler's pool. This is an internal component of
12+
/// the scheduler.
13+
use crate::queue::StealingQueue;
14+
15+
/// Represents a single worker thread that continuously polls the work-stealing
16+
/// queue for tasks to execute.
17+
pub(crate) struct Worker {
18+
Id:usize,
19+
Queue:Arc<StealingQueue>,
20+
IsRunning:Arc<AtomicBool>,
21+
}
22+
23+
impl Worker {
24+
pub fn New(Id:usize, Queue:Arc<StealingQueue>, IsRunning:Arc<AtomicBool>) -> Self { Self { Id, Queue, IsRunning } }
25+
26+
/// The main execution loop for the worker.
27+
///
28+
/// It continuously attempts to find a task from its local queue or by
29+
/// stealing from other workers. When a task is found, its encapsulated
30+
/// `Future` is awaited to completion.
31+
pub async fn Run(&self) {
32+
trace!("[Worker {}] Starting execution loop.", self.Id);
33+
while self.IsRunning.load(Ordering::Relaxed) {
34+
// Attempt to get a task from the shared queue.
35+
let TaskOption = self.Queue.StealForWorker(self.Id);
36+
37+
if let Some(Task) = TaskOption {
38+
trace!("[Worker {}] Found task with priority {:?}. Executing.", self.Id, Task.Priority);
39+
// The future is executed simply by awaiting it.
40+
// Any panics within the future will be caught by tokio's task system
41+
// and can be handled when the JoinHandle is awaited on shutdown.
42+
Task.Future.await;
43+
} else {
44+
// If no work is found anywhere, yield the thread to the OS to prevent
45+
// a tight busy-loop from consuming 100% CPU.
46+
sleep(Duration::from_millis(1)).await;
47+
}
48+
}
49+
trace!("[Worker {}] Execution loop finished.", self.Id);
50+
}
51+
}

0 commit comments

Comments
 (0)