-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathacp.py.j2
More file actions
98 lines (78 loc) · 3.25 KB
/
Copy pathacp.py.j2
File metadata and controls
98 lines (78 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
ACP (Agent Communication Protocol) handler for Agentex.
This is the API layer — it manages the graph lifecycle and streams
tokens and tool calls from the LangGraph graph to the Agentex frontend.
"""
from typing import AsyncGenerator
import agentex.lib.adk as adk
from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.protocol.acp import SendMessageParams
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.types.task_message_content import TaskMessageContent
from agentex.types.task_message_delta import TextDelta
from agentex.types.task_message_update import TaskMessageUpdate
from dotenv import load_dotenv
load_dotenv()
import os
# LiteLLM proxy auth: copy LITELLM_API_KEY to OPENAI_API_KEY for OpenAI client compatibility
_litellm_key = os.environ.get("LITELLM_API_KEY")
if _litellm_key:
os.environ["OPENAI_API_KEY"] = _litellm_key
from project.graph import create_graph
logger = make_logger(__name__)
# Register the Agentex tracing processor so spans are shipped to the backend
add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=os.environ.get("SGP_API_KEY", ""),
sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""),
sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""),
))
# Create ACP server
acp = FastACP.create(acp_type="sync")
# Compiled graph (lazy-initialized on first request)
_graph = None
async def get_graph():
"""Get or create the compiled graph instance."""
global _graph
if _graph is None:
_graph = await create_graph()
return _graph
@acp.on_message_send
async def handle_message_send(
params: SendMessageParams,
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
"""Handle incoming messages from Agentex, streaming tokens and tool calls."""
graph = await get_graph()
thread_id = params.task.id
user_message = params.content.content
logger.info(f"Processing message for thread {thread_id}")
async with adk.tracing.span(
trace_id=thread_id,
name="message",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
callback = create_langgraph_tracing_handler(
trace_id=thread_id,
parent_span_id=turn_span.id if turn_span else None,
)
stream = graph.astream(
{"messages": [{"role": "user", "content": user_message}]},
config={
"configurable": {"thread_id": thread_id},
"callbacks": [callback],
},
stream_mode=["messages", "updates"],
)
final_text = ""
async for event in convert_langgraph_to_agentex_events(stream):
# Accumulate text deltas for span output
delta = getattr(event, "delta", None)
if isinstance(delta, TextDelta) and delta.text_delta:
final_text += delta.text_delta
yield event
if turn_span:
turn_span.output = {"final_output": final_text}