Skip to content

Commit 6d265a0

Browse files
refactor(Echo/Scheduler): decouple queue from scheduler and optimize work-stealing
- Restructured `StealingQueue` and `Scheduler` modules to separate generic work-stealing logic from application-specific task handling, aligning with Land's architecture principle of decoupling components. - Implemented a more efficient work-stealing algorithm using random peer iteration to reduce contention and improve task distribution efficiency. - Updated architectural documentation in README and Deep Dive to reflect the decoupled design and priority-aware scheduling crucial for Mountain's `ActionEffect` execution. - Revised code examples and API documentation to match the current `SchedulerBuilder` usage, ensuring consistency across the codebase. This refactoring enhances the scheduler's performance and maintainability, directly supporting Mountain's need for reliable and efficient execution of declarative `ActionEffect` workflows in the Land editor backend.
1 parent e853ec4 commit 6d265a0

13 files changed

Lines changed: 212 additions & 65 deletions

File tree

README.md

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,31 +41,30 @@ executing complex asynchronous workflows with resilience and efficiency.
4141

4242
## Key Features 🔐
4343

44-
- **Work-Stealing Scheduler:** Implements a modern work-stealing algorithm to
45-
efficiently distribute tasks across a pool of worker threads, preventing
46-
bottlenecks and maximizing parallelism.
44+
- **Work-Stealing Scheduler:** Implements a modern, priority-aware work-stealing
45+
algorithm to efficiently distribute tasks across a pool of worker threads.
4746
- **Task Prioritization:** Supports submitting tasks with `High`, `Normal`, or
4847
`Low` priority, ensuring that latency-sensitive operations are handled
4948
immediately.
50-
- **Fluent Builder API:** A clean `Builder` allows for easy configuration of the
51-
worker pool and other scheduler parameters.
49+
- **Fluent Builder API:** A clean `SchedulerBuilder` allows for easy
50+
configuration of the worker pool size.
5251
- **Graceful Shutdown:** Provides a `Stop()` method to ensure all worker threads
5352
complete their current tasks and exit cleanly, preventing orphaned threads.
54-
- **Built for `ActionEffect`:** Serves as the ideal backend for effect systems,
55-
providing the runtime engine that executes declarative, asynchronous workflows
56-
defined in other parts of the application.
53+
- **Decoupled Architecture:** A generic `Queue` module provides the core
54+
work-stealing logic, which is consumed by the application-specific
55+
`Scheduler`.
5756

5857
---
5958

6059
## Core Architecture Principles 🏗️
6160

62-
| Principle | Description | Key Components Involved |
63-
| :------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------ | :---------------------------------------------------- |
64-
| **Performance** | Use lock-free data structures (`crossbeam-deque`) and a work-stealing algorithm to achieve maximum throughput and low-latency task execution. | `Queue::StealingQueue`, `Scheduler::Worker` |
65-
| **Structured Concurrency** | Manage all asynchronous operations within a supervised pool of workers, providing graceful startup and shutdown, unlike fire-and-forget `tokio::spawn`. | `Scheduler::Scheduler`, `Scheduler::SchedulerBuilder` |
66-
| **Decoupling** | Separate the _submission_ of a task from its _execution_. The application submits work, and the `Scheduler` handles how, when, and where it runs. | `Scheduler::Scheduler::Submit`, `Task::Task::Task` |
67-
| **Resilience** | The scheduler's design is inherently resilient, as the failure of one task (if it panics) does not bring down the entire worker pool. | `Scheduler::Worker::Run` (execution loop) |
68-
| **Composability** | Provide a simple, generic `Submit` API that accepts any `Future<Output = ()>`, making it easy to integrate with any asynchronous Rust code. | `Task::Task::Task`, `Scheduler::Scheduler::Submit` |
61+
| Principle | Description | Key Components Involved |
62+
| :------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- | :-------------------------------------------------------------------- |
63+
| **Performance** | Use lock-free data structures (`crossbeam-deque`) and a high-performance work-stealing algorithm to achieve maximum throughput and low-latency task execution. | `Queue::StealingQueue`, `Scheduler::Worker` |
64+
| **Structured Concurrency** | Manage all asynchronous operations within a supervised pool of workers, providing graceful startup and shutdown, unlike fire-and-forget `tokio::spawn`. | `Scheduler::Scheduler`, `Scheduler::SchedulerBuilder` |
65+
| **Decoupling** | Separate the generic **Queueing Logic** from the application-specific **Scheduler Implementation**. The scheduler uses the queue to run its tasks. | `Queue::StealingQueue<T>`, `Scheduler::Scheduler`, `Task::Task::Task` |
66+
| **Resilience** | The scheduler's design is inherently resilient; the failure of one task (if it panics) is contained within its `tokio` task and does not crash the worker pool. | `Scheduler::Worker::Run` |
67+
| **Composability** | Provide a simple `Submit` API that accepts any `Future<Output = ()>`, making it easy to integrate with any asynchronous Rust code. | `Task::Task::Task`, `Scheduler::Scheduler::Submit` |
6968

7069
---
7170

@@ -120,14 +119,15 @@ graph LR
120119

121120
## Project Structure Overview 🗺️
122121

123-
The `Echo` repository is organized into a few core modules:
122+
The `Echo` repository is organized into a few core modules with a clear
123+
separation of concerns:
124124

125125
```
126126
Echo/
127127
└── Source/
128128
├── Library.rs # Crate root, declares all modules.
129-
├── Scheduler/ # The main public API: Scheduler and Builder.
130-
├── Queue/ # The generic, high-performance work-stealing queue.
129+
├── Scheduler/ # The main public API: Scheduler and Builder. Consumes the Queue.
130+
├── Queue/ # The generic, high-performance work-stealing queue library.
131131
└── Task/ # The application-specific definition of a Task and its Priority.
132132
```
133133

@@ -162,7 +162,8 @@ used throughout the application, often via a shared context or runtime.
162162

163163
```rust
164164
// In your application's lib.rs
165-
pub use Echo::Scheduler::{Scheduler, SchedulerBuilder as Builder, Priority};
165+
pub use Echo::Scheduler::{Scheduler, SchedulerBuilder};
166+
pub use Echo::Task::Priority::Priority;
166167
```
167168

168169
2. **Initialize the Scheduler:** Create and start the scheduler when your
@@ -174,7 +175,7 @@ used throughout the application, often via a shared context or runtime.
174175
use std::sync::Arc;
175176

176177
// Use the fluent builder to configure and build the scheduler
177-
let Scheduler = Arc::new(Builder::Create().Count(8).Build());
178+
let Scheduler = Arc::new(SchedulerBuilder::Create().Count(8).Build());
178179
```
179180

180181
3. **Submit Tasks:** Use the `Scheduler` instance to submit asynchronous work
@@ -199,6 +200,7 @@ used throughout the application, often via a shared context or runtime.
199200

200201
```rust
201202
// In your application's shutdown sequence
203+
// Note: Arc::get_mut requires the Arc to have only one strong reference.
202204
if let Some(mut Scheduler) = Arc::get_mut(&mut Scheduler) {
203205
Scheduler.Stop().await;
204206
}

Source/Library.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@
99
// --- Crate Modules ---
1010
// Declares the main modules that constitute the library.
1111
pub mod Queue;
12+
1213
pub mod Scheduler;
14+
1315
pub mod Task;

Source/Queue/StealingQueue.rs

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88

99
use std::sync::Arc;
1010

11-
use crossbeam_deque::{Injector, Stealer, Worker};
12-
use rand::seq::SliceRandom;
11+
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
12+
use rand::Rng;
1313

1414
/// Defines a contract for types that can be prioritized by the queue.
1515
pub trait Prioritized {
1616
/// The type of the priority value used by the implementor.
1717
type Kind: PartialEq + Eq + Copy;
18+
1819
/// A method to retrieve the priority of the item.
1920
fn Rank(&self) -> Self::Kind;
2021
}
@@ -23,7 +24,9 @@ pub trait Prioritized {
2324
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2425
pub enum Priority {
2526
High,
27+
2628
Normal,
29+
2730
Low,
2831
}
2932

@@ -34,6 +37,7 @@ pub enum Priority {
3437
pub struct Share<T> {
3538
/// Global, multi-producer queues for each priority.
3639
pub Injector:(Injector<T>, Injector<T>, Injector<T>),
40+
3741
/// Share handles for stealing tasks from each worker's queue.
3842
pub Stealer:(Vec<Stealer<T>>, Vec<Stealer<T>>, Vec<Stealer<T>>),
3943
}
@@ -54,8 +58,10 @@ pub struct StealingQueue<T:Prioritized<Kind = Priority>> {
5458
pub struct Context<T> {
5559
/// A unique identifier for the worker, used to avoid self-stealing.
5660
pub Identifier:usize,
61+
5762
/// Thread-local work queues for each priority level.
5863
pub Local:(Worker<T>, Worker<T>, Worker<T>),
64+
5965
/// A reference to the shared components of the entire queue system.
6066
pub Share:Arc<Share<T>>,
6167
}
@@ -71,57 +77,73 @@ impl<T:Prioritized<Kind = Priority>> StealingQueue<T> {
7177
/// 2. A `Vec` of `Context` objects, one for each worker thread to own.
7278
pub fn Create(Count:usize) -> (Self, Vec<Context<T>>) {
7379
let mut High:Vec<Worker<T>> = Vec::with_capacity(Count);
80+
7481
let mut Normal:Vec<Worker<T>> = Vec::with_capacity(Count);
82+
7583
let mut Low:Vec<Worker<T>> = Vec::with_capacity(Count);
7684

7785
// For each priority level, create a thread-local worker queue and its
7886
// corresponding shared stealer.
7987
let StealerHigh:Vec<Stealer<T>> = (0..Count)
8088
.map(|_| {
8189
let Worker = Worker::new_fifo();
90+
8291
let Stealer = Worker.stealer();
92+
8393
High.push(Worker);
94+
8495
Stealer
8596
})
8697
.collect();
8798

8899
let StealerNormal:Vec<Stealer<T>> = (0..Count)
89100
.map(|_| {
90101
let Worker = Worker::new_fifo();
102+
91103
let Stealer = Worker.stealer();
104+
92105
Normal.push(Worker);
106+
93107
Stealer
94108
})
95109
.collect();
96110

97111
let StealerLow:Vec<Stealer<T>> = (0..Count)
98112
.map(|_| {
99113
let Worker = Worker::new_fifo();
114+
100115
let Stealer = Worker.stealer();
116+
101117
Low.push(Worker);
118+
102119
Stealer
103120
})
104121
.collect();
105122

106123
// Bundle all shared components into an Arc for safe sharing.
107124
let Share = Arc::new(Share {
108125
Injector:(Injector::new(), Injector::new(), Injector::new()),
126+
109127
Stealer:(StealerHigh, StealerNormal, StealerLow),
110128
});
111129

112130
// Create a unique context for each worker, giving it ownership of its
113131
// local queues and a reference to the shared components.
114-
let mut Contexts = Vec::with_capacity(Count);
132+
let mut Context = Vec::with_capacity(Count);
133+
115134
for Identifier in 0..Count {
116-
Contexts.push(Context {
135+
Context.push(Context {
117136
Identifier,
137+
118138
Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
139+
119140
Share:Share.clone(),
120141
});
121142
}
122143

123144
let Queue = Self { Share };
124-
(Queue, Contexts)
145+
146+
(Queue, Context)
125147
}
126148

127149
/// Submits a new task to the appropriate global queue based on its
@@ -130,7 +152,9 @@ impl<T:Prioritized<Kind = Priority>> StealingQueue<T> {
130152
pub fn Submit(&self, Task:T) {
131153
match Task.Rank() {
132154
Priority::High => self.Share.Injector.0.push(Task),
155+
133156
Priority::Normal => self.Share.Injector.1.push(Task),
157+
134158
Priority::Low => self.Share.Injector.2.push(Task),
135159
}
136160
}
@@ -157,22 +181,38 @@ impl<T> Context<T> {
157181
/// It first tries to steal a batch from the global injector queue. If that
158182
/// fails, it attempts to steal from a randomly chosen peer worker to ensure
159183
/// fair distribution and avoid contention hotspots.
160-
pub fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealers:&'a [Stealer<T>], Local:&'a Worker<T>) -> Option<T> {
161-
if Injector.steal_batch_and_pop(Local).is_success() {
162-
return Local.pop();
184+
pub fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealer:&'a [Stealer<T>], Local:&'a Worker<T>) -> Option<T> {
185+
// First, try to steal a batch from the global injector.
186+
// `steal_batch_and_pop` is efficient: it moves a batch into our local
187+
// queue and returns one task immediately if successful.
188+
if let Steal::Success(Task) = Injector.steal_batch_and_pop(Local) {
189+
return Some(Task);
190+
}
191+
192+
// If the global queue is empty, try stealing from peers.
193+
let Count = Stealer.len();
194+
195+
if Count <= 1 {
196+
return None;
163197
}
164198

165-
let mut Indices:Vec<usize> = (0..Stealers.len()).collect();
166-
Indices.shuffle(&mut rand::rng());
199+
// Allocation-free random iteration: pick a random starting point.
200+
let Start = rand::rng().random_range(0..Count);
167201

168-
for Index in Indices {
202+
// Iterate through all peers starting from the random offset.
203+
for i in 0..Count {
204+
let Index = (Start + i) % Count;
205+
206+
// Don't steal from ourselves.
169207
if Index == self.Identifier {
170208
continue;
171209
}
172-
if Stealers[Index].steal_batch_and_pop(Local).is_success() {
173-
return Local.pop();
210+
211+
if let Steal::Success(Task) = Stealer[Index].steal_batch_and_pop(Local) {
212+
return Some(Task);
174213
}
175214
}
215+
176216
None
177217
}
178218
}

Source/Scheduler/Scheduler.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ use crate::{
2525
pub struct Scheduler {
2626
/// The underlying work-stealing queue system used for task submission.
2727
Queue:StealingQueue<Task>,
28+
2829
/// Handles to the spawned worker threads, used for graceful shutdown.
2930
Handle:Vec<JoinHandle<()>>,
31+
3032
/// An atomic flag to signal all workers to shut down.
3133
Running:Arc<AtomicBool>,
3234
}
@@ -38,15 +40,18 @@ impl Scheduler {
3840
/// `SchedulerBuilder`'s `Build` method.
3941
pub fn Create(Count:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
4042
info!("[Scheduler] Create with {} workers.", Count);
43+
4144
let Running = Arc::new(AtomicBool::new(true));
4245

4346
// Create the entire queue system and retrieve the contexts for each worker.
4447
let (Queue, Contexts) = StealingQueue::<Task>::Create(Count);
4548

4649
let mut Handle = Vec::with_capacity(Count);
50+
4751
// Spawn an asynchronous task for each worker.
4852
for Context in Contexts.into_iter() {
4953
let Running = Running.clone();
54+
5055
Handle.push(tokio::spawn(async move {
5156
// Each task creates and runs a worker, consuming its context.
5257
Worker::Create(Context, Running).Run().await;
@@ -73,15 +78,18 @@ impl Scheduler {
7378
pub async fn Stop(&mut self) {
7479
if !self.Running.swap(false, Ordering::Relaxed) {
7580
info!("[Scheduler] Stop already initiated.");
81+
7682
return;
7783
}
7884

7985
info!("[Scheduler] Stopping worker threads...");
86+
8087
for Handle in self.Handle.drain(..) {
8188
if let Err(Error) = Handle.await {
8289
error!("[Scheduler] Error joining worker: {}", Error);
8390
}
8491
}
92+
8593
info!("[Scheduler] All workers stopped.");
8694
}
8795
}
@@ -94,6 +102,7 @@ impl Drop for Scheduler {
94102
fn drop(&mut self) {
95103
if self.Running.load(Ordering::Relaxed) {
96104
warn!("[Scheduler] Dropped without explicit stop. Signaling workers.");
105+
97106
self.Running.store(false, Ordering::Relaxed);
98107
}
99108
}

Source/Scheduler/SchedulerBuilder.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::Scheduler::Scheduler::Scheduler;
1313
pub enum Concurrency {
1414
/// Specifies a maximum number of concurrent tasks for a queue.
1515
Limit(usize),
16+
1617
/// Allows an unlimited number of concurrent tasks for a queue.
1718
Unlimited,
1819
}
@@ -25,6 +26,7 @@ pub enum Concurrency {
2526
pub struct SchedulerBuilder {
2627
/// The number of worker threads to be spawned in the scheduler's pool.
2728
Count:usize,
29+
2830
/// Configuration for named queues with concurrency limits. (For future use)
2931
Configuration:HashMap<String, Concurrency>,
3032
}
@@ -36,6 +38,7 @@ impl SchedulerBuilder {
3638
/// system, with a minimum of two workers to ensure work-stealing is viable.
3739
pub fn Create() -> Self {
3840
let Default = num_cpus::get().max(2);
41+
3942
Self { Count:Default, Configuration:HashMap::new() }
4043
}
4144

@@ -45,17 +48,20 @@ impl SchedulerBuilder {
4548
pub fn Count(mut self, Count:usize) -> Self {
4649
if Count == 0 {
4750
warn!("[Builder] Worker count of 0 is invalid. Defaulting to logical CPUs.");
51+
4852
self.Count = num_cpus::get().max(2);
4953
} else {
5054
self.Count = Count;
5155
}
56+
5257
self
5358
}
5459

5560
/// Configures a named queue with a specific concurrency limit. (For future
5661
/// use)
5762
pub fn Queue(mut self, Name:&str, Limit:Concurrency) -> Self {
5863
self.Configuration.insert(Name.to_string(), Limit);
64+
5965
self
6066
}
6167

0 commit comments

Comments
 (0)