-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathacp.py.j2
More file actions
93 lines (76 loc) · 3.2 KB
/
Copy pathacp.py.j2
File metadata and controls
93 lines (76 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""ACP (Agent Communication Protocol) handler for {{ agent_name }}.
API layer — owns the agent lifecycle and streams tokens and tool calls
from the Pydantic AI agent to the Agentex frontend. Wraps each message in
an Agentex tracing span so the per-message turn (and any tool calls
underneath it) show up in the AgentEx UI / SGP.
"""
from __future__ import annotations
import os
from typing import AsyncGenerator
from dotenv import load_dotenv
load_dotenv()
from project.agent import create_agent
import agentex.lib.adk as adk
from agentex.lib.adk import (
create_pydantic_ai_tracing_handler,
convert_pydantic_ai_to_agentex_events,
)
from agentex.protocol.acp import SendMessageParams
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.types.task_message_update import TaskMessageUpdate
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
logger = make_logger(__name__)
# Register the SGP tracing exporter. Spans also reach the AgentEx backend
# (and surface in the per-task spans dropdown) via the default Agentex
# processor that's lazy-initialised on first span.
SGP_API_KEY = os.environ.get("SGP_API_KEY", "")
SGP_ACCOUNT_ID = os.environ.get("SGP_ACCOUNT_ID", "")
if SGP_API_KEY and SGP_ACCOUNT_ID:
add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=SGP_API_KEY,
sgp_account_id=SGP_ACCOUNT_ID,
sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""),
)
)
acp = FastACP.create(acp_type="sync")
# Lazy-initialised agent instance so the Pydantic AI Agent is constructed
# inside the running event loop on the first request, not at import time.
_agent = None
def get_agent():
"""Return the cached Pydantic AI agent, creating it on first use."""
global _agent
if _agent is None:
_agent = create_agent()
return _agent
@acp.on_message_send
async def handle_message_send(
params: SendMessageParams,
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
"""Handle each incoming user message, streaming tokens and tool calls back."""
agent = get_agent()
task_id = params.task.id
user_message = params.content.content
logger.info(f"Processing message for task {task_id}")
# Open a per-message turn span. Tool calls below nest underneath this
# span via the tracing handler's parent_span_id wiring.
async with adk.tracing.span(
trace_id=task_id,
task_id=task_id,
name="message",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
tracing_handler = create_pydantic_ai_tracing_handler(
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
task_id=task_id,
)
async with agent.run_stream_events(user_message) as stream:
async for event in convert_pydantic_ai_to_agentex_events(
stream, tracing_handler=tracing_handler
):
yield event