Skip to content

Commit 11003a9

Browse files
jsonmp-k8EItanya
andauthored
fix: return MCP connection errors to LLM instead of raising (#1531)
## Summary - Wrap `McpTool` instances with `ConnectionSafeMcpTool` that catches persistent connection errors and returns them as error text to the LLM - Catches `ConnectionError` (stdlib), `TimeoutError` (stdlib), `httpx.TransportError` (httpx network/timeout/protocol errors), and `McpError` (MCP session stream drops and read timeouts) - The error message includes the tool name, error type, and instructs the LLM not to retry - `KAgentMcpToolset.get_tools()` automatically wraps all `McpTool` instances ## Root cause When an MCP HTTP tool call fails with "connection reset by peer", the error propagates up to the ADK flow handler, which sends it back to the LLM as a function error. The LLM interprets this as a transient failure and retries the same tool call — creating a tight loop of LLM call → tool call → connection error → LLM call for up to `max_llm_calls` (500) iterations, burning 100% CPU. The MCP client wraps transport-level errors into `McpError` via `mcp.shared.session.send_request()` before they reach the tool, so catching only stdlib/httpx errors is insufficient — `McpError` must also be handled. ## Testing - `python -m pytest python/packages/kagent-adk/tests/unittests/test_mcp_connection_error_handling.py -v` (10 tests) - `python -m pytest python/packages/kagent-adk/tests/unittests/ -v` (170 passed) Test coverage: - `ConnectionResetError`, `ConnectionRefusedError`, `TimeoutError` — caught, returned as error dict - `httpx.ConnectError`, `httpx.ReadError`, `httpx.ConnectTimeout` — caught via `httpx.TransportError` - `McpError` (session read timeout) — caught, returned as error dict - `ValueError`, `CancelledError` — still raised (not connection errors) - `KAgentMcpToolset.get_tools()` wraps `McpTool` → `ConnectionSafeMcpTool` Fixes #1530 --------- Signed-off-by: Jaison Paul <paul.jaison@gmail.com> Co-authored-by: Eitan Yarmush <eitan.yarmush@solo.io>
1 parent 213f53d commit 11003a9

2 files changed

Lines changed: 272 additions & 2 deletions

File tree

python/packages/kagent-adk/src/kagent/adk/_mcp_toolset.py

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,65 @@
22

33
import asyncio
44
import logging
5-
from typing import Optional
5+
from typing import Any, Optional
66

7+
import httpx
78
from google.adk.tools import BaseTool
9+
from google.adk.tools.mcp_tool.mcp_tool import McpTool
810
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset, ReadonlyContext
11+
from google.adk.tools.tool_context import ToolContext
12+
from mcp.shared.exceptions import McpError
913

1014
logger = logging.getLogger("kagent_adk." + __name__)
1115

16+
# Connection errors that indicate an unreachable MCP server.
17+
# When these occur, the tool should return an error message to the LLM
18+
# instead of raising, so the LLM can respond to the user rather than
19+
# retrying the broken tool indefinitely.
20+
#
21+
# - ConnectionError: stdlib base for ConnectionResetError, ConnectionRefusedError, etc.
22+
# - TimeoutError: stdlib timeout (e.g. socket.timeout)
23+
# - httpx.TransportError: covers httpx.NetworkError (ConnectError, ReadError,
24+
# WriteError, CloseError), httpx.TimeoutException, httpx.ProtocolError, etc.
25+
# These do NOT inherit from stdlib ConnectionError/OSError.
26+
#
27+
# McpError is handled separately in ConnectionSafeMcpTool.run_async() because
28+
# it is the general MCP protocol error class. Only transport-level McpErrors
29+
# (e.g., session read timeouts) should be caught; protocol-level McpErrors
30+
# (e.g., invalid tool arguments) must propagate so the LLM can correct itself.
31+
_CONNECTION_ERROR_TYPES = (
32+
ConnectionError,
33+
TimeoutError,
34+
httpx.TransportError,
35+
)
36+
37+
# Keywords in McpError messages that indicate transport-level failures
38+
# (as opposed to protocol-level errors like invalid arguments).
39+
_TRANSPORT_MCP_ERROR_KEYWORDS = (
40+
"timeout",
41+
"timed out",
42+
"connection",
43+
"eof",
44+
"reset",
45+
"closed",
46+
"transport",
47+
"stream",
48+
"unreachable",
49+
)
50+
51+
52+
def _is_transport_mcp_error(error: McpError) -> bool:
53+
"""Check if an McpError represents a transport-level failure.
54+
55+
McpError wraps all MCP protocol errors, but only transport-level failures
56+
(e.g., session read timeouts, stream closures) should be caught and
57+
returned to the LLM as non-retryable errors. Protocol-level errors
58+
(e.g., invalid tool arguments, server validation failures) should
59+
propagate so the LLM can correct its behavior.
60+
"""
61+
message = error.error.message.lower()
62+
return any(keyword in message for keyword in _TRANSPORT_MCP_ERROR_KEYWORDS)
63+
1264

1365
def _enrich_cancelled_error(error: BaseException) -> asyncio.CancelledError:
1466
message = "Failed to create MCP session: operation cancelled"
@@ -17,6 +69,57 @@ def _enrich_cancelled_error(error: BaseException) -> asyncio.CancelledError:
1769
return asyncio.CancelledError(message)
1870

1971

72+
class ConnectionSafeMcpTool(McpTool):
73+
"""McpTool wrapper that catches connection errors and returns them as
74+
error text to the LLM instead of raising.
75+
76+
Without this, a persistent connection failure (e.g. "connection reset by
77+
peer") causes the LLM to retry the tool call in a tight loop, burning
78+
100% CPU for up to max_llm_calls iterations.
79+
80+
Uses composition: delegates to an inner McpTool instance via __getattr__,
81+
avoiding the fragile __new__ + __dict__ copy pattern that would break if
82+
upstream McpTool adds __slots__, properties, or post-init hooks.
83+
84+
See: https://github.com/kagent-dev/kagent/issues/1530
85+
"""
86+
87+
_inner_tool: McpTool
88+
89+
def __init__(self, inner_tool: McpTool):
90+
# Store the inner tool without calling McpTool.__init__
91+
# (which requires connection params we don't have).
92+
object.__setattr__(self, "_inner_tool", inner_tool)
93+
94+
def __getattr__(self, name: str) -> Any:
95+
return getattr(self._inner_tool, name)
96+
97+
def _connection_error_response(self, error: Exception) -> dict[str, Any]:
98+
error_message = (
99+
f"MCP tool '{self.name}' failed due to a connection error: "
100+
f"{type(error).__name__}: {error}. "
101+
"The MCP server may be unreachable. "
102+
"Do not retry this tool — inform the user about the failure."
103+
)
104+
logger.error(error_message, exc_info=error)
105+
return {"error": error_message}
106+
107+
async def run_async(
108+
self,
109+
*,
110+
args: dict[str, Any],
111+
tool_context: ToolContext,
112+
) -> dict[str, Any]:
113+
try:
114+
return await self._inner_tool.run_async(args=args, tool_context=tool_context)
115+
except _CONNECTION_ERROR_TYPES as error:
116+
return self._connection_error_response(error)
117+
except McpError as error:
118+
if not _is_transport_mcp_error(error):
119+
raise
120+
return self._connection_error_response(error)
121+
122+
20123
class KAgentMcpToolset(McpToolset):
21124
"""McpToolset variant that catches and enriches errors during MCP session setup
22125
and handles cancel scope issues during cleanup.
@@ -27,10 +130,20 @@ class KAgentMcpToolset(McpToolset):
27130

28131
async def get_tools(self, readonly_context: Optional[ReadonlyContext] = None) -> list[BaseTool]:
29132
try:
30-
return await super().get_tools(readonly_context)
133+
tools = await super().get_tools(readonly_context)
31134
except asyncio.CancelledError as error:
32135
raise _enrich_cancelled_error(error) from error
33136

137+
# Wrap each McpTool with ConnectionSafeMcpTool so that connection
138+
# errors are returned as error text instead of raised.
139+
wrapped_tools: list[BaseTool] = []
140+
for tool in tools:
141+
if isinstance(tool, McpTool) and not isinstance(tool, ConnectionSafeMcpTool):
142+
wrapped_tools.append(ConnectionSafeMcpTool(tool))
143+
else:
144+
wrapped_tools.append(tool)
145+
return wrapped_tools
146+
34147
async def close(self) -> None:
35148
"""Close MCP sessions and suppress known anyio cancel scope cleanup errors.
36149
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
"""Tests for ConnectionSafeMcpTool — connection errors are returned as
2+
error text to the LLM instead of raised, preventing tight retry loops.
3+
4+
See: https://github.com/kagent-dev/kagent/issues/1530
5+
"""
6+
7+
import asyncio
8+
from unittest.mock import AsyncMock, MagicMock, patch
9+
10+
import httpx
11+
import pytest
12+
from google.adk.tools.mcp_tool.mcp_tool import McpTool
13+
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
14+
from mcp.shared.exceptions import McpError
15+
from mcp.types import ErrorData
16+
17+
from kagent.adk._mcp_toolset import ConnectionSafeMcpTool, KAgentMcpToolset
18+
19+
20+
def _make_connection_safe_tool(side_effect):
21+
"""Create a ConnectionSafeMcpTool wrapping a mock McpTool."""
22+
inner_tool = MagicMock(spec=McpTool)
23+
inner_tool.name = "test-tool"
24+
inner_tool.run_async = AsyncMock(side_effect=side_effect)
25+
return ConnectionSafeMcpTool(inner_tool)
26+
27+
28+
@pytest.mark.asyncio
29+
async def test_connection_reset_error_returns_error_dict():
30+
"""ConnectionResetError should be caught and returned as error text."""
31+
tool = _make_connection_safe_tool(ConnectionResetError("Connection reset by peer"))
32+
33+
result = await tool.run_async(args={"key": "value"}, tool_context=MagicMock())
34+
35+
assert "error" in result
36+
assert "ConnectionResetError" in result["error"]
37+
assert "Connection reset by peer" in result["error"]
38+
assert "Do not retry" in result["error"]
39+
40+
41+
@pytest.mark.asyncio
42+
async def test_connection_refused_error_returns_error_dict():
43+
"""ConnectionRefusedError should be caught and returned as error text."""
44+
tool = _make_connection_safe_tool(ConnectionRefusedError("Connection refused"))
45+
46+
result = await tool.run_async(args={}, tool_context=MagicMock())
47+
48+
assert "error" in result
49+
assert "ConnectionRefusedError" in result["error"]
50+
51+
52+
@pytest.mark.asyncio
53+
async def test_timeout_error_returns_error_dict():
54+
"""TimeoutError should be caught and returned as error text."""
55+
tool = _make_connection_safe_tool(TimeoutError("timed out"))
56+
57+
result = await tool.run_async(args={}, tool_context=MagicMock())
58+
59+
assert "error" in result
60+
assert "TimeoutError" in result["error"]
61+
62+
63+
@pytest.mark.asyncio
64+
async def test_httpx_connect_error_returns_error_dict():
65+
"""httpx.ConnectError should be caught via httpx.TransportError."""
66+
tool = _make_connection_safe_tool(httpx.ConnectError("connection refused"))
67+
68+
result = await tool.run_async(args={}, tool_context=MagicMock())
69+
70+
assert "error" in result
71+
assert "ConnectError" in result["error"]
72+
73+
74+
@pytest.mark.asyncio
75+
async def test_httpx_read_error_returns_error_dict():
76+
"""httpx.ReadError (connection reset by peer) should be caught."""
77+
tool = _make_connection_safe_tool(httpx.ReadError("peer closed connection"))
78+
79+
result = await tool.run_async(args={}, tool_context=MagicMock())
80+
81+
assert "error" in result
82+
assert "ReadError" in result["error"]
83+
84+
85+
@pytest.mark.asyncio
86+
async def test_httpx_connect_timeout_returns_error_dict():
87+
"""httpx.ConnectTimeout should be caught via httpx.TransportError."""
88+
tool = _make_connection_safe_tool(httpx.ConnectTimeout("timed out"))
89+
90+
result = await tool.run_async(args={}, tool_context=MagicMock())
91+
92+
assert "error" in result
93+
assert "ConnectTimeout" in result["error"]
94+
95+
96+
@pytest.mark.asyncio
97+
async def test_transport_mcp_error_returns_error_dict():
98+
"""McpError with a transport-level message (e.g., session read timeout) should be caught."""
99+
tool = _make_connection_safe_tool(McpError(ErrorData(code=-1, message="session read timeout")))
100+
101+
result = await tool.run_async(args={}, tool_context=MagicMock())
102+
103+
assert "error" in result
104+
assert "McpError" in result["error"]
105+
assert "session read timeout" in result["error"]
106+
107+
108+
@pytest.mark.asyncio
109+
async def test_protocol_mcp_error_still_raises():
110+
"""McpError with a protocol-level message (e.g., invalid arguments) should propagate."""
111+
tool = _make_connection_safe_tool(McpError(ErrorData(code=-32602, message="Invalid params: unknown tool")))
112+
113+
with pytest.raises(McpError, match="Invalid params"):
114+
await tool.run_async(args={}, tool_context=MagicMock())
115+
116+
117+
@pytest.mark.asyncio
118+
async def test_non_connection_error_still_raises():
119+
"""Non-connection errors (e.g. ValueError) should still propagate."""
120+
tool = _make_connection_safe_tool(ValueError("bad argument"))
121+
122+
with pytest.raises(ValueError, match="bad argument"):
123+
await tool.run_async(args={}, tool_context=MagicMock())
124+
125+
126+
@pytest.mark.asyncio
127+
async def test_cancelled_error_still_raises():
128+
"""CancelledError must propagate — it's not a connection error."""
129+
tool = _make_connection_safe_tool(asyncio.CancelledError("cancelled"))
130+
131+
with pytest.raises(asyncio.CancelledError):
132+
await tool.run_async(args={}, tool_context=MagicMock())
133+
134+
135+
@pytest.mark.asyncio
136+
async def test_get_tools_wraps_mcp_tools():
137+
"""KAgentMcpToolset.get_tools should wrap McpTool instances with ConnectionSafeMcpTool."""
138+
fake_mcp_tool = McpTool.__new__(McpTool)
139+
fake_mcp_tool.name = "wrapped-tool"
140+
fake_mcp_tool._some_attr = "value"
141+
142+
fake_other_tool = MagicMock()
143+
fake_other_tool.name = "other-tool"
144+
145+
toolset = KAgentMcpToolset.__new__(KAgentMcpToolset)
146+
147+
async def mock_super_get_tools(self_arg, readonly_context=None):
148+
return [fake_mcp_tool, fake_other_tool]
149+
150+
with patch.object(McpToolset, "get_tools", mock_super_get_tools):
151+
tools = await toolset.get_tools()
152+
153+
assert len(tools) == 2
154+
assert isinstance(tools[0], ConnectionSafeMcpTool)
155+
assert tools[0].name == "wrapped-tool"
156+
assert tools[0]._some_attr == "value"
157+
assert tools[1] is fake_other_tool

0 commit comments

Comments
 (0)