Skip to content

Commit 138622d

Browse files
Add 'run_in_thread' to Scheduler constructor
1 parent 78c73b3 commit 138622d

4 files changed

Lines changed: 136 additions & 31 deletions

File tree

scheduler/ProcessTask.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# MIT License
2+
#
3+
# Copyright (c) 2020 Sam McCormack
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in all
13+
# copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
from multiprocessing import Process, Queue
23+
24+
from scheduler.Task import Task
25+
from scheduler.utils import terminate_tree
26+
27+
28+
class ProcessTask(Task):
29+
def __init__(self, process: Process, queue: Queue, subtasks: int = 0):
30+
super(ProcessTask, self).__init__(queue, subtasks)
31+
32+
self.process = process
33+
34+
def start(self) -> None:
35+
"""Starts the task."""
36+
if not self.running and not self.finished:
37+
self.process.start()
38+
self.running = True
39+
40+
def terminate(self) -> None:
41+
"""
42+
Terminates the task and all running sub-tasks.
43+
"""
44+
try:
45+
terminate_tree(self.process)
46+
self.queue.close()
47+
except:
48+
pass
49+
finally:
50+
self.running = False

scheduler/Scheduler.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@
2323
import asyncio
2424
import functools
2525
import multiprocessing
26-
import time
2726
import sys
27+
import threading
28+
import time
2829
from multiprocessing import cpu_count, Process, Queue
30+
from queue import Queue as MTQueue
2931
from typing import List, Callable, Type, Optional, Union, Any, Tuple, Iterable
3032

3133
import psutil
3234

35+
from scheduler.ProcessTask import ProcessTask
3336
from scheduler.Task import Task
37+
from scheduler.ThreadTask import ThreadTask
3438
from scheduler.utils import SchedulerException
3539

3640

@@ -54,7 +58,8 @@ def __init__(
5458
cpu_threshold: float = 95,
5559
cpu_update_interval: float = 5,
5660
shared_memory: bool = False,
57-
shared_memory_threshold:int = 1e7,
61+
shared_memory_threshold: int = 1e7,
62+
run_in_thread: bool = False,
5863
):
5964
"""
6065
:param progress_callback: a function taking the number of finished tasks and the total number of tasks, which is
@@ -66,7 +71,15 @@ def __init__(
6671
:param cpu_update_interval: the time, in seconds, between consecutive CPU usage checks when `dynamic` is enabled
6772
:param shared_memory: whether to use shared memory if possible
6873
:param shared_memory_threshold: the minimum size of a Numpy array which will cause it to be transferred using shared memory if possible
74+
:param run_in_thread: if True, a single task will be run in a thread instead of a process.
75+
This reduces the overhead (caused by spawning processes instead of forking) on Windows/macOS systems
6976
"""
77+
self.run_in_thread = run_in_thread
78+
if self.run_in_thread and shared_memory:
79+
raise SchedulerException(
80+
f"Shared memory cannot currently be used when 'run_in_thread' is True."
81+
)
82+
7083
self.dynamic = dynamic
7184
self.update_interval = update_interval
7285

@@ -125,7 +138,7 @@ def add_process(self, process: Process, queue: Queue, subtasks: int = 0) -> None
125138
"add() cannot be called on a Scheduler which has already been started."
126139
)
127140

128-
task = Task(process, queue, subtasks)
141+
task = ProcessTask(process, queue, subtasks)
129142
self.tasks.append(task)
130143

131144
def add(
@@ -150,13 +163,27 @@ def add(
150163
"add_function() cannot be called on a scheduler which has already been started."
151164
)
152165

153-
queue = queue_type()
166+
thread = self.run_in_thread and len(self.tasks) == 0
167+
168+
if thread:
169+
queue = MTQueue()
170+
171+
_args = (queue, self.mgr, self.shared_memory_threshold) + args
172+
_wrapper = functools.partial(wrapper, target)
173+
174+
task = ThreadTask(
175+
threading.Thread(target=_wrapper, args=_args), queue, subtasks=subtasks
176+
)
177+
else:
178+
queue = queue_type()
179+
180+
_args = (queue, self.mgr, self.shared_memory_threshold) + args
181+
_wrapper = functools.partial(wrapper, target)
154182

155-
_args = (queue, self.mgr, self.shared_memory_threshold) + args
156-
_wrapper = functools.partial(wrapper, target)
183+
process = process_type(target=_wrapper, args=_args)
184+
task = ProcessTask(process, queue, subtasks=subtasks)
157185

158-
process = process_type(target=_wrapper, args=_args)
159-
self.tasks.append(Task(process, queue, subtasks=subtasks))
186+
self.tasks.append(task)
160187

161188
async def map(
162189
self,
@@ -463,7 +490,7 @@ def wrapper(
463490
queue: Queue,
464491
manager: Optional["SharedMemoryManager"],
465492
threshold: int,
466-
*args: Any
493+
*args: Any,
467494
) -> None:
468495
"""
469496
Wrapper which calls a function with its specified arguments and puts the output in a queue.

scheduler/Task.py

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,45 +19,30 @@
1919
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2020
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121
# SOFTWARE.
22+
from abc import ABC, abstractmethod
2223

23-
from multiprocessing import Process, Queue
2424

25-
from scheduler.utils import terminate_tree
26-
27-
28-
class Task:
25+
class Task(ABC):
2926
"""
3027
A simple class containing a process and an associated queue. The queue should have been
3128
passed to the process, so that the process can put its output in the queue.
3229
"""
3330

34-
def __init__(self, process: Process, queue: Queue, subtasks: int = 0):
35-
self.process = process
31+
def __init__(self, queue, subtasks: int):
3632
self.queue = queue
37-
3833
self.running = False
3934
self.finished = False
4035

4136
# The number of processes which will be spawned as part of this task.
4237
self.subtasks: int = subtasks
4338

39+
@abstractmethod
4440
def start(self) -> None:
45-
"""Starts the task."""
46-
if not self.running and not self.finished:
47-
self.process.start()
48-
self.running = True
41+
pass
4942

43+
@abstractmethod
5044
def terminate(self) -> None:
51-
"""
52-
Terminates the task and all running sub-tasks.
53-
"""
54-
try:
55-
terminate_tree(self.process)
56-
self.queue.close()
57-
except:
58-
pass
59-
finally:
60-
self.running = False
45+
pass
6146

6247
def total_tasks(self) -> int:
6348
"""

scheduler/ThreadTask.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# MIT License
2+
#
3+
# Copyright (c) 2020 Sam McCormack
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in all
13+
# copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
import warnings
23+
from queue import Queue
24+
from threading import Thread
25+
26+
from scheduler.Task import Task
27+
28+
29+
class ThreadTask(Task):
30+
def __init__(self, thread: Thread, queue: Queue, subtasks: int = 0):
31+
super(ThreadTask, self).__init__(queue, subtasks)
32+
33+
self.thread = thread
34+
35+
def start(self) -> None:
36+
if not self.running and not self.finished:
37+
self.thread.start()
38+
self.running = True
39+
40+
def terminate(self) -> None:
41+
warnings.warn(f"Cannot terminate a thread task.", RuntimeWarning)
42+
43+
self.running = False

0 commit comments

Comments
 (0)