Skip to content

Commit 9342ba5

Browse files
committed
feat(langchain|langgraph): add AGUI protocol support with async generator handlers and lifecycle hooks
This commit introduces comprehensive AGUI protocol support with lifecycle hooks for both LangChain and LangGraph integrations. The changes include: 1. Added convert function exports to both LangChain and LangGraph integration modules with usage examples 2. Refactored model adapter to use default_headers instead of async_client for ChatOpenAI 3. Enhanced server invoker to handle both coroutine and async generator functions 4. Updated protocol handlers to support async response formatting and safe iterator handling 5. Refined AgentRequest model to be protocol-agnostic with core fields 6. Added CORS middleware configuration to AgentRunServer 7. Updated quick start example to use async streaming with event conversion The changes enable proper streaming support, tool call event handling, and improved async performance for both LangChain and LangGraph integrations. feat(langchain|langgraph): 添加 AGUI 协议支持和异步生成器处理器以及生命周期钩子 此提交为 LangChain 和 LangGraph 集成引入了全面的 AGUI 协议支持和生命周期钩子。变更包括: 1. 为 LangChain 和 LangGraph 集成模块添加了 convert 函数导出以及使用示例 2. 重构模型适配器,使用 default_headers 替代 async_client 用于 ChatOpenAI 3. 增强服务器调用器以处理协程和异步生成器函数 4. 更新协议处理器以支持异步响应格式化和安全的迭代器处理 5. 精简 AgentRequest 模型以实现协议无关性并使用核心字段 6. 为 AgentRunServer 添加 CORS 中间件配置 7. 更新快速开始示例以使用带事件转换的异步流 这些变更启用了适当的流式传输支持、工具调用事件处理以及改进的异步性能。 Change-Id: I941d1d797b930243282555b5a6db0e6d420f3691 Signed-off-by: OhYee <oyohyee@oyohyee.com>
1 parent 5c24837 commit 9342ba5

File tree

15 files changed

+440
-600
lines changed

15 files changed

+440
-600
lines changed

agentrun/integration/langchain/__init__.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
1-
"""LangChain 集成模块,提供 AgentRun 模型与沙箱的 LangChain 适配。 / LangChain 集成 Module"""
1+
"""LangChain 集成模块
2+
3+
Example:
4+
>>> from langchain.agents import create_agent
5+
>>> from agentrun.integration.langchain import convert, model, toolset
6+
>>>
7+
>>> agent = create_agent(model=model("my-model"), tools=toolset("my-tools"))
8+
>>>
9+
>>> async def invoke_agent(request: AgentRequest):
10+
... input_data = {"messages": [...]}
11+
... async for event in agent.astream_events(input_data, version="v2"):
12+
... for item in convert(event, request.hooks):
13+
... yield item
14+
"""
15+
16+
from agentrun.integration.langgraph.agent_converter import convert
217

318
from .builtin import model, sandbox_toolset, toolset
419

520
__all__ = [
21+
"convert",
622
"model",
723
"toolset",
824
"sandbox_toolset",

agentrun/integration/langchain/model_adapter.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ def __init__(self):
2323

2424
def wrap_model(self, common_model: Any) -> Any:
2525
"""包装 CommonModel 为 LangChain BaseChatModel / LangChain Model Adapter"""
26-
from httpx import AsyncClient
2726
from langchain_openai import ChatOpenAI
2827

2928
info = common_model.get_model_info() # 确保模型可用
@@ -32,6 +31,6 @@ def wrap_model(self, common_model: Any) -> Any:
3231
api_key=info.api_key,
3332
model=info.model,
3433
base_url=info.base_url,
35-
async_client=AsyncClient(headers=info.headers),
34+
default_headers=info.headers,
3635
stream_usage=True,
3736
)

agentrun/integration/langgraph/__init__.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1-
"""LangGraph 集成模块。 / LangGraph 集成 Module
1+
"""LangGraph 集成模块
22
3-
提供 AgentRun 模型与沙箱工具的 LangGraph 适配入口。 / 提供 AgentRun 模型with沙箱工具的 LangGraph 适配入口。
4-
LangGraph 与 LangChain 兼容,因此直接复用 LangChain 的转换逻辑。 / LangGraph with LangChain 兼容,因此直接复用 LangChain 的转换逻辑。
3+
Example:
4+
>>> from langgraph.prebuilt import create_react_agent
5+
>>> from agentrun.integration.langgraph import convert
6+
>>>
7+
>>> agent = create_react_agent(llm, tools)
8+
>>>
9+
>>> async def invoke_agent(request: AgentRequest):
10+
... input_data = {"messages": [...]}
11+
... async for event in agent.astream_events(input_data, version="v2"):
12+
... for item in convert(event, request.hooks):
13+
... yield item
514
"""
615

16+
from .agent_converter import convert
717
from .builtin import model, sandbox_toolset, toolset
818

919
__all__ = [
20+
"convert",
1021
"model",
1122
"toolset",
1223
"sandbox_toolset",
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""LangGraph/LangChain Agent 事件转换器
2+
3+
将 LangGraph/LangChain astream_events 的单个事件转换为 AgentRun 事件。
4+
5+
支持两种事件格式:
6+
1. on_chat_model_stream - LangGraph create_react_agent 的流式输出
7+
2. on_chain_stream - LangChain create_agent 的输出
8+
9+
Example:
10+
>>> async def invoke_agent(request: AgentRequest):
11+
... async for event in agent.astream_events(input_data, version="v2"):
12+
... for item in convert(event, request.hooks):
13+
... yield item
14+
"""
15+
16+
import json
17+
from typing import Any, Dict, Generator, List, Optional, Union
18+
19+
from agentrun.server.model import AgentEvent, AgentLifecycleHooks
20+
21+
22+
def convert(
23+
event: Dict[str, Any],
24+
hooks: Optional[AgentLifecycleHooks] = None,
25+
) -> Generator[Union[AgentEvent, str], None, None]:
26+
"""转换单个 astream_events 事件
27+
28+
Args:
29+
event: LangGraph/LangChain astream_events 的单个事件
30+
hooks: AgentLifecycleHooks,用于创建工具调用事件
31+
32+
Yields:
33+
str (文本内容) 或 AgentEvent (工具调用事件)
34+
"""
35+
event_type = event.get("event", "")
36+
data = event.get("data", {})
37+
38+
# 1. LangGraph 格式: on_chat_model_stream
39+
if event_type == "on_chat_model_stream":
40+
chunk = data.get("chunk")
41+
if chunk:
42+
content = _get_content(chunk)
43+
if content:
44+
yield content
45+
46+
# 流式工具调用参数
47+
if hooks:
48+
for tc in _get_tool_chunks(chunk):
49+
tc_id = tc.get("id") or str(tc.get("index", ""))
50+
if tc.get("name") and tc_id:
51+
yield hooks.on_tool_call_start(
52+
id=tc_id, name=tc["name"]
53+
)
54+
if tc.get("args") and tc_id:
55+
yield hooks.on_tool_call_args_delta(
56+
id=tc_id, delta=tc["args"]
57+
)
58+
59+
# 2. LangChain 格式: on_chain_stream (来自 create_agent)
60+
# 只处理 name="model" 的事件,避免重复(LangGraph 会发送 name="model" 和 name="LangGraph" 两个相同内容的事件)
61+
elif event_type == "on_chain_stream" and event.get("name") == "model":
62+
chunk_data = data.get("chunk", {})
63+
if isinstance(chunk_data, dict):
64+
# chunk 格式: {"messages": [AIMessage(...)]}
65+
messages = chunk_data.get("messages", [])
66+
67+
for msg in messages:
68+
# 提取文本内容
69+
content = _get_content(msg)
70+
if content:
71+
yield content
72+
73+
# 提取工具调用
74+
if hooks:
75+
tool_calls = _get_tool_calls(msg)
76+
for tc in tool_calls:
77+
tc_id = tc.get("id", "")
78+
tc_name = tc.get("name", "")
79+
tc_args = tc.get("args", {})
80+
if tc_id and tc_name:
81+
yield hooks.on_tool_call_start(
82+
id=tc_id, name=tc_name
83+
)
84+
if tc_args:
85+
yield hooks.on_tool_call_args(
86+
id=tc_id, args=_to_json(tc_args)
87+
)
88+
89+
# 3. 工具开始 (LangGraph)
90+
elif event_type == "on_tool_start" and hooks:
91+
run_id = event.get("run_id", "")
92+
tool_name = event.get("name", "")
93+
tool_input = data.get("input", {})
94+
95+
if run_id:
96+
yield hooks.on_tool_call_start(id=run_id, name=tool_name)
97+
if tool_input:
98+
yield hooks.on_tool_call_args(
99+
id=run_id, args=_to_json(tool_input)
100+
)
101+
102+
# 4. 工具结束 (LangGraph)
103+
elif event_type == "on_tool_end" and hooks:
104+
run_id = event.get("run_id", "")
105+
output = data.get("output", "")
106+
107+
if run_id:
108+
yield hooks.on_tool_call_result(
109+
id=run_id, result=str(output) if output else ""
110+
)
111+
yield hooks.on_tool_call_end(id=run_id)
112+
113+
114+
def _get_content(obj: Any) -> Optional[str]:
115+
"""提取文本内容"""
116+
if obj is None:
117+
return None
118+
119+
# 字符串
120+
if isinstance(obj, str):
121+
return obj if obj else None
122+
123+
# 有 content 属性的对象 (AIMessage, AIMessageChunk, etc.)
124+
if hasattr(obj, "content"):
125+
c = obj.content
126+
if isinstance(c, str) and c:
127+
return c
128+
if isinstance(c, list):
129+
parts = []
130+
for item in c:
131+
if isinstance(item, str):
132+
parts.append(item)
133+
elif isinstance(item, dict):
134+
parts.append(item.get("text", ""))
135+
return "".join(parts) or None
136+
137+
return None
138+
139+
140+
def _get_tool_chunks(chunk: Any) -> List[Dict[str, Any]]:
141+
"""提取工具调用增量 (AIMessageChunk.tool_call_chunks)"""
142+
result: List[Dict[str, Any]] = []
143+
if hasattr(chunk, "tool_call_chunks") and chunk.tool_call_chunks:
144+
for tc in chunk.tool_call_chunks:
145+
if isinstance(tc, dict):
146+
result.append(tc)
147+
else:
148+
result.append({
149+
"id": getattr(tc, "id", None),
150+
"name": getattr(tc, "name", None),
151+
"args": getattr(tc, "args", None),
152+
"index": getattr(tc, "index", None),
153+
})
154+
return result
155+
156+
157+
def _get_tool_calls(msg: Any) -> List[Dict[str, Any]]:
158+
"""提取完整工具调用 (AIMessage.tool_calls)"""
159+
result: List[Dict[str, Any]] = []
160+
if hasattr(msg, "tool_calls") and msg.tool_calls:
161+
for tc in msg.tool_calls:
162+
if isinstance(tc, dict):
163+
result.append(tc)
164+
else:
165+
result.append({
166+
"id": getattr(tc, "id", None),
167+
"name": getattr(tc, "name", None),
168+
"args": getattr(tc, "args", None),
169+
})
170+
return result
171+
172+
173+
def _to_json(obj: Any) -> str:
174+
"""转 JSON 字符串"""
175+
if isinstance(obj, str):
176+
return obj
177+
return json.dumps(obj, ensure_ascii=False)

agentrun/server/__init__.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,54 @@
11
"""AgentRun Server 模块 / AgentRun Server Module
22
3-
提供 HTTP Server 集成能力,支持符合 AgentRun 规范的 Agent 调用接口。
3+
提供 HTTP Server 集成能力支持符合 AgentRun 规范的 Agent 调用接口。
44
支持 OpenAI Chat Completions 和 AG-UI 两种协议。
55
6-
Example (基本使用 - 同步):
6+
Example (基本使用 - 返回字符串):
77
>>> from agentrun.server import AgentRunServer, AgentRequest
88
>>>
99
>>> def invoke_agent(request: AgentRequest):
10-
... # 实现你的 Agent 逻辑
1110
... return "Hello, world!"
1211
>>>
1312
>>> server = AgentRunServer(invoke_agent=invoke_agent)
14-
>>> server.start(host="0.0.0.0", port=8080)
13+
>>> server.start(port=9000)
1514
16-
Example (使用生命周期钩子 - 同步,推荐):
15+
Example (流式输出):
16+
>>> def invoke_agent(request: AgentRequest):
17+
... for word in ["Hello", ", ", "world", "!"]:
18+
... yield word
19+
>>>
20+
>>> AgentRunServer(invoke_agent=invoke_agent).start()
21+
22+
Example (使用生命周期钩子):
1723
>>> def invoke_agent(request: AgentRequest):
1824
... hooks = request.hooks
1925
...
20-
... # 发送步骤开始事件 (使用 emit_* 同步方法)
21-
... yield hooks.emit_step_start("processing")
26+
... # 发送步骤开始事件
27+
... yield hooks.on_step_start("processing")
2228
...
23-
... # 处理逻辑...
29+
... # 流式输出内容
2430
... yield "Hello, "
2531
... yield "world!"
2632
...
2733
... # 发送步骤结束事件
28-
... yield hooks.emit_step_finish("processing")
34+
... yield hooks.on_step_finish("processing")
2935
30-
Example (使用生命周期钩子 - 异步):
31-
>>> async def invoke_agent(request: AgentRequest):
36+
Example (工具调用事件):
37+
>>> def invoke_agent(request: AgentRequest):
3238
... hooks = request.hooks
3339
...
34-
... # 发送步骤开始事件 (使用 on_* 异步方法)
35-
... async for event in hooks.on_step_start("processing"):
36-
... yield event
40+
... # 工具调用开始
41+
... yield hooks.on_tool_call_start(id="call_1", name="get_time")
42+
... yield hooks.on_tool_call_args(id="call_1", args={"timezone": "UTC"})
3743
...
38-
... # 处理逻辑...
39-
... yield "Hello, world!"
44+
... # 执行工具
45+
... result = "2024-01-01 12:00:00"
4046
...
41-
... # 发送步骤结束事件
42-
... async for event in hooks.on_step_finish("processing"):
43-
... yield event
47+
... # 工具调用结果
48+
... yield hooks.on_tool_call_result(id="call_1", result=result)
49+
... yield hooks.on_tool_call_end(id="call_1")
50+
...
51+
... yield f"当前时间: {result}"
4452
4553
Example (访问原始请求):
4654
>>> def invoke_agent(request: AgentRequest):

agentrun/server/agui_protocol.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"""
1515

1616
from enum import Enum
17+
import inspect
1718
import json
1819
import time
1920
from typing import (
@@ -536,6 +537,9 @@ async def run_agent(request: Request):
536537
event_stream = self.format_response(
537538
agent_result, agent_request, context
538539
)
540+
# 支持 format_response 返回 coroutine 或者 async iterator
541+
if inspect.isawaitable(event_stream):
542+
event_stream = await event_stream
539543

540544
# 4. 返回 SSE 流
541545
return StreamingResponse(
@@ -779,13 +783,20 @@ async def _iterate_content(
779783
loop = asyncio.get_event_loop()
780784
iterator = iter(content) # type: ignore
781785

782-
while True:
786+
# 使用哨兵值来检测迭代结束,避免 StopIteration 传播到 Future
787+
_STOP = object()
788+
789+
def _safe_next():
783790
try:
784-
# 在线程池中执行 next(),避免 time.sleep 阻塞事件循环
785-
chunk = await loop.run_in_executor(None, next, iterator)
786-
yield chunk
791+
return next(iterator)
787792
except StopIteration:
793+
return _STOP
794+
795+
while True:
796+
chunk = await loop.run_in_executor(None, _safe_next)
797+
if chunk is _STOP:
788798
break
799+
yield chunk
789800

790801

791802
# ============================================================================

0 commit comments

Comments
 (0)