Skip to content

Commit c360384

Browse files
committed
feat: resume runtime on fired triggers
1 parent 700c188 commit c360384

File tree

4 files changed

+352
-106
lines changed

4 files changed

+352
-106
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-runtime"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/runtime/resumable/runtime.py

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212
)
1313
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
1414
from uipath.runtime.events import UiPathRuntimeEvent
15-
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
15+
from uipath.runtime.result import (
16+
UiPathRuntimeResult,
17+
UiPathRuntimeStatus,
18+
)
1619
from uipath.runtime.resumable.protocols import (
1720
UiPathResumableStorageProtocol,
1821
UiPathResumeTriggerProtocol,
1922
)
23+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
2024
from uipath.runtime.schema import UiPathRuntimeSchema
2125

2226
logger = logging.getLogger(__name__)
@@ -51,6 +55,7 @@ def __init__(
5155
self.storage = storage
5256
self.trigger_manager = trigger_manager
5357
self.runtime_id = runtime_id
58+
self._fired_triggers_map: dict[str, Any] = {}
5459

5560
async def execute(
5661
self,
@@ -66,14 +71,29 @@ async def execute(
6671
Returns:
6772
Execution result, potentially with resume trigger attached
6873
"""
69-
# If resuming, restore trigger from storage
74+
# check if we are resuming
7075
if options and options.resume:
71-
input = await self._restore_resume_input(input)
76+
if self._fired_triggers_map:
77+
input = self._fired_triggers_map
78+
self._fired_triggers_map = {}
79+
else:
80+
# restore trigger from storage
81+
input = await self._restore_resume_input(input)
7282

7383
# Execute the delegate
7484
result = await self.delegate.execute(input, options=options)
7585
# If suspended, create and persist trigger
76-
return await self._handle_suspension(result)
86+
suspension_result = await self._handle_suspension(result)
87+
if not self._fired_triggers_map:
88+
return suspension_result
89+
90+
# some triggers are already fired, runtime can be resumed
91+
resume_options = options or UiPathExecuteOptions(resume=True)
92+
if not resume_options.resume:
93+
resume_options = UiPathExecuteOptions(resume=True)
94+
return await self.execute(
95+
options=resume_options,
96+
)
7797

7898
async def stream(
7999
self,
@@ -89,9 +109,14 @@ async def stream(
89109
Yields:
90110
Runtime events during execution, final event is UiPathRuntimeResult
91111
"""
92-
# If resuming, restore trigger from storage
112+
# check if we are resuming
93113
if options and options.resume:
94-
input = await self._restore_resume_input(input)
114+
if self._fired_triggers_map:
115+
input = self._fired_triggers_map
116+
self._fired_triggers_map = {}
117+
else:
118+
# restore trigger from storage
119+
input = await self._restore_resume_input(input)
95120

96121
final_result: UiPathRuntimeResult | None = None
97122
async for event in self.delegate.stream(input, options=options):
@@ -102,7 +127,20 @@ async def stream(
102127

103128
# If suspended, create and persist trigger
104129
if final_result:
105-
yield await self._handle_suspension(final_result)
130+
suspension_result = await self._handle_suspension(final_result)
131+
132+
if not self._fired_triggers_map:
133+
yield suspension_result
134+
return
135+
136+
# some triggers are already fired, runtime can be resumed
137+
resume_options = options or UiPathStreamOptions(resume=True)
138+
if not resume_options.resume:
139+
resume_options = UiPathStreamOptions(resume=True)
140+
async for event in self.stream(
141+
options=resume_options,
142+
):
143+
yield event
106144

107145
async def _restore_resume_input(
108146
self, input: dict[str, Any] | None
@@ -142,6 +180,11 @@ async def _restore_resume_input(
142180
if not triggers:
143181
return None
144182

183+
return await self._build_resume_map(triggers)
184+
185+
async def _build_resume_map(
186+
self, triggers: list[UiPathResumeTrigger]
187+
) -> dict[str, Any]:
145188
# Build resume map: {interrupt_id: resume_data}
146189
resume_map: dict[str, Any] = {}
147190
for trigger in triggers:
@@ -166,11 +209,10 @@ async def _handle_suspension(
166209
Args:
167210
result: The execution result to check for suspension
168211
"""
169-
# Only handle suspensions
170-
if result.status != UiPathRuntimeStatus.SUSPENDED:
171-
return result
172-
173-
if isinstance(result, UiPathBreakpointResult):
212+
# Only handle interrupt suspensions
213+
if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance(
214+
result, UiPathBreakpointResult
215+
):
174216
return result
175217

176218
suspended_result = UiPathRuntimeResult(
@@ -205,6 +247,13 @@ async def _handle_suspension(
205247
# Backward compatibility: set single trigger directly
206248
suspended_result.trigger = suspended_result.triggers[0]
207249

250+
# check if any trigger can be resumed
251+
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
252+
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
253+
triggers = await self.storage.get_triggers(self.runtime_id)
254+
if triggers:
255+
self._fired_triggers_map = await self._build_resume_map(triggers)
256+
208257
return suspended_result
209258

210259
async def get_schema(self) -> UiPathRuntimeSchema:

0 commit comments

Comments
 (0)