Skip to content

Commit 5fab8aa

Browse files
committed
fix: separate init signal from client lifetime in MCP task wrapper
The previous design awaited task completion, but tasks only finish on shutdown (after event.wait()), causing asyncio.wait to always hit the 20s timeout and cancel all clients. Fix: introduce a dedicated ready_event that is set immediately after _init_mcp_client completes. init_mcp_clients now waits only for ready_event (with 20s timeout), while the long-lived client task continues running in the background until shutdown_event is set. This ensures startup returns promptly once clients are ready.
1 parent 38e99cf commit 5fab8aa

1 file changed

Lines changed: 48 additions & 32 deletions

File tree

astrbot/core/provider/func_tool_manager.py

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -213,43 +213,57 @@ async def init_mcp_clients(self) -> None:
213213
open(mcp_json_file, encoding="utf-8"),
214214
)["mcpServers"]
215215

216-
tasks: dict[str, asyncio.Task] = {}
216+
# name -> (shutdown_event, ready_event, task)
217+
# shutdown_event: 关机时 set,通知长期运行的 task 退出
218+
# ready_event: 初始化完成时 set,用于等待初始化,不阻塞到关机
219+
client_info: dict[str, tuple[asyncio.Event, asyncio.Event, asyncio.Task]] = {}
217220

218221
for name, cfg in mcp_server_json_obj.items():
219222
if cfg.get("active", True):
220-
event = asyncio.Event()
223+
shutdown_event = asyncio.Event()
224+
ready_event = asyncio.Event()
221225
task = asyncio.create_task(
222-
self._init_mcp_client_task_wrapper(name, cfg, event),
226+
self._init_mcp_client_task_wrapper(
227+
name, cfg, shutdown_event, ready_event
228+
),
223229
)
224-
tasks[name] = task
225-
self.mcp_client_event[name] = event
230+
client_info[name] = (shutdown_event, ready_event, task)
231+
self.mcp_client_event[name] = shutdown_event
226232

227-
if tasks:
228-
logger.info(f"等待 {len(tasks)} 个 MCP 服务初始化...")
233+
if client_info:
234+
logger.info(f"等待 {len(client_info)} 个 MCP 服务初始化...")
229235

230-
done, pending = await asyncio.wait(tasks.values(), timeout=20.0)
236+
# 只等待初始化完成信号,不等待整个 task 的生命周期结束
237+
ready_events = {name: info[1] for name, info in client_info.items()}
238+
tasks_by_name = {name: info[2] for name, info in client_info.items()}
231239

232-
if pending:
233-
logger.warning(
234-
"MCP 服务初始化超时(20秒),部分服务可能未完全加载。"
235-
"建议检查 MCP 服务器配置和网络连接。"
240+
async def _wait_ready(name: str, ev: asyncio.Event) -> str:
241+
await ev.wait()
242+
return name
243+
244+
wait_coros = [_wait_ready(n, e) for n, e in ready_events.items()]
245+
try:
246+
await asyncio.wait_for(
247+
asyncio.gather(*wait_coros, return_exceptions=True),
248+
timeout=20.0,
236249
)
237-
for task in pending:
238-
task.cancel()
250+
except asyncio.TimeoutError:
251+
pass
239252

240253
success_count = 0
241254
failed_services: list[str] = []
242255

243-
for name, task in tasks.items():
244-
if task in pending:
256+
for name, task in tasks_by_name.items():
257+
ready_ev = ready_events[name]
258+
if not ready_ev.is_set():
259+
# 超时,初始化未完成,取消 task
245260
logger.error(f"MCP 服务 {name} 初始化超时")
261+
task.cancel()
246262
failed_services.append(name)
247-
continue
248-
249-
exc = task.exception()
250-
if exc is not None:
263+
elif task.done() and task.exception() is not None:
264+
# 初始化期间抛出异常
265+
exc = task.exception()
251266
logger.error(f"MCP 服务 {name} 初始化失败: {exc}")
252-
# 仅在 debug 级别输出完整配置,避免在生产日志中泄露敏感信息
253267
cfg = mcp_server_json_obj.get(name, {})
254268
if "command" in cfg:
255269
logger.debug(f" 命令: {cfg['command']}")
@@ -268,27 +282,29 @@ async def init_mcp_clients(self) -> None:
268282
f"请检查配置文件 mcp_server.json 和服务器可用性。"
269283
)
270284

271-
logger.info(f"MCP 服务初始化完成: {success_count}/{len(tasks)} 成功")
285+
logger.info(f"MCP 服务初始化完成: {success_count}/{len(client_info)} 成功")
272286

273287
async def _init_mcp_client_task_wrapper(
274288
self,
275289
name: str,
276290
cfg: dict,
277-
event: asyncio.Event,
291+
shutdown_event: asyncio.Event,
292+
ready_event: asyncio.Event,
278293
) -> None:
279-
"""初始化 MCP 客户端的包装函数,用于捕获异常"""
280-
initialized = False
294+
"""初始化 MCP 客户端的包装函数。
295+
296+
初始化完成后立即 set ready_event,让 init_mcp_clients 可以
297+
及时返回,而无需等待整个客户端的生命周期结束。
298+
"""
281299
try:
282300
await self._init_mcp_client(name, cfg)
283-
initialized = True
284-
await event.wait()
301+
ready_event.set()
302+
await shutdown_event.wait()
285303
logger.info(f"收到 MCP 客户端 {name} 终止信号")
286304
except Exception:
287-
if not initialized:
288-
# 初始化阶段失败,记录错误并向上抛出让 task.exception() 捕获
289-
logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
290-
raise
291-
# 初始化已成功,此处异常来自 event.wait() 被取消,属于正常终止流程
305+
ready_event.set() # 确保即使失败也能解除等待
306+
logger.error(f"初始化 MCP 客户端 {name} 失败", exc_info=True)
307+
raise
292308
finally:
293309
await self._terminate_mcp_client(name)
294310

0 commit comments

Comments
 (0)