-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathhelloworld.py
More file actions
50 lines (38 loc) · 1.61 KB
/
Copy pathhelloworld.py
File metadata and controls
50 lines (38 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio
from greetings_workflow import greetings_workflow
from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration import Configuration
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
from conductor.asyncio_client.workflow.executor.workflow_executor import (
AsyncWorkflowExecutor,
)
async def register_workflow(
workflow_executor: AsyncWorkflowExecutor,
) -> AsyncConductorWorkflow:
workflow = greetings_workflow(workflow_executor=workflow_executor)
await workflow.register(True)
return workflow
async def main():
# points to http://localhost:8080/api by default
api_config = Configuration()
async with ApiClient(api_config) as api_client:
workflow_executor = AsyncWorkflowExecutor(
configuration=api_config, api_client=api_client
)
# Needs to be done only when registering a workflow one-time
workflow = await register_workflow(workflow_executor)
task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()
workflow_run = await workflow_executor.execute(
name=workflow.name,
version=workflow.version,
workflow_input={"name": "World"},
)
print(f"\nworkflow result: {workflow_run.output}\n")
print(
f"see the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}\n"
)
task_handler.stop_processes()
if __name__ == "__main__":
asyncio.run(main())