Skip to content

Commit 5f7db59

Browse files
committed
feat(langgraph|langchain): introduce AgentRunConverter for consistent tool_call_id handling
adds new AgentRunConverter class to maintain tool_call_id consistency in streaming tool calls, where the first chunk provides the id and subsequent chunks only have index. also updates integration modules to use the new converter class and maintains backward compatibility. The new AgentRunConverter class manages the mapping between tool call indices and IDs to ensure consistent tool_call_id across streaming chunks, which is crucial for proper tool call correlation in the AG-UI protocol. // 中文翻译: 添加了新的 AgentRunConverter 类来维护流式工具调用中 tool_call_id 的一致性, 其中第一个数据块提供 ID,后续的数据块只有索引。同时更新了集成模块以使用新 的转换器类并保持向后兼容性。 新的 AgentRunConverter 类管理工具调用索引和 ID 之间的映射,确保跨流式数据块的 tool_call_id 一致性,这对于 AG-UI 协议中的正确工具调用关联至关重要。 Change-Id: I35ce9ac6db8a05d0b052df77aa716bf4402ced64 Signed-off-by: OhYee <oyohyee@oyohyee.com>
1 parent 2a0edf4 commit 5f7db59

File tree

5 files changed

+647
-16
lines changed

5 files changed

+647
-16
lines changed

agentrun/integration/langchain/__init__.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
"""LangChain 集成模块
22
3-
使用 to_agui_events 将 LangChain 事件转换为 AG-UI 协议事件:
3+
使用 AgentRunConverter 将 LangChain 事件转换为 AG-UI 协议事件:
44
5-
>>> from agentrun.integration.langchain import to_agui_events
5+
>>> from agentrun.integration.langchain import AgentRunConverter
66
>>>
77
>>> async def invoke_agent(request: AgentRequest):
8-
... input_data = {"messages": [...]}
8+
... converter = AgentRunConverter()
99
... async for event in agent.astream_events(input_data, version="v2"):
10-
... for item in to_agui_events(event):
10+
... for item in converter.convert(event):
1111
... yield item
1212
1313
支持多种调用方式:
@@ -17,15 +17,21 @@
1717
"""
1818

1919
from agentrun.integration.langgraph.agent_converter import (
20+
AguiEventConverter,
21+
) # 向后兼容
22+
from agentrun.integration.langgraph.agent_converter import (
23+
AgentRunConverter,
2024
convert,
2125
to_agui_events,
2226
)
2327

2428
from .builtin import model, sandbox_toolset, toolset
2529

2630
__all__ = [
27-
"to_agui_events",
28-
"convert", # 兼容旧代码
31+
"AgentRunConverter",
32+
"AguiEventConverter", # 向后兼容
33+
"to_agui_events", # 向后兼容
34+
"convert", # 向后兼容
2935
"model",
3036
"toolset",
3137
"sandbox_toolset",

agentrun/integration/langgraph/__init__.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
"""LangGraph 集成模块
22
3-
使用 to_agui_events 将 LangGraph 事件转换为 AG-UI 协议事件:
3+
使用 AgentRunConverter 将 LangGraph 事件转换为 AG-UI 协议事件:
44
5-
>>> from agentrun.integration.langgraph import to_agui_events
5+
>>> from agentrun.integration.langgraph import AgentRunConverter
66
>>>
77
>>> async def invoke_agent(request: AgentRequest):
8-
... input_data = {"messages": [...]}
8+
... converter = AgentRunConverter()
99
... async for event in agent.astream_events(input_data, version="v2"):
10-
... for item in to_agui_events(event):
10+
... for item in converter.convert(event):
1111
... yield item
1212
1313
支持多种调用方式:
@@ -16,12 +16,15 @@
1616
- agent.astream(input, stream_mode="updates") - 异步按节点输出
1717
"""
1818

19-
from .agent_converter import convert, to_agui_events
19+
from .agent_converter import AguiEventConverter # 向后兼容
20+
from .agent_converter import AgentRunConverter, convert, to_agui_events
2021
from .builtin import model, sandbox_toolset, toolset
2122

2223
__all__ = [
23-
"to_agui_events",
24-
"convert", # 兼容旧代码
24+
"AgentRunConverter",
25+
"AguiEventConverter", # 向后兼容
26+
"to_agui_events", # 向后兼容
27+
"convert", # 向后兼容
2528
"model",
2629
"toolset",
2730
"sandbox_toolset",

agentrun/integration/langgraph/agent_converter.py

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -498,11 +498,15 @@ def _convert_stream_values_event(
498498

499499
def _convert_astream_events_event(
500500
event_dict: Dict[str, Any],
501+
tool_call_id_map: Optional[Dict[int, str]] = None,
501502
) -> Iterator[Union[AgentResult, str]]:
502503
"""转换 astream_events 格式的单个事件
503504
504505
Args:
505506
event_dict: 事件字典,格式为 {"event": "on_xxx", "data": {...}}
507+
tool_call_id_map: 可选的 index -> tool_call_id 映射字典。
508+
在流式工具调用中,第一个 chunk 有 id,后续只有 index。
509+
此映射用于确保所有 chunk 使用一致的 tool_call_id。
506510
507511
Yields:
508512
str (文本内容) 或 AgentResult (事件)
@@ -521,9 +525,34 @@ def _convert_astream_events_event(
521525

522526
# 流式工具调用参数
523527
for tc in _extract_tool_call_chunks(chunk):
524-
tc_id = tc.get("id") or str(tc.get("index", ""))
528+
tc_index = tc.get("index")
529+
tc_raw_id = tc.get("id")
525530
tc_args = tc.get("args", "")
526531

532+
# 解析 tool_call_id:
533+
# 1. 如果有 id 且非空,使用它并更新映射
534+
# 2. 如果 id 为空但有 index,从映射中查找
535+
# 3. 最后回退到使用 index 字符串
536+
if tc_raw_id:
537+
tc_id = tc_raw_id
538+
# 更新映射(如果提供了映射字典)
539+
# 重要:即使这个 chunk 没有 args,也要更新映射,
540+
# 因为后续 chunk 可能只有 index 没有 id
541+
if tool_call_id_map is not None and tc_index is not None:
542+
tool_call_id_map[tc_index] = tc_id
543+
elif tc_index is not None:
544+
# 从映射中查找,如果没有则使用 index
545+
if (
546+
tool_call_id_map is not None
547+
and tc_index in tool_call_id_map
548+
):
549+
tc_id = tool_call_id_map[tc_index]
550+
else:
551+
tc_id = str(tc_index)
552+
else:
553+
tc_id = ""
554+
555+
# 只有有 args 时才生成 TOOL_CALL_ARGS 事件
527556
if tc_args and tc_id:
528557
if isinstance(tc_args, (dict, list)):
529558
tc_args = _safe_json_dumps(tc_args)
@@ -622,6 +651,7 @@ def _convert_astream_events_event(
622651
def to_agui_events(
623652
event: Union[Dict[str, Any], Any],
624653
messages_key: str = "messages",
654+
tool_call_id_map: Optional[Dict[int, str]] = None,
625655
) -> Iterator[Union[AgentResult, str]]:
626656
"""将 LangGraph/LangChain 流式事件转换为 AG-UI 协议事件
627657
@@ -635,12 +665,14 @@ def to_agui_events(
635665
Args:
636666
event: LangGraph/LangChain 流式事件(StreamEvent 对象或 Dict)
637667
messages_key: state 中消息列表的 key,默认 "messages"
668+
tool_call_id_map: 可选的 index -> tool_call_id 映射字典,用于流式工具调用
669+
的 ID 一致性。如果提供,函数会自动更新此映射。
638670
639671
Yields:
640672
str (文本内容) 或 AgentResult (AG-UI 事件)
641673
642674
Example:
643-
>>> # 使用 astream_events
675+
>>> # 使用 astream_events(推荐使用 AguiEventConverter 类)
644676
>>> async for event in agent.astream_events(input, version="v2"):
645677
... for item in to_agui_events(event):
646678
... yield item
@@ -660,7 +692,7 @@ def to_agui_events(
660692
# 根据事件格式选择对应的转换器
661693
if _is_astream_events_format(event_dict):
662694
# astream_events 格式:{"event": "on_xxx", "data": {...}}
663-
yield from _convert_astream_events_event(event_dict)
695+
yield from _convert_astream_events_event(event_dict, tool_call_id_map)
664696

665697
elif _is_stream_updates_format(event_dict):
666698
# stream/astream(stream_mode="updates") 格式:{node_name: state_update}
@@ -671,5 +703,55 @@ def to_agui_events(
671703
yield from _convert_stream_values_event(event_dict, messages_key)
672704

673705

706+
class AgentRunConverter:
707+
"""AgentRun 事件转换器
708+
709+
将 LangGraph/LangChain 流式事件转换为 AG-UI 协议事件。
710+
此类维护必要的状态以确保流式工具调用的 tool_call_id 一致性。
711+
712+
在流式工具调用中,第一个 chunk 包含 id,后续 chunk 只有 index。
713+
此类维护 index -> id 的映射,确保所有相关事件使用相同的 tool_call_id。
714+
715+
Example:
716+
>>> from agentrun.integration.langchain import AgentRunConverter
717+
>>>
718+
>>> async def invoke_agent(request: AgentRequest):
719+
... converter = AgentRunConverter()
720+
... async for event in agent.astream_events(input, version="v2"):
721+
... for item in converter.convert(event):
722+
... yield item
723+
"""
724+
725+
def __init__(self):
726+
self._tool_call_id_map: Dict[int, str] = {}
727+
728+
def convert(
729+
self,
730+
event: Union[Dict[str, Any], Any],
731+
messages_key: str = "messages",
732+
) -> Iterator[Union[AgentResult, str]]:
733+
"""转换单个事件为 AG-UI 协议事件
734+
735+
Args:
736+
event: LangGraph/LangChain 流式事件(StreamEvent 对象或 Dict)
737+
messages_key: state 中消息列表的 key,默认 "messages"
738+
739+
Yields:
740+
str (文本内容) 或 AgentResult (AG-UI 事件)
741+
"""
742+
yield from to_agui_events(event, messages_key, self._tool_call_id_map)
743+
744+
def reset(self):
745+
"""重置状态,清空 tool_call_id 映射
746+
747+
在处理新的请求时,建议创建新的 AgentRunConverter 实例,
748+
而不是复用旧实例并调用 reset。
749+
"""
750+
self._tool_call_id_map.clear()
751+
752+
753+
# 保留向后兼容的别名
754+
AguiEventConverter = AgentRunConverter
755+
674756
# 保留 convert 作为别名,兼容旧代码
675757
convert = to_agui_events

0 commit comments

Comments
 (0)