-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathdynamic_workflow.py
More file actions
71 lines (56 loc) · 2.72 KB
/
Copy pathdynamic_workflow.py
File metadata and controls
71 lines (56 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
This is a dynamic workflow that can be created and executed at run time.
dynamic_workflow will run worker tasks get_user_email and send_email in the same order.
For use cases in which the workflow cannot be defined statically, dynamic workflows is a useful approach.
For detailed explanation, https://github.com/conductor-sdk/conductor-python/blob/main/workflows.md
"""
import asyncio
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration.configuration import Configuration
from conductor.asyncio_client.orkes.orkes_clients import OrkesClients
from conductor.asyncio_client.worker.worker_task import worker_task
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
@worker_task(task_definition_name="get_user_email")
def get_user_email(userid: str) -> str:
return f"{userid}@example.com"
@worker_task(task_definition_name="send_email")
def send_email(email: str, subject: str, body: str):
print(f"sending email to {email} with subject {subject} and body {body}")
async def main():
# defaults to reading the configuration using following env variables
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
# CONDUCTOR_AUTH_KEY : API Authentication Key
# CONDUCTOR_AUTH_SECRET: API Auth Secret
api_config = Configuration()
api_config.apply_logging_config()
task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()
async with ApiClient(api_config) as api_client:
clients = OrkesClients(api_client=api_client, configuration=api_config)
workflow_executor = clients.get_workflow_executor()
workflow = AsyncConductorWorkflow(
name="dynamic_workflow", version=1, executor=workflow_executor
)
get_email = get_user_email(
task_ref_name="get_user_email_ref", userid=workflow.input("userid")
)
sendmail = send_email(
task_ref_name="send_email_ref",
email=get_email.output("result"),
subject="Hello from Orkes",
body="Test Email",
)
workflow >> get_email >> sendmail
# Configure the output of the workflow
workflow.output_parameters(
output_parameters={"email": get_email.output("result")}
)
workflow_run = await workflow.execute(workflow_input={"userid": "user_a"})
print(f"\nworkflow output: {workflow_run.output}\n")
print(
f"check the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}"
)
task_handler.stop_processes()
if __name__ == "__main__":
asyncio.run(main())