Skip to content

Commit 279979a

Browse files
Worker properties config (#331)
1 parent 80bda79 commit 279979a

12 files changed

Lines changed: 839 additions & 98 deletions

File tree

src/conductor/asyncio_client/automator/task_runner.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,21 +243,19 @@ def __set_worker_properties(self) -> None:
243243
polling_interval = self.__get_property_value_from_env(
244244
"polling_interval", task_type
245245
)
246+
246247
if polling_interval:
247248
try:
248249
self.worker.poll_interval = float(polling_interval)
249-
except Exception:
250+
except Exception as e:
250251
logger.error(
251-
"Error converting polling_interval to float value: %s",
252+
"Error converting polling_interval to float value: %s, exception: %s",
252253
polling_interval,
254+
e,
253255
)
254256
self.worker.poll_interval = (
255257
self.worker.get_polling_interval_in_seconds()
256258
)
257-
else:
258-
logger.error(
259-
"Exception in reading polling_interval from environment variable",
260-
)
261259

262260
def __get_property_value_from_env(self, prop, task_type):
263261
"""

src/conductor/asyncio_client/configuration/configuration.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ class Configuration:
2626
2727
Worker Properties (via environment variables):
2828
----------------------------------------------
29-
CONDUCTOR_WORKER_POLLING_INTERVAL: Default polling interval in seconds
3029
CONDUCTOR_WORKER_DOMAIN: Default worker domain
30+
CONDUCTOR_WORKER_POLL_INTERVAL: Polling interval in milliseconds (default: 100)
31+
CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS: Polling interval in seconds (default: 0)
3132
CONDUCTOR_WORKER_<TASK_TYPE>_POLLING_INTERVAL: Task-specific polling interval
3233
CONDUCTOR_WORKER_<TASK_TYPE>_DOMAIN: Task-specific domain
3334
@@ -57,8 +58,9 @@ def __init__(
5758
auth_secret: Optional[str] = None,
5859
debug: bool = False,
5960
# Worker properties
60-
default_polling_interval: Optional[float] = None,
61-
default_domain: Optional[str] = None,
61+
polling_interval: Optional[int] = None,
62+
domain: Optional[str] = None,
63+
polling_interval_seconds: Optional[int] = None,
6264
# HTTP Configuration parameters
6365
api_key: Optional[Dict[str, str]] = None,
6466
api_key_prefix: Optional[Dict[str, str]] = None,
@@ -88,10 +90,12 @@ def __init__(
8890
Authentication key secret. If not provided, reads from CONDUCTOR_AUTH_SECRET env var.
8991
debug : bool, optional
9092
Enable debug logging. Default is False.
91-
default_polling_interval : float, optional
92-
Default polling interval for workers in seconds.
93-
default_domain : str, optional
94-
Default domain for workers.
93+
polling_interval : int, optional
94+
Polling interval in milliseconds. If not provided, reads from CONDUCTOR_WORKER_POLL_INTERVAL env var.
95+
domain : str, optional
96+
Worker domain. If not provided, reads from CONDUCTOR_WORKER_DOMAIN env var.
97+
polling_interval_seconds : int, optional
98+
Polling interval in seconds. If not provided, reads from CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS env var.
9599
**kwargs : Any
96100
Additional parameters passed to HttpConfiguration.
97101
"""
@@ -116,11 +120,14 @@ def __init__(
116120
else:
117121
self.auth_secret = os.getenv("CONDUCTOR_AUTH_SECRET")
118122

119-
# Worker properties with environment variable fallback
120-
self.default_polling_interval = default_polling_interval or self._get_env_float(
121-
"CONDUCTOR_WORKER_POLLING_INTERVAL", 1.0
123+
# Additional worker properties with environment variable fallback
124+
self.polling_interval = polling_interval or self._get_env_int(
125+
"CONDUCTOR_WORKER_POLL_INTERVAL", 100
126+
)
127+
self.domain = domain or os.getenv("CONDUCTOR_WORKER_DOMAIN", "default_domain")
128+
self.polling_interval_seconds = polling_interval_seconds or self._get_env_int(
129+
"CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0
122130
)
123-
self.default_domain = default_domain or os.getenv("CONDUCTOR_WORKER_DOMAIN")
124131

125132
# Store additional worker properties
126133
self._worker_properties: Dict[str, Dict[str, Any]] = {}
@@ -234,10 +241,12 @@ def get_worker_property_value(
234241
return self._convert_property_value(property_name, value)
235242

236243
# Return default value
237-
if property_name == "polling_interval":
238-
return self.default_polling_interval
239244
elif property_name == "domain":
240-
return self.default_domain
245+
return self.domain
246+
elif property_name == "polling_interval":
247+
return self.polling_interval
248+
elif property_name == "poll_interval_seconds":
249+
return self.polling_interval_seconds
241250

242251
return None
243252

@@ -248,7 +257,13 @@ def _convert_property_value(self, property_name: str, value: str) -> Any:
248257
return float(value)
249258
except (ValueError, TypeError):
250259
self.logger.warning("Invalid polling_interval value: %s", value)
251-
return self.default_polling_interval
260+
return self.polling_interval
261+
elif property_name == "polling_interval_seconds":
262+
try:
263+
return float(value)
264+
except (ValueError, TypeError):
265+
self.logger.warning("Invalid polling_interval_seconds value: %s", value)
266+
return self.polling_interval_seconds
252267

253268
# For other properties, return as string
254269
return value
@@ -325,6 +340,37 @@ def get_domain(self, task_type: Optional[str] = None) -> Optional[str]:
325340
"""
326341
return self.get_worker_property_value("domain", task_type)
327342

343+
def get_poll_interval(self, task_type: Optional[str] = None) -> int:
344+
"""
345+
Get polling interval in milliseconds for a task type with environment variable support.
346+
347+
Parameters:
348+
-----------
349+
task_type : str, optional
350+
Task type for task-specific configuration
351+
352+
Returns:
353+
--------
354+
int
355+
Polling interval in milliseconds
356+
"""
357+
if task_type:
358+
value = self.get_worker_property_value("polling_interval", task_type)
359+
if value is not None:
360+
return int(value)
361+
return self.polling_interval
362+
363+
def get_poll_interval_seconds(self) -> int:
364+
"""
365+
Get polling interval in seconds.
366+
367+
Returns:
368+
--------
369+
int
370+
Polling interval in seconds
371+
"""
372+
return self.polling_interval_seconds
373+
328374
# Properties for commonly used HTTP configuration attributes
329375
@property
330376
def host(self) -> str:

src/conductor/asyncio_client/worker/worker.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99
from typing import Any, Callable, Optional, Union
1010

1111
from conductor.asyncio_client.adapters.models.task_adapter import TaskAdapter
12-
from conductor.asyncio_client.adapters.models.task_exec_log_adapter import \
13-
TaskExecLogAdapter
14-
from conductor.asyncio_client.adapters.models.task_result_adapter import \
15-
TaskResultAdapter
12+
from conductor.asyncio_client.adapters.models.task_exec_log_adapter import (
13+
TaskExecLogAdapter,
14+
)
15+
from conductor.asyncio_client.adapters.models.task_result_adapter import (
16+
TaskResultAdapter,
17+
)
1618
from conductor.asyncio_client.configuration import Configuration
1719
from conductor.asyncio_client.adapters import ApiClient
18-
from conductor.asyncio_client.worker.worker_interface import (
19-
DEFAULT_POLLING_INTERVAL, WorkerInterface)
20+
from conductor.asyncio_client.worker.worker_interface import WorkerInterface
2021
from conductor.shared.automator import utils
2122
from conductor.shared.automator.utils import convert_from_dict_or_list
2223
from conductor.shared.http.enums import TaskResultStatus
@@ -60,15 +61,10 @@ def __init__(
6061
):
6162
super().__init__(task_definition_name)
6263
self.api_client = ApiClient()
63-
if poll_interval is None:
64-
self.poll_interval = DEFAULT_POLLING_INTERVAL
65-
else:
66-
self.poll_interval = deepcopy(poll_interval)
67-
self.domain = deepcopy(domain)
68-
if worker_id is None:
69-
self.worker_id = deepcopy(super().get_identity())
70-
else:
71-
self.worker_id = deepcopy(worker_id)
64+
self.config = Configuration()
65+
self.poll_interval = poll_interval or self.config.get_poll_interval()
66+
self.domain = domain or self.config.get_domain()
67+
self.worker_id = worker_id or super().get_identity()
7268
self.execute_function = deepcopy(execute_function)
7369

7470
def execute(self, task: TaskAdapter) -> TaskResultAdapter:

src/conductor/asyncio_client/worker/worker_task.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import functools
44
from typing import Optional
55

6-
from conductor.asyncio_client.automator.task_handler import \
7-
register_decorated_fn
6+
from conductor.asyncio_client.automator.task_handler import register_decorated_fn
7+
from conductor.asyncio_client.configuration.configuration import Configuration
88
from conductor.asyncio_client.workflow.task.simple_task import SimpleTask
99

1010

@@ -15,12 +15,17 @@ def WorkerTask(
1515
worker_id: Optional[str] = None,
1616
poll_interval_seconds: int = 0,
1717
):
18+
config = Configuration()
19+
20+
poll_interval = poll_interval or config.get_poll_interval()
21+
domain = domain or config.get_domain()
22+
poll_interval_seconds = poll_interval_seconds or config.get_poll_interval_seconds()
23+
1824
poll_interval_millis = poll_interval
1925
if poll_interval_seconds > 0:
2026
poll_interval_millis = 1000 * poll_interval_seconds
2127

2228
def worker_task_func(func):
23-
2429
register_decorated_fn(
2530
name=task_definition_name,
2631
poll_interval=poll_interval_millis,
@@ -52,6 +57,11 @@ def worker_task(
5257
domain: Optional[str] = None,
5358
worker_id: Optional[str] = None,
5459
):
60+
config = Configuration()
61+
62+
poll_interval_millis = poll_interval_millis or config.get_poll_interval()
63+
domain = domain or config.get_domain()
64+
5565
def worker_task_func(func):
5666
register_decorated_fn(
5767
name=task_definition_name,

src/conductor/client/automator/task_runner.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,27 +244,20 @@ def __set_worker_properties(self) -> None:
244244
polling_interval = self.__get_property_value_from_env(
245245
"polling_interval", task_type
246246
)
247+
247248
if polling_interval:
248249
try:
249250
self.worker.poll_interval = float(polling_interval)
250-
except Exception:
251+
except Exception as e:
251252
logger.error(
252-
"Error converting polling_interval to float value: %s",
253+
"Error converting polling_interval to float value: %s, exception: %s",
253254
polling_interval,
255+
e,
254256
)
255257
self.worker.poll_interval = (
256258
self.worker.get_polling_interval_in_seconds()
257259
)
258260

259-
if polling_interval:
260-
try:
261-
self.worker.poll_interval = float(polling_interval)
262-
except Exception as e:
263-
logger.error(
264-
"Exception in reading polling interval from environment variable: %s",
265-
e,
266-
)
267-
268261
def __get_property_value_from_env(self, prop, task_type):
269262
"""
270263
get the property from the env variable

src/conductor/client/configuration/configuration.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ def __init__(
2020
authentication_settings: AuthenticationSettings = None,
2121
server_api_url: Optional[str] = None,
2222
auth_token_ttl_min: int = 45,
23+
polling_interval: Optional[float] = None,
24+
domain: Optional[str] = None,
25+
polling_interval_seconds: Optional[float] = None,
2326
):
2427
if server_api_url is not None:
2528
self.host = server_api_url
@@ -79,6 +82,15 @@ def __init__(
7982
self.token_update_time = 0
8083
self.auth_token_ttl_msec = auth_token_ttl_min * 60 * 1000
8184

85+
# Worker properties
86+
self.polling_interval = polling_interval or self._get_env_float(
87+
"CONDUCTOR_WORKER_POLL_INTERVAL", 100
88+
)
89+
self.domain = domain or os.getenv("CONDUCTOR_WORKER_DOMAIN", "default_domain")
90+
self.polling_interval_seconds = polling_interval_seconds or self._get_env_float(
91+
"CONDUCTOR_WORKER_POLL_INTERVAL_SECONDS", 0
92+
)
93+
8294
@property
8395
def debug(self):
8496
"""Debug status
@@ -162,3 +174,22 @@ def get_logging_formatted_name(name):
162174
def update_token(self, token: str) -> None:
163175
self.AUTH_TOKEN = token
164176
self.token_update_time = round(time.time() * 1000)
177+
178+
def _get_env_float(self, env_var: str, default: float) -> float:
179+
"""Get float value from environment variable with default fallback."""
180+
try:
181+
value = os.getenv(env_var)
182+
if value is not None:
183+
return float(value)
184+
except (ValueError, TypeError):
185+
pass
186+
return default
187+
188+
def get_poll_interval_seconds(self):
189+
return self.polling_interval_seconds
190+
191+
def get_poll_interval(self):
192+
return self.polling_interval
193+
194+
def get_domain(self):
195+
return self.domain

0 commit comments

Comments
 (0)