Skip to content

Commit 3fd3fa6

Browse files
committed
feat: add streamable-http support
1 parent c2e8d7a commit 3fd3fa6

4 files changed

Lines changed: 418 additions & 164 deletions

File tree

src/uipath_mcp/_cli/_runtime/_factory.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,26 @@ async def new_runtime(
110110
McpErrorCode.SERVER_NOT_FOUND,
111111
"MCP server not found",
112112
f"Server '{entrypoint}' not found. Available: {available}",
113-
UiPathErrorCategory.DEPLOYMENT,
113+
UiPathErrorCategory.USER,
114114
)
115115

116+
# Validate streamable-http configuration
117+
if server.is_streamable_http:
118+
if not server.url:
119+
raise UiPathMcpRuntimeError(
120+
McpErrorCode.CONFIGURATION_ERROR,
121+
"Invalid configuration",
122+
f"Server '{entrypoint}' uses streamable-http transport but 'url' is not specified in mcp.json",
123+
UiPathErrorCategory.USER,
124+
)
125+
if not server.command or server.command == "None":
126+
raise UiPathMcpRuntimeError(
127+
McpErrorCode.CONFIGURATION_ERROR,
128+
"Invalid configuration",
129+
f"Server '{entrypoint}' uses streamable-http transport but 'command' is not specified in mcp.json",
130+
UiPathErrorCategory.USER,
131+
)
132+
116133
# Validate runtime_id is a valid UUID, generate new one if not
117134
try:
118135
uuid.UUID(runtime_id)

src/uipath_mcp/_cli/_runtime/_runtime.py

Lines changed: 196 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from .._utils._config import McpServer
3838
from ._context import UiPathServerType
3939
from ._exception import McpErrorCode, UiPathMcpRuntimeError
40-
from ._session import SessionServer
40+
from ._session import BaseSessionServer, SessionServer, StreamableHttpSessionServer
4141

4242
logger = logging.getLogger(__name__)
4343
tracer = trace.get_tracer(__name__)
@@ -76,10 +76,12 @@ def __init__(
7676
self._server_slug = server_slug
7777

7878
self._signalr_client: SignalRClient | None = None
79-
self._session_servers: dict[str, SessionServer] = {}
79+
self._session_servers: dict[str, BaseSessionServer] = {}
8080
self._session_output: str | None = None
8181
self._cancel_event = asyncio.Event()
8282
self._keep_alive_task: asyncio.Task[None] | None = None
83+
self._http_server_process: asyncio.subprocess.Process | None = None
84+
self._http_monitor_task: asyncio.Task[None] | None = None
8385
self._uipath = UiPath()
8486
self._cleanup_done = False
8587

@@ -223,6 +225,12 @@ async def _run_server(self) -> UiPathRuntimeResult:
223225
# Register the local server with UiPath MCP Server
224226
await self._register()
225227

228+
# Start HTTP server process monitor if using streamable-http
229+
if self._server.is_streamable_http:
230+
self._http_monitor_task = asyncio.create_task(
231+
self._monitor_http_server_process()
232+
)
233+
226234
run_task = asyncio.create_task(self._signalr_client.run())
227235
cancel_task = asyncio.create_task(self._cancel_event.wait())
228236
self._keep_alive_task = asyncio.create_task(self._keep_alive())
@@ -300,6 +308,9 @@ async def _cleanup(self) -> None:
300308
except Exception as e:
301309
logger.error(f"Error cleaning up session server {session_id}: {str(e)}")
302310

311+
# Stop the shared HTTP server process (streamable-http only)
312+
await self._stop_http_server_process()
313+
303314
if self._signalr_client and hasattr(self._signalr_client, "_transport"):
304315
transport = self._signalr_client._transport
305316
if transport and hasattr(transport, "_ws") and transport._ws:
@@ -358,8 +369,13 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
358369
try:
359370
# Check if we have a session server for this session_id
360371
if session_id not in self._session_servers:
361-
# Create and start a new session server
362-
session_server = SessionServer(self._server, self.slug, session_id)
372+
session_server: BaseSessionServer
373+
if self._server.is_streamable_http:
374+
session_server = StreamableHttpSessionServer(
375+
self._server, self.slug, session_id
376+
)
377+
else:
378+
session_server = SessionServer(self._server, self.slug, session_id)
363379
try:
364380
await session_server.start()
365381
except Exception as e:
@@ -393,6 +409,134 @@ async def _handle_signalr_close(self) -> None:
393409
"""Handle SignalR connection close event."""
394410
logger.info("Websocket connection closed.")
395411

412+
async def _start_http_server_process(self) -> None:
413+
"""Spawn the streamable-http server process.
414+
415+
The process is started once and shared across all sessions.
416+
"""
417+
env_vars = self._server.env.copy()
418+
if self.server_type is UiPathServerType.Coded:
419+
for name, value in os.environ.items():
420+
if name not in env_vars:
421+
env_vars[name] = value
422+
423+
merged_env = {**os.environ, **env_vars} if env_vars else None
424+
self._http_server_process = await asyncio.create_subprocess_exec(
425+
self._server.command,
426+
*self._server.args,
427+
env=merged_env,
428+
stdout=asyncio.subprocess.PIPE,
429+
stderr=asyncio.subprocess.PIPE,
430+
)
431+
logger.info(
432+
f"Started HTTP server process (PID: {self._http_server_process.pid}) "
433+
f"for {self._server.url}"
434+
)
435+
436+
async def _wait_for_http_server_ready(
437+
self,
438+
max_retries: int = 30,
439+
retry_delay: float = 1.0,
440+
) -> None:
441+
"""Wait for the HTTP server to start accepting connections."""
442+
import httpx
443+
444+
url = self._server.url
445+
if not url:
446+
raise ValueError("streamable-http transport requires url in config")
447+
448+
for attempt in range(max_retries):
449+
# Check if process has crashed
450+
if (
451+
self._http_server_process
452+
and self._http_server_process.returncode is not None
453+
):
454+
stderr_output = ""
455+
if self._http_server_process.stderr:
456+
stderr_bytes = await self._http_server_process.stderr.read()
457+
stderr_output = stderr_bytes.decode("utf-8", errors="replace")
458+
raise UiPathMcpRuntimeError(
459+
McpErrorCode.INITIALIZATION_ERROR,
460+
"HTTP server process exited unexpectedly",
461+
f"Exit code: {self._http_server_process.returncode}\n{stderr_output}",
462+
UiPathErrorCategory.SYSTEM,
463+
)
464+
465+
try:
466+
async with httpx.AsyncClient() as client:
467+
response = await client.get(url, timeout=2.0)
468+
logger.info(
469+
f"HTTP server is ready (status: {response.status_code})"
470+
)
471+
return
472+
except (httpx.ConnectError, httpx.ConnectTimeout) as err:
473+
if attempt < max_retries - 1:
474+
logger.debug(
475+
f"HTTP server not ready yet, retrying in {retry_delay}s "
476+
f"(attempt {attempt + 1}/{max_retries})"
477+
)
478+
await asyncio.sleep(retry_delay)
479+
else:
480+
raise UiPathMcpRuntimeError(
481+
McpErrorCode.INITIALIZATION_ERROR,
482+
"HTTP server failed to start",
483+
f"Server at {url} did not become ready after {max_retries} attempts",
484+
UiPathErrorCategory.SYSTEM,
485+
) from err
486+
except httpx.HTTPError:
487+
# Any other HTTP error means server is listening
488+
logger.info("HTTP server is ready (responded with error, but is up)")
489+
return
490+
491+
async def _stop_http_server_process(self) -> None:
492+
"""Stop the shared HTTP server process."""
493+
if self._http_monitor_task and not self._http_monitor_task.done():
494+
self._http_monitor_task.cancel()
495+
try:
496+
await self._http_monitor_task
497+
except asyncio.CancelledError:
498+
pass
499+
self._http_monitor_task = None
500+
501+
if self._http_server_process:
502+
try:
503+
self._http_server_process.terminate()
504+
try:
505+
await asyncio.wait_for(
506+
self._http_server_process.wait(), timeout=5.0
507+
)
508+
except asyncio.TimeoutError:
509+
self._http_server_process.kill()
510+
await self._http_server_process.wait()
511+
except ProcessLookupError:
512+
pass
513+
finally:
514+
logger.info("HTTP server process stopped")
515+
self._http_server_process = None
516+
517+
async def _monitor_http_server_process(self) -> None:
518+
"""Monitor the HTTP server process and handle unexpected exits."""
519+
if not self._http_server_process:
520+
return
521+
try:
522+
returncode = await self._http_server_process.wait()
523+
if not self._cancel_event.is_set():
524+
logger.error(
525+
f"HTTP server process exited unexpectedly with code {returncode}"
526+
)
527+
# Stop all HTTP sessions, they will fail on next request anyway
528+
for session_id, session_server in list(self._session_servers.items()):
529+
if isinstance(session_server, StreamableHttpSessionServer):
530+
try:
531+
await session_server.stop()
532+
except Exception as e:
533+
logger.error(
534+
f"Error stopping session {session_id} after process crash: {e}"
535+
)
536+
self._session_servers.pop(session_id, None)
537+
except asyncio.CancelledError:
538+
pass
539+
396540
async def _register(self) -> None:
397541
"""Register the MCP server with UiPath."""
398542

@@ -409,36 +553,65 @@ async def _register(self) -> None:
409553
env_vars[name] = value
410554

411555
try:
412-
# Create a temporary session to get tools
413-
server_params = StdioServerParameters(
414-
command=self._server.command,
415-
args=self._server.args,
416-
env=env_vars,
417-
)
556+
if self._server.is_streamable_http:
557+
# spawn process, wait for readiness, connect via HTTP
558+
await self._start_http_server_process()
559+
await self._wait_for_http_server_ready()
560+
561+
from mcp.client.streamable_http import streamable_http_client
418562

419-
# Start a temporary stdio client to get tools
420-
# Use a temporary file to capture stderr
421-
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
422-
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
423-
async with stdio_client(server_params, errlog=stderr_temp) as (
563+
if self._server.url is None:
564+
raise UiPathMcpRuntimeError(
565+
McpErrorCode.CONFIGURATION_ERROR,
566+
"Missing URL for streamable-http server",
567+
"Please specify a 'url' in the server configuration for streamable-http transport.",
568+
UiPathErrorCategory.SYSTEM,
569+
)
570+
async with streamable_http_client(self._server.url) as (
424571
read,
425572
write,
573+
_,
426574
):
427575
async with ClientSession(read, write) as session:
428-
logger.info("Initializing client session...")
429-
# Try to initialize with timeout
576+
logger.info("Initializing client session (streamable-http)...")
430577
try:
431578
await asyncio.wait_for(session.initialize(), timeout=30)
432579
initialization_successful = True
433-
logger.info("Initialization successful")
434-
435-
# Only proceed if initialization was successful
580+
logger.info("Initialization successful (streamable-http)")
581+
logger.info("Sending notifications/initialized...")
582+
logger.info("Opening SSE stream (GET)...")
583+
logger.info("Listing tools (tools/list)...")
436584
tools_result = await session.list_tools()
585+
logger.info(f"Discovered {len(tools_result.tools)} tool(s)")
437586
except Exception as err:
438587
logger.error(f"Initialization error: {err}")
439-
# Capture stderr output here, after the timeout
440-
stderr_temp.seek(0)
441-
server_stderr_output = stderr_temp.read()
588+
logger.info("Registration session closed (DELETE sent to server)")
589+
# Process stays alive for future sessions
590+
else:
591+
# spawn temporary process, discover tools, process dies with context
592+
server_params = StdioServerParameters(
593+
command=self._server.command,
594+
args=self._server.args,
595+
env=env_vars,
596+
)
597+
598+
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
599+
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
600+
async with stdio_client(server_params, errlog=stderr_temp) as (
601+
read,
602+
write,
603+
):
604+
async with ClientSession(read, write) as session:
605+
logger.info("Initializing client session...")
606+
try:
607+
await asyncio.wait_for(session.initialize(), timeout=30)
608+
initialization_successful = True
609+
logger.info("Initialization successful")
610+
tools_result = await session.list_tools()
611+
except Exception as err:
612+
logger.error(f"Initialization error: {err}")
613+
stderr_temp.seek(0)
614+
server_stderr_output = stderr_temp.read()
442615

443616
except* Exception as eg:
444617
for e in eg.exceptions:

0 commit comments

Comments
 (0)