Skip to content

Commit 9a5fc2d

Browse files
edis-uipathclaude
andauthored
feat: mcp client with 404 recovery (#502)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent a48b8d6 commit 9a5fc2d

13 files changed

Lines changed: 2776 additions & 81 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-langchain"
3-
version = "0.5.27"
3+
version = "0.5.28"
44
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath_langchain/agent/tools/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from .extraction_tool import create_ixp_extraction_tool
66
from .integration_tool import create_integration_tool
77
from .ixp_escalation_tool import create_ixp_escalation_tool
8-
from .mcp_tool import create_mcp_tools
98
from .process_tool import create_process_tool
109
from .tool_factory import (
1110
create_tools_from_resources,
@@ -19,7 +18,6 @@
1918
"create_process_tool",
2019
"create_integration_tool",
2120
"create_escalation_tool",
22-
"create_mcp_tools",
2321
"create_ixp_extraction_tool",
2422
"create_ixp_escalation_tool",
2523
"UiPathToolNode",
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
"""MCP (Model Context Protocol) tools."""
2+
3+
from .mcp_client import McpClient
4+
from .mcp_tool import (
5+
create_mcp_tools,
6+
create_mcp_tools_from_agent,
7+
create_mcp_tools_from_metadata_for_mcp_server,
8+
)
9+
10+
__all__ = [
11+
"McpClient",
12+
"create_mcp_tools",
13+
"create_mcp_tools_from_agent",
14+
"create_mcp_tools_from_metadata_for_mcp_server",
15+
]

src/uipath_langchain/agent/tools/mcp/claude.md

Lines changed: 433 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
"""MCP Session management for tool invocations.
2+
3+
This module provides a session class that manages the lifecycle of MCP connections,
4+
including automatic reconnection on session disconnect errors.
5+
"""
6+
7+
import asyncio
8+
import logging
9+
from contextlib import AsyncExitStack
10+
from typing import Any
11+
12+
import httpx
13+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
14+
from mcp import ClientSession
15+
from mcp.client.streamable_http import (
16+
GetSessionIdCallback,
17+
streamable_http_client,
18+
)
19+
from mcp.shared.exceptions import McpError
20+
from mcp.shared.message import SessionMessage
21+
from mcp.types import CallToolResult
22+
from uipath._utils._ssl_context import get_httpx_client_kwargs
23+
from uipath.runtime.base import UiPathDisposableProtocol
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
class McpClient(UiPathDisposableProtocol):
29+
"""Manages an MCP session for tool invocations.
30+
31+
This class handles the lifecycle of MCP connections with two distinct phases:
32+
33+
1. **Client Initialization** (first call):
34+
- Creates HTTP client
35+
- Establishes streamable HTTP connection
36+
- Creates ClientSession
37+
- Calls session.initialize() to get session ID
38+
39+
2. **Session Reinitialization** (on 404 error):
40+
- Reuses existing HTTP client and streamable connection
41+
- Calls session.initialize() again to get new session ID
42+
43+
Thread-safety is ensured via asyncio.Lock for both phases.
44+
"""
45+
46+
# Error codes that indicate session disconnect/termination
47+
SESSION_ERROR_CODES = [32600, -32000]
48+
49+
def __init__(
50+
self,
51+
url: str,
52+
headers: dict[str, str] | None = None,
53+
timeout: httpx.Timeout | None = None,
54+
max_retries: int = 1,
55+
) -> None:
56+
"""Initialize the MCP tool session.
57+
58+
Args:
59+
url: The MCP server endpoint URL.
60+
headers: Optional headers to include in HTTP requests.
61+
timeout: Optional timeout configuration for HTTP requests.
62+
max_retries: Maximum number of retries on session disconnect errors.
63+
"""
64+
self._url = url
65+
self._headers = headers or {}
66+
self._timeout = timeout or httpx.Timeout(600)
67+
self._max_retries = max_retries
68+
69+
# Lock for both client initialization and session reinitialization
70+
self._lock = asyncio.Lock()
71+
72+
# Client state (created once, reused across session reinitializations)
73+
self._http_client: httpx.AsyncClient | None = None
74+
self._read_stream: (
75+
MemoryObjectReceiveStream[SessionMessage | Exception] | None
76+
) = None
77+
self._write_stream: MemoryObjectSendStream[SessionMessage] | None = None
78+
self._get_session_id: GetSessionIdCallback | None = None
79+
self._stack: AsyncExitStack | None = None
80+
81+
# Session state (can be reinitialized without recreating client)
82+
self._session: ClientSession | None = None
83+
self._session_id: str | None = None
84+
self._client_initialized: bool = False
85+
86+
@property
87+
def session_id(self) -> str | None:
88+
"""Get the current session ID."""
89+
return self._session_id
90+
91+
@property
92+
def is_client_initialized(self) -> bool:
93+
"""Check if the HTTP client and streamable connection are initialized."""
94+
return self._client_initialized
95+
96+
async def _initialize_client(self) -> None:
97+
"""Initialize the HTTP client and streamable connection.
98+
99+
This is called once on first use. Creates:
100+
- httpx.AsyncClient
101+
- Streamable HTTP connection (read/write streams)
102+
- ClientSession
103+
104+
Then calls _initialize_session() to complete the MCP handshake.
105+
"""
106+
logger.debug("Initializing MCP client")
107+
108+
# Create exit stack for resource management
109+
self._stack = AsyncExitStack()
110+
await self._stack.__aenter__()
111+
112+
# Create HTTP client with SSL, proxy, and redirect settings
113+
default_client_kwargs = get_httpx_client_kwargs()
114+
client_kwargs = {
115+
**default_client_kwargs,
116+
"headers": self._headers,
117+
"timeout": self._timeout,
118+
}
119+
self._http_client = await self._stack.enter_async_context(
120+
httpx.AsyncClient(**client_kwargs)
121+
)
122+
123+
# Create streamable HTTP connection
124+
(
125+
self._read_stream,
126+
self._write_stream,
127+
self._get_session_id,
128+
) = await self._stack.enter_async_context(
129+
streamable_http_client(
130+
url=self._url,
131+
http_client=self._http_client,
132+
)
133+
)
134+
135+
# Create ClientSession (but don't initialize yet)
136+
# These are guaranteed to be set by the context manager above
137+
assert self._read_stream is not None
138+
assert self._write_stream is not None
139+
self._session = await self._stack.enter_async_context(
140+
ClientSession(self._read_stream, self._write_stream)
141+
)
142+
143+
self._client_initialized = True
144+
logger.info("MCP client initialized")
145+
146+
# Now initialize the MCP session
147+
await self._initialize_session()
148+
149+
async def _initialize_session(self) -> None:
150+
"""Initialize or reinitialize the MCP session.
151+
152+
Calls session.initialize() to perform the MCP handshake and obtain
153+
a session ID from the server. Can be called multiple times on the
154+
same ClientSession to recover from session disconnects.
155+
156+
Requires: Client must be initialized first (_initialize_client).
157+
"""
158+
if self._session is None:
159+
raise RuntimeError("Cannot initialize session: client not initialized")
160+
161+
logger.debug(f"Initializing MCP session (previous: {self._session_id})")
162+
163+
await self._session.initialize()
164+
self._session_id = self._get_session_id() # type: ignore[misc]
165+
166+
logger.info(f"MCP session initialized: {self._session_id}")
167+
168+
async def _ensure_session(self) -> ClientSession:
169+
"""Ensure client and session are initialized, return the session.
170+
171+
Thread-safe via lock. Only initializes once; subsequent calls
172+
return the existing session immediately.
173+
174+
Returns:
175+
The initialized ClientSession.
176+
"""
177+
if not self._client_initialized:
178+
async with self._lock:
179+
if not self._client_initialized:
180+
await self._initialize_client()
181+
182+
return self._session # type: ignore[return-value]
183+
184+
async def _reinitialize_session(self) -> None:
185+
"""Reinitialize only the MCP session after a disconnect error.
186+
187+
Thread-safe via lock. Reuses existing HTTP client and streamable
188+
connection; only performs a new MCP handshake.
189+
"""
190+
async with self._lock:
191+
if not self._client_initialized:
192+
# Client not initialized, do full initialization
193+
await self._initialize_client()
194+
else:
195+
# Client exists, just reinitialize session
196+
await self._initialize_session()
197+
198+
def _is_session_error(self, error: McpError) -> bool:
199+
"""Check if an McpError indicates a session disconnect.
200+
201+
Args:
202+
error: The McpError to check.
203+
204+
Returns:
205+
True if the error indicates a session disconnect.
206+
"""
207+
return (
208+
hasattr(error, "error")
209+
and hasattr(error.error, "code")
210+
and error.error.code in self.SESSION_ERROR_CODES
211+
)
212+
213+
async def call_tool(
214+
self,
215+
name: str,
216+
arguments: dict[str, Any] | None = None,
217+
) -> CallToolResult:
218+
"""Call an MCP tool with automatic retry on session disconnect.
219+
220+
On first call, initializes the full client stack. On session
221+
disconnect (404/32600), reinitializes only the session and retries.
222+
223+
Args:
224+
name: The name of the tool to call.
225+
arguments: Optional arguments to pass to the tool.
226+
227+
Returns:
228+
The tool call result.
229+
230+
Raises:
231+
McpError: If the tool call fails after all retries.
232+
"""
233+
retry_count = 0
234+
235+
while retry_count <= self._max_retries:
236+
try:
237+
session = await self._ensure_session()
238+
logger.debug(
239+
f"Calling tool {name} (attempt {retry_count + 1}/{self._max_retries + 1})"
240+
)
241+
result = await session.call_tool(name, arguments=arguments)
242+
logger.info(f"Tool call successful: {name}")
243+
return result
244+
245+
except McpError as e:
246+
logger.info(f"McpError during tool call: {e}")
247+
248+
if self._is_session_error(e) and retry_count < self._max_retries:
249+
logger.warning(
250+
f"Session disconnected (error code: {e.error.code}), "
251+
f"reinitializing session"
252+
)
253+
await self._reinitialize_session()
254+
retry_count += 1
255+
continue
256+
else:
257+
if retry_count >= self._max_retries:
258+
logger.error(f"Max retries reached after session error: {e}")
259+
else:
260+
logger.error(f"Non-retryable MCP error: {e}")
261+
raise
262+
263+
# Should not reach here, but just in case
264+
raise RuntimeError("Exited retry loop unexpectedly")
265+
266+
async def dispose(self) -> None:
267+
"""Dispose of the client and release all resources.
268+
269+
Implements UiPathDisposableProtocol.
270+
Releases the HTTP client, streamable connection, and ClientSession.
271+
After calling dispose(), the client can be reused - a new call_tool()
272+
will reinitialize everything.
273+
"""
274+
async with self._lock:
275+
if self._stack is not None:
276+
try:
277+
await self._stack.__aexit__(None, None, None)
278+
except Exception as e:
279+
logger.debug(f"Error during cleanup: {e}")
280+
finally:
281+
self._stack = None
282+
self._session = None
283+
self._session_id = None
284+
self._http_client = None
285+
self._read_stream = None
286+
self._write_stream = None
287+
self._get_session_id = None
288+
self._client_initialized = False
289+
290+
logger.info("MCP client disposed")

0 commit comments

Comments
 (0)