Skip to content

Commit 77668cd

Browse files
committed
feat: add streamable-http support
1 parent b43ea06 commit 77668cd

6 files changed

Lines changed: 453 additions & 174 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-mcp"
3-
version = "0.1.0"
3+
version = "0.1.1"
44
description = "UiPath MCP SDK"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath_mcp/_cli/_runtime/_factory.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,26 @@ async def new_runtime(
110110
McpErrorCode.SERVER_NOT_FOUND,
111111
"MCP server not found",
112112
f"Server '{entrypoint}' not found. Available: {available}",
113-
UiPathErrorCategory.DEPLOYMENT,
113+
UiPathErrorCategory.USER,
114114
)
115115

116+
# Validate streamable-http configuration
117+
if server.is_streamable_http:
118+
if not server.url:
119+
raise UiPathMcpRuntimeError(
120+
McpErrorCode.CONFIGURATION_ERROR,
121+
"Invalid configuration",
122+
f"Server '{entrypoint}' uses streamable-http transport but 'url' is not specified in mcp.json",
123+
UiPathErrorCategory.USER,
124+
)
125+
if not server.command or server.command == "None":
126+
raise UiPathMcpRuntimeError(
127+
McpErrorCode.CONFIGURATION_ERROR,
128+
"Invalid configuration",
129+
f"Server '{entrypoint}' uses streamable-http transport but 'command' is not specified in mcp.json",
130+
UiPathErrorCategory.USER,
131+
)
132+
116133
# Validate runtime_id is a valid UUID, generate new one if not
117134
try:
118135
uuid.UUID(runtime_id)

src/uipath_mcp/_cli/_runtime/_runtime.py

Lines changed: 221 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from .._utils._config import McpServer
3838
from ._context import UiPathServerType
3939
from ._exception import McpErrorCode, UiPathMcpRuntimeError
40-
from ._session import SessionServer
40+
from ._session import BaseSessionServer, StdioSessionServer, StreamableHttpSessionServer
4141

4242
logger = logging.getLogger(__name__)
4343
tracer = trace.get_tracer(__name__)
@@ -76,10 +76,14 @@ def __init__(
7676
self._server_slug = server_slug
7777

7878
self._signalr_client: SignalRClient | None = None
79-
self._session_servers: dict[str, SessionServer] = {}
79+
self._session_servers: dict[str, BaseSessionServer] = {}
8080
self._session_output: str | None = None
8181
self._cancel_event = asyncio.Event()
8282
self._keep_alive_task: asyncio.Task[None] | None = None
83+
self._http_server_process: asyncio.subprocess.Process | None = None
84+
self._http_monitor_task: asyncio.Task[None] | None = None
85+
self._http_stderr_drain_task: asyncio.Task[None] | None = None
86+
self._http_server_stderr_lines: list[str] = []
8387
self._uipath = UiPath()
8488
self._cleanup_done = False
8589

@@ -223,6 +227,12 @@ async def _run_server(self) -> UiPathRuntimeResult:
223227
# Register the local server with UiPath MCP Server
224228
await self._register()
225229

230+
# Start HTTP server process monitor if using streamable-http
231+
if self._server.is_streamable_http:
232+
self._http_monitor_task = asyncio.create_task(
233+
self._monitor_http_server_process()
234+
)
235+
226236
run_task = asyncio.create_task(self._signalr_client.run())
227237
cancel_task = asyncio.create_task(self._cancel_event.wait())
228238
self._keep_alive_task = asyncio.create_task(self._keep_alive())
@@ -294,12 +304,15 @@ async def _cleanup(self) -> None:
294304
except asyncio.CancelledError:
295305
pass
296306

297-
for session_id, session_server in self._session_servers.items():
307+
for session_id, session_server in list(self._session_servers.items()):
298308
try:
299309
await session_server.stop()
300310
except Exception as e:
301311
logger.error(f"Error cleaning up session server {session_id}: {str(e)}")
302312

313+
# Stop the shared HTTP server process (streamable-http only)
314+
await self._stop_http_server_process()
315+
303316
if self._signalr_client and hasattr(self._signalr_client, "_transport"):
304317
transport = self._signalr_client._transport
305318
if transport and hasattr(transport, "_ws") and transport._ws:
@@ -358,8 +371,15 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
358371
try:
359372
# Check if we have a session server for this session_id
360373
if session_id not in self._session_servers:
361-
# Create and start a new session server
362-
session_server = SessionServer(self._server, self.slug, session_id)
374+
session_server: BaseSessionServer
375+
if self._server.is_streamable_http:
376+
session_server = StreamableHttpSessionServer(
377+
self._server, self.slug, session_id
378+
)
379+
else:
380+
session_server = StdioSessionServer(
381+
self._server, self.slug, session_id
382+
)
363383
try:
364384
await session_server.start()
365385
except Exception as e:
@@ -393,6 +413,156 @@ async def _handle_signalr_close(self) -> None:
393413
"""Handle SignalR connection close event."""
394414
logger.info("Websocket connection closed.")
395415

416+
async def _start_http_server_process(self) -> None:
417+
"""Spawn the streamable-http server process.
418+
419+
The process is started once and shared across all sessions.
420+
"""
421+
env_vars = self._server.env.copy()
422+
if self.server_type is UiPathServerType.Coded:
423+
for name, value in os.environ.items():
424+
if name not in env_vars:
425+
env_vars[name] = value
426+
427+
merged_env = {**os.environ, **env_vars} if env_vars else None
428+
self._http_server_stderr_lines = []
429+
self._http_server_process = await asyncio.create_subprocess_exec(
430+
self._server.command,
431+
*self._server.args,
432+
env=merged_env,
433+
stdout=asyncio.subprocess.DEVNULL,
434+
stderr=asyncio.subprocess.PIPE,
435+
)
436+
self._http_stderr_drain_task = asyncio.create_task(self._drain_http_stderr())
437+
logger.info(
438+
f"Started HTTP server process (PID: {self._http_server_process.pid}) "
439+
f"for {self._server.url}"
440+
)
441+
442+
async def _drain_http_stderr(self) -> None:
443+
"""Continuously read and log stderr from the HTTP server process.
444+
445+
Accumulates output in _http_server_stderr_lines for error reporting.
446+
"""
447+
if not self._http_server_process or not self._http_server_process.stderr:
448+
return
449+
try:
450+
async for line in self._http_server_process.stderr:
451+
decoded = line.decode("utf-8", errors="replace").rstrip()
452+
self._http_server_stderr_lines.append(decoded)
453+
logger.debug(f"HTTP server stderr: {decoded}")
454+
except asyncio.CancelledError:
455+
pass
456+
457+
async def _wait_for_http_server_ready(
458+
self,
459+
max_retries: int = 30,
460+
retry_delay: float = 1.0,
461+
) -> None:
462+
"""Wait for the HTTP server to start accepting connections."""
463+
import httpx
464+
465+
url = self._server.url
466+
if not url:
467+
raise ValueError("streamable-http transport requires url in config")
468+
469+
for attempt in range(max_retries):
470+
# Check if process has crashed
471+
if (
472+
self._http_server_process
473+
and self._http_server_process.returncode is not None
474+
):
475+
stderr_output = "\n".join(self._http_server_stderr_lines)
476+
raise UiPathMcpRuntimeError(
477+
McpErrorCode.INITIALIZATION_ERROR,
478+
"HTTP server process exited unexpectedly",
479+
f"Exit code: {self._http_server_process.returncode}\n{stderr_output}",
480+
UiPathErrorCategory.SYSTEM,
481+
)
482+
483+
try:
484+
async with httpx.AsyncClient() as client:
485+
response = await client.get(url, timeout=2.0)
486+
logger.info(
487+
f"HTTP server is ready (status: {response.status_code})"
488+
)
489+
return
490+
except (httpx.ConnectError, httpx.ConnectTimeout) as err:
491+
if attempt < max_retries - 1:
492+
logger.debug(
493+
f"HTTP server not ready yet, retrying in {retry_delay}s "
494+
f"(attempt {attempt + 1}/{max_retries})"
495+
)
496+
await asyncio.sleep(retry_delay)
497+
else:
498+
raise UiPathMcpRuntimeError(
499+
McpErrorCode.INITIALIZATION_ERROR,
500+
"HTTP server failed to start",
501+
f"Server at {url} did not become ready after {max_retries} attempts",
502+
UiPathErrorCategory.SYSTEM,
503+
) from err
504+
except httpx.HTTPStatusError:
505+
# Server responded with an error status code
506+
logger.info("HTTP server is ready (responded with error, but is up)")
507+
return
508+
509+
async def _stop_http_server_process(self) -> None:
510+
"""Stop the shared HTTP server process."""
511+
if self._http_monitor_task and not self._http_monitor_task.done():
512+
self._http_monitor_task.cancel()
513+
try:
514+
await self._http_monitor_task
515+
except asyncio.CancelledError:
516+
pass
517+
self._http_monitor_task = None
518+
519+
if self._http_server_process:
520+
try:
521+
self._http_server_process.terminate()
522+
try:
523+
await asyncio.wait_for(
524+
self._http_server_process.wait(), timeout=5.0
525+
)
526+
except asyncio.TimeoutError:
527+
self._http_server_process.kill()
528+
await self._http_server_process.wait()
529+
except ProcessLookupError:
530+
pass
531+
finally:
532+
logger.info("HTTP server process stopped")
533+
self._http_server_process = None
534+
535+
if self._http_stderr_drain_task and not self._http_stderr_drain_task.done():
536+
self._http_stderr_drain_task.cancel()
537+
try:
538+
await self._http_stderr_drain_task
539+
except asyncio.CancelledError:
540+
pass
541+
self._http_stderr_drain_task = None
542+
543+
async def _monitor_http_server_process(self) -> None:
544+
"""Monitor the HTTP server process and handle unexpected exits."""
545+
if not self._http_server_process:
546+
return
547+
try:
548+
returncode = await self._http_server_process.wait()
549+
if not self._cancel_event.is_set():
550+
logger.error(
551+
f"HTTP server process exited unexpectedly with code {returncode}"
552+
)
553+
# Stop all HTTP sessions, they will fail on next request anyway
554+
for session_id, session_server in list(self._session_servers.items()):
555+
if isinstance(session_server, StreamableHttpSessionServer):
556+
try:
557+
await session_server.stop()
558+
except Exception as e:
559+
logger.error(
560+
f"Error stopping session {session_id} after process crash: {e}"
561+
)
562+
self._session_servers.pop(session_id, None)
563+
except asyncio.CancelledError:
564+
pass
565+
396566
async def _register(self) -> None:
397567
"""Register the MCP server with UiPath."""
398568

@@ -409,36 +579,63 @@ async def _register(self) -> None:
409579
env_vars[name] = value
410580

411581
try:
412-
# Create a temporary session to get tools
413-
server_params = StdioServerParameters(
414-
command=self._server.command,
415-
args=self._server.args,
416-
env=env_vars,
417-
)
582+
if self._server.is_streamable_http:
583+
# spawn process, wait for readiness, connect via HTTP
584+
await self._start_http_server_process()
585+
await self._wait_for_http_server_ready()
586+
587+
from mcp.client.streamable_http import streamable_http_client
418588

419-
# Start a temporary stdio client to get tools
420-
# Use a temporary file to capture stderr
421-
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
422-
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
423-
async with stdio_client(server_params, errlog=stderr_temp) as (
589+
if self._server.url is None:
590+
raise UiPathMcpRuntimeError(
591+
McpErrorCode.CONFIGURATION_ERROR,
592+
"Missing URL for streamable-http server",
593+
"Please specify a 'url' in the server configuration for streamable-http transport.",
594+
UiPathErrorCategory.SYSTEM,
595+
)
596+
async with streamable_http_client(self._server.url) as (
424597
read,
425598
write,
599+
_,
426600
):
427601
async with ClientSession(read, write) as session:
428-
logger.info("Initializing client session...")
429-
# Try to initialize with timeout
602+
logger.info("Initializing client session (streamable-http)...")
430603
try:
431604
await asyncio.wait_for(session.initialize(), timeout=30)
432605
initialization_successful = True
433-
logger.info("Initialization successful")
434-
435-
# Only proceed if initialization was successful
436606
tools_result = await session.list_tools()
607+
logger.info(f"Discovered {len(tools_result.tools)} tool(s)")
437608
except Exception as err:
438609
logger.error(f"Initialization error: {err}")
439-
# Capture stderr output here, after the timeout
440-
stderr_temp.seek(0)
441-
server_stderr_output = stderr_temp.read()
610+
server_stderr_output = "\n".join(
611+
self._http_server_stderr_lines
612+
)
613+
logger.info("Registration session closed (DELETE sent to server)")
614+
else:
615+
# spawn temporary process, discover tools, process dies with context
616+
server_params = StdioServerParameters(
617+
command=self._server.command,
618+
args=self._server.args,
619+
env=env_vars,
620+
)
621+
622+
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
623+
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
624+
async with stdio_client(server_params, errlog=stderr_temp) as (
625+
read,
626+
write,
627+
):
628+
async with ClientSession(read, write) as session:
629+
logger.info("Initializing client session...")
630+
try:
631+
await asyncio.wait_for(session.initialize(), timeout=30)
632+
initialization_successful = True
633+
logger.info("Initialization successful")
634+
tools_result = await session.list_tools()
635+
except Exception as err:
636+
logger.error(f"Initialization error: {err}")
637+
stderr_temp.seek(0)
638+
server_stderr_output = stderr_temp.read()
442639

443640
except* Exception as eg:
444641
for e in eg.exceptions:

0 commit comments

Comments
 (0)