Skip to content

Commit 16f2325

Browse files
Updated async task worker
1 parent c5642b7 commit 16f2325

2 files changed

Lines changed: 9 additions & 2 deletions

File tree

src/conductor/asyncio_client/automator/task_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,21 @@
3030
logger.error(
3131
"Error when setting multiprocessing.set_start_method - maybe the context is set %s",
3232
e.args,
33-
3433
)
3534
if platform == "darwin":
3635
os.environ["no_proxy"] = "*"
3736

3837

3938
def register_decorated_fn(
40-
name: str, poll_interval: int, domain: str, worker_id: str, func
39+
name: str, poll_interval: int, domain: str, worker_id: str, func, task_options=None
4140
):
4241
logger.info("Registering decorated function: %s", name)
4342
_decorated_functions[(name, domain)] = {
4443
"func": func,
4544
"poll_interval": poll_interval,
4645
"domain": domain,
4746
"worker_id": worker_id,
47+
"task_options": task_options,
4848
}
4949

5050

src/conductor/asyncio_client/worker/worker_task.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from conductor.asyncio_client.automator.task_handler import register_decorated_fn
77
from conductor.asyncio_client.configuration.configuration import Configuration
88
from conductor.asyncio_client.workflow.task.simple_task import SimpleTask
9+
from conductor.shared.worker.task_options import get_task_options
910

1011

1112
def WorkerTask(
@@ -26,12 +27,15 @@ def WorkerTask(
2627
poll_interval_millis = 1000 * poll_interval_seconds
2728

2829
def worker_task_func(func):
30+
task_opts = get_task_options(func)
31+
2932
register_decorated_fn(
3033
name=task_definition_name,
3134
poll_interval=poll_interval_millis,
3235
domain=domain,
3336
worker_id=worker_id,
3437
func=func,
38+
task_options=task_opts,
3539
)
3640

3741
@functools.wraps(func)
@@ -63,12 +67,15 @@ def worker_task(
6367
domain = domain or config.get_domain()
6468

6569
def worker_task_func(func):
70+
task_opts = get_task_options(func)
71+
6672
register_decorated_fn(
6773
name=task_definition_name,
6874
poll_interval=poll_interval_millis,
6975
domain=domain,
7076
worker_id=worker_id,
7177
func=func,
78+
task_options=task_opts,
7279
)
7380

7481
@functools.wraps(func)

0 commit comments

Comments
 (0)