-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathworkflow.py.j2
More file actions
66 lines (53 loc) · 3.13 KB
/
Copy pathworkflow.py.j2
File metadata and controls
66 lines (53 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import json
from temporalio import workflow
from agentex.lib import adk
from agentex.protocol.acp import CreateTaskParams, SendEventParams
from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
from agentex.lib.core.temporal.types.workflow import SignalName
from agentex.lib.utils.logging import make_logger
from agentex.types.text_content import TextContent
from agentex.lib.environment_variables import EnvironmentVariables
environment_variables = EnvironmentVariables.refresh()
if environment_variables.WORKFLOW_NAME is None:
raise ValueError("Environment variable WORKFLOW_NAME is not set")
if environment_variables.AGENT_NAME is None:
raise ValueError("Environment variable AGENT_NAME is not set")
logger = make_logger(__name__)
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
class {{ workflow_class }}(BaseWorkflow):
"""
Minimal async workflow template for AgentEx Temporal agents.
"""
def __init__(self):
super().__init__(display_name=environment_variables.AGENT_NAME)
self._complete_task = False
@workflow.signal(name=SignalName.RECEIVE_EVENT)
async def on_task_event_send(self, params: SendEventParams) -> None:
logger.info(f"Received task message instruction: {params}")
# 2. Echo back the client's message to show it in the UI. This is not done by default so the agent developer has full control over what is shown to the user.
await adk.messages.create(task_id=params.task.id, content=params.event.content)
# 3. Send a simple response message.
# In future tutorials, this is where we'll add more sophisticated response logic.
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your message. I can't respond right now, but in future tutorials we'll see how you can get me to intelligently respond to your message.",
),
)
@workflow.run
async def on_task_create(self, params: CreateTaskParams) -> str:
logger.info(f"Received task create params: {params}")
# 1. Acknowledge that the task has been created.
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
return "Task completed"