-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathacp.py.j2
More file actions
151 lines (119 loc) · 4.91 KB
/
Copy pathacp.py.j2
File metadata and controls
151 lines (119 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
from typing import AsyncGenerator, List
from agentex.lib import adk
from agentex.lib.adk.providers._modules.sync_provider import SyncStreamingProvider, convert_openai_to_agentex_events
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.protocol.acp import SendMessageParams
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.model_utils import BaseModel
from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull
from agentex.types.task_message_content import TaskMessageContent
from agentex.types.text_content import TextContent
from agentex.lib.utils.logging import make_logger
from agents import Agent, Runner, RunConfig, function_tool, set_tracing_disabled
# Disable the openai-agents SDK's native tracer so it doesn't ship traces to
# api.openai.com using OPENAI_API_KEY (which may be a LiteLLM proxy key).
# SGP tracing below still runs via the Agentex tracing manager.
set_tracing_disabled(True)
logger = make_logger(__name__)
# LiteLLM proxy auth: copy LITELLM_API_KEY to OPENAI_API_KEY for OpenAI client compatibility
_litellm_key = os.environ.get("LITELLM_API_KEY")
if _litellm_key:
os.environ["OPENAI_API_KEY"] = _litellm_key
SGP_API_KEY = os.environ.get("SGP_API_KEY", "")
SGP_ACCOUNT_ID = os.environ.get("SGP_ACCOUNT_ID", "")
SGP_CLIENT_BASE_URL = os.environ.get("SGP_CLIENT_BASE_URL", "")
if SGP_API_KEY and SGP_ACCOUNT_ID:
add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=SGP_API_KEY,
sgp_account_id=SGP_ACCOUNT_ID,
sgp_base_url=SGP_CLIENT_BASE_URL,
)
)
MODEL = "gpt-4o-mini"
SYSTEM_PROMPT = """
<role>
You are a helpful assistant. Use your tools to help the user.
</role>
<communication_style>
Communicate in a witty and friendly manner
</communication_style>
"""
AGENT_NAME = "{{ agent_name }}"
@function_tool
async def get_weather() -> str:
"""
Get the current weather.
This is a dummy activity that returns a hardcoded string for demo purposes.
Replace this with a real weather API call in your implementation.
Returns:
A string describing the current weather conditions.
"""
logger.info("get_weather activity called")
return "Sunny, 72°F"
# Create an ACP server
acp = FastACP.create(
acp_type="sync",
)
class StateModel(BaseModel):
input_list: List[dict]
turn_number: int
@acp.on_message_send
async def handle_message_send(
params: SendMessageParams
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
if not os.environ.get("LITELLM_API_KEY"):
yield StreamTaskMessageFull(
index=0,
type="full",
content=TextContent(
author="agent",
content="Hey, sorry I'm unable to respond to your message because you're running this example without a LiteLLM API key. Please set the LITELLM_API_KEY environment variable to run this example. Do this by either adding a .env file to the project/ directory or by setting the environment variable in your terminal.",
),
)
return
user_prompt = params.content.content
# Retrieve the task state. Each event is handled as a new turn, so we need to get the state for the current turn.
task_state = await adk.state.get_by_task_and_agent(task_id=params.task.id, agent_id=params.agent.id)
if not task_state:
# If the state doesn't exist, create it.
state = StateModel(input_list=[], turn_number=0)
task_state = await adk.state.create(task_id=params.task.id, agent_id=params.agent.id, state=state)
else:
state = StateModel.model_validate(task_state.state)
state.turn_number += 1
state.input_list.append({"role": "user", "content": user_prompt})
# Initialize the sync provider and run config to allow for tracing
provider = SyncStreamingProvider(
trace_id=params.task.id,
)
run_config = RunConfig(
model_provider=provider,
)
# Initialize the agent
agent = Agent(
name=AGENT_NAME,
instructions=SYSTEM_PROMPT,
model=MODEL,
tools=[get_weather],
)
# Run the agent with the conversation history from state
result = Runner.run_streamed(
agent,
state.input_list,
run_config=run_config
)
# Convert the OpenAI events to Agentex events and stream them back to the client
async for agentex_event in convert_openai_to_agentex_events(result.stream_events()):
yield agentex_event
# After streaming is complete, update state with the full conversation history
state.input_list = result.to_input_list()
await adk.state.update(
state_id=task_state.id,
task_id=params.task.id,
agent_id=params.agent.id,
state=state,
trace_id=params.task.id,
)