Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.2.9"
version = "0.3.0"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
27 changes: 25 additions & 2 deletions src/uipath/runtime/chat/protocol.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""Abstract conversation bridge interface."""

from typing import Protocol
from typing import Any, Protocol

from uipath.core.chat import UiPathConversationMessageEvent
from uipath.core.chat import (
UiPathConversationMessageEvent,
)

from uipath.runtime.result import UiPathRuntimeResult


class UiPathChatProtocol(Protocol):
Expand All @@ -28,3 +32,22 @@ async def emit_message_event(
message_event: UiPathConversationMessageEvent to wrap and send
"""
...

async def emit_interrupt_event(
self,
interrupt_event: UiPathRuntimeResult,
) -> None:
"""Wrap and send an interrupt event.
Args:
interrupt_event: UiPathConversationInterruptEvent to wrap and send
"""
...

async def emit_exchange_end_event(self) -> None:
"""Send an exchange end event."""
...

async def wait_for_resume(self) -> dict[str, Any]:
"""Wait for the interrupt_end event to be received."""
...
43 changes: 38 additions & 5 deletions src/uipath/runtime/chat/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.resumable.trigger import UiPathResumeTriggerType
from uipath.runtime.schema import UiPathRuntimeSchema

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,12 +66,44 @@ async def stream(
"""Stream execution events with chat support."""
await self.chat_bridge.connect()

async for event in self.delegate.stream(input, options=options):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)
execution_completed = False
Comment thread
cristipufu marked this conversation as resolved.
current_input = input
current_options = UiPathStreamOptions(
resume=options.resume if options else False,
breakpoints=options.breakpoints if options else None,
)

while not execution_completed:
async for event in self.delegate.stream(
current_input, options=current_options
):
Comment thread
cristipufu marked this conversation as resolved.
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)

if isinstance(event, UiPathRuntimeResult):
runtime_result = event

if (
runtime_result.status == UiPathRuntimeStatus.SUSPENDED
and runtime_result.trigger
and runtime_result.trigger.trigger_type
== UiPathResumeTriggerType.API
):
await self.chat_bridge.emit_interrupt_event(runtime_result)
resume_data = await self.chat_bridge.wait_for_resume()

# Continue with resumed execution
current_input = resume_data
current_options.resume = True
break
else:
yield event
execution_completed = True
else:
yield event

yield event
await self.chat_bridge.emit_exchange_end_event()

async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema from the delegate runtime."""
Expand Down
2 changes: 1 addition & 1 deletion src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
UiPathRuntimeProtocol,
UiPathStreamOptions,
)
from uipath.runtime.debug import UiPathBreakpointResult
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
from uipath.runtime.resumable.protocols import (
Expand Down
147 changes: 147 additions & 0 deletions tests/test_chat_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from uipath.runtime import (
UiPathExecuteOptions,
UiPathResumeTrigger,
UiPathResumeTriggerType,
UiPathRuntimeResult,
UiPathRuntimeStatus,
UiPathStreamOptions,
Expand All @@ -32,6 +34,8 @@ def make_chat_bridge_mock() -> UiPathChatProtocol:
bridge_mock.connect = AsyncMock()
bridge_mock.disconnect = AsyncMock()
bridge_mock.emit_message_event = AsyncMock()
bridge_mock.emit_interrupt_event = AsyncMock()
bridge_mock.wait_for_resume = AsyncMock()

return cast(UiPathChatProtocol, bridge_mock)

Expand Down Expand Up @@ -94,6 +98,79 @@ async def get_schema(self) -> UiPathRuntimeSchema:
raise NotImplementedError()


class SuspendingMockRuntime:
"""Mock runtime that can suspend with API triggers."""

def __init__(
self,
suspend_at_message: int | None = None,
) -> None:
self.suspend_at_message = suspend_at_message

async def dispose(self) -> None:
pass

async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
"""Fallback execute path."""
return UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUCCESSFUL,
output={"mode": "execute"},
)

async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream events with potential API trigger suspension."""
is_resume = options and options.resume

if not is_resume:
# Initial execution - yield message and then suspend
message_event = UiPathConversationMessageEvent(
message_id="msg-0",
start=UiPathConversationMessageStartEvent(
role="assistant",
timestamp="2025-01-01T00:00:00.000Z",
),
)
yield UiPathRuntimeMessageEvent(payload=message_event)

if self.suspend_at_message is not None:
# Suspend with API trigger
yield UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUSPENDED,
trigger=UiPathResumeTrigger(
trigger_type=UiPathResumeTriggerType.API,
payload={"action": "confirm_tool_call"},
),
)
return
else:
# Resumed execution - yield another message and complete
message_event = UiPathConversationMessageEvent(
message_id="msg-1",
start=UiPathConversationMessageStartEvent(
role="assistant",
timestamp="2025-01-01T00:00:01.000Z",
),
)
yield UiPathRuntimeMessageEvent(payload=message_event)

# Final successful result
yield UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUCCESSFUL,
output={"resumed": is_resume, "input": input},
)

async def get_schema(self) -> UiPathRuntimeSchema:
raise NotImplementedError()


@pytest.mark.asyncio
async def test_chat_runtime_streams_and_emits_messages():
"""UiPathChatRuntime should stream events and emit message events to bridge."""
Expand Down Expand Up @@ -221,3 +298,73 @@ async def test_chat_runtime_dispose_suppresses_disconnect_errors():
await chat_runtime.dispose()

cast(AsyncMock, bridge.disconnect).assert_awaited_once()


@pytest.mark.asyncio
async def test_chat_runtime_handles_api_trigger_suspension():
"""UiPathChatRuntime should intercept suspensions and resume execution."""

runtime_impl = SuspendingMockRuntime(suspend_at_message=0)
bridge = make_chat_bridge_mock()

cast(AsyncMock, bridge.wait_for_resume).return_value = {"approved": True}

chat_runtime = UiPathChatRuntime(
delegate=runtime_impl,
chat_bridge=bridge,
)

result = await chat_runtime.execute({})

await chat_runtime.dispose()

# Result should be SUCCESSFUL
assert isinstance(result, UiPathRuntimeResult)
assert result.status == UiPathRuntimeStatus.SUCCESSFUL
assert result.output == {"resumed": True, "input": {"approved": True}}

cast(AsyncMock, bridge.connect).assert_awaited_once()
cast(AsyncMock, bridge.disconnect).assert_awaited_once()

cast(AsyncMock, bridge.emit_interrupt_event).assert_awaited_once()
cast(AsyncMock, bridge.wait_for_resume).assert_awaited_once()

# Message events emitted (one before suspend, one after resume)
assert cast(AsyncMock, bridge.emit_message_event).await_count == 2


@pytest.mark.asyncio
async def test_chat_runtime_yields_events_during_suspension_flow():
"""UiPathChatRuntime.stream() should not yield SUSPENDED result, only final result."""

runtime_impl = SuspendingMockRuntime(suspend_at_message=0)
bridge = make_chat_bridge_mock()

# wait_for_resume returns approval data
cast(AsyncMock, bridge.wait_for_resume).return_value = {"approved": True}

chat_runtime = UiPathChatRuntime(
delegate=runtime_impl,
chat_bridge=bridge,
)

events = []
async for event in chat_runtime.stream({}):
events.append(event)

await chat_runtime.dispose()

# Should have 2 message events + 1 final SUCCESSFUL result
# SUSPENDED result should NOT be yielded
assert len(events) == 3
assert isinstance(events[0], UiPathRuntimeMessageEvent)
assert events[0].payload.message_id == "msg-0"
assert isinstance(events[1], UiPathRuntimeMessageEvent)
assert events[1].payload.message_id == "msg-1"
assert isinstance(events[2], UiPathRuntimeResult)
assert events[2].status == UiPathRuntimeStatus.SUCCESSFUL

# Verify no SUSPENDED result was yielded
for event in events:
if isinstance(event, UiPathRuntimeResult):
assert event.status != UiPathRuntimeStatus.SUSPENDED
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.