Skip to content

Commit 55baa43

Browse files
committed
feat: resume runtime on fired triggers
1 parent f539244 commit 55baa43

File tree

4 files changed

+350
-112
lines changed

4 files changed

+350
-112
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-runtime"
3-
version = "0.8.2"
3+
version = "0.8.3"
44
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/runtime/resumable/runtime.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212
)
1313
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
1414
from uipath.runtime.events import UiPathRuntimeEvent
15-
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
15+
from uipath.runtime.result import (
16+
UiPathRuntimeResult,
17+
UiPathRuntimeStatus,
18+
)
1619
from uipath.runtime.resumable.protocols import (
1720
UiPathResumableStorageProtocol,
1821
UiPathResumeTriggerProtocol,
1922
)
23+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
2024
from uipath.runtime.schema import UiPathRuntimeSchema
2125

2226
logger = logging.getLogger(__name__)
@@ -68,12 +72,28 @@ async def execute(
6872
"""
6973
# If resuming, restore trigger from storage
7074
if options and options.resume:
75+
# restore trigger from storage
7176
input = await self._restore_resume_input(input)
7277

73-
# Execute the delegate
74-
result = await self.delegate.execute(input, options=options)
75-
# If suspended, create and persist trigger
76-
return await self._handle_suspension(result)
78+
while True:
79+
# Execute the delegate
80+
result = await self.delegate.execute(input, options=options)
81+
# If suspended, create and persist trigger
82+
suspension_result = await self._handle_suspension(result)
83+
84+
# check if any trigger may be resumed
85+
if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not (
86+
fired_triggers := await self._restore_resume_input(None)
87+
):
88+
return suspension_result
89+
90+
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
91+
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
92+
input = fired_triggers
93+
if not options:
94+
options = UiPathExecuteOptions(resume=True)
95+
else:
96+
options.resume = True
7797

7898
async def stream(
7999
self,
@@ -94,15 +114,33 @@ async def stream(
94114
input = await self._restore_resume_input(input)
95115

96116
final_result: UiPathRuntimeResult | None = None
97-
async for event in self.delegate.stream(input, options=options):
98-
if isinstance(event, UiPathRuntimeResult):
99-
final_result = event
100-
else:
101-
yield event
117+
execution_completed = False
118+
119+
while not execution_completed:
120+
async for event in self.delegate.stream(input, options=options):
121+
if isinstance(event, UiPathRuntimeResult):
122+
final_result = event
123+
else:
124+
yield event
125+
126+
# If suspended, create and persist trigger
127+
if final_result:
128+
suspension_result = await self._handle_suspension(final_result)
102129

103-
# If suspended, create and persist trigger
104-
if final_result:
105-
yield await self._handle_suspension(final_result)
130+
if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not (
131+
fired_triggers := await self._restore_resume_input(None)
132+
):
133+
yield suspension_result
134+
execution_completed = True
135+
136+
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
137+
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
138+
input = fired_triggers
139+
140+
if not options:
141+
options = UiPathStreamOptions(resume=True)
142+
else:
143+
options.resume = True
106144

107145
async def _restore_resume_input(
108146
self, input: dict[str, Any] | None
@@ -142,6 +180,11 @@ async def _restore_resume_input(
142180
if not triggers:
143181
return None
144182

183+
return await self._build_resume_map(triggers)
184+
185+
async def _build_resume_map(
186+
self, triggers: list[UiPathResumeTrigger]
187+
) -> dict[str, Any]:
145188
# Build resume map: {interrupt_id: resume_data}
146189
resume_map: dict[str, Any] = {}
147190
for trigger in triggers:
@@ -166,11 +209,10 @@ async def _handle_suspension(
166209
Args:
167210
result: The execution result to check for suspension
168211
"""
169-
# Only handle suspensions
170-
if result.status != UiPathRuntimeStatus.SUSPENDED:
171-
return result
172-
173-
if isinstance(result, UiPathBreakpointResult):
212+
# Only handle interrupt suspensions
213+
if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance(
214+
result, UiPathBreakpointResult
215+
):
174216
return result
175217

176218
suspended_result = UiPathRuntimeResult(

0 commit comments

Comments
 (0)