66import copy
77import json
88import traceback
9+ from datetime import timedelta
910from typing import AsyncGenerator , Union
1011from astrbot .core .conversation_mgr import Conversation
1112from astrbot .core import logger
@@ -185,21 +186,33 @@ async def _execute_local(
185186 handler = awaitable ,
186187 ** tool_args ,
187188 )
188- async for resp in wrapper :
189- if resp is not None :
190- if isinstance (resp , mcp .types .CallToolResult ):
191- yield resp
189+ # async for resp in wrapper:
190+ while True :
191+ try :
192+ resp = await asyncio .wait_for (
193+ anext (wrapper ),
194+ timeout = run_context .context .tool_call_timeout ,
195+ )
196+ if resp is not None :
197+ if isinstance (resp , mcp .types .CallToolResult ):
198+ yield resp
199+ else :
200+ text_content = mcp .types .TextContent (
201+ type = "text" ,
202+ text = str (resp ),
203+ )
204+ yield mcp .types .CallToolResult (content = [text_content ])
192205 else :
193- text_content = mcp . types . TextContent (
194- type = "text" ,
195- text = str ( resp ),
196- )
197- yield mcp . types . CallToolResult ( content = [ text_content ])
198- else :
199- # NOTE: Tool 在这里直接请求发送消息给用户
200- # TODO: 是否需要判断 event.get_result() 是否为空?
201- # 如果为空,则说明没有发送消息给用户,并且返回值为空,将返回一个特殊的 TextContent,其内容如"工具没有返回内容"
202- yield None
206+ # NOTE: Tool 在这里直接请求发送消息给用户
207+ # TODO: 是否需要判断 event.get_result() 是否为空?
208+ # 如果为空,则说明没有发送消息给用户,并且返回值为空,将返回一个特殊的 TextContent,其内容如"工具没有返回内容"
209+ yield None
210+ except asyncio . TimeoutError :
211+ raise Exception (
212+ f"tool { tool . name } execution timeout after { run_context . context . tool_call_timeout } seconds."
213+ )
214+ except StopAsyncIteration :
215+ break
203216
204217 @classmethod
205218 async def _execute_mcp (
@@ -217,6 +230,9 @@ async def _execute_mcp(
217230 res = await session .call_tool (
218231 name = tool .name ,
219232 arguments = tool_args ,
233+ read_timeout_seconds = timedelta (
234+ seconds = run_context .context .tool_call_timeout
235+ ),
220236 )
221237 if not res :
222238 return
@@ -307,6 +323,7 @@ async def initialize(self, ctx: PipelineContext) -> None:
307323 )
308324 self .streaming_response : bool = settings ["streaming_response" ]
309325 self .max_step : int = settings .get ("max_agent_step" , 30 )
326+ self .tool_call_timeout : int = settings .get ("tool_call_timeout" , 60 )
310327 if isinstance (self .max_step , bool ): # workaround: #2622
311328 self .max_step = 30
312329 self .show_tool_use : bool = settings .get ("show_tool_use_status" , True )
@@ -473,6 +490,7 @@ async def process(
473490 first_provider_request = req ,
474491 curr_provider_request = req ,
475492 streaming = self .streaming_response ,
493+ tool_call_timeout = self .tool_call_timeout ,
476494 )
477495 await agent_runner .reset (
478496 provider = provider ,
0 commit comments