Skip to content

Commit 1276979

Browse files
Linting
1 parent f30f6ad commit 1276979

9 files changed

Lines changed: 110 additions & 62 deletions

File tree

src/conductor/asyncio_client/automator/task_runner.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,16 +240,14 @@ def __set_worker_properties(self) -> None:
240240
else:
241241
self.worker.domain = self.worker.get_domain()
242242

243-
# Try both naming conventions for polling interval
244243
polling_interval = self.__get_property_value_from_env(
245244
"poll_interval", task_type
246245
)
247246
if not polling_interval:
248-
# Fallback to the "polling_interval" naming convention
249247
polling_interval = self.__get_property_value_from_env(
250248
"polling_interval", task_type
251249
)
252-
250+
253251
if polling_interval:
254252
try:
255253
self.worker.poll_interval = float(polling_interval)
@@ -260,7 +258,7 @@ def __set_worker_properties(self) -> None:
260258
e,
261259
)
262260
self.worker.poll_interval = (
263-
self.worker.get_polling_interval_in_seconds() * 1000
261+
self.worker.get_polling_interval_in_seconds()
264262
)
265263

266264
def __get_property_value_from_env(self, prop, task_type):

src/conductor/asyncio_client/configuration/configuration.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,13 @@ def __init__(
121121
self.auth_secret = os.getenv("CONDUCTOR_AUTH_SECRET")
122122

123123
# Additional worker properties with environment variable fallback
124-
self.poll_interval = poll_interval or self._get_env_int("CONDUCTOR_WORKER_POLL_INTERVAL", 100)
124+
self.poll_interval = poll_interval or self._get_env_int(
125+
"CONDUCTOR_WORKER_POLL_INTERVAL", 100
126+
)
125127
self.domain = domain or os.getenv("CONDUCTOR_WORKER_DOMAIN", "default_domain")
126-
self.poll_interval_seconds = poll_interval_seconds or self._get_env_int("CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0)
128+
self.poll_interval_seconds = poll_interval_seconds or self._get_env_int(
129+
"CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0
130+
)
127131

128132
# Store additional worker properties
129133
self._worker_properties: Dict[str, Dict[str, Any]] = {}

src/conductor/asyncio_client/worker/worker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99
from typing import Any, Callable, Optional, Union
1010

1111
from conductor.asyncio_client.adapters.models.task_adapter import TaskAdapter
12-
from conductor.asyncio_client.adapters.models.task_exec_log_adapter import \
13-
TaskExecLogAdapter
14-
from conductor.asyncio_client.adapters.models.task_result_adapter import \
15-
TaskResultAdapter
12+
from conductor.asyncio_client.adapters.models.task_exec_log_adapter import (
13+
TaskExecLogAdapter,
14+
)
15+
from conductor.asyncio_client.adapters.models.task_result_adapter import (
16+
TaskResultAdapter,
17+
)
1618
from conductor.asyncio_client.configuration import Configuration
1719
from conductor.asyncio_client.adapters import ApiClient
1820
from conductor.asyncio_client.worker.worker_interface import WorkerInterface

src/conductor/asyncio_client/worker/worker_task.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
def WorkerTask(
1212
task_definition_name: str,
1313
poll_interval: int = 100,
14-
domain: str = None,
15-
worker_id: str = None,
14+
domain: Optional[str] = None,
15+
worker_id: Optional[str] = None,
1616
poll_interval_seconds: int = 0,
1717
):
1818
config = Configuration()
@@ -21,7 +21,6 @@ def WorkerTask(
2121
domain = domain or config.get_domain()
2222
poll_interval_seconds = poll_interval_seconds or config.get_poll_interval_seconds()
2323

24-
# Calculate poll interval in milliseconds
2524
poll_interval_millis = poll_interval
2625
if poll_interval_seconds > 0:
2726
poll_interval_millis = 1000 * poll_interval_seconds

src/conductor/client/automator/task_runner.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,16 +241,14 @@ def __set_worker_properties(self) -> None:
241241
else:
242242
self.worker.domain = self.worker.get_domain()
243243

244-
# Try both naming conventions for polling interval
245244
polling_interval = self.__get_property_value_from_env(
246245
"poll_interval", task_type
247246
)
248247
if not polling_interval:
249-
# Fallback to the "polling_interval" naming convention
250248
polling_interval = self.__get_property_value_from_env(
251249
"polling_interval", task_type
252250
)
253-
251+
254252
if polling_interval:
255253
try:
256254
self.worker.poll_interval = float(polling_interval)
@@ -261,7 +259,7 @@ def __set_worker_properties(self) -> None:
261259
e,
262260
)
263261
self.worker.poll_interval = (
264-
self.worker.get_polling_interval_in_seconds() * 1000
262+
self.worker.get_polling_interval_in_seconds()
265263
)
266264

267265
def __get_property_value_from_env(self, prop, task_type):

src/conductor/client/configuration/configuration.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,13 @@ def __init__(
8383
self.auth_token_ttl_msec = auth_token_ttl_min * 60 * 1000
8484

8585
# Worker properties
86-
self.poll_interval = poll_interval or self._get_env_float("CONDUCTOR_WORKER_POLL_INTERVAL", 100)
86+
self.poll_interval = poll_interval or self._get_env_float(
87+
"CONDUCTOR_WORKER_POLL_INTERVAL", 100
88+
)
8789
self.domain = domain or os.getenv("CONDUCTOR_WORKER_DOMAIN", "default_domain")
88-
self.poll_interval_seconds = poll_interval_seconds or self._get_env_float("CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0)
90+
self.poll_interval_seconds = poll_interval_seconds or self._get_env_float(
91+
"CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0
92+
)
8993

9094
@property
9195
def debug(self):

src/conductor/client/worker/worker.py

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,41 +20,41 @@
2020
from conductor.shared.worker.exception import NonRetryableException
2121
from conductor.client.worker.worker_interface import WorkerInterface
2222

23-
ExecuteTaskFunction = Callable[
24-
[
25-
Union[Task, object]
26-
],
27-
Union[TaskResult, object]
28-
]
23+
ExecuteTaskFunction = Callable[[Union[Task, object]], Union[TaskResult, object]]
2924

30-
logger = logging.getLogger(
31-
Configuration.get_logging_formatted_name(
32-
__name__
33-
)
34-
)
25+
logger = logging.getLogger(Configuration.get_logging_formatted_name(__name__))
3526

3627

37-
def is_callable_input_parameter_a_task(callable: ExecuteTaskFunction, object_type: Any) -> bool:
28+
def is_callable_input_parameter_a_task(
29+
callable: ExecuteTaskFunction, object_type: Any
30+
) -> bool:
3831
parameters = inspect.signature(callable).parameters
3932
if len(parameters) != 1:
4033
return False
4134
parameter = parameters[next(iter(parameters.keys()))]
42-
return parameter.annotation == object_type or parameter.annotation == parameter.empty or parameter.annotation is object # noqa: PLR1714
35+
return (
36+
parameter.annotation == object_type
37+
or parameter.annotation == parameter.empty
38+
or parameter.annotation is object
39+
) # noqa: PLR1714
4340

4441

45-
def is_callable_return_value_of_type(callable: ExecuteTaskFunction, object_type: Any) -> bool:
42+
def is_callable_return_value_of_type(
43+
callable: ExecuteTaskFunction, object_type: Any
44+
) -> bool:
4645
return_annotation = inspect.signature(callable).return_annotation
4746
return return_annotation == object_type
4847

4948

5049
class Worker(WorkerInterface):
51-
def __init__(self,
52-
task_definition_name: str,
53-
execute_function: ExecuteTaskFunction,
54-
poll_interval: Optional[float] = None,
55-
domain: Optional[str] = None,
56-
worker_id: Optional[str] = None,
57-
) -> Self:
50+
def __init__(
51+
self,
52+
task_definition_name: str,
53+
execute_function: ExecuteTaskFunction,
54+
poll_interval: Optional[float] = None,
55+
domain: Optional[str] = None,
56+
worker_id: Optional[str] = None,
57+
) -> Self:
5858
super().__init__(task_definition_name)
5959
self.api_client = ApiClient()
6060
self.config = Configuration()
@@ -91,7 +91,9 @@ def execute(self, task: Task) -> TaskResult:
9191
if typ in utils.simple_types:
9292
task_input[input_name] = task.input_data[input_name]
9393
else:
94-
task_input[input_name] = convert_from_dict_or_list(typ, task.input_data[input_name])
94+
task_input[input_name] = convert_from_dict_or_list(
95+
typ, task.input_data[input_name]
96+
)
9597
elif default_value is not inspect.Parameter.empty:
9698
task_input[input_name] = default_value
9799
else:
@@ -118,8 +120,11 @@ def execute(self, task: Task) -> TaskResult:
118120
task.task_id,
119121
)
120122

121-
task_result.logs = [TaskExecLog(
122-
traceback.format_exc(), task_result.task_id, int(time.time()))]
123+
task_result.logs = [
124+
TaskExecLog(
125+
traceback.format_exc(), task_result.task_id, int(time.time())
126+
)
127+
]
123128
task_result.status = TaskResultStatus.FAILED
124129
if len(ne.args) > 0:
125130
task_result.reason_for_incompletion = ne.args[0]
@@ -130,7 +135,9 @@ def execute(self, task: Task) -> TaskResult:
130135
return task_result
131136
if not isinstance(task_result.output_data, dict):
132137
task_output = task_result.output_data
133-
task_result.output_data = self.api_client.sanitize_for_serialization(task_output)
138+
task_result.output_data = self.api_client.sanitize_for_serialization(
139+
task_output
140+
)
134141
if not isinstance(task_result.output_data, dict):
135142
task_result.output_data = {"result": task_result.output_data}
136143

@@ -146,11 +153,15 @@ def execute_function(self) -> ExecuteTaskFunction:
146153
@execute_function.setter
147154
def execute_function(self, execute_function: ExecuteTaskFunction) -> None:
148155
self._execute_function = execute_function
149-
self._is_execute_function_input_parameter_a_task = is_callable_input_parameter_a_task(
150-
callable=execute_function,
151-
object_type=Task,
156+
self._is_execute_function_input_parameter_a_task = (
157+
is_callable_input_parameter_a_task(
158+
callable=execute_function,
159+
object_type=Task,
160+
)
152161
)
153-
self._is_execute_function_return_value_a_task_result = is_callable_return_value_of_type(
154-
callable=execute_function,
155-
object_type=TaskResult,
162+
self._is_execute_function_return_value_a_task_result = (
163+
is_callable_return_value_of_type(
164+
callable=execute_function,
165+
object_type=TaskResult,
166+
)
156167
)

src/conductor/client/worker/worker_interface.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ def get_polling_interval_in_seconds(self) -> float:
4444
:return: float
4545
Default: 100ms
4646
"""
47-
return (self.poll_interval if self.poll_interval else Configuration().get_poll_interval()) / 1000
47+
return (
48+
self.poll_interval
49+
if self.poll_interval
50+
else Configuration().get_poll_interval()
51+
) / 1000
4852

4953
def get_task_definition_name(self) -> str:
5054
"""
@@ -73,7 +77,9 @@ def clear_task_definition_name_cache(self):
7377
def compute_task_definition_name(self):
7478
if isinstance(self.task_definition_name, list):
7579
task_definition_name = self.task_definition_name[self.next_task_index]
76-
self.next_task_index = (self.next_task_index + 1) % len(self.task_definition_name)
80+
self.next_task_index = (self.next_task_index + 1) % len(
81+
self.task_definition_name
82+
)
7783
return task_definition_name
7884
return self.task_definition_name
7985

@@ -87,7 +93,7 @@ def get_task_result_from_task(self, task: Task) -> TaskResult:
8793
return TaskResult(
8894
task_id=task.task_id,
8995
workflow_instance_id=task.workflow_instance_id,
90-
worker_id=self.get_identity()
96+
worker_id=self.get_identity(),
9197
)
9298

9399
def get_domain(self) -> str:

src/conductor/client/worker/worker_task.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66
from conductor.client.workflow.task.simple_task import SimpleTask
77

88

9-
def WorkerTask(task_definition_name: str, poll_interval: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None,
10-
poll_interval_seconds: int = 0):
9+
def WorkerTask(
10+
task_definition_name: str,
11+
poll_interval: int = 100,
12+
domain: Optional[str] = None,
13+
worker_id: Optional[str] = None,
14+
poll_interval_seconds: int = 0,
15+
):
1116
config = Configuration()
1217

1318
poll_interval = poll_interval or config.get_poll_interval()
@@ -20,13 +25,21 @@ def WorkerTask(task_definition_name: str, poll_interval: int = 100, domain: Opti
2025

2126
def worker_task_func(func):
2227

23-
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
24-
worker_id=worker_id, func=func)
28+
register_decorated_fn(
29+
name=task_definition_name,
30+
poll_interval=poll_interval_millis,
31+
domain=domain,
32+
worker_id=worker_id,
33+
func=func,
34+
)
2535

2636
@functools.wraps(func)
2737
def wrapper_func(*args, **kwargs):
2838
if "task_ref_name" in kwargs:
29-
task = SimpleTask(task_def_name=task_definition_name, task_reference_name=kwargs["task_ref_name"])
39+
task = SimpleTask(
40+
task_def_name=task_definition_name,
41+
task_reference_name=kwargs["task_ref_name"],
42+
)
3043
kwargs.pop("task_ref_name")
3144
task.input_parameters.update(kwargs)
3245
return task
@@ -37,20 +50,33 @@ def wrapper_func(*args, **kwargs):
3750
return worker_task_func
3851

3952

40-
def worker_task(task_definition_name: str, poll_interval_millis: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None):
53+
def worker_task(
54+
task_definition_name: str,
55+
poll_interval_millis: int = 100,
56+
domain: Optional[str] = None,
57+
worker_id: Optional[str] = None,
58+
):
4159
config = Configuration()
4260

4361
poll_interval_millis = poll_interval_millis or config.get_poll_interval()
4462
domain = domain or config.get_domain()
4563

4664
def worker_task_func(func):
47-
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
48-
worker_id=worker_id, func=func)
65+
register_decorated_fn(
66+
name=task_definition_name,
67+
poll_interval=poll_interval_millis,
68+
domain=domain,
69+
worker_id=worker_id,
70+
func=func,
71+
)
4972

5073
@functools.wraps(func)
5174
def wrapper_func(*args, **kwargs):
5275
if "task_ref_name" in kwargs:
53-
task = SimpleTask(task_def_name=task_definition_name, task_reference_name=kwargs["task_ref_name"])
76+
task = SimpleTask(
77+
task_def_name=task_definition_name,
78+
task_reference_name=kwargs["task_ref_name"],
79+
)
5480
kwargs.pop("task_ref_name")
5581
task.input_parameters.update(kwargs)
5682
return task

0 commit comments

Comments
 (0)