-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathruntime.py
More file actions
178 lines (151 loc) · 6.39 KB
/
runtime.py
File metadata and controls
178 lines (151 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Chat runtime implementation."""
import logging
from datetime import datetime, timezone
from typing import Any, AsyncGenerator, cast
from uipath.core.chat import (
UiPathConversationMessageEvent,
UiPathConversationToolCallEvent,
UiPathConversationToolCallStartEvent,
)
from uipath.core.triggers import UiPathResumeTrigger, UiPathResumeTriggerType
from uipath.runtime.base import (
UiPathExecuteOptions,
UiPathRuntimeProtocol,
UiPathStreamOptions,
)
from uipath.runtime.chat.protocol import UiPathChatProtocol
from uipath.runtime.events import (
UiPathRuntimeEvent,
UiPathRuntimeMessageEvent,
)
from uipath.runtime.result import (
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.schema import UiPathRuntimeSchema
logger = logging.getLogger(__name__)
class UiPathChatRuntime:
"""Specialized runtime for chat mode that streams message events to a chat bridge."""
def __init__(
self,
delegate: UiPathRuntimeProtocol,
chat_bridge: UiPathChatProtocol,
):
"""Initialize the UiPathChatRuntime.
Args:
delegate: The underlying runtime to wrap
chat_bridge: Bridge for chat event communication
"""
super().__init__()
self.delegate = delegate
self.chat_bridge = chat_bridge
self._current_message_id: str | None = None
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
"""Execute the workflow with chat support."""
result: UiPathRuntimeResult | None = None
async for event in self.stream(input, cast(UiPathStreamOptions, options)):
if isinstance(event, UiPathRuntimeResult):
result = event
return (
result
if result
else UiPathRuntimeResult(status=UiPathRuntimeStatus.SUCCESSFUL)
)
async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream execution events with chat support."""
await self.chat_bridge.connect()
execution_completed = False
current_input = input
current_options = UiPathStreamOptions(
resume=options.resume if options else False,
breakpoints=options.breakpoints if options else None,
)
while not execution_completed:
async for event in self.delegate.stream(
current_input, options=current_options
):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
self._current_message_id = event.payload.message_id
await self.chat_bridge.emit_message_event(event.payload)
if isinstance(event, UiPathRuntimeResult):
runtime_result = event
if (
runtime_result.status == UiPathRuntimeStatus.SUSPENDED
and runtime_result.triggers
):
api_triggers = [
t
for t in runtime_result.triggers
if t.trigger_type == UiPathResumeTriggerType.API
]
if api_triggers:
resume_map: dict[str, Any] = {}
for trigger in api_triggers:
await self.chat_bridge.emit_interrupt_event(trigger)
resume_data = await self.chat_bridge.wait_for_resume()
await (
self._emit_start_tool_call_on_confirmation_approval(
trigger, resume_data
)
)
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = resume_data
current_input = resume_map
current_options.resume = True
break
else:
# No API triggers - yield result and complete
yield event
execution_completed = True
else:
yield event
execution_completed = True
await self.chat_bridge.emit_exchange_end_event()
else:
yield event
async def _emit_start_tool_call_on_confirmation_approval(
self, trigger: UiPathResumeTrigger, resume_data: dict[str, Any]
) -> None:
"""Emit a startToolCall event when a HITL tool call confirmation is approved."""
if self._current_message_id is None or trigger.api_resume is None:
return
request = trigger.api_resume.request or {}
value = resume_data.get("value") or {}
tool_input = value.get("input")
tool_call_start_event = UiPathConversationMessageEvent(
message_id=self._current_message_id,
tool_call=UiPathConversationToolCallEvent(
tool_call_id=request.get("toolCallId", ""),
start=UiPathConversationToolCallStartEvent(
tool_name=request.get("toolName", ""),
timestamp=datetime.now(timezone.utc)
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z"),
input=tool_input,
),
),
)
try:
await self.chat_bridge.emit_message_event(tool_call_start_event)
except Exception as e:
logger.warning(f"Error emitting startToolCall on approval: {e}")
async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema from the delegate runtime."""
return await self.delegate.get_schema()
async def dispose(self) -> None:
"""Cleanup runtime resources."""
try:
await self.chat_bridge.disconnect()
except Exception as e:
logger.warning(f"Error disconnecting chat bridge: {e}")