-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathruntime.py
More file actions
287 lines (238 loc) · 10.4 KB
/
runtime.py
File metadata and controls
287 lines (238 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""Resumable runtime protocol and implementation."""
import logging
from typing import Any, AsyncGenerator
from uipath.core.errors import UiPathPendingTriggerError
from uipath.runtime.base import (
UiPathExecuteOptions,
UiPathRuntimeProtocol,
UiPathStreamOptions,
)
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.result import (
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.resumable import UiPathResumeTriggerType
from uipath.runtime.resumable.protocols import (
UiPathResumableStorageProtocol,
UiPathResumeTriggerProtocol,
)
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
from uipath.runtime.schema import UiPathRuntimeSchema
logger = logging.getLogger(__name__)
class UiPathResumableRuntime:
"""Generic runtime wrapper that adds resume trigger management to any runtime.
This class wraps any UiPathRuntimeProtocol implementation and handles:
- Detecting suspensions in execution results
- Creating and persisting resume triggers via handler
- Restoring resume triggers from storage on resume
- Passing through all other runtime operations unchanged
"""
def __init__(
self,
delegate: UiPathRuntimeProtocol,
storage: UiPathResumableStorageProtocol,
trigger_manager: UiPathResumeTriggerProtocol,
runtime_id: str,
):
"""Initialize the resumable runtime wrapper.
Args:
delegate: The underlying runtime to wrap
storage: Storage for persisting/retrieving resume triggers
trigger_manager: Manager for creating and reading resume triggers
runtime_id: Id used for runtime orchestration
"""
self.delegate = delegate
self.storage = storage
self.trigger_manager = trigger_manager
self.runtime_id = runtime_id
async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
) -> UiPathRuntimeResult:
"""Execute with resume trigger handling.
Args:
input: Input data for execution
options: Execution options including resume flag
Returns:
Execution result, potentially with resume trigger attached
"""
# If resuming, restore trigger from storage
if options and options.resume:
# restore trigger from storage
input = await self._restore_resume_input(input)
while True:
# Execute the delegate
result = await self.delegate.execute(input, options=options)
# If suspended, create and persist trigger
suspension_result = await self._handle_suspension(result)
# check if any trigger may be resumed
if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not (
fired_triggers := await self._get_fired_triggers()
):
return suspension_result
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
input = fired_triggers
if not options:
options = UiPathExecuteOptions(resume=True)
else:
options.resume = True
async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream with resume trigger handling.
Args:
input: Input data for execution
options: Stream options including resume flag
Yields:
Runtime events during execution, final event is UiPathRuntimeResult
"""
# If resuming, restore trigger from storage
if options and options.resume:
input = await self._restore_resume_input(input)
final_result: UiPathRuntimeResult | None = None
execution_completed = False
fired_triggers = None
while not execution_completed:
async for event in self.delegate.stream(input, options=options):
if isinstance(event, UiPathRuntimeResult):
final_result = event
else:
yield event
# If suspended, create and persist trigger
if final_result:
suspension_result = await self._handle_suspension(final_result)
# check if any trigger may be resumed
if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not (
fired_triggers := await self._get_fired_triggers()
):
yield suspension_result
execution_completed = True
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
input = fired_triggers
if not options:
options = UiPathStreamOptions(resume=True)
else:
options.resume = True
async def _get_fired_triggers(self) -> dict[str, Any] | None:
"""Check stored triggers for any that have already fired (excluding API triggers).
API triggers cannot be completed before suspending the job, so they are skipped.
Returns:
A resume map of {interrupt_id: resume_data} for fired triggers, or None.
"""
triggers = await self.storage.get_triggers(self.runtime_id)
if not triggers:
return None
non_api_triggers = [
t for t in triggers if t.trigger_type != UiPathResumeTriggerType.API
]
return await self._build_resume_map(non_api_triggers)
async def _restore_resume_input(
self,
input: dict[str, Any] | None,
) -> dict[str, Any] | None:
"""Restore resume input from storage if not provided.
Args:
input: User-provided input (takes precedence)
Returns:
Input to use for resume: {interrupt_id: resume_data, ...}
"""
# Fetch all triggers from storage
triggers = await self.storage.get_triggers(self.runtime_id)
# If user provided explicit input, use it
if input is not None:
if triggers:
if len(triggers) == 1:
# Single trigger - just delete it
await self.storage.delete_trigger(self.runtime_id, triggers[0])
else:
# Multiple triggers - match by interrupt_id
found = False
for trigger in triggers:
if trigger.interrupt_id in input:
await self.storage.delete_trigger(self.runtime_id, trigger)
found = True
if not found:
logger.warning(
f"Multiple triggers detected but none match the provided input. "
f"Please specify which trigger to resume by {{interrupt_id: value}}. "
f"Available interrupt_ids: {[t.interrupt_id for t in triggers]}."
)
return input
if not triggers:
return None
return await self._build_resume_map(triggers)
async def _build_resume_map(
self,
triggers: list[UiPathResumeTrigger],
) -> dict[str, Any]:
"""Build resume map from triggers: {interrupt_id: resume_data}.
Args:
triggers: List of triggers to read and map
Returns:
A dict mapping interrupt_id to the trigger's resume data.
"""
resume_map: dict[str, Any] = {}
for trigger in triggers:
try:
data = await self.trigger_manager.read_trigger(trigger)
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = data
await self.storage.delete_trigger(self.runtime_id, trigger)
except UiPathPendingTriggerError:
# Trigger still pending, skip it
pass
return resume_map
async def _handle_suspension(
self, result: UiPathRuntimeResult
) -> UiPathRuntimeResult:
"""Create and persist resume trigger if execution was suspended.
Args:
result: The execution result to check for suspension
"""
# Only handle interrupt suspensions
if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance(
result, UiPathBreakpointResult
):
return result
suspended_result = UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUSPENDED,
output=result.output,
)
assert result.output is None or isinstance(result.output, dict), (
"Suspended runtime output must be a dict of interrupt IDs to resume data"
)
# Get existing triggers and current interrupts
suspended_result.triggers = (
await self.storage.get_triggers(self.runtime_id) or []
)
current_interrupts = result.output or {}
# Diff: find new interrupts
existing_ids = [t.interrupt_id for t in suspended_result.triggers]
new_ids = [key for key in current_interrupts.keys() if key not in existing_ids]
# Create triggers only for new interrupts
for interrupt_id in new_ids:
trigger = await self.trigger_manager.create_trigger(
current_interrupts[interrupt_id]
)
trigger.interrupt_id = interrupt_id
suspended_result.triggers.append(trigger)
if suspended_result.triggers:
await self.storage.save_triggers(self.runtime_id, suspended_result.triggers)
# Backward compatibility: set single trigger directly
suspended_result.trigger = suspended_result.triggers[0]
return suspended_result
async def get_schema(self) -> UiPathRuntimeSchema:
"""Passthrough schema from delegate runtime."""
return await self.delegate.get_schema()
async def dispose(self) -> None:
"""Cleanup resources for both wrapper and delegate."""
await self.delegate.dispose()