Skip to content

Commit 4884c3e

Browse files
edis-uipathclaude
andcommitted
fix: remove stream() method and emit_state events
Streaming events add unnecessary complexity for MCP servers. The execute() path is sufficient. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 427cd65 commit 4884c3e

2 files changed

Lines changed: 16 additions & 90 deletions

File tree

src/uipath_mcp/_cli/_runtime/_factory.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
import logging
44
import uuid
5+
from typing import Any
56

67
from uipath.runtime import (
78
UiPathRuntimeContext,
89
UiPathRuntimeFactorySettings,
910
UiPathRuntimeProtocol,
1011
)
1112
from uipath.runtime.errors import UiPathErrorCategory
13+
from uipath.runtime.storage import UiPathRuntimeStorageProtocol
1214

1315
from uipath_mcp._cli._runtime._exception import McpErrorCode, UiPathMcpRuntimeError
1416
from uipath_mcp._cli._runtime._runtime import UiPathMcpRuntime
@@ -66,7 +68,7 @@ async def discover_runtimes(self) -> list[UiPathRuntimeProtocol]:
6668
return runtimes
6769

6870
async def new_runtime(
69-
self, entrypoint: str, runtime_id: str
71+
self, entrypoint: str, runtime_id: str, **kwargs: Any
7072
) -> UiPathRuntimeProtocol:
7173
"""Create a new MCP runtime instance.
7274
@@ -115,6 +117,14 @@ async def new_runtime(
115117
server_slug=self.context.mcp_server_slug,
116118
)
117119

120+
async def get_storage(self) -> UiPathRuntimeStorageProtocol | None:
121+
"""Get factory storage.
122+
123+
MCP servers are long-running processes and don't need
124+
cross-invocation state persistence.
125+
"""
126+
return None
127+
118128
async def get_settings(self) -> UiPathRuntimeFactorySettings | None:
119129
"""Get factory settings."""
120130
return None

src/uipath_mcp/_cli/_runtime/_runtime.py

Lines changed: 5 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
UiPathErrorCategory,
2828
UiPathErrorCode,
2929
)
30-
from uipath.runtime.events import UiPathRuntimeStateEvent
3130

3231
from .._utils._config import McpServer
3332
from ._context import UiPathServerType
@@ -153,86 +152,23 @@ async def stream(
153152
input: dict[str, Any] | None = None,
154153
options: UiPathStreamOptions | None = None,
155154
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
156-
"""Stream execution events from MCP server runtime in real-time.
155+
"""Stream execution for MCP server runtime.
157156
158-
Yields state events at key lifecycle points (initialization, registration,
159-
connection, sessions) and finally yields the runtime result.
160-
161-
Args:
162-
input: Optional input dictionary (unused for MCP servers).
163-
options: Optional stream options.
164-
165-
Yields:
166-
UiPathRuntimeStateEvent: State updates during server lifecycle.
167-
UiPathRuntimeResult: Final result when server stops.
168-
169-
Raises:
170-
UiPathMcpRuntimeError: If execution fails.
157+
MCP servers don't emit intermediate events, so this just yields the final result.
171158
"""
172-
# Queue for real-time event streaming
173-
# None is used as sentinel to signal completion
174-
event_queue: asyncio.Queue[UiPathRuntimeEvent | None] = asyncio.Queue()
175-
run_exception: list[Exception] = []
176-
177-
async def run_with_queue() -> None:
178-
try:
179-
result = await self._run_server(event_queue=event_queue)
180-
await event_queue.put(result)
181-
except Exception as e:
182-
run_exception.append(e)
183-
finally:
184-
await event_queue.put(None) # Signal completion
185-
186-
# Start the server task
187-
run_task = asyncio.create_task(run_with_queue())
188-
189-
try:
190-
# Consume events from queue in real-time
191-
while True:
192-
event = await event_queue.get()
193-
if event is None:
194-
break
195-
yield event
196-
finally:
197-
# Ensure the run task is properly awaited
198-
if not run_task.done():
199-
run_task.cancel()
200-
try:
201-
await run_task
202-
except asyncio.CancelledError:
203-
pass
204-
205-
# Re-raise any exception from the run task
206-
if run_exception:
207-
raise run_exception[0]
159+
result = await self._run_server()
160+
yield result
208161

209-
async def _run_server(
210-
self,
211-
event_queue: asyncio.Queue[UiPathRuntimeEvent | None] | None = None,
212-
) -> UiPathRuntimeResult:
162+
async def _run_server(self) -> UiPathRuntimeResult:
213163
"""Core server execution logic.
214164
215-
Args:
216-
event_queue: Optional queue to push state events for real-time streaming.
217-
218165
Returns:
219166
UiPathRuntimeResult with execution results.
220167
221168
Raises:
222169
UiPathMcpRuntimeError: If execution fails.
223170
"""
224-
225-
async def emit_state(node_name: str, payload: dict[str, Any]) -> None:
226-
if event_queue:
227-
event = UiPathRuntimeStateEvent(
228-
payload=payload,
229-
node_name=node_name,
230-
)
231-
await event_queue.put(event)
232-
233171
try:
234-
await emit_state("initializing", {"status": "validating_auth"})
235-
236172
# Validate authentication configuration
237173
self._validate_auth()
238174

@@ -262,11 +198,6 @@ async def emit_state(node_name: str, payload: dict[str, Any]) -> None:
262198

263199
logger.info(f"Folder key: {self._folder_key}")
264200

265-
await emit_state(
266-
"initializing",
267-
{"status": "folder_resolved", "folder_key": self._folder_key},
268-
)
269-
270201
with tracer.start_as_current_span(self.slug) as root_span:
271202
root_span.set_attribute("runtime_id", self._runtime_id)
272203
root_span.set_attribute("command", str(self._server.command))
@@ -290,26 +221,13 @@ async def emit_state(node_name: str, payload: dict[str, Any]) -> None:
290221
self._signalr_client.on_open(self._handle_signalr_open)
291222
self._signalr_client.on_close(self._handle_signalr_close)
292223

293-
await emit_state("registering", {"status": "registering_server"})
294-
295224
# Register the local server with UiPath MCP Server
296225
await self._register()
297226

298-
await emit_state(
299-
"registered",
300-
{
301-
"status": "server_registered",
302-
"slug": self.slug,
303-
"server_type": self.server_type.value,
304-
},
305-
)
306-
307227
run_task = asyncio.create_task(self._signalr_client.run())
308228
cancel_task = asyncio.create_task(self._cancel_event.wait())
309229
self._keep_alive_task = asyncio.create_task(self._keep_alive())
310230

311-
await emit_state("running", {"status": "server_running"})
312-
313231
try:
314232
# Wait for either the run to complete or cancellation
315233
done, pending = await asyncio.wait(
@@ -334,8 +252,6 @@ async def emit_state(node_name: str, payload: dict[str, Any]) -> None:
334252
if self._session_output:
335253
output_result["content"] = self._session_output
336254

337-
await emit_state("completed", {"status": "server_stopped"})
338-
339255
return UiPathRuntimeResult(
340256
output=output_result,
341257
status=UiPathRuntimeStatus.SUCCESSFUL,

0 commit comments

Comments
 (0)