-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathworker_task.py
More file actions
49 lines (37 loc) · 1.95 KB
/
worker_task.py
File metadata and controls
49 lines (37 loc) · 1.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from __future__ import annotations
import functools
from typing import Optional
from conductor.client.automator.task_handler import register_decorated_fn
from conductor.client.workflow.task.simple_task import SimpleTask
def WorkerTask(task_definition_name: str, poll_interval: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None,
poll_interval_seconds: int = 0):
poll_interval_millis = poll_interval
if poll_interval_seconds > 0:
poll_interval_millis = 1000 * poll_interval_seconds
def worker_task_func(func):
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
worker_id=worker_id, func=func)
@functools.wraps(func)
def wrapper_func(*args, **kwargs):
if 'task_ref_name' in kwargs:
task = SimpleTask(task_def_name=task_definition_name, task_reference_name=kwargs['task_ref_name'])
kwargs.pop('task_ref_name')
task.input_parameters.update(kwargs)
return task
return func(*args, **kwargs)
return wrapper_func
return worker_task_func
def worker_task(task_definition_name: str, poll_interval_millis: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None):
def worker_task_func(func):
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
worker_id=worker_id, func=func)
@functools.wraps(func)
def wrapper_func(*args, **kwargs):
if 'task_ref_name' in kwargs:
task = SimpleTask(task_def_name=task_definition_name, task_reference_name=kwargs['task_ref_name'])
kwargs.pop('task_ref_name')
task.input_parameters.update(kwargs)
return task
return func(*args, **kwargs)
return wrapper_func
return worker_task_func