-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathpython_worker.py
More file actions
107 lines (87 loc) · 3.58 KB
/
Copy pathpython_worker.py
File metadata and controls
107 lines (87 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.shared.http.enums import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from conductor.client.worker.worker_task import WorkerTask
class FaultyExecutionWorker(WorkerInterface):
def execute(self, task: Task) -> TaskResult:
raise Exception('faulty execution')
class ClassWorker(WorkerInterface):
def __init__(self, task_definition_name):
super().__init__(task_definition_name)
self.poll_interval = 375.0
def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
task_result.add_output_data('worker_style', 'class')
task_result.add_output_data('secret_number', 1234)
task_result.add_output_data('is_it_true', False)
task_result.status = TaskResultStatus.COMPLETED
return task_result
class ClassWorkerWithDomain(WorkerInterface):
def __init__(self, task_definition_name):
super().__init__(task_definition_name)
self.poll_interval = 850.0
self.domain = 'simple_python_worker'
def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
task_result.add_output_data('worker_style', 'class')
task_result.add_output_data('secret_number', 1234)
task_result.add_output_data('is_it_true', False)
task_result.status = TaskResultStatus.COMPLETED
return task_result
@WorkerTask(task_definition_name='test_python_decorated_worker')
def decorated_worker(obj: object) -> object:
return {
'worker_style': 'function',
'worker_input': 'Task',
'worker_output': 'object',
'task_input': obj,
'status': 'COMPLETED'
}
@WorkerTask(task_definition_name='test_python_decorated_worker', domain='cool', poll_interval=500.0)
def decorated_worker_with_domain_and_poll_interval(obj: object) -> object:
return {
'worker_style': 'function',
'worker_input': 'Task',
'worker_output': 'object',
'domain': 'cool',
'task_input': obj,
'status': 'COMPLETED'
}
def worker_with_task_input_and_task_result_output(task: Task) -> TaskResult:
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id='your_custom_id'
)
task_result.add_output_data('worker_style', 'function')
task_result.add_output_data('worker_input', 'Task')
task_result.add_output_data('worker_output', 'TaskResult')
task_result.status = TaskResultStatus.COMPLETED
return task_result
def worker_with_task_input_and_generic_output(task: Task) -> object:
return {
'worker_style': 'function',
'worker_input': 'Task',
'worker_output': 'object',
'task_id': task.task_id,
'task_input': task.input_data,
}
def worker_with_generic_input_and_task_result_output(obj: object) -> TaskResult:
task_result = TaskResult(
task_id='',
workflow_instance_id='',
worker_id=''
)
task_result.add_output_data('worker_style', 'function')
task_result.add_output_data('worker_input', 'object')
task_result.add_output_data('worker_output', 'TaskResult')
task_result.status = TaskResultStatus.COMPLETED
return task_result
def worker_with_generic_input_and_generic_output(obj: object) -> object:
return {
'worker_style': 'function',
'worker_input': 'object',
'worker_output': 'object',
'input': obj,
}