-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathtask_status_change_audit.py
More file actions
146 lines (128 loc) · 4.61 KB
/
Copy pathtask_status_change_audit.py
File metadata and controls
146 lines (128 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import asyncio
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.adapters.models import (
ExtendedWorkflowDef,
StartWorkflowRequest,
StateChangeEvent,
Task,
TaskDef,
TaskResult,
WorkflowTask,
)
from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration.configuration import Configuration
from conductor.asyncio_client.orkes.orkes_clients import OrkesClients
from conductor.asyncio_client.worker.worker_task import worker_task
from conductor.shared.http.enums import TaskResultStatus
@worker_task(task_definition_name="audit_log")
def audit_log(workflow_input: object, status: str, name: str):
print(f"task {name} is in {status} status, with workflow input as {workflow_input}")
@worker_task(task_definition_name="simple_task_1")
def simple_task_1(task: Task) -> str:
return "OK"
@worker_task(task_definition_name="simple_task_2")
def simple_task_2(task: Task) -> TaskResult:
return TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
status=TaskResultStatus.FAILED_WITH_TERMINAL_ERROR,
)
async def main():
api_config = Configuration()
api_config.apply_logging_config()
task_handler = TaskHandler(
workers=[],
configuration=api_config,
scan_for_annotated_workers=True,
)
task_handler.start_processes()
async with ApiClient(api_config) as api_client:
clients = OrkesClients(configuration=api_config, api_client=api_client)
metadata_client = clients.get_metadata_client()
workflow_client = clients.get_workflow_client()
task1 = WorkflowTask(
type="SIMPLE",
name="simple_task_1",
task_reference_name="simple_task_1_ref",
on_state_change={
"onStart": [
StateChangeEvent(
type="audit_log",
payload={
"workflow_input": "${workflow.input}",
"status": "${simple_task_1_ref.status}",
"name": "simple_task_1_ref",
},
)
]
},
)
task_def = TaskDef(
name="simple_task_2",
retry_count=0,
timeout_seconds=600,
total_timeout_seconds=600,
)
task2 = WorkflowTask(
type="SIMPLE",
name="simple_task_2",
task_reference_name="simple_task_2_ref",
task_definition=task_def,
on_state_change={
"onScheduled": [
StateChangeEvent(
type="audit_log",
payload={
"workflow_input": "${workflow.input}",
"status": "${simple_task_2_ref.status}",
"name": "simple_task_2_ref",
},
)
],
"onStart": [
StateChangeEvent(
type="audit_log",
payload={
"workflow_input": "${workflow.input}",
"status": "${simple_task_2_ref.status}",
"name": "simple_task_2_ref",
},
)
],
"onFailed": [
StateChangeEvent(
type="audit_log",
payload={
"workflow_input": "${workflow.input}",
"status": "${simple_task_2_ref.status}",
"name": "simple_task_2_ref",
},
)
],
},
)
workflow = ExtendedWorkflowDef(
name="test_audit_logs",
version=1,
timeoutSeconds=600,
tasks=[
task1,
task2,
],
)
await metadata_client.register_workflow_def(
extended_workflow_def=workflow, overwrite=True
)
request = StartWorkflowRequest(
name=workflow.name,
version=workflow.version,
input={"a": "aa", "b": "bb", "c": 42},
)
workflow_id = await workflow_client.start_workflow(
start_workflow_request=request
)
print(f"workflow_id {workflow_id}")
task_handler.join_processes()
if __name__ == "__main__":
asyncio.run(main())