Skip to content

Commit 64d8b64

Browse files
refactor(Echo/Scheduler): expose scheduler APIs and optimize worker task polling
- Made `Share` struct and `Steal` method public in `StealingQueue` to enable cross-component integration with `Track` dispatcher - Changed `Scheduler::Create` visibility to public for proper builder pattern usage in `Mountain` initialization - Refactored `Worker::Run` to implement prioritized local polling and system stealing strategies, improving task distribution efficiency - Fixed module import paths to align with finalized `Element/Echo` crate structure - Updated task imports to use absolute paths ensuring consistency across scheduler components These changes strengthen the foundation for `Mountain`'s task execution system, directly supporting Land's requirement for low-latency command processing between Tauri frontend and native backend. The worker optimization particularly enhances performance during concurrent operations like extension host communication and file I/O handling.
1 parent 8a45494 commit 64d8b64

7 files changed

Lines changed: 50 additions & 27 deletions

File tree

Source/Queue/StealingQueue.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ pub enum Priority {
3131
///
3232
/// This includes global injectors for submitting new tasks and stealers for
3333
/// taking tasks from other workers, organized by priority level.
34-
struct Share<T> {
34+
pub struct Share<T> {
3535
/// Global, multi-producer queues for each priority.
36-
Injector:(Injector<T>, Injector<T>, Injector<T>),
36+
pub Injector:(Injector<T>, Injector<T>, Injector<T>),
3737
/// Share handles for stealing tasks from each worker's queue.
38-
Stealer:(Vec<Stealer<T>>, Vec<Stealer<T>>, Vec<Stealer<T>>),
38+
pub Stealer:(Vec<Stealer<T>>, Vec<Stealer<T>>, Vec<Stealer<T>>),
3939
}
4040

4141
/// A generic, priority-aware, work-stealing queue.
@@ -55,9 +55,9 @@ pub struct Context<T> {
5555
/// A unique identifier for the worker, used to avoid self-stealing.
5656
pub Identifier:usize,
5757
/// Thread-local work queues for each priority level.
58-
Local:(Worker<T>, Worker<T>, Worker<T>),
58+
pub Local:(Worker<T>, Worker<T>, Worker<T>),
5959
/// A reference to the shared components of the entire queue system.
60-
Share:Arc<Share<T>>,
60+
pub Share:Arc<Share<T>>,
6161
}
6262

6363
impl<T:Prioritized<Kind = Priority>> StealingQueue<T> {
@@ -157,7 +157,7 @@ impl<T> Context<T> {
157157
/// It first tries to steal a batch from the global injector queue. If that
158158
/// fails, it attempts to steal from a randomly chosen peer worker to ensure
159159
/// fair distribution and avoid contention hotspots.
160-
fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealers:&'a [Stealer<T>], Local:&'a Worker<T>) -> Option<T> {
160+
pub fn Steal<'a>(&self, Injector:&'a Injector<T>, Stealers:&'a [Stealer<T>], Local:&'a Worker<T>) -> Option<T> {
161161
if Injector.steal_batch_and_pop(Local).is_success() {
162162
return Local.pop();
163163
}

Source/Scheduler/Scheduler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl Scheduler {
3636
///
3737
/// This is a crate-private function, intended to be called only by the
3838
/// `SchedulerBuilder`'s `Build` method.
39-
pub(crate) fn Create(Count:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
39+
pub fn Create(Count:usize, _Configuration:HashMap<String, Concurrency>) -> Self {
4040
info!("[Scheduler] Create with {} workers.", Count);
4141
let Running = Arc::new(AtomicBool::new(true));
4242

Source/Scheduler/SchedulerBuilder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::collections::HashMap;
66

77
use log::warn;
88

9-
use super::Scheduler::Scheduler;
9+
use crate::Scheduler::Scheduler::Scheduler;
1010

1111
/// Defines concurrency limits for named queues. (For future use)
1212
#[derive(Debug, Clone, Copy)]

Source/Scheduler/Worker.rs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ use tokio::time::{Duration, sleep};
1313
use crate::{Queue::StealingQueue::Context, Task::Task::Task};
1414

1515
/// Represents a worker that executes tasks from its assigned context.
16-
///
17-
/// Each worker runs in a dedicated asynchronous task, continuously polling for
18-
/// work from its local queues or stealing from its peers.
1916
pub struct Worker {
2017
/// The worker's execution context, which contains its private queues
2118
/// and a reference to the shared queue system.
@@ -30,26 +27,50 @@ impl Worker {
3027
pub fn Create(Context:Context<Task>, Running:Arc<AtomicBool>) -> Self { Self { Context, Running } }
3128

3229
/// The main execution loop for the worker.
33-
///
34-
/// This method consumes the `Worker` instance, taking full ownership of its
35-
/// fields. This design is critical for making the resulting `Future` safe
36-
/// to send across threads as required by `tokio::spawn`.
3730
pub async fn Run(self) {
3831
trace!("[Worker {}] Starting.", self.Context.Identifier);
3932

4033
while self.Running.load(Ordering::Relaxed) {
41-
// Attempt to find a task from any available source.
42-
if let Some(Task) = self.Context.Next() {
43-
trace!("[Worker {}] Execute: {:?}.", self.Context.Identifier, Task.Priority);
44-
// Execute the task's future to completion.
34+
// First, try to get a task from the local queues.
35+
let TaskOption = self.PopLocal();
36+
37+
if let Some(Task) = TaskOption {
38+
trace!("[Worker {}] Execute Local: {:?}.", self.Context.Identifier, Task.Priority);
39+
Task.Operation.await;
40+
continue; // Immediately loop back to check for more local work.
41+
}
42+
43+
// If no local work, try to steal from the system.
44+
// This attempts to get a batch and executes the first task.
45+
// The rest of the batch now populates the local queue for the next loops.
46+
let TaskOption = self.StealFromSystem();
47+
48+
if let Some(Task) = TaskOption {
49+
trace!("[Worker {}] Execute Stolen: {:?}.", self.Context.Identifier, Task.Priority);
4550
Task.Operation.await;
4651
} else {
47-
// If no work is found, yield to the OS to prevent busy-waiting
48-
// and conserve CPU resources.
52+
// If there's truly no work anywhere, yield.
4953
sleep(Duration::from_millis(1)).await;
5054
}
5155
}
5256

5357
trace!("[Worker {}] Finished.", self.Context.Identifier);
5458
}
59+
60+
/// Attempts to pop a single task from the local deques, honoring priority.
61+
fn PopLocal(&self) -> Option<Task> {
62+
self.Context.Local.0.pop() // High
63+
.or_else(|| self.Context.Local.1.pop()) // Normal
64+
.or_else(|| self.Context.Local.2.pop()) // Low
65+
}
66+
67+
/// Attempts to steal a batch of work from the system.
68+
///
69+
/// It steals from the highest-priority queue that has work, populating its
70+
/// own local queue and returning the first task immediately.
71+
fn StealFromSystem(&self) -> Option<Task> {
72+
self.Context.Steal(&self.Context.Share.Injector.0, &self.Context.Share.Stealer.0, &self.Context.Local.0) // Steal High
73+
.or_else(|| self.Context.Steal(&self.Context.Share.Injector.1, &self.Context.Share.Stealer.1, &self.Context.Local.1)) // Steal Normal
74+
.or_else(|| self.Context.Steal(&self.Context.Share.Injector.2, &self.Context.Share.Stealer.2, &self.Context.Local.2)) // Steal Low
75+
}
5576
}

Source/Task/Task.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
use std::{future::Future, pin::Pin};
66

7-
use super::Priority::Priority;
8-
use crate::Queue::StealingQueue::{Prioritized, Priority as QueuePriority};
7+
use crate::{
8+
Queue::StealingQueue::{Prioritized, Priority as QueuePriority},
9+
Task::Priority::Priority,
10+
};
911

1012
/// Defines a dynamic, asynchronous operation that can be sent between threads.
1113
pub type Operation = Pin<Box<dyn Future<Output = ()> + Send>>;

Target/debug/libEcho.rlib

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:cf6fed20c4874669f6eb11c2a43493b19f8d32d80eb5d47900d91aa846b2a499
3-
size 12084896
2+
oid sha256:05c2fdc8ee692f6e433b6886988245653b49d59ff361e2c4867ebb58fc7beaff
3+
size 12134460

Target/release/libEcho.rlib

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:6c5a8c60c99ff4ac42395e90d999a62af63bc125f614110c47199b6a230478f2
3-
size 271132
2+
oid sha256:98c54fa2c4be0f49dca74c1faf8412cb56ff1598cb3b6cbf3c024b80410eee02
3+
size 286990

0 commit comments

Comments
 (0)