Skip to content

Commit 2716a98

Browse files
Added worker configuration
1 parent cbc28f7 commit 2716a98

9 files changed

Lines changed: 154 additions & 59 deletions

File tree

src/conductor/asyncio_client/automator/task_runner.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,24 +240,28 @@ def __set_worker_properties(self) -> None:
240240
else:
241241
self.worker.domain = self.worker.get_domain()
242242

243+
# Try both naming conventions for polling interval
243244
polling_interval = self.__get_property_value_from_env(
244-
"polling_interval", task_type
245+
"poll_interval", task_type
245246
)
247+
if not polling_interval:
248+
# Fallback to the "polling_interval" naming convention
249+
polling_interval = self.__get_property_value_from_env(
250+
"polling_interval", task_type
251+
)
252+
246253
if polling_interval:
247254
try:
248255
self.worker.poll_interval = float(polling_interval)
249-
except Exception:
256+
except Exception as e:
250257
logger.error(
251-
"Error converting polling_interval to float value: %s",
258+
"Error converting polling_interval to float value: %s, exception: %s",
252259
polling_interval,
260+
e,
253261
)
254262
self.worker.poll_interval = (
255-
self.worker.get_polling_interval_in_seconds()
263+
self.worker.get_polling_interval_in_seconds() * 1000
256264
)
257-
else:
258-
logger.error(
259-
"Exception in reading polling_interval from environment variable",
260-
)
261265

262266
def __get_property_value_from_env(self, prop, task_type):
263267
"""

src/conductor/asyncio_client/configuration/configuration.py

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ class Configuration:
2626
2727
Worker Properties (via environment variables):
2828
----------------------------------------------
29-
CONDUCTOR_WORKER_POLLING_INTERVAL: Default polling interval in seconds
3029
CONDUCTOR_WORKER_DOMAIN: Default worker domain
30+
CONDUCTOR_WORKER_POLL_INTERVAL: Polling interval in milliseconds (default: 100)
31+
CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS: Polling interval in seconds (default: 0)
3132
CONDUCTOR_WORKER_<TASK_TYPE>_POLLING_INTERVAL: Task-specific polling interval
3233
CONDUCTOR_WORKER_<TASK_TYPE>_DOMAIN: Task-specific domain
3334
@@ -57,8 +58,9 @@ def __init__(
5758
auth_secret: Optional[str] = None,
5859
debug: bool = False,
5960
# Worker properties
60-
default_polling_interval: Optional[float] = None,
61-
default_domain: Optional[str] = None,
61+
poll_interval: Optional[int] = None,
62+
domain: Optional[str] = None,
63+
poll_interval_seconds: Optional[int] = None,
6264
# HTTP Configuration parameters
6365
api_key: Optional[Dict[str, str]] = None,
6466
api_key_prefix: Optional[Dict[str, str]] = None,
@@ -88,10 +90,12 @@ def __init__(
8890
Authentication key secret. If not provided, reads from CONDUCTOR_AUTH_SECRET env var.
8991
debug : bool, optional
9092
Enable debug logging. Default is False.
91-
default_polling_interval : float, optional
92-
Default polling interval for workers in seconds.
93-
default_domain : str, optional
94-
Default domain for workers.
93+
poll_interval : int, optional
94+
Polling interval in milliseconds. If not provided, reads from CONDUCTOR_WORKER_POLL_INTERVAL env var.
95+
domain : str, optional
96+
Worker domain. If not provided, reads from CONDUCTOR_WORKER_DOMAIN env var.
97+
poll_interval_seconds : int, optional
98+
Polling interval in seconds. If not provided, reads from CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS env var.
9599
**kwargs : Any
96100
Additional parameters passed to HttpConfiguration.
97101
"""
@@ -116,11 +120,10 @@ def __init__(
116120
else:
117121
self.auth_secret = os.getenv("CONDUCTOR_AUTH_SECRET")
118122

119-
# Worker properties with environment variable fallback
120-
self.default_polling_interval = default_polling_interval or self._get_env_float(
121-
"CONDUCTOR_WORKER_POLLING_INTERVAL", 1.0
122-
)
123-
self.default_domain = default_domain or os.getenv("CONDUCTOR_WORKER_DOMAIN")
123+
# Additional worker properties with environment variable fallback
124+
self.poll_interval = poll_interval or self._get_env_int("CONDUCTOR_WORKER_POLL_INTERVAL", 100)
125+
self.domain = domain or os.getenv("CONDUCTOR_WORKER_DOMAIN", "default_domain")
126+
self.poll_interval_seconds = poll_interval_seconds or self._get_env_int("CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0)
124127

125128
# Store additional worker properties
126129
self._worker_properties: Dict[str, Dict[str, Any]] = {}
@@ -234,10 +237,12 @@ def get_worker_property_value(
234237
return self._convert_property_value(property_name, value)
235238

236239
# Return default value
237-
if property_name == "polling_interval":
238-
return self.default_polling_interval
239240
elif property_name == "domain":
240-
return self.default_domain
241+
return self.domain
242+
elif property_name == "poll_interval":
243+
return self.poll_interval
244+
elif property_name == "poll_interval_seconds":
245+
return self.poll_interval_seconds
241246

242247
return None
243248

@@ -247,8 +252,14 @@ def _convert_property_value(self, property_name: str, value: str) -> Any:
247252
try:
248253
return float(value)
249254
except (ValueError, TypeError):
250-
self.logger.warning("Invalid polling_interval value: %s", value)
251-
return self.default_polling_interval
255+
self.logger.warning("Invalid poll_interval value: %s", value)
256+
return self.poll_interval
257+
elif property_name == "poll_interval_seconds":
258+
try:
259+
return float(value)
260+
except (ValueError, TypeError):
261+
self.logger.warning("Invalid poll_interval_seconds value: %s", value)
262+
return self.poll_interval_seconds
252263

253264
# For other properties, return as string
254265
return value
@@ -325,6 +336,37 @@ def get_domain(self, task_type: Optional[str] = None) -> Optional[str]:
325336
"""
326337
return self.get_worker_property_value("domain", task_type)
327338

339+
def get_poll_interval(self, task_type: Optional[str] = None) -> int:
340+
"""
341+
Get polling interval in milliseconds for a task type with environment variable support.
342+
343+
Parameters:
344+
-----------
345+
task_type : str, optional
346+
Task type for task-specific configuration
347+
348+
Returns:
349+
--------
350+
int
351+
Polling interval in milliseconds
352+
"""
353+
if task_type:
354+
value = self.get_worker_property_value("poll_interval", task_type)
355+
if value is not None:
356+
return int(value)
357+
return self.poll_interval
358+
359+
def get_poll_interval_seconds(self) -> int:
360+
"""
361+
Get polling interval in seconds.
362+
363+
Returns:
364+
--------
365+
int
366+
Polling interval in seconds
367+
"""
368+
return self.poll_interval_seconds
369+
328370
# Properties for commonly used HTTP configuration attributes
329371
@property
330372
def host(self) -> str:

src/conductor/asyncio_client/worker/worker.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
TaskResultAdapter
1616
from conductor.asyncio_client.configuration import Configuration
1717
from conductor.asyncio_client.adapters import ApiClient
18-
from conductor.asyncio_client.worker.worker_interface import (
19-
DEFAULT_POLLING_INTERVAL, WorkerInterface)
18+
from conductor.asyncio_client.worker.worker_interface import WorkerInterface
2019
from conductor.shared.automator import utils
2120
from conductor.shared.automator.utils import convert_from_dict_or_list
2221
from conductor.shared.http.enums import TaskResultStatus
@@ -60,15 +59,10 @@ def __init__(
6059
):
6160
super().__init__(task_definition_name)
6261
self.api_client = ApiClient()
63-
if poll_interval is None:
64-
self.poll_interval = DEFAULT_POLLING_INTERVAL
65-
else:
66-
self.poll_interval = deepcopy(poll_interval)
67-
self.domain = deepcopy(domain)
68-
if worker_id is None:
69-
self.worker_id = deepcopy(super().get_identity())
70-
else:
71-
self.worker_id = deepcopy(worker_id)
62+
self.config = Configuration()
63+
self.poll_interval = poll_interval or self.config.get_poll_interval()
64+
self.domain = domain or self.config.get_domain()
65+
self.worker_id = worker_id or super().get_identity()
7266
self.execute_function = deepcopy(execute_function)
7367

7468
def execute(self, task: TaskAdapter) -> TaskResultAdapter:

src/conductor/asyncio_client/worker/worker_task.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,30 @@
33
import functools
44
from typing import Optional
55

6-
from conductor.asyncio_client.automator.task_handler import \
7-
register_decorated_fn
6+
from conductor.asyncio_client.automator.task_handler import register_decorated_fn
7+
from conductor.asyncio_client.configuration.configuration import Configuration
88
from conductor.asyncio_client.workflow.task.simple_task import SimpleTask
99

1010

1111
def WorkerTask(
1212
task_definition_name: str,
1313
poll_interval: int = 100,
14-
domain: Optional[str] = None,
15-
worker_id: Optional[str] = None,
14+
domain: str = None,
15+
worker_id: str = None,
1616
poll_interval_seconds: int = 0,
1717
):
18+
config = Configuration()
19+
20+
poll_interval = poll_interval or config.get_poll_interval()
21+
domain = domain or config.get_domain()
22+
poll_interval_seconds = poll_interval_seconds or config.get_poll_interval_seconds()
23+
24+
# Calculate poll interval in milliseconds
1825
poll_interval_millis = poll_interval
1926
if poll_interval_seconds > 0:
2027
poll_interval_millis = 1000 * poll_interval_seconds
2128

2229
def worker_task_func(func):
23-
2430
register_decorated_fn(
2531
name=task_definition_name,
2632
poll_interval=poll_interval_millis,
@@ -52,6 +58,11 @@ def worker_task(
5258
domain: Optional[str] = None,
5359
worker_id: Optional[str] = None,
5460
):
61+
config = Configuration()
62+
63+
poll_interval_millis = poll_interval_millis or config.get_poll_interval()
64+
domain = domain or config.get_domain()
65+
5566
def worker_task_func(func):
5667
register_decorated_fn(
5768
name=task_definition_name,

src/conductor/client/automator/task_runner.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -241,28 +241,27 @@ def __set_worker_properties(self) -> None:
241241
else:
242242
self.worker.domain = self.worker.get_domain()
243243

244+
# Try both naming conventions for polling interval
244245
polling_interval = self.__get_property_value_from_env(
245-
"polling_interval", task_type
246+
"poll_interval", task_type
246247
)
248+
if not polling_interval:
249+
# Fallback to the "polling_interval" naming convention
250+
polling_interval = self.__get_property_value_from_env(
251+
"polling_interval", task_type
252+
)
253+
247254
if polling_interval:
248255
try:
249256
self.worker.poll_interval = float(polling_interval)
250-
except Exception:
257+
except Exception as e:
251258
logger.error(
252-
"Error converting polling_interval to float value: %s",
259+
"Error converting polling_interval to float value: %s, exception: %s",
253260
polling_interval,
261+
e,
254262
)
255263
self.worker.poll_interval = (
256-
self.worker.get_polling_interval_in_seconds()
257-
)
258-
259-
if polling_interval:
260-
try:
261-
self.worker.poll_interval = float(polling_interval)
262-
except Exception as e:
263-
logger.error(
264-
"Exception in reading polling interval from environment variable: %s",
265-
e,
264+
self.worker.get_polling_interval_in_seconds() * 1000
266265
)
267266

268267
def __get_property_value_from_env(self, prop, task_type):

src/conductor/client/configuration/configuration.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ def __init__(
2020
authentication_settings: AuthenticationSettings = None,
2121
server_api_url: Optional[str] = None,
2222
auth_token_ttl_min: int = 45,
23+
poll_interval: Optional[float] = None,
24+
domain: Optional[str] = None,
25+
poll_interval_seconds: Optional[float] = None,
2326
):
2427
if server_api_url is not None:
2528
self.host = server_api_url
@@ -79,6 +82,11 @@ def __init__(
7982
self.token_update_time = 0
8083
self.auth_token_ttl_msec = auth_token_ttl_min * 60 * 1000
8184

85+
# Worker properties
86+
self.poll_interval = poll_interval or self._get_env_float("CONDUCTOR_WORKER_POLL_INTERVAL", 100)
87+
self.domain = domain or os.getenv("CONDUCTOR_WORKER_DOMAIN", "default_domain")
88+
self.poll_interval_seconds = poll_interval_seconds or self._get_env_float("CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0)
89+
8290
@property
8391
def debug(self):
8492
"""Debug status
@@ -162,3 +170,22 @@ def get_logging_formatted_name(name):
162170
def update_token(self, token: str) -> None:
163171
self.AUTH_TOKEN = token
164172
self.token_update_time = round(time.time() * 1000)
173+
174+
def _get_env_float(self, env_var: str, default: float) -> float:
175+
"""Get float value from environment variable with default fallback."""
176+
try:
177+
value = os.getenv(env_var)
178+
if value is not None:
179+
return float(value)
180+
except (ValueError, TypeError):
181+
pass
182+
return default
183+
184+
def get_poll_interval_seconds(self):
185+
return self.poll_interval_seconds
186+
187+
def get_poll_interval(self):
188+
return self.poll_interval
189+
190+
def get_domain(self):
191+
return self.domain

src/conductor/client/worker/worker.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from conductor.client.http.models.task_result import TaskResult
1919
from conductor.shared.http.enums import TaskResultStatus
2020
from conductor.shared.worker.exception import NonRetryableException
21-
from conductor.client.worker.worker_interface import WorkerInterface, DEFAULT_POLLING_INTERVAL
21+
from conductor.client.worker.worker_interface import WorkerInterface
2222

2323
ExecuteTaskFunction = Callable[
2424
[
@@ -57,11 +57,16 @@ def __init__(self,
5757
) -> Self:
5858
super().__init__(task_definition_name)
5959
self.api_client = ApiClient()
60+
self.config = Configuration()
61+
6062
if poll_interval is None:
61-
self.poll_interval = DEFAULT_POLLING_INTERVAL
63+
self.poll_interval = self.config.get_poll_interval()
6264
else:
6365
self.poll_interval = deepcopy(poll_interval)
64-
self.domain = deepcopy(domain)
66+
if domain is None:
67+
self.domain = self.config.get_domain()
68+
else:
69+
self.domain = deepcopy(domain)
6570
if worker_id is None:
6671
self.worker_id = deepcopy(super().get_identity())
6772
else:

src/conductor/client/worker/worker_interface.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from conductor.client.http.models.task import Task
77
from conductor.client.http.models.task_result import TaskResult
8+
from conductor.client.configuration.configuration import Configuration
89

910
DEFAULT_POLLING_INTERVAL = 100 # ms
1011

@@ -15,7 +16,7 @@ def __init__(self, task_definition_name: Union[str, list]):
1516
self.next_task_index = 0
1617
self._task_definition_name_cache = None
1718
self._domain = None
18-
self._poll_interval = DEFAULT_POLLING_INTERVAL
19+
self._poll_interval = Configuration().get_poll_interval()
1920

2021
@abc.abstractmethod
2122
def execute(self, task: Task) -> TaskResult:
@@ -43,7 +44,7 @@ def get_polling_interval_in_seconds(self) -> float:
4344
:return: float
4445
Default: 100ms
4546
"""
46-
return (self.poll_interval if self.poll_interval else DEFAULT_POLLING_INTERVAL) / 1000
47+
return (self.poll_interval if self.poll_interval else Configuration().get_poll_interval()) / 1000
4748

4849
def get_task_definition_name(self) -> str:
4950
"""

0 commit comments

Comments
 (0)