Skip to content

Commit a16d402

Browse files
refactor(Echo): Improve code clarity, documentation, and API consistency
Refactored the Echo scheduler crate to enhance readability, maintainability, and consistency with Land's architectural principles. Key changes include: - Renamed `SchedulerBuilder::Count` to `WithWorkerCount` and `SchedulerBuilder::Queue` to `WithQueue` for clearer fluent API semantics - Simplified work-stealing logic in `Queue::StealingQueue` with improved error handling and reduced allocations - Updated module and function documentation throughout to better explain Echo's role as Mountain's execution engine - Revised README and Deep Dive docs to reflect architectural relationships with Mountain's ActionEffect system - Renamed internal fields for clarity (e.g., `Handle` → `WorkerHandles`, `Running` → `IsRunning`) These changes ensure Echo remains a robust foundation for Mountain's structured concurrency model, where it executes ActionEffect futures via Tauri's async runtime. The improved documentation clarifies Echo's integration points with Mountain's Track dispatcher and environment services. Refs #291 #292 #290
1 parent eda4e8a commit a16d402

14 files changed

Lines changed: 264 additions & 335 deletions

File tree

README.md

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
Welcome to **Echo**! This crate provides a powerful, structured concurrency
2020
runtime for Rust applications, built on a high-performance **work-stealing
21-
scheduler**. It is designed to be the core execution engine for the `Mountain`
22-
backend, integrating seamlessly with the declarative `ActionEffect` system
23-
defined in the `Common` crate. **Echo** moves beyond simple task spawning
21+
scheduler**. It is designed to be the core execution engine for application
22+
backends like `Mountain`, integrating seamlessly with declarative systems like
23+
the `ActionEffect` pattern. **Echo** moves beyond simple task spawning
2424
(`tokio::spawn`) to provide a robust framework for managing, prioritizing, and
2525
executing complex asynchronous workflows with resilience and efficiency.
2626

@@ -58,13 +58,13 @@ executing complex asynchronous workflows with resilience and efficiency.
5858

5959
## Core Architecture Principles 🏗️
6060

61-
| Principle | Description | Key Components Involved |
62-
| :------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- | :-------------------------------------------------------------------- |
63-
| **Performance** | Use lock-free data structures (`crossbeam-deque`) and a high-performance work-stealing algorithm to achieve maximum throughput and low-latency task execution. | `Queue::StealingQueue`, `Scheduler::Worker` |
64-
| **Structured Concurrency** | Manage all asynchronous operations within a supervised pool of workers, providing graceful startup and shutdown, unlike fire-and-forget `tokio::spawn`. | `Scheduler::Scheduler`, `Scheduler::SchedulerBuilder` |
65-
| **Decoupling** | Separate the generic **Queueing Logic** from the application-specific **Scheduler Implementation**. The scheduler uses the queue to run its tasks. | `Queue::StealingQueue<T>`, `Scheduler::Scheduler`, `Task::Task::Task` |
66-
| **Resilience** | The scheduler's design is inherently resilient; the failure of one task (if it panics) is contained within its `tokio` task and does not crash the worker pool. | `Scheduler::Worker::Run` |
67-
| **Composability** | Provide a simple `Submit` API that accepts any `Future<Output = ()>`, making it easy to integrate with any asynchronous Rust code. | `Task::Task::Task`, `Scheduler::Scheduler::Submit` |
61+
| Principle | Description | Key Components Involved |
62+
| :------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------------------ |
63+
| **Performance** | Use lock-free data structures (`crossbeam-deque`) and a high-performance work-stealing algorithm to achieve maximum throughput and low-latency task execution. | `Queue::StealingQueue`, `Scheduler::Worker` |
64+
| **Structured Concurrency** | Manage all asynchronous operations within a supervised pool of workers, providing graceful startup and shutdown, unlike fire-and-forget `tokio::spawn`. | `Scheduler::Scheduler`, `Scheduler::SchedulerBuilder` |
65+
| **Decoupling** | Separate the generic **Queueing Logic** from the application-specific **Scheduler Implementation**. The scheduler uses the queue to run its tasks. | `Queue::StealingQueue<TTask>`, `Scheduler::Scheduler`, `Task::Task` |
66+
| **Resilience** | The scheduler's design is inherently resilient; the failure of one task (if it panics) is contained within its `tokio` task and does not crash the worker pool. | `Scheduler::Worker::Run` |
67+
| **Composability** | Provide a simple `Submit` API that accepts any `Future<Output = ()>`, making it easy to integrate with any asynchronous Rust code. | `Task::Task`, `Scheduler::Scheduler::Submit` |
6868

6969
---
7070

@@ -94,8 +94,8 @@ graph LR
9494
end
9595
9696
subgraph "Mountain (Application Logic)"
97-
AppRuntime["Mountain AppRuntime"]:::mountain
98-
MountainEnv["MountainEnvironment (Service Impls)"]:::mountain
97+
ApplicationRunTime["Mountain ApplicationRunTime"]:::mountain
98+
MountainEnvironment["MountainEnvironment (Service Impls)"]:::mountain
9999
Track["Track (Request Dispatcher)"]:::mountain
100100
end
101101
@@ -109,10 +109,10 @@ graph LR
109109
WorkerPool -- Pull tasks from --> WorkStealingQueue;
110110
end
111111
112-
Track -- Dispatches --> ActionEffect;
113-
ActionEffect -- Is run by --> AppRuntime;
114-
AppRuntime -- Submits Future to --> Scheduler;
115-
WorkerPool -- Executes Future using --> MountainEnv;
112+
Track -- Dispatches to --> ApplicationRunTime;
113+
ApplicationRunTime -- Creates Future from --> ActionEffect;
114+
ApplicationRunTime -- Submits Future to --> Scheduler;
115+
WorkerPool -- Executes Future using --> MountainEnvironment;
116116
```
117117

118118
---
@@ -126,9 +126,9 @@ separation of concerns:
126126
Echo/
127127
└── Source/
128128
├── Library.rs # Crate root, declares all modules.
129-
├── Scheduler/ # The main public API: Scheduler and Builder. Consumes the Queue.
129+
├── Scheduler/ # The main public API: Scheduler and SchedulerBuilder.
130130
├── Queue/ # The generic, high-performance work-stealing queue library.
131-
└── Task/ # The application-specific definition of a Task and its Priority.
131+
└── Task/ # The concrete definition of a Task and its Priority.
132132
```
133133

134134
---
@@ -157,28 +157,21 @@ Echo = { git = "https://github.com/CodeEditorLand/Echo.git", branch = "Current"
157157
`Echo` is designed to be integrated into an application's main entry point and
158158
used throughout the application, often via a shared context or runtime.
159159

160-
1. **Define the Public API:** In your library's root (`lib.rs` or `main.rs`),
161-
re-export the primary components for easy access.
162-
163-
```rust
164-
// In your application's lib.rs
165-
pub use Echo::Scheduler::{Scheduler, SchedulerBuilder};
166-
pub use Echo::Task::Priority::Priority;
167-
```
168-
169-
2. **Initialize the Scheduler:** Create and start the scheduler when your
160+
1. **Initialize the Scheduler:** Create and start the scheduler when your
170161
application starts. It is typically wrapped in an `Arc` to be shared safely
171162
across your application.
172163

173164
```rust
174165
// In your application's main function
175166
use std::sync::Arc;
167+
use Echo::Scheduler::SchedulerBuilder;
168+
use Echo::Task::Priority;
176169

177170
// Use the fluent builder to configure and build the scheduler
178-
let Scheduler = Arc::new(SchedulerBuilder::Create().Count(8).Build());
171+
let Scheduler = Arc::new(SchedulerBuilder::Create().WithWorkerCount(8).Build());
179172
```
180173

181-
3. **Submit Tasks:** Use the `Scheduler` instance to submit asynchronous work
174+
2. **Submit Tasks:** Use the `Scheduler` instance to submit asynchronous work
182175
from anywhere in your application.
183176

184177
```rust
@@ -195,13 +188,13 @@ used throughout the application, often via a shared context or runtime.
195188
Scheduler.Submit(async { /* critical work */ }, Priority::High);
196189
```
197190

198-
4. **Graceful Shutdown:** Before your application exits, ensure a clean
191+
3. **Graceful Shutdown:** Before your application exits, ensure a clean
199192
shutdown of all worker threads.
200193

201194
```rust
202195
// In your application's shutdown sequence
203-
// Note: Arc::get_mut requires the Arc to have only one strong reference.
204-
if let Some(mut Scheduler) = Arc::get_mut(&mut Scheduler) {
196+
// Note: Arc::try_unwrap requires the Arc to have only one strong reference.
197+
if let Ok(mut Scheduler) = Arc::try_unwrap(Scheduler) {
205198
Scheduler.Stop().await;
206199
}
207200
```

Source/Library.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
//! A Resilient, High-Performance Task Scheduler.
1+
//! # Echo: A Resilient, High-Performance Task Scheduler
22
//!
33
//! Provides a structured concurrency runtime for Rust applications, built on a
44
//! high-performance, priority-aware, work-stealing scheduler. It is designed
5-
//! to be a robust and efficient core execution engine.
5+
//! to be a robust and efficient core execution engine for demanding,
6+
//! concurrent workloads.
67
78
#![allow(non_snake_case, non_camel_case_types)]
89

910
// --- Crate Modules ---
1011
// Declares the main modules that constitute the library.
1112
pub mod Queue;
12-
1313
pub mod Scheduler;
14-
1514
pub mod Task;

Source/Queue/StealingQueue.rs

Lines changed: 58 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
//! A generic, priority-aware, work-stealing queue implementation.
1+
//! # StealingQueue
22
//!
3-
//! This module is self-contained and can be used by any scheduler or
4-
//! application to manage and distribute tasks of any type that can be
5-
//! prioritized.
3+
//! A generic, priority-aware, work-stealing queue implementation. This module
4+
//! is self-contained and can be used by any scheduler or application to manage
5+
//! and distribute tasks of any type that can be prioritized.
66
77
#![allow(non_snake_case, non_camel_case_types)]
88

@@ -20,152 +20,138 @@ pub trait Prioritized {
2020
fn Rank(&self) -> Self::Kind;
2121
}
2222

23-
/// Defines the internal priority levels used by the generic queue.
23+
/// Defines the internal priority levels used by the generic queue. These map
24+
/// directly to the different deques managed by the system.
2425
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2526
pub enum Priority {
2627
High,
27-
2828
Normal,
29-
3029
Low,
3130
}
3231

3332
/// Holds the queue components that are safe to share across all threads.
3433
///
35-
/// This includes global injectors for submitting new tasks and stealers for
36-
/// taking tasks from other workers, organized by priority level.
37-
pub struct Share<T> {
38-
/// Global, multi-producer queues for each priority.
39-
pub Injector:(Injector<T>, Injector<T>, Injector<T>),
40-
41-
/// Share handles for stealing tasks from each worker's queue.
42-
pub Stealer:(Vec<Stealer<T>>, Vec<Stealer<T>>, Vec<Stealer<T>>),
34+
/// This includes global injectors for submitting new tasks from any context and
35+
/// stealers for taking tasks from other workers' deques, organized by priority
36+
/// level.
37+
pub struct Share<TTask> {
38+
/// Global, multi-producer queues for each priority level.
39+
pub Injector:(Injector<TTask>, Injector<TTask>, Injector<TTask>),
40+
41+
/// Shared handles for stealing tasks from each worker's local queue.
42+
pub Stealer:(Vec<Stealer<TTask>>, Vec<Stealer<TTask>>, Vec<Stealer<TTask>>),
4343
}
4444

4545
/// A generic, priority-aware, work-stealing queue.
4646
///
4747
/// This is the public-facing entry point for submitting tasks. It is generic
48-
/// over any task type `T` that implements the `Prioritized` trait.
49-
pub struct StealingQueue<T:Prioritized<Kind = Priority>> {
48+
/// over any task type `TTask` that implements the `Prioritized` trait.
49+
pub struct StealingQueue<TTask:Prioritized<Kind = Priority>> {
5050
/// A shared, thread-safe pointer to the queue's shared components.
51-
Share:Arc<Share<T>>,
51+
Share:Arc<Share<TTask>>,
5252
}
5353

5454
/// Contains all necessary components for a single worker thread to operate.
5555
///
5656
/// This includes the thread-local `Worker` deques, which are not safe to share,
57-
/// making this context object the sole owner of a worker's private queues.
58-
pub struct Context<T> {
57+
/// making this `Context` object the sole owner of a worker's private queues.
58+
pub struct Context<TTask> {
5959
/// A unique identifier for the worker, used to avoid self-stealing.
6060
pub Identifier:usize,
6161

6262
/// Thread-local work queues for each priority level.
63-
pub Local:(Worker<T>, Worker<T>, Worker<T>),
63+
pub Local:(Worker<TTask>, Worker<TTask>, Worker<TTask>),
6464

6565
/// A reference to the shared components of the entire queue system.
66-
pub Share:Arc<Share<T>>,
66+
pub Share:Arc<Share<TTask>>,
6767
}
6868

69-
impl<T:Prioritized<Kind = Priority>> StealingQueue<T> {
69+
impl<TTask:Prioritized<Kind = Priority>> StealingQueue<TTask> {
7070
/// Creates a complete work-stealing queue system.
7171
///
7272
/// This function initializes all the necessary queues, both shared and
7373
/// thread-local, for a given number of workers.
7474
///
75-
/// Returns a tuple containing:
75+
/// # Returns
76+
///
77+
/// A tuple containing:
7678
/// 1. The public-facing `StealingQueue` for submitting new tasks.
7779
/// 2. A `Vec` of `Context` objects, one for each worker thread to own.
78-
pub fn Create(Count:usize) -> (Self, Vec<Context<T>>) {
79-
let mut High:Vec<Worker<T>> = Vec::with_capacity(Count);
80-
81-
let mut Normal:Vec<Worker<T>> = Vec::with_capacity(Count);
82-
83-
let mut Low:Vec<Worker<T>> = Vec::with_capacity(Count);
80+
pub fn Create(Count:usize) -> (Self, Vec<Context<TTask>>) {
81+
let mut High:Vec<Worker<TTask>> = Vec::with_capacity(Count);
82+
let mut Normal:Vec<Worker<TTask>> = Vec::with_capacity(Count);
83+
let mut Low:Vec<Worker<TTask>> = Vec::with_capacity(Count);
8484

8585
// For each priority level, create a thread-local worker queue and its
8686
// corresponding shared stealer.
87-
let StealerHigh:Vec<Stealer<T>> = (0..Count)
87+
let StealerHigh:Vec<Stealer<TTask>> = (0..Count)
8888
.map(|_| {
8989
let Worker = Worker::new_fifo();
90-
9190
let Stealer = Worker.stealer();
92-
9391
High.push(Worker);
94-
9592
Stealer
9693
})
9794
.collect();
9895

99-
let StealerNormal:Vec<Stealer<T>> = (0..Count)
96+
let StealerNormal:Vec<Stealer<TTask>> = (0..Count)
10097
.map(|_| {
10198
let Worker = Worker::new_fifo();
102-
10399
let Stealer = Worker.stealer();
104-
105100
Normal.push(Worker);
106-
107101
Stealer
108102
})
109103
.collect();
110104

111-
let StealerLow:Vec<Stealer<T>> = (0..Count)
105+
let StealerLow:Vec<Stealer<TTask>> = (0..Count)
112106
.map(|_| {
113107
let Worker = Worker::new_fifo();
114-
115108
let Stealer = Worker.stealer();
116-
117109
Low.push(Worker);
118-
119110
Stealer
120111
})
121112
.collect();
122113

123114
// Bundle all shared components into an Arc for safe sharing.
124115
let Share = Arc::new(Share {
125116
Injector:(Injector::new(), Injector::new(), Injector::new()),
126-
127117
Stealer:(StealerHigh, StealerNormal, StealerLow),
128118
});
129119

130120
// Create a unique context for each worker, giving it ownership of its
131121
// local queues and a reference to the shared components.
132-
let mut Context = Vec::with_capacity(Count);
133-
122+
let mut Contexts = Vec::with_capacity(Count);
134123
for Identifier in 0..Count {
135-
Context.push(Context {
124+
Contexts.push(Context {
136125
Identifier,
137-
138126
Local:(High.remove(0), Normal.remove(0), Low.remove(0)),
139-
140127
Share:Share.clone(),
141128
});
142129
}
143130

144131
let Queue = Self { Share };
145-
146-
(Queue, Context)
132+
(Queue, Contexts)
147133
}
148134

149135
/// Submits a new task to the appropriate global queue based on its
150136
/// priority. This method is thread-safe and can be called from any
151137
/// context.
152-
pub fn Submit(&self, Task:T) {
138+
pub fn Submit(&self, Task:TTask) {
153139
match Task.Rank() {
154140
Priority::High => self.Share.Injector.0.push(Task),
155-
156141
Priority::Normal => self.Share.Injector.1.push(Task),
157-
158142
Priority::Low => self.Share.Injector.2.push(Task),
159143
}
160144
}
161145
}
162146

163-
impl<T> Context<T> {
147+
impl<TTask> Context<TTask> {
164148
/// Finds the next available task for the worker to execute.
165-
// This method implements the complete work-finding logic:
166-
/// 1. Check local queue (high to low priority).
167-
/// 2. Steal from the system (high to low priority).
168-
pub fn Next(&self) -> Option<T> {
149+
///
150+
/// This method implements the complete work-finding logic:
151+
/// 1. Check local deques (from high to low priority).
152+
/// 2. If local deques are empty, attempt to steal from the system (from
153+
/// high to low priority).
154+
pub fn Next(&self) -> Option<TTask> {
169155
self.Local
170156
.0
171157
.pop()
@@ -178,10 +164,15 @@ impl<T> Context<T> {
178164

179165
/// Attempts to steal a task from a specific priority set.
180166
///
181-
/// It first tries to steal a batch from the global injector queue. If that
182-
/// fails, it attempts to steal from a randomly chosen peer worker to ensure
183-
/// fair distribution and avoid contention hotspots.
184-
pub fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealer:&'a [Stealer<T>], Local:&'a Worker<T>) -> Option<T> {
167+
/// It first tries to steal a batch from the global injector queue for that
168+
/// priority. If that fails, it attempts to steal from a randomly chosen
169+
/// peer worker to ensure fair distribution and avoid contention hotspots.
170+
pub fn Steal<'a>(
171+
&self,
172+
Injector:&'a Injector<TTask>,
173+
Stealers:&'a [Stealer<TTask>],
174+
Local:&'a Worker<TTask>,
175+
) -> Option<TTask> {
185176
// First, try to steal a batch from the global injector.
186177
// `steal_batch_and_pop` is efficient: it moves a batch into our local
187178
// queue and returns one task immediately if successful.
@@ -190,14 +181,15 @@ impl<T> Context<T> {
190181
}
191182

192183
// If the global queue is empty, try stealing from peers.
193-
let Count = Stealer.len();
194-
184+
let Count = Stealers.len();
195185
if Count <= 1 {
186+
// Cannot steal if there are no other workers.
196187
return None;
197188
}
198189

199190
// Allocation-free random iteration: pick a random starting point.
200-
let Start = rand::rng().random_range(0..Count);
191+
let mut Rng = rand::rng();
192+
let Start = Rng.random_range(0..Count);
201193

202194
// Iterate through all peers starting from the random offset.
203195
for i in 0..Count {
@@ -208,7 +200,7 @@ impl<T> Context<T> {
208200
continue;
209201
}
210202

211-
if let Steal::Success(Task) = Stealer[Index].steal_batch_and_pop(Local) {
203+
if let Steal::Success(Task) = Stealers[Index].steal_batch_and_pop(Local) {
212204
return Some(Task);
213205
}
214206
}

0 commit comments

Comments
 (0)