-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathruntime.py
More file actions
164 lines (140 loc) · 6.21 KB
/
runtime.py
File metadata and controls
164 lines (140 loc) · 6.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""Chat runtime implementation."""
import logging
from typing import Any, AsyncGenerator, cast
from uipath.core.triggers import UiPathResumeTriggerType
from uipath.runtime.base import (
UiPathExecuteOptions,
UiPathRuntimeProtocol,
UiPathStreamOptions,
)
from uipath.runtime.chat.protocol import UiPathChatProtocol
from uipath.runtime.errors import UiPathBaseRuntimeError
from uipath.runtime.errors.contract import UiPathErrorCategory
from uipath.runtime.events import (
UiPathRuntimeEvent,
UiPathRuntimeMessageEvent,
)
from uipath.runtime.result import (
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.schema import UiPathRuntimeSchema
logger = logging.getLogger(__name__)
class UiPathChatRuntime:
"""Specialized runtime for chat mode that streams message events to a chat bridge."""
def __init__(
self,
delegate: UiPathRuntimeProtocol,
chat_bridge: UiPathChatProtocol,
):
"""Initialize the UiPathChatRuntime.
Args:
delegate: The underlying runtime to wrap
chat_bridge: Bridge for chat event communication
"""
super().__init__()
self.delegate = delegate
self.chat_bridge = chat_bridge
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
"""Execute the workflow with chat support."""
result: UiPathRuntimeResult | None = None
async for event in self.stream(input, cast(UiPathStreamOptions, options)):
if isinstance(event, UiPathRuntimeResult):
result = event
return (
result
if result
else UiPathRuntimeResult(status=UiPathRuntimeStatus.SUCCESSFUL)
)
async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream execution events with chat support."""
try:
await self.chat_bridge.connect()
execution_completed = False
current_input = input
current_options = UiPathStreamOptions(
resume=options.resume if options else False,
breakpoints=options.breakpoints if options else None,
)
while not execution_completed:
async for event in self.delegate.stream(
current_input, options=current_options
):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)
if isinstance(event, UiPathRuntimeResult):
runtime_result = event
if (
runtime_result.status == UiPathRuntimeStatus.SUSPENDED
and runtime_result.triggers
):
api_triggers = [
t
for t in runtime_result.triggers
if t.trigger_type == UiPathResumeTriggerType.API
]
if api_triggers:
resume_map: dict[str, Any] = {}
for trigger in api_triggers:
await self.chat_bridge.emit_interrupt_event(trigger)
resume_data = (
await self.chat_bridge.wait_for_resume()
)
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = resume_data
current_input = resume_map
current_options.resume = True
break
else:
# No API triggers - yield result and complete
yield event
execution_completed = True
elif runtime_result.status == UiPathRuntimeStatus.FAULTED:
error = runtime_result.error
faulted_error = UiPathBaseRuntimeError(
code=error.code if error else "UNKNOWN",
title=error.title if error else "Unknown Error",
detail=error.detail if error else "",
category=error.category
if error
else UiPathErrorCategory.UNKNOWN,
status=error.status if error else None,
)
await self._emit_error_event(faulted_error)
yield event
execution_completed = True
else:
yield event
execution_completed = True
await self.chat_bridge.emit_exchange_end_event()
else:
yield event
except Exception as e:
await self._emit_error_event(e)
raise
async def _emit_error_event(self, error: Exception) -> None:
"""Emit an exchange error event to the chat bridge."""
try:
await self.chat_bridge.emit_exchange_error_event(error)
except Exception:
logger.warning("Failed to emit exchange error event", exc_info=True)
async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema from the delegate runtime."""
return await self.delegate.get_schema()
async def dispose(self) -> None:
"""Cleanup runtime resources."""
try:
await self.chat_bridge.disconnect()
except Exception as e:
logger.warning(f"Error disconnecting chat bridge: {e}")