-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathruntime.py
More file actions
117 lines (99 loc) · 3.88 KB
/
runtime.py
File metadata and controls
117 lines (99 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Chat runtime implementation."""
import logging
from typing import Any, AsyncGenerator, cast
from uipath.runtime.base import (
UiPathExecuteOptions,
UiPathRuntimeProtocol,
UiPathStreamOptions,
)
from uipath.runtime.chat.protocol import UiPathChatProtocol
from uipath.runtime.events import (
UiPathRuntimeEvent,
UiPathRuntimeMessageEvent,
)
from uipath.runtime.result import (
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.resumable.trigger import UiPathResumeTriggerType
from uipath.runtime.schema import UiPathRuntimeSchema
logger = logging.getLogger(__name__)
class UiPathChatRuntime:
"""Specialized runtime for chat mode that streams message events to a chat bridge."""
def __init__(
self,
delegate: UiPathRuntimeProtocol,
chat_bridge: UiPathChatProtocol,
):
"""Initialize the UiPathChatRuntime.
Args:
delegate: The underlying runtime to wrap
chat_bridge: Bridge for chat event communication
"""
super().__init__()
self.delegate = delegate
self.chat_bridge = chat_bridge
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
"""Execute the workflow with chat support."""
result: UiPathRuntimeResult | None = None
async for event in self.stream(input, cast(UiPathStreamOptions, options)):
if isinstance(event, UiPathRuntimeResult):
result = event
return (
result
if result
else UiPathRuntimeResult(status=UiPathRuntimeStatus.SUCCESSFUL)
)
async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream execution events with chat support."""
await self.chat_bridge.connect()
execution_completed = False
current_input = input
current_options = UiPathStreamOptions(
resume=options.resume if options else False,
breakpoints=options.breakpoints if options else None,
)
while not execution_completed:
async for event in self.delegate.stream(
current_input, options=current_options
):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)
if isinstance(event, UiPathRuntimeResult):
runtime_result = event
if (
runtime_result.status == UiPathRuntimeStatus.SUSPENDED
and runtime_result.trigger
and runtime_result.trigger.trigger_type
== UiPathResumeTriggerType.API
):
await self.chat_bridge.emit_interrupt_event(runtime_result)
resume_data = await self.chat_bridge.wait_for_resume()
# Continue with resumed execution
current_input = resume_data
current_options.resume = True
break
else:
yield event
execution_completed = True
else:
yield event
await self.chat_bridge.emit_exchange_end_event()
async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema from the delegate runtime."""
return await self.delegate.get_schema()
async def dispose(self) -> None:
"""Cleanup runtime resources."""
try:
await self.chat_bridge.disconnect()
except Exception as e:
logger.warning(f"Error disconnecting chat bridge: {e}")