Skip to content

Commit f9c8cd9

Browse files
committed
feat: add streamable-http support
1 parent c2e8d7a commit f9c8cd9

4 files changed

Lines changed: 440 additions & 171 deletions

File tree

src/uipath_mcp/_cli/_runtime/_factory.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,26 @@ async def new_runtime(
110110
McpErrorCode.SERVER_NOT_FOUND,
111111
"MCP server not found",
112112
f"Server '{entrypoint}' not found. Available: {available}",
113-
UiPathErrorCategory.DEPLOYMENT,
113+
UiPathErrorCategory.USER,
114114
)
115115

116+
# Validate streamable-http configuration
117+
if server.is_streamable_http:
118+
if not server.url:
119+
raise UiPathMcpRuntimeError(
120+
McpErrorCode.CONFIGURATION_ERROR,
121+
"Invalid configuration",
122+
f"Server '{entrypoint}' uses streamable-http transport but 'url' is not specified in mcp.json",
123+
UiPathErrorCategory.USER,
124+
)
125+
if not server.command or server.command == "None":
126+
raise UiPathMcpRuntimeError(
127+
McpErrorCode.CONFIGURATION_ERROR,
128+
"Invalid configuration",
129+
f"Server '{entrypoint}' uses streamable-http transport but 'command' is not specified in mcp.json",
130+
UiPathErrorCategory.USER,
131+
)
132+
116133
# Validate runtime_id is a valid UUID, generate new one if not
117134
try:
118135
uuid.UUID(runtime_id)

src/uipath_mcp/_cli/_runtime/_runtime.py

Lines changed: 218 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from .._utils._config import McpServer
3838
from ._context import UiPathServerType
3939
from ._exception import McpErrorCode, UiPathMcpRuntimeError
40-
from ._session import SessionServer
40+
from ._session import BaseSessionServer, SessionServer, StreamableHttpSessionServer
4141

4242
logger = logging.getLogger(__name__)
4343
tracer = trace.get_tracer(__name__)
@@ -76,10 +76,14 @@ def __init__(
7676
self._server_slug = server_slug
7777

7878
self._signalr_client: SignalRClient | None = None
79-
self._session_servers: dict[str, SessionServer] = {}
79+
self._session_servers: dict[str, BaseSessionServer] = {}
8080
self._session_output: str | None = None
8181
self._cancel_event = asyncio.Event()
8282
self._keep_alive_task: asyncio.Task[None] | None = None
83+
self._http_server_process: asyncio.subprocess.Process | None = None
84+
self._http_monitor_task: asyncio.Task[None] | None = None
85+
self._http_stderr_drain_task: asyncio.Task[None] | None = None
86+
self._http_server_stderr_lines: list[str] = []
8387
self._uipath = UiPath()
8488
self._cleanup_done = False
8589

@@ -223,6 +227,12 @@ async def _run_server(self) -> UiPathRuntimeResult:
223227
# Register the local server with UiPath MCP Server
224228
await self._register()
225229

230+
# Start HTTP server process monitor if using streamable-http
231+
if self._server.is_streamable_http:
232+
self._http_monitor_task = asyncio.create_task(
233+
self._monitor_http_server_process()
234+
)
235+
226236
run_task = asyncio.create_task(self._signalr_client.run())
227237
cancel_task = asyncio.create_task(self._cancel_event.wait())
228238
self._keep_alive_task = asyncio.create_task(self._keep_alive())
@@ -300,6 +310,9 @@ async def _cleanup(self) -> None:
300310
except Exception as e:
301311
logger.error(f"Error cleaning up session server {session_id}: {str(e)}")
302312

313+
# Stop the shared HTTP server process (streamable-http only)
314+
await self._stop_http_server_process()
315+
303316
if self._signalr_client and hasattr(self._signalr_client, "_transport"):
304317
transport = self._signalr_client._transport
305318
if transport and hasattr(transport, "_ws") and transport._ws:
@@ -358,8 +371,13 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
358371
try:
359372
# Check if we have a session server for this session_id
360373
if session_id not in self._session_servers:
361-
# Create and start a new session server
362-
session_server = SessionServer(self._server, self.slug, session_id)
374+
session_server: BaseSessionServer
375+
if self._server.is_streamable_http:
376+
session_server = StreamableHttpSessionServer(
377+
self._server, self.slug, session_id
378+
)
379+
else:
380+
session_server = SessionServer(self._server, self.slug, session_id)
363381
try:
364382
await session_server.start()
365383
except Exception as e:
@@ -393,6 +411,156 @@ async def _handle_signalr_close(self) -> None:
393411
"""Handle SignalR connection close event."""
394412
logger.info("Websocket connection closed.")
395413

414+
async def _start_http_server_process(self) -> None:
415+
"""Spawn the streamable-http server process.
416+
417+
The process is started once and shared across all sessions.
418+
"""
419+
env_vars = self._server.env.copy()
420+
if self.server_type is UiPathServerType.Coded:
421+
for name, value in os.environ.items():
422+
if name not in env_vars:
423+
env_vars[name] = value
424+
425+
merged_env = {**os.environ, **env_vars} if env_vars else None
426+
self._http_server_stderr_lines = []
427+
self._http_server_process = await asyncio.create_subprocess_exec(
428+
self._server.command,
429+
*self._server.args,
430+
env=merged_env,
431+
stdout=asyncio.subprocess.DEVNULL,
432+
stderr=asyncio.subprocess.PIPE,
433+
)
434+
self._http_stderr_drain_task = asyncio.create_task(self._drain_http_stderr())
435+
logger.info(
436+
f"Started HTTP server process (PID: {self._http_server_process.pid}) "
437+
f"for {self._server.url}"
438+
)
439+
440+
async def _drain_http_stderr(self) -> None:
441+
"""Continuously read and log stderr from the HTTP server process.
442+
443+
Accumulates output in _http_server_stderr_lines for error reporting.
444+
"""
445+
if not self._http_server_process or not self._http_server_process.stderr:
446+
return
447+
try:
448+
async for line in self._http_server_process.stderr:
449+
decoded = line.decode("utf-8", errors="replace").rstrip()
450+
self._http_server_stderr_lines.append(decoded)
451+
logger.debug(f"HTTP server stderr: {decoded}")
452+
except asyncio.CancelledError:
453+
pass
454+
455+
async def _wait_for_http_server_ready(
456+
self,
457+
max_retries: int = 30,
458+
retry_delay: float = 1.0,
459+
) -> None:
460+
"""Wait for the HTTP server to start accepting connections."""
461+
import httpx
462+
463+
url = self._server.url
464+
if not url:
465+
raise ValueError("streamable-http transport requires url in config")
466+
467+
for attempt in range(max_retries):
468+
# Check if process has crashed
469+
if (
470+
self._http_server_process
471+
and self._http_server_process.returncode is not None
472+
):
473+
stderr_output = "\n".join(self._http_server_stderr_lines)
474+
raise UiPathMcpRuntimeError(
475+
McpErrorCode.INITIALIZATION_ERROR,
476+
"HTTP server process exited unexpectedly",
477+
f"Exit code: {self._http_server_process.returncode}\n{stderr_output}",
478+
UiPathErrorCategory.SYSTEM,
479+
)
480+
481+
try:
482+
async with httpx.AsyncClient() as client:
483+
response = await client.get(url, timeout=2.0)
484+
logger.info(
485+
f"HTTP server is ready (status: {response.status_code})"
486+
)
487+
return
488+
except (httpx.ConnectError, httpx.ConnectTimeout) as err:
489+
if attempt < max_retries - 1:
490+
logger.debug(
491+
f"HTTP server not ready yet, retrying in {retry_delay}s "
492+
f"(attempt {attempt + 1}/{max_retries})"
493+
)
494+
await asyncio.sleep(retry_delay)
495+
else:
496+
raise UiPathMcpRuntimeError(
497+
McpErrorCode.INITIALIZATION_ERROR,
498+
"HTTP server failed to start",
499+
f"Server at {url} did not become ready after {max_retries} attempts",
500+
UiPathErrorCategory.SYSTEM,
501+
) from err
502+
except httpx.HTTPError:
503+
# Any other HTTP error means server is listening
504+
logger.info("HTTP server is ready (responded with error, but is up)")
505+
return
506+
507+
async def _stop_http_server_process(self) -> None:
508+
"""Stop the shared HTTP server process."""
509+
if self._http_monitor_task and not self._http_monitor_task.done():
510+
self._http_monitor_task.cancel()
511+
try:
512+
await self._http_monitor_task
513+
except asyncio.CancelledError:
514+
pass
515+
self._http_monitor_task = None
516+
517+
if self._http_server_process:
518+
try:
519+
self._http_server_process.terminate()
520+
try:
521+
await asyncio.wait_for(
522+
self._http_server_process.wait(), timeout=5.0
523+
)
524+
except asyncio.TimeoutError:
525+
self._http_server_process.kill()
526+
await self._http_server_process.wait()
527+
except ProcessLookupError:
528+
pass
529+
finally:
530+
logger.info("HTTP server process stopped")
531+
self._http_server_process = None
532+
533+
if self._http_stderr_drain_task and not self._http_stderr_drain_task.done():
534+
self._http_stderr_drain_task.cancel()
535+
try:
536+
await self._http_stderr_drain_task
537+
except asyncio.CancelledError:
538+
pass
539+
self._http_stderr_drain_task = None
540+
541+
async def _monitor_http_server_process(self) -> None:
542+
"""Monitor the HTTP server process and handle unexpected exits."""
543+
if not self._http_server_process:
544+
return
545+
try:
546+
returncode = await self._http_server_process.wait()
547+
if not self._cancel_event.is_set():
548+
logger.error(
549+
f"HTTP server process exited unexpectedly with code {returncode}"
550+
)
551+
# Stop all HTTP sessions, they will fail on next request anyway
552+
for session_id, session_server in list(self._session_servers.items()):
553+
if isinstance(session_server, StreamableHttpSessionServer):
554+
try:
555+
await session_server.stop()
556+
except Exception as e:
557+
logger.error(
558+
f"Error stopping session {session_id} after process crash: {e}"
559+
)
560+
self._session_servers.pop(session_id, None)
561+
except asyncio.CancelledError:
562+
pass
563+
396564
async def _register(self) -> None:
397565
"""Register the MCP server with UiPath."""
398566

@@ -409,36 +577,63 @@ async def _register(self) -> None:
409577
env_vars[name] = value
410578

411579
try:
412-
# Create a temporary session to get tools
413-
server_params = StdioServerParameters(
414-
command=self._server.command,
415-
args=self._server.args,
416-
env=env_vars,
417-
)
580+
if self._server.is_streamable_http:
581+
# spawn process, wait for readiness, connect via HTTP
582+
await self._start_http_server_process()
583+
await self._wait_for_http_server_ready()
418584

419-
# Start a temporary stdio client to get tools
420-
# Use a temporary file to capture stderr
421-
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
422-
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
423-
async with stdio_client(server_params, errlog=stderr_temp) as (
585+
from mcp.client.streamable_http import streamable_http_client
586+
587+
if self._server.url is None:
588+
raise UiPathMcpRuntimeError(
589+
McpErrorCode.CONFIGURATION_ERROR,
590+
"Missing URL for streamable-http server",
591+
"Please specify a 'url' in the server configuration for streamable-http transport.",
592+
UiPathErrorCategory.SYSTEM,
593+
)
594+
async with streamable_http_client(self._server.url) as (
424595
read,
425596
write,
597+
_,
426598
):
427599
async with ClientSession(read, write) as session:
428-
logger.info("Initializing client session...")
429-
# Try to initialize with timeout
600+
logger.info("Initializing client session (streamable-http)...")
430601
try:
431602
await asyncio.wait_for(session.initialize(), timeout=30)
432603
initialization_successful = True
433-
logger.info("Initialization successful")
434-
435-
# Only proceed if initialization was successful
436604
tools_result = await session.list_tools()
605+
logger.info(f"Discovered {len(tools_result.tools)} tool(s)")
437606
except Exception as err:
438607
logger.error(f"Initialization error: {err}")
439-
# Capture stderr output here, after the timeout
440-
stderr_temp.seek(0)
441-
server_stderr_output = stderr_temp.read()
608+
server_stderr_output = "\n".join(
609+
self._http_server_stderr_lines
610+
)
611+
logger.info("Registration session closed (DELETE sent to server)")
612+
else:
613+
# spawn temporary process, discover tools, process dies with context
614+
server_params = StdioServerParameters(
615+
command=self._server.command,
616+
args=self._server.args,
617+
env=env_vars,
618+
)
619+
620+
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
621+
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
622+
async with stdio_client(server_params, errlog=stderr_temp) as (
623+
read,
624+
write,
625+
):
626+
async with ClientSession(read, write) as session:
627+
logger.info("Initializing client session...")
628+
try:
629+
await asyncio.wait_for(session.initialize(), timeout=30)
630+
initialization_successful = True
631+
logger.info("Initialization successful")
632+
tools_result = await session.list_tools()
633+
except Exception as err:
634+
logger.error(f"Initialization error: {err}")
635+
stderr_temp.seek(0)
636+
server_stderr_output = stderr_temp.read()
442637

443638
except* Exception as eg:
444639
for e in eg.exceptions:

0 commit comments

Comments
 (0)