-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathconcurrent_executor.py
More file actions
101 lines (84 loc) · 3.39 KB
/
concurrent_executor.py
File metadata and controls
101 lines (84 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#
# Copyright 2025 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Concurrent executor provides concurrent executing function either in a
thread pool or a process pool."""
import solnlib.concurrent.process_pool as pp
import solnlib.concurrent.thread_pool as tp
class ConcurrentExecutor:
def __init__(self, config):
"""
:param config: dict like object, contains thread_min_size (int),
thread_max_size (int), daemonize_thread (bool),
process_size (int)
"""
self._io_executor = tp.ThreadPool(
config.get("thread_min_size", 0),
config.get("thread_max_size", 0),
config.get("task_queue_size", 1024),
config.get("daemonize_thread", True),
)
self._compute_executor = None
if config.get("process_size", 0):
self._compute_executor = pp.ProcessPool(config.get("process_size", 0))
def start(self):
self._io_executor.start()
def tear_down(self):
self._io_executor.tear_down()
if self._compute_executor is not None:
self._compute_executor.tear_down()
def run_io_func_sync(self, func, args=(), kwargs=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""
return self._io_executor.apply(func, args, kwargs)
def run_io_func_async(self, func, args=(), kwargs=None, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:calllback: when func is done and without exception, call the callback
:return whatever the func returns
"""
return self._io_executor.apply_async(func, args, kwargs, callback)
def enqueue_io_funcs(self, funcs, block=True):
"""Run jobs in a fire and forget way, no result will be handled over to
clients.
:param funcs: tuple/list-like or generator like object, func
shall be callable
"""
return self._io_executor.enqueue_funcs(funcs, block)
def run_compute_func_sync(self, func, args=(), kwargs={}):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""
assert self._compute_executor is not None
return self._compute_executor.apply(func, args, kwargs)
def run_compute_func_async(self, func, args=(), kwargs={}, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:calllback: when func is done and without exception, call the callback
:return whatever the func returns
"""
assert self._compute_executor is not None
return self._compute_executor.apply_async(func, args, kwargs, callback)