11import importlib
22import logging
33import os
4- from multiprocessing import Process , freeze_support , Queue , set_start_method , get_context
4+ from multiprocessing import Process , freeze_support , Queue , set_start_method
55from sys import platform
66from typing import List
77
2222_mp_fork_set = False
2323if not _mp_fork_set :
2424 try :
25- if platform == ' win32' :
26- set_start_method (' spawn' )
25+ if platform == " win32" :
26+ set_start_method (" spawn" )
2727 else :
28- set_start_method (' fork' )
28+ set_start_method (" fork" )
2929 _mp_fork_set = True
3030 except Exception as e :
31- logger .info (f' error when setting multiprocessing.set_start_method - maybe the context is set { e .args } ' )
31+ logger .info (f" error when setting multiprocessing.set_start_method - maybe the context is set { e .args } " )
3232 if platform == "darwin" :
33- os .environ [' no_proxy' ] = '*'
33+ os .environ [" no_proxy" ] = "*"
3434
3535def register_decorated_fn (name : str , poll_interval : int , domain : str , worker_id : str , func ):
36- logger .info (f' decorated { name } ' )
36+ logger .info (f" decorated { name } " )
3737 _decorated_functions [(name , domain )] = {
38- ' func' : func ,
39- ' poll_interval' : poll_interval ,
40- ' domain' : domain ,
41- ' worker_id' : worker_id
38+ " func" : func ,
39+ " poll_interval" : poll_interval ,
40+ " domain" : domain ,
41+ " worker_id" : worker_id
4242 }
4343
4444
@@ -54,11 +54,11 @@ def __init__(
5454 self .logger_process , self .queue = _setup_logging_queue (configuration )
5555
5656 # imports
57- importlib .import_module (' conductor.client.http.models.task' )
58- importlib .import_module (' conductor.client.worker.worker_task' )
57+ importlib .import_module (" conductor.client.http.models.task" )
58+ importlib .import_module (" conductor.client.worker.worker_task" )
5959 if import_modules is not None :
6060 for module in import_modules :
61- logger .info (f' loading module { module } ' )
61+ logger .info (f" loading module { module } " )
6262 importlib .import_module (module )
6363
6464 if workers is None :
@@ -68,22 +68,22 @@ def __init__(
6868 if scan_for_annotated_workers is True :
6969 for (task_def_name , domain ) in _decorated_functions :
7070 record = _decorated_functions [(task_def_name , domain )]
71- fn = record [' func' ]
72- worker_id = record [' worker_id' ]
73- poll_interval = record [' poll_interval' ]
71+ fn = record [" func" ]
72+ worker_id = record [" worker_id" ]
73+ poll_interval = record [" poll_interval" ]
7474
7575 worker = Worker (
7676 task_definition_name = task_def_name ,
7777 execute_function = fn ,
7878 worker_id = worker_id ,
7979 domain = domain ,
8080 poll_interval = poll_interval )
81- logger .info (f' created worker with name={ task_def_name } and domain={ domain } ' )
81+ logger .info (f" created worker with name={ task_def_name } and domain={ domain } " )
8282 workers .append (worker )
8383
8484 self .__create_task_runner_processes (workers , configuration , metrics_settings )
8585 self .__create_metrics_provider_process (metrics_settings )
86- logger .info (' TaskHandler initialized' )
86+ logger .info (" TaskHandler initialized" )
8787
8888 def __enter__ (self ):
8989 return self
@@ -94,24 +94,24 @@ def __exit__(self, exc_type, exc_value, traceback):
9494 def stop_processes (self ) -> None :
9595 self .__stop_task_runner_processes ()
9696 self .__stop_metrics_provider_process ()
97- logger .info (' Stopped worker processes...' )
97+ logger .info (" Stopped worker processes..." )
9898 self .queue .put (None )
9999 self .logger_process .terminate ()
100100
101101 def start_processes (self ) -> None :
102- logger .info (' Starting worker processes...' )
102+ logger .info (" Starting worker processes..." )
103103 freeze_support ()
104104 self .__start_task_runner_processes ()
105105 self .__start_metrics_provider_process ()
106- logger .info (' Started all processes' )
106+ logger .info (" Started all processes" )
107107
108108 def join_processes (self ) -> None :
109109 try :
110110 self .__join_task_runner_processes ()
111111 self .__join_metrics_provider_process ()
112- logger .info (' Joined all processes' )
112+ logger .info (" Joined all processes" )
113113 except KeyboardInterrupt :
114- logger .info (' KeyboardInterrupt: Stopping all processes' )
114+ logger .info (" KeyboardInterrupt: Stopping all processes" )
115115 self .stop_processes ()
116116
117117 def __create_metrics_provider_process (self , metrics_settings : MetricsSettings ) -> None :
@@ -122,7 +122,7 @@ def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -
122122 target = MetricsCollector .provide_metrics ,
123123 args = (metrics_settings ,)
124124 )
125- logger .info (' Created MetricsProvider process' )
125+ logger .info (" Created MetricsProvider process" )
126126
127127 def __create_task_runner_processes (
128128 self ,
@@ -150,25 +150,25 @@ def __start_metrics_provider_process(self):
150150 if self .metrics_provider_process is None :
151151 return
152152 self .metrics_provider_process .start ()
153- logger .info (' Started MetricsProvider process' )
153+ logger .info (" Started MetricsProvider process" )
154154
155155 def __start_task_runner_processes (self ):
156156 n = 0
157157 for task_runner_process in self .task_runner_processes :
158158 task_runner_process .start ()
159159 n = n + 1
160- logger .info (f' Started { n } TaskRunner process' )
160+ logger .info (f" Started { n } TaskRunner process" )
161161
162162 def __join_metrics_provider_process (self ):
163163 if self .metrics_provider_process is None :
164164 return
165165 self .metrics_provider_process .join ()
166- logger .info (' Joined MetricsProvider processes' )
166+ logger .info (" Joined MetricsProvider processes" )
167167
168168 def __join_task_runner_processes (self ):
169169 for task_runner_process in self .task_runner_processes :
170170 task_runner_process .join ()
171- logger .info (' Joined TaskRunner processes' )
171+ logger .info (" Joined TaskRunner processes" )
172172
173173 def __stop_metrics_provider_process (self ):
174174 self .__stop_process (self .metrics_provider_process )
@@ -181,12 +181,12 @@ def __stop_process(self, process: Process):
181181 if process is None :
182182 return
183183 try :
184- logger .debug (f' Terminating process: { process .pid } ' )
184+ logger .debug (f" Terminating process: { process .pid } " )
185185 process .terminate ()
186186 except Exception as e :
187- logger .debug (f' Failed to terminate process: { process .pid } , reason: { e } ' )
187+ logger .debug (f" Failed to terminate process: { process .pid } , reason: { e } " )
188188 process .kill ()
189- logger .debug (f' Killed process: { process .pid } ' )
189+ logger .debug (f" Killed process: { process .pid } " )
190190
191191
192192# Setup centralized logging queue
0 commit comments