Skip to content

Commit b3930da

Browse files
1 parent 2e2fa42 commit b3930da

7 files changed

Lines changed: 46 additions & 42 deletions

File tree

Example/Sequence.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6666
// Wait for a moment to allow actions to complete
6767
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
6868

69-
// Shutdown the sequence
70-
Sequence.Shutdown().await;
69+
// ShutDown the sequence
70+
Sequence.ShutDown().await;
7171

7272
println!("Sequence completed");
7373

Example/Tauri.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9696
}
9797
}
9898

99-
// Shutdown the sequence
100-
Sequence.Shutdown().await;
99+
// ShutDown the sequence
100+
Sequence.ShutDown().await;
101101

102102
println!("Application completed");
103103

Example/WorkSteal.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
105105
let Running = Arc::new(Mutex::new(true));
106106

107107
// Spawn worker tasks
108-
let Handles: Vec<_> = Workers
108+
let Handle: Vec<_> = Workers
109109
.iter()
110110
.map(|Worker| {
111111
let Worker = Worker.clone();
@@ -139,7 +139,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
139139
*Running.lock().await = false;
140140

141141
// Wait for all worker tasks to complete
142-
for Handle in Handles {
142+
for Handle in Handle {
143143
Handle.await?;
144144
}
145145

Source/Queue/StealingQueue.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
use crossbeam_deque::{Injector, Stealer, Worker};
55
use rand::seq::SliceRandom;
66

7-
use crate::Task::{Priority::Priority, Task::Task};
7+
use crate::Task::{Priority::Enum, Task::Struct};
88

99
/// A container for a set of queues for a single priority level.
1010
struct PriorityQueueSet {
11-
GlobalInjector:Injector<Task>,
12-
WorkerQueue:Vec<Worker<Task>>,
13-
Stealer:Vec<Stealer<Task>>,
11+
GlobalInjector:Injector<Struct>,
12+
WorkerQueue:Vec<Worker<Struct>>,
13+
Stealer:Vec<Stealer<Struct>>,
1414
}
1515

1616
impl PriorityQueueSet {
1717
/// Creates a new set of queues for a given number of workers.
1818
fn New(NumberOfWorker:usize) -> Self {
19-
let WorkerQueue:Vec<Worker<Task>> = (0..NumberOfWorker).map(|_| Worker::new_fifo()).collect();
19+
let WorkerQueue:Vec<Worker<Struct>> = (0..NumberOfWorker).map(|_| Worker::new_fifo()).collect();
2020
Self {
2121
GlobalInjector:Injector::new(),
2222
Stealer:WorkerQueue.iter().map(|w| w.stealer()).collect(),
@@ -49,33 +49,33 @@ impl StealingQueue {
4949

5050
/// Submits a task to the appropriate global injection queue based on its
5151
/// priority.
52-
pub fn Push(&self, Task:Task) {
52+
pub fn Push(&self, Task:Struct) {
5353
match Task.Priority {
54-
Priority::High => self.High.GlobalInjector.push(Task),
55-
Priority::Normal => self.Normal.GlobalInjector.push(Task),
56-
Priority::Low => self.Low.GlobalInjector.push(Task),
54+
Enum::High => self.High.GlobalInjector.push(Task),
55+
Enum::Normal => self.Normal.GlobalInjector.push(Task),
56+
Enum::Low => self.Low.GlobalInjector.push(Task),
5757
}
5858
}
5959

6060
/// Attempts to find a task for a given worker, always prioritizing
6161
/// `High` > `Normal` > `Low`.
62-
pub fn StealForWorker(&self, WorkerId:usize) -> Option<Task> {
62+
pub fn StealForWorker(&self, WorkerId:usize) -> Option<Struct> {
6363
self.FindTaskInSet(&self.High, WorkerId)
6464
.or_else(|| self.FindTaskInSet(&self.Normal, WorkerId))
6565
.or_else(|| self.FindTaskInSet(&self.Low, WorkerId))
6666
}
6767

6868
/// Implements the core work-finding logic for a specific priority level.
6969
/// It first checks the worker's local queue, then attempts to steal.
70-
fn FindTaskInSet(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Task> {
70+
fn FindTaskInSet(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Struct> {
7171
Set.WorkerQueue[WorkerId]
7272
.pop()
7373
.or_else(|| self.StealFromSetGlobalOrPeer(Set, WorkerId))
7474
}
7575

7676
/// Attempts to steal work for a specific priority level, first from the
7777
/// global queue, then from peer workers.
78-
fn StealFromSetGlobalOrPeer(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Task> {
78+
fn StealFromSetGlobalOrPeer(&self, Set:&PriorityQueueSet, WorkerId:usize) -> Option<Struct> {
7979
// Try stealing from the global injector for this priority set.
8080
if Set.GlobalInjector.steal_batch_and_pop(&Set.WorkerQueue[WorkerId]).is_success() {
8181
return Set.WorkerQueue[WorkerId].pop();

Source/Scheduler/Scheduler.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use super::Worker::Worker;
1717
use crate::{
1818
Queue::StealingQueue::StealingQueue,
1919
Scheduler::SchedulerBuilder::Concurrency,
20-
Task::{Priority::Priority, Task::Task},
20+
Task::{Priority::Enum, Task::Struct},
2121
};
2222

2323
/// Manages a pool of worker threads and a work-stealing queue to execute tasks
@@ -36,21 +36,25 @@ impl Scheduler {
3636
/// This is a crate-private function, intended to be called only by the
3737
/// `SchedulerBuilder`.
3838
///
39-
/// @param NumberOfWorkers - The number of worker threads to spawn.
40-
/// @param QueueConfigs - Configuration for named queues with concurrency
39+
/// @param number_of_workers - The number of worker threads to spawn.
40+
/// @param _queue_configs - Configuration for named queues with concurrency
4141
/// limits (future use).
42-
pub(crate) fn Start(NumberOfWorkers:usize, _QueueConfigs:HashMap<String, Concurrency>) -> Self {
43-
info!("[Scheduler] Starting scheduler with {} worker threads.", NumberOfWorkers);
42+
pub(crate) fn Start(number_of_workers:usize, _queue_configs:HashMap<String, Concurrency>) -> Self {
43+
info!("[Scheduler] Starting scheduler with {} worker threads.", number_of_workers);
4444
let IsRunning = Arc::new(AtomicBool::new(true));
45-
let Queue = Arc::new(StealingQueue::New(NumberOfWorkers));
45+
let Queue = Arc::new(StealingQueue::New(number_of_workers));
4646

47-
let mut WorkerHandles = Vec::with_capacity(NumberOfWorkers);
47+
let mut WorkerHandles = Vec::with_capacity(number_of_workers);
48+
49+
for WorkerIdentifier in 0..number_of_workers {
50+
let CloneQueue = Queue.clone();
51+
let CloneIsRunning = IsRunning.clone();
4852

49-
for WorkerId in 0..NumberOfWorkers {
50-
let WorkerInstance = Worker::New(WorkerId, Queue.clone(), IsRunning.clone());
5153
let WorkerHandle = tokio::spawn(async move {
54+
let WorkerInstance = Worker::New(WorkerIdentifier, CloneQueue, CloneIsRunning);
5255
WorkerInstance.Run().await;
5356
});
57+
5458
WorkerHandles.push(WorkerHandle);
5559
}
5660

@@ -60,28 +64,28 @@ impl Scheduler {
6064
/// Submits a new task (as a `Future`) to the scheduler's global queue.
6165
/// The task will be picked up by the next available worker.
6266
///
63-
/// @param FutureInstance - The async block or function to execute.
64-
/// @param TaskPriority - The priority of the task.
65-
pub fn Submit<F>(&self, FutureInstance:F, TaskPriority:Priority)
67+
/// @param future_instance - The async block or function to execute.
68+
/// @param task_priority - The priority of the task.
69+
pub fn submit<F>(&self, future_instance:F, task_priority:Enum)
6670
where
6771
F: Future<Output = ()> + Send + 'static, {
68-
let NewTask = Task::New(FutureInstance, TaskPriority);
69-
self.Queue.Push(NewTask);
72+
let new_task = Struct::New(future_instance, task_priority);
73+
self.Queue.Push(new_task);
7074
}
7175

7276
/// Asynchronously shuts down the scheduler.
7377
///
7478
/// This signals all worker threads to stop their loops and then waits for
7579
/// them to complete their current tasks and exit gracefully.
76-
pub async fn Shutdown(&mut self) {
80+
pub async fn ShutDown(&mut self) {
7781
if !self.IsRunning.swap(false, Ordering::Relaxed) {
78-
info!("[Scheduler] Shutdown already initiated.");
82+
info!("[Scheduler] ShutDown already initiated.");
7983
return;
8084
}
8185

8286
info!("[Scheduler] Shutting down worker threads...");
83-
for Handle in self.WorkerHandles.drain(..) {
84-
if let Err(e) = Handle.await {
87+
for handle in self.WorkerHandles.drain(..) {
88+
if let Err(e) = handle.await {
8589
error!("[Scheduler] Error joining worker task during shutdown: {}", e);
8690
}
8791
}

Source/Task/Priority.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/// Schedulers and workers can use this to ensure that high-priority,
99
/// user-facing tasks are executed before long-running background tasks.
1010
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
11-
pub enum Priority {
11+
pub enum Enum {
1212
/// For background tasks that are not time-sensitive, such as logging,
1313
/// telemetry, or non-critical file indexing.
1414
Low,

Source/Task/Task.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{future::Future, pin::Pin};
33
/// @module Task
44
/// @description Defines the `Task` struct, which is the internal unit of work
55
/// for the Echo scheduler.
6-
use super::Priority::Priority;
6+
use super::Priority::Enum;
77

88
/// A type alias for a boxed, send-able, pinned future that returns no value.
99
/// This is the standard way to handle dynamic, async operations in Rust.
@@ -14,20 +14,20 @@ type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
1414
/// It encapsulates an asynchronous operation (`Future`) along with metadata,
1515
/// such as its `Priority`, that the scheduler can use to determine execution
1616
/// order.
17-
pub struct Task {
17+
pub struct Struct {
1818
/// The asynchronous operation to be executed by a worker.
1919
pub Future:BoxedFuture,
2020
/// The priority level of this task, used by the scheduler's queue.
21-
pub Priority:Priority,
21+
pub Priority:Enum,
2222
}
2323

24-
impl Task {
24+
impl Struct {
2525
/// Creates a new `Task` from a given future and priority level.
2626
///
2727
/// @param FutureInstance - Any `Future` that is `Send` and has a `'static`
2828
/// lifetime. The future is automatically boxed and pinned.
2929
/// @param PriorityValue - The `Priority` of the task.
30-
pub fn New<F>(FutureInstance:F, PriorityValue:Priority) -> Self
30+
pub fn New<F>(FutureInstance:F, PriorityValue:Enum) -> Self
3131
where
3232
F: Future<Output = ()> + Send + 'static, {
3333
Self { Future:Box::pin(FutureInstance), Priority:PriorityValue }

0 commit comments

Comments
 (0)