diff --git a/src/google/adk/tools/mcp_tool/mcp_thread_utils.py b/src/google/adk/tools/mcp_tool/mcp_thread_utils.py new file mode 100644 index 0000000000..769843c987 --- /dev/null +++ b/src/google/adk/tools/mcp_tool/mcp_thread_utils.py @@ -0,0 +1,155 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Thread-isolated MCP helpers for environments with anyio cancel scope constraints. + +Root cause +---------- +``anyio``'s ``CancelScope`` binds to the ``asyncio.Task`` that *enters* it. +On Vertex AI Agent Engine, the scheduler can context-switch tasks between +entering and exiting the scope inside ``streamablehttp_client``'s +``anyio.create_task_group()``, which raises:: + + Attempted to exit cancel scope in a different task than it was entered in + +Fix +--- +Run each MCP operation inside a dedicated thread via ``asyncio.to_thread()``. +Inside that thread, ``asyncio.new_event_loop()`` creates an isolated event +loop. The ``anyio`` cancel scope is created *and* destroyed entirely within +that loop, so it never crosses task boundaries in the caller's scheduler. + +Trade-offs +---------- +* A new HTTP connection is opened per tool call (no session reuse). +* ``progress_callback`` and MCP sampling are not supported in this path. +* Auth headers are threaded through, so ``auth_scheme``/``auth_credential`` + and ``header_provider`` on ``McpToolset`` remain functional. +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Any, Dict, Optional + +from mcp import types +from mcp.client import streamable_http +from mcp.client.session import ClientSession + + +def _cancel_pending(loop: asyncio.AbstractEventLoop) -> None: + """Cancel all pending tasks so the loop can be closed without warnings.""" + pending = asyncio.all_tasks(loop) + if not pending: + return + for task in pending: + task.cancel() + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + + +def list_tools_in_thread( + url: str, + headers: Optional[Dict[str, str]] = None, +) -> list[Any]: + """Return tools/list results from an MCP server via an isolated event loop. + + Opens a fresh connection for every call; must be invoked from a non-async + context (i.e. via ``asyncio.to_thread``). + + Args: + url: The MCP server URL. + headers: Optional HTTP headers to include in the request. + + Returns: + A list of ``mcp.types.Tool`` objects. + """ + + async def _async(): + kwargs = {"headers": headers} if headers else {} + async with streamable_http.streamable_http_client(url, **kwargs) as ( + read, + write, + _, + ): + async with ClientSession(read, write) as session: + await session.initialize() + result = await session.list_tools() + return result.tools + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(_async()) + finally: + _cancel_pending(loop) + loop.close() + + +def call_tool_in_thread( + url: str, + tool_name: str, + arguments: Dict[str, Any], + headers: Optional[Dict[str, str]] = None, +) -> Any: + """Call an MCP tool via an isolated event loop and return the parsed result. + + Opens a fresh connection for every call; must be invoked from a non-async + context (i.e. via ``asyncio.to_thread``). + + Args: + url: The MCP server URL. + tool_name: The name of the tool to call. + arguments: The arguments to pass to the tool. + headers: Optional HTTP headers to include in the request. + + Returns: + The parsed tool result (JSON-decoded dict, or ``{"text": ...}`` fallback). + + Raises: + RuntimeError: If the MCP server returns an error response. + """ + + async def _async(): + kwargs = {"headers": headers} if headers else {} + async with streamable_http.streamable_http_client(url, **kwargs) as ( + read, + write, + _, + ): + async with ClientSession(read, write) as session: + await session.initialize() + result = await session.call_tool(tool_name, arguments) + if result.isError: + raise RuntimeError( + f"MCP tool '{tool_name}' returned an error: {result.content}" + ) + if result.content: + first = result.content[0] + if isinstance(first, types.TextContent): + try: + return json.loads(first.text) + except json.JSONDecodeError: + return {"text": first.text} + return {"content": str(first)} + return {} + + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(_async()) + finally: + _cancel_pending(loop) + loop.close() diff --git a/src/google/adk/tools/mcp_tool/mcp_tool.py b/src/google/adk/tools/mcp_tool/mcp_tool.py index 6a24651f92..7e78fc3b9e 100644 --- a/src/google/adk/tools/mcp_tool/mcp_tool.py +++ b/src/google/adk/tools/mcp_tool/mcp_tool.py @@ -57,6 +57,7 @@ from ..tool_context import ToolContext from .mcp_session_manager import MCPSessionManager from .mcp_session_manager import retry_on_errors +from .mcp_session_manager import StreamableHTTPConnectionParams from .session_context import SessionContext logger = logging.getLogger("google_adk." + __name__) @@ -148,6 +149,7 @@ def __init__( progress_callback: Optional[ Union[ProgressFnT, ProgressCallbackFactory] ] = None, + use_isolated_event_loop: bool = False, ): """Initializes an McpTool. @@ -174,9 +176,19 @@ def __init__( The factory receives (tool_name, callback_context, **kwargs) and returns a ProgressFnT or None. This allows callbacks to access and modify runtime context like session state. + + use_isolated_event_loop: When ``True``, each tool call runs in a + dedicated thread with an isolated ``asyncio`` event loop. This avoids + the anyio ``CancelScope`` cross-task error that occurs on Vertex AI + Agent Engine when using ``StreamableHTTPConnectionParams``. Only + supported with ``StreamableHTTPConnectionParams``; raises + ``ValueError`` for other transport types. Note: ``progress_callback`` + is not invoked in this mode. Raises: - ValueError: If mcp_tool or mcp_session_manager is None. + ValueError: If mcp_tool or mcp_session_manager is None, or if + ``use_isolated_event_loop=True`` is combined with a non-streamable- + HTTP transport. """ # --- BEGIN BOUND TOKEN PATCH --- @@ -197,11 +209,25 @@ def __init__( if auth_scheme else None, ) + if use_isolated_event_loop and not isinstance( + mcp_session_manager._connection_params, StreamableHTTPConnectionParams + ): + raise ValueError( + "use_isolated_event_loop=True is only supported with" + " StreamableHTTPConnectionParams." + ) + self._mcp_tool = mcp_tool self._mcp_session_manager = mcp_session_manager self._require_confirmation = require_confirmation self._header_provider = header_provider self._progress_callback = progress_callback + self._use_isolated_event_loop = use_isolated_event_loop + self._server_url: Optional[str] = None + if use_isolated_event_loop and isinstance( + mcp_session_manager._connection_params, StreamableHTTPConnectionParams + ): + self._server_url = mcp_session_manager._connection_params.url @override def _get_declaration(self) -> FunctionDeclaration: @@ -397,6 +423,21 @@ async def _run_async_impl( headers.update(dynamic_headers) final_headers = headers if headers else None + if self._use_isolated_event_loop: + # Run in a dedicated thread with an isolated event loop to avoid the + # anyio CancelScope cross-task error on Vertex AI Agent Engine. + # See mcp_thread_utils.py for the full explanation. + from .mcp_thread_utils import call_tool_in_thread # pylint: disable=g-import-not-at-top + + assert self._server_url is not None + return await asyncio.to_thread( + call_tool_in_thread, + self._server_url, + self.name, + args, + final_headers, + ) + # Propagate trace context in the _meta field as sprcified by MCP protocol. # See https://agentclientprotocol.com/protocol/extensibility#the-meta-field trace_carrier: Dict[str, str] = {} @@ -576,7 +617,8 @@ async def _get_headers( ) } elif credential.service_account: - # Service accounts should be exchanged for access tokens before reaching this point + # Service accounts should be exchanged for access tokens before + # reaching this point logger.warning( "Service account credentials should be exchanged before MCP" " session creation" diff --git a/src/google/adk/tools/mcp_tool/mcp_toolset.py b/src/google/adk/tools/mcp_tool/mcp_toolset.py index c566c52746..a6ec6cbe39 100644 --- a/src/google/adk/tools/mcp_tool/mcp_toolset.py +++ b/src/google/adk/tools/mcp_tool/mcp_toolset.py @@ -118,6 +118,7 @@ def __init__( use_mcp_resources: Optional[bool] = False, sampling_callback: Optional[SamplingFnT] = None, sampling_capabilities: Optional[SamplingCapability] = None, + use_isolated_event_loop: bool = False, ): """Initializes the McpToolset. @@ -157,6 +158,14 @@ def __init__( sampling_callback: Optional callback to handle sampling requests from the MCP server. sampling_capabilities: Optional capabilities for sampling. + use_isolated_event_loop: When ``True``, each MCP operation (tool + discovery and tool calls) runs in a dedicated thread with an isolated + ``asyncio`` event loop. This avoids the anyio ``CancelScope`` cross-task + error that occurs on Vertex AI Agent Engine when using + ``StreamableHTTPConnectionParams``. Only supported with + ``StreamableHTTPConnectionParams``; raises ``ValueError`` for other + transport types. Note: ``progress_callback`` and MCP sampling are not + invoked in this mode. """ # --- BEGIN BOUND TOKEN PATCH --- @@ -176,6 +185,14 @@ def __init__( if not connection_params: raise ValueError("Missing connection params in McpToolset.") + if use_isolated_event_loop and not isinstance( + connection_params, StreamableHTTPConnectionParams + ): + raise ValueError( + "use_isolated_event_loop=True is only supported with" + " StreamableHTTPConnectionParams." + ) + self._connection_params = connection_params self._errlog = errlog self._header_provider = header_provider @@ -202,6 +219,7 @@ def __init__( else None ) self._use_mcp_resources = use_mcp_resources + self._use_isolated_event_loop = use_isolated_event_loop def _get_auth_headers( self, readonly_context: Optional[ReadonlyContext] = None @@ -340,6 +358,50 @@ async def get_tools( Returns: List[BaseTool]: A list of tools available under the specified context. """ + + if self._use_isolated_event_loop: + # Discover tools via an isolated thread to avoid the anyio CancelScope + # cross-task error on Vertex AI Agent Engine. + # See mcp_thread_utils.py for the full explanation. + from .mcp_thread_utils import list_tools_in_thread # pylint: disable=g-import-not-at-top + + headers: Dict[str, str] = {} + auth_headers = self._get_auth_headers(readonly_context) + if auth_headers: + headers.update(auth_headers) + if self._header_provider and readonly_context: + provider_headers = self._header_provider(readonly_context) + if provider_headers: + headers.update(provider_headers) + + assert isinstance( + self._connection_params, StreamableHTTPConnectionParams + ) + raw_tools = await asyncio.to_thread( + list_tools_in_thread, + self._connection_params.url, + headers or None, + ) + tools = [] + for tool in raw_tools: + mcp_tool = MCPTool( + mcp_tool=tool, + mcp_session_manager=self._mcp_session_manager, + auth_scheme=self._auth_scheme, + auth_credential=self._auth_credential, + require_confirmation=self._require_confirmation, + header_provider=self._header_provider, + use_isolated_event_loop=True, + ) + if readonly_context is None or self._is_tool_selected( + mcp_tool, readonly_context + ): + tools.append(mcp_tool) + if self._use_mcp_resources: + tools.append(LoadMcpResourceTool(mcp_toolset=self)) + if self._use_mcp_resources: + tools.append(LoadMcpResourceTool(mcp_toolset=self)) + return tools # Fetch available tools from the MCP server tools_response: ListToolsResult = await self._execute_with_session( lambda session: session.list_tools(),