3737from .._utils ._config import McpServer
3838from ._context import UiPathServerType
3939from ._exception import McpErrorCode , UiPathMcpRuntimeError
40- from ._session import SessionServer
40+ from ._session import BaseSessionServer , StdioSessionServer , StreamableHttpSessionServer
4141
4242logger = logging .getLogger (__name__ )
4343tracer = trace .get_tracer (__name__ )
@@ -76,10 +76,14 @@ def __init__(
7676 self ._server_slug = server_slug
7777
7878 self ._signalr_client : SignalRClient | None = None
79- self ._session_servers : dict [str , SessionServer ] = {}
79+ self ._session_servers : dict [str , BaseSessionServer ] = {}
8080 self ._session_output : str | None = None
8181 self ._cancel_event = asyncio .Event ()
8282 self ._keep_alive_task : asyncio .Task [None ] | None = None
83+ self ._http_server_process : asyncio .subprocess .Process | None = None
84+ self ._http_monitor_task : asyncio .Task [None ] | None = None
85+ self ._http_stderr_drain_task : asyncio .Task [None ] | None = None
86+ self ._http_server_stderr_lines : list [str ] = []
8387 self ._uipath = UiPath ()
8488 self ._cleanup_done = False
8589
@@ -223,6 +227,12 @@ async def _run_server(self) -> UiPathRuntimeResult:
223227 # Register the local server with UiPath MCP Server
224228 await self ._register ()
225229
230+ # Start HTTP server process monitor if using streamable-http
231+ if self ._server .is_streamable_http :
232+ self ._http_monitor_task = asyncio .create_task (
233+ self ._monitor_http_server_process ()
234+ )
235+
226236 run_task = asyncio .create_task (self ._signalr_client .run ())
227237 cancel_task = asyncio .create_task (self ._cancel_event .wait ())
228238 self ._keep_alive_task = asyncio .create_task (self ._keep_alive ())
@@ -300,6 +310,9 @@ async def _cleanup(self) -> None:
300310 except Exception as e :
301311 logger .error (f"Error cleaning up session server { session_id } : { str (e )} " )
302312
313+ # Stop the shared HTTP server process (streamable-http only)
314+ await self ._stop_http_server_process ()
315+
303316 if self ._signalr_client and hasattr (self ._signalr_client , "_transport" ):
304317 transport = self ._signalr_client ._transport
305318 if transport and hasattr (transport , "_ws" ) and transport ._ws :
@@ -358,8 +371,15 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
358371 try :
359372 # Check if we have a session server for this session_id
360373 if session_id not in self ._session_servers :
361- # Create and start a new session server
362- session_server = SessionServer (self ._server , self .slug , session_id )
374+ session_server : BaseSessionServer
375+ if self ._server .is_streamable_http :
376+ session_server = StreamableHttpSessionServer (
377+ self ._server , self .slug , session_id
378+ )
379+ else :
380+ session_server = StdioSessionServer (
381+ self ._server , self .slug , session_id
382+ )
363383 try :
364384 await session_server .start ()
365385 except Exception as e :
@@ -393,6 +413,156 @@ async def _handle_signalr_close(self) -> None:
393413 """Handle SignalR connection close event."""
394414 logger .info ("Websocket connection closed." )
395415
416+ async def _start_http_server_process (self ) -> None :
417+ """Spawn the streamable-http server process.
418+
419+ The process is started once and shared across all sessions.
420+ """
421+ env_vars = self ._server .env .copy ()
422+ if self .server_type is UiPathServerType .Coded :
423+ for name , value in os .environ .items ():
424+ if name not in env_vars :
425+ env_vars [name ] = value
426+
427+ merged_env = {** os .environ , ** env_vars } if env_vars else None
428+ self ._http_server_stderr_lines = []
429+ self ._http_server_process = await asyncio .create_subprocess_exec (
430+ self ._server .command ,
431+ * self ._server .args ,
432+ env = merged_env ,
433+ stdout = asyncio .subprocess .DEVNULL ,
434+ stderr = asyncio .subprocess .PIPE ,
435+ )
436+ self ._http_stderr_drain_task = asyncio .create_task (self ._drain_http_stderr ())
437+ logger .info (
438+ f"Started HTTP server process (PID: { self ._http_server_process .pid } ) "
439+ f"for { self ._server .url } "
440+ )
441+
442+ async def _drain_http_stderr (self ) -> None :
443+ """Continuously read and log stderr from the HTTP server process.
444+
445+ Accumulates output in _http_server_stderr_lines for error reporting.
446+ """
447+ if not self ._http_server_process or not self ._http_server_process .stderr :
448+ return
449+ try :
450+ async for line in self ._http_server_process .stderr :
451+ decoded = line .decode ("utf-8" , errors = "replace" ).rstrip ()
452+ self ._http_server_stderr_lines .append (decoded )
453+ logger .debug (f"HTTP server stderr: { decoded } " )
454+ except asyncio .CancelledError :
455+ pass
456+
457+ async def _wait_for_http_server_ready (
458+ self ,
459+ max_retries : int = 30 ,
460+ retry_delay : float = 1.0 ,
461+ ) -> None :
462+ """Wait for the HTTP server to start accepting connections."""
463+ import httpx
464+
465+ url = self ._server .url
466+ if not url :
467+ raise ValueError ("streamable-http transport requires url in config" )
468+
469+ for attempt in range (max_retries ):
470+ # Check if process has crashed
471+ if (
472+ self ._http_server_process
473+ and self ._http_server_process .returncode is not None
474+ ):
475+ stderr_output = "\n " .join (self ._http_server_stderr_lines )
476+ raise UiPathMcpRuntimeError (
477+ McpErrorCode .INITIALIZATION_ERROR ,
478+ "HTTP server process exited unexpectedly" ,
479+ f"Exit code: { self ._http_server_process .returncode } \n { stderr_output } " ,
480+ UiPathErrorCategory .SYSTEM ,
481+ )
482+
483+ try :
484+ async with httpx .AsyncClient () as client :
485+ response = await client .get (url , timeout = 2.0 )
486+ logger .info (
487+ f"HTTP server is ready (status: { response .status_code } )"
488+ )
489+ return
490+ except (httpx .ConnectError , httpx .ConnectTimeout ) as err :
491+ if attempt < max_retries - 1 :
492+ logger .debug (
493+ f"HTTP server not ready yet, retrying in { retry_delay } s "
494+ f"(attempt { attempt + 1 } /{ max_retries } )"
495+ )
496+ await asyncio .sleep (retry_delay )
497+ else :
498+ raise UiPathMcpRuntimeError (
499+ McpErrorCode .INITIALIZATION_ERROR ,
500+ "HTTP server failed to start" ,
501+ f"Server at { url } did not become ready after { max_retries } attempts" ,
502+ UiPathErrorCategory .SYSTEM ,
503+ ) from err
504+ except httpx .HTTPStatusError :
505+ # Server responded with an error status code
506+ logger .info ("HTTP server is ready (responded with error, but is up)" )
507+ return
508+
509+ async def _stop_http_server_process (self ) -> None :
510+ """Stop the shared HTTP server process."""
511+ if self ._http_monitor_task and not self ._http_monitor_task .done ():
512+ self ._http_monitor_task .cancel ()
513+ try :
514+ await self ._http_monitor_task
515+ except asyncio .CancelledError :
516+ pass
517+ self ._http_monitor_task = None
518+
519+ if self ._http_server_process :
520+ try :
521+ self ._http_server_process .terminate ()
522+ try :
523+ await asyncio .wait_for (
524+ self ._http_server_process .wait (), timeout = 5.0
525+ )
526+ except asyncio .TimeoutError :
527+ self ._http_server_process .kill ()
528+ await self ._http_server_process .wait ()
529+ except ProcessLookupError :
530+ pass
531+ finally :
532+ logger .info ("HTTP server process stopped" )
533+ self ._http_server_process = None
534+
535+ if self ._http_stderr_drain_task and not self ._http_stderr_drain_task .done ():
536+ self ._http_stderr_drain_task .cancel ()
537+ try :
538+ await self ._http_stderr_drain_task
539+ except asyncio .CancelledError :
540+ pass
541+ self ._http_stderr_drain_task = None
542+
543+ async def _monitor_http_server_process (self ) -> None :
544+ """Monitor the HTTP server process and handle unexpected exits."""
545+ if not self ._http_server_process :
546+ return
547+ try :
548+ returncode = await self ._http_server_process .wait ()
549+ if not self ._cancel_event .is_set ():
550+ logger .error (
551+ f"HTTP server process exited unexpectedly with code { returncode } "
552+ )
553+ # Stop all HTTP sessions, they will fail on next request anyway
554+ for session_id , session_server in list (self ._session_servers .items ()):
555+ if isinstance (session_server , StreamableHttpSessionServer ):
556+ try :
557+ await session_server .stop ()
558+ except Exception as e :
559+ logger .error (
560+ f"Error stopping session { session_id } after process crash: { e } "
561+ )
562+ self ._session_servers .pop (session_id , None )
563+ except asyncio .CancelledError :
564+ pass
565+
396566 async def _register (self ) -> None :
397567 """Register the MCP server with UiPath."""
398568
@@ -409,36 +579,63 @@ async def _register(self) -> None:
409579 env_vars [name ] = value
410580
411581 try :
412- # Create a temporary session to get tools
413- server_params = StdioServerParameters (
414- command = self ._server . command ,
415- args = self ._server . args ,
416- env = env_vars ,
417- )
582+ if self . _server . is_streamable_http :
583+ # spawn process, wait for readiness, connect via HTTP
584+ await self ._start_http_server_process ()
585+ await self ._wait_for_http_server_ready ()
586+
587+ from mcp . client . streamable_http import streamable_http_client
418588
419- # Start a temporary stdio client to get tools
420- # Use a temporary file to capture stderr
421- with tempfile .TemporaryFile (mode = "w+b" ) as stderr_temp_binary :
422- stderr_temp = io .TextIOWrapper (stderr_temp_binary , encoding = "utf-8" )
423- async with stdio_client (server_params , errlog = stderr_temp ) as (
589+ if self ._server .url is None :
590+ raise UiPathMcpRuntimeError (
591+ McpErrorCode .CONFIGURATION_ERROR ,
592+ "Missing URL for streamable-http server" ,
593+ "Please specify a 'url' in the server configuration for streamable-http transport." ,
594+ UiPathErrorCategory .SYSTEM ,
595+ )
596+ async with streamable_http_client (self ._server .url ) as (
424597 read ,
425598 write ,
599+ _ ,
426600 ):
427601 async with ClientSession (read , write ) as session :
428- logger .info ("Initializing client session..." )
429- # Try to initialize with timeout
602+ logger .info ("Initializing client session (streamable-http)..." )
430603 try :
431604 await asyncio .wait_for (session .initialize (), timeout = 30 )
432605 initialization_successful = True
433- logger .info ("Initialization successful" )
434-
435- # Only proceed if initialization was successful
436606 tools_result = await session .list_tools ()
607+ logger .info (f"Discovered { len (tools_result .tools )} tool(s)" )
437608 except Exception as err :
438609 logger .error (f"Initialization error: { err } " )
439- # Capture stderr output here, after the timeout
440- stderr_temp .seek (0 )
441- server_stderr_output = stderr_temp .read ()
610+ server_stderr_output = "\n " .join (
611+ self ._http_server_stderr_lines
612+ )
613+ logger .info ("Registration session closed (DELETE sent to server)" )
614+ else :
615+ # spawn temporary process, discover tools, process dies with context
616+ server_params = StdioServerParameters (
617+ command = self ._server .command ,
618+ args = self ._server .args ,
619+ env = env_vars ,
620+ )
621+
622+ with tempfile .TemporaryFile (mode = "w+b" ) as stderr_temp_binary :
623+ stderr_temp = io .TextIOWrapper (stderr_temp_binary , encoding = "utf-8" )
624+ async with stdio_client (server_params , errlog = stderr_temp ) as (
625+ read ,
626+ write ,
627+ ):
628+ async with ClientSession (read , write ) as session :
629+ logger .info ("Initializing client session..." )
630+ try :
631+ await asyncio .wait_for (session .initialize (), timeout = 30 )
632+ initialization_successful = True
633+ logger .info ("Initialization successful" )
634+ tools_result = await session .list_tools ()
635+ except Exception as err :
636+ logger .error (f"Initialization error: { err } " )
637+ stderr_temp .seek (0 )
638+ server_stderr_output = stderr_temp .read ()
442639
443640 except* Exception as eg :
444641 for e in eg .exceptions :
0 commit comments