@@ -39,16 +39,34 @@ class Scheduler:
3939 concurrently and an 8-core, 16-thread CPU will run 17 processes
4040 concurrently.
4141
42- If CPU usage is found to be below the threshold, the number of
43- simultaneous processes will be increased. CPU usage is checked
44- every 5 seconds.
42+ If `dynamic` is enabled, Scheduler will also check CPU usage and increase
43+ the number of processes if it is below the threshold.
4544 """
4645
4746 def __init__ (
48- self ,
49- progress_callback : Callable [[int , int ], None ] = None ,
50- delay_seconds : float = 0.05 ,
47+ self ,
48+ progress_callback : Callable [[int , int ], None ] = None ,
49+ update_interval : float = 0.05 ,
50+ dynamic : bool = True ,
51+ cpu_threshold : float = 95 ,
52+ cpu_update_interval : float = 5 ,
5153 ):
54+ """
55+ :param progress_callback: a function taking the number of finished tasks and the total number of tasks, which is
56+ called when a task finishes
57+
58+ :param update_interval: the time between consecutive updates (when tasks are checked and new tasks are scheduled)
59+
60+ :param dynamic: whether to dynamically increase the number of processes based on the CPU usage
61+
62+ :param cpu_threshold: the minimum target CPU usage, in percent; if `dynamic` is enabled and CPU usage is found
63+ to be below the threshold, the number of simultaneous tasks will be increased
64+
65+ :param cpu_update_interval: the time, in seconds, between consecutive CPU usage checks when `dynamic` is enabled
66+ """
67+ self .dynamic = dynamic
68+ self .update_interval = update_interval
69+
5270 self .tasks : List [Task ] = []
5371 self .output : List [tuple ] = []
5472
@@ -61,29 +79,24 @@ def __init__(
6179 # List of currently running tasks.
6280 self .running_tasks : List [Task ] = []
6381
64- # The delay between each consecutive check for finished tasks.
65- self .delay_seconds = delay_seconds
66- self .delay_millis = int (delay_seconds * 1000 )
67-
6882 self .time_start : float = 0
6983 self .started = False
70- self .stopped = False
7184 self .terminated = False
7285
7386 # Most recent time at which CPU utilisation was checked.
7487 self .time_cpu_checked : float = 0
7588
7689 # Number of seconds between CPU utilisation checks.
77- self .time_between_cpu_checks : float = 5
90+ self .time_between_cpu_checks : float = cpu_update_interval
7891
7992 # If the CPU utilisation in percent is below the threshold, more tasks will be run.
80- self .cpu_threshold = 95
93+ self .cpu_threshold = cpu_threshold
8194
8295 self .total_task_count : int = 0
8396 self .tasks_completed : int = 0
8497
8598 # Callback which allows the Scheduler to report its progress;
86- # the 1st input is the number of completed tasks, while the 2nd is
99+ # the 1st input is the number of completed tasks, and the 2nd is
87100 # the total number of tasks.
88101 self .progress_callback : Callable [[int , int ], None ] = progress_callback
89102
@@ -92,37 +105,70 @@ def add_task(self, task: Task) -> None:
92105 Adds a task to the Scheduler.
93106 """
94107 if self .started :
95- raise SchedulerException ("Do not add tasks to an running Scheduler." )
108+ raise SchedulerException ("add_task() cannot be called on a running Scheduler." )
96109
97110 self .tasks .append (task )
98111
112+ def add_tasks (self , * args : Task ) -> None :
113+ """
114+ Adds multiple tasks to the Scheduler.
115+ """
116+ if self .started :
117+ raise SchedulerException ("add_tasks() cannot be called on a running Scheduler." )
118+
119+ self .tasks .extend (args )
120+
99121 async def run (self ) -> List [tuple ]:
100122 """
101- Runs the tasks with coroutines. Returns a list containing
102- the output of each task, after all tasks are complete.
123+ Runs the tasks with coroutines.
103124
104- Important: the list is not ordered. Each task should return some
105- form of identifier in its results; for example, the name of the
106- signal.
125+ :returns an ordered list containing the output of each task
107126 """
108- # Initialize `self.output` so that it can be indexed into.
109- self .output = [() for _ in self .tasks ]
110- self .start ()
127+ self ._initialize_output ()
128+ self ._start ()
111129
112- while not self .stopped and not self .all_tasks_finished ():
113- await asyncio .sleep (self .delay_seconds )
114- self .update ()
130+ while not self .terminated and not self ._all_tasks_finished ():
131+ await asyncio .sleep (self .update_interval )
132+ self ._update ()
115133
116134 return self .output
117135
118- def update (self ):
119- should_update_tasks = False
136+ def run_blocking (self ) -> List [tuple ]:
137+ """
138+ Runs the tasks. Will block the current thread until all tasks are complete.
139+
140+ :returns an ordered list containing the output of each task
141+ """
142+ self ._initialize_output ()
143+ self ._start ()
144+
145+ while not self .terminated and not self ._all_tasks_finished ():
146+ time .sleep (self .update_interval )
147+ self ._update ()
148+
149+ return self .output
150+
151+ def terminate (self ) -> None :
152+ """Terminates all running tasks by killing their processes."""
153+ if not self .terminated :
154+ [t .terminate () for t in self .tasks ]
155+ self .terminated = True
156+
157+ def _initialize_output (self ) -> None :
158+ # Initialize `self.output` so that it can be indexed into.
159+ self .output = [None for _ in self .tasks ]
160+
161+ def _update (self ) -> None :
162+ """
163+ Checks whether tasks have finished, and schedules new tasks if applicable.
164+ """
165+ schedule_new_tasks = False
120166
121167 t = time .time ()
122- if t - self .time_cpu_checked > self .time_between_cpu_checks :
168+ if self . dynamic and t - self .time_cpu_checked > self .time_between_cpu_checks :
123169 self .time_cpu_checked = t
124170 total_remaining_tasks = sum (
125- [t .total_tasks () for t in self .available_tasks ()]
171+ [t .total_tasks () for t in self ._available_tasks ()]
126172 )
127173
128174 if total_remaining_tasks > self .concurrent_count :
@@ -135,91 +181,74 @@ def update(self):
135181 new_count += 1
136182
137183 self .concurrent_count = new_count
138- should_update_tasks = True
184+ schedule_new_tasks = True
139185
140186 for t in self .running_tasks :
141187 t .update ()
142188 if t .finished :
143189 index = self .tasks .index (t )
144190 self .output [index ] = t .queue .get ()
145191
146- self .on_task_completed (t )
147- should_update_tasks = True
192+ self ._on_task_completed (t )
193+ schedule_new_tasks = True
148194
149- if should_update_tasks :
150- self .schedule_tasks ()
195+ if schedule_new_tasks :
196+ self ._schedule_tasks ()
151197
152- def start (self ):
153- """Starts the scheduler running the assigned tasks."""
198+ def _start (self ) -> None :
199+ """
200+ Starts the scheduler running its tasks.
201+ """
154202 self .started = True
155203 self .total_task_count = sum ([t .total_tasks () for t in self .tasks ])
156204
157205 self .time_start = time .time ()
158206 self .time_cpu_checked = self .time_start
159- self .report_progress (0 )
207+ self ._report_progress (0 )
160208
161- self .schedule_tasks ()
209+ self ._schedule_tasks ()
162210
163- def schedule_tasks (self ):
211+ def _schedule_tasks (self ) -> None :
164212 """Updates the currently running tasks by starting new tasks if necessary."""
165- tasks = self .tasks_to_run ()
213+ tasks = self ._tasks_to_run ()
166214 self .running_tasks .extend (tasks )
167215 [t .start () for t in tasks ]
168216
169- def terminate (self ):
170- """Terminates all running tasks by killing their processes."""
171- if not (self .terminated or self .stopped ):
172- [t .terminate () for t in self .tasks ]
173- self .terminated = True
174- self .stop_timer ()
175-
176- def stop_timer (self ):
177- """
178- Stops the timer from checking whether tasks have finished.
179- This should be called when all tasks have been completed.
180- """
181- if not self .stopped :
182- self .stopped = True
183-
184- def on_task_completed (self , task ):
217+ def _on_task_completed (self , task ) -> None :
185218 """Called when a task finishes."""
186- self .report_progress (task .total_tasks ())
219+ self ._report_progress (task .total_tasks ())
187220 self .running_tasks .remove (task )
188221
189- def on_all_tasks_completed (self ):
190- """Called when all assigned tasks have been completed."""
191- self .stop_timer ()
192-
193- def report_progress (self , tasks_just_finished : int ):
222+ def _report_progress (self , tasks_just_finished : int ) -> None :
194223 self .tasks_completed += tasks_just_finished
195224 if self .progress_callback :
196225 self .progress_callback (self .tasks_completed , self .total_task_count )
197226
198- def available_tasks (self ) -> List [Task ]:
227+ def _available_tasks (self ) -> List [Task ]:
199228 """Gets all tasks which are available to run."""
200229 return [t for t in self .tasks if not (t .running or t .finished )]
201230
202- def all_tasks_finished (self ) -> bool :
231+ def _all_tasks_finished (self ) -> bool :
203232 """Returns whether all tasks have been finished."""
204233 return all ([t .finished for t in self .tasks ])
205234
206- def total_running_tasks (self ) -> int :
235+ def _total_running_tasks (self ) -> int :
207236 """Returns the total number of running tasks, including sub-tasks."""
208237 running = self .running_tasks
209238 return sum ([t .total_tasks () for t in running ])
210239
211- def tasks_to_run (self ) -> List [Task ]:
240+ def _tasks_to_run (self ) -> List [Task ]:
212241 """
213242 Gets the tasks that should be run, based on the core count
214243 and the current number of running tasks.
215244 """
216245 # Number of remaining tasks to run.
217- available = self .available_tasks ()
246+ available = self ._available_tasks ()
218247
219248 # The total number of tasks (including sub-tasks) for each available task.
220249 task_counts = [t .total_tasks () for t in available ]
221250
222- running_count = self .total_running_tasks ()
251+ running_count = self ._total_running_tasks ()
223252
224253 # Number of tasks that can be started without reducing efficiency.
225254 num_to_run = self .concurrent_count - running_count
0 commit comments