Skip to content

Commit 22b0402

Browse files
committed
fix: add support for multiple resume triggers
1 parent 5bc94c3 commit 22b0402

File tree

3 files changed

+57
-21
lines changed

3 files changed

+57
-21
lines changed

src/uipath/runtime/resumable/protocols.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
class UiPathResumableStorageProtocol(Protocol):
99
"""Protocol for storing and retrieving resume triggers."""
1010

11-
async def save_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger) -> None:
12-
"""Save a resume trigger to storage.
11+
async def save_triggers(
12+
self, runtime_id: str, trigger: list[UiPathResumeTrigger]
13+
) -> None:
14+
"""Save resume triggers to storage.
1315
1416
Args:
1517
trigger: The resume trigger to persist
@@ -19,17 +21,31 @@ async def save_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger) -> N
1921
"""
2022
...
2123

22-
async def get_latest_trigger(self, runtime_id: str) -> UiPathResumeTrigger | None:
23-
"""Retrieve the most recent resume trigger from storage.
24+
async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None:
25+
"""Retrieve the resume triggers from storage.
2426
2527
Returns:
26-
The latest resume trigger, or None if no triggers exist
28+
The resume triggers, or None if no triggers exist
2729
2830
Raises:
2931
Exception: If retrieval operation fails
3032
"""
3133
...
3234

35+
async def delete_trigger(
36+
self, runtime_id: str, trigger: UiPathResumeTrigger
37+
) -> None:
38+
"""Delete resume trigger from storage.
39+
40+
Args:
41+
runtime_id: The runtime ID
42+
trigger: The resume trigger to delete
43+
44+
Raises:
45+
Exception: If deletion operation fails
46+
"""
47+
...
48+
3349
async def set_value(
3450
self, runtime_id: str, namespace: str, key: str, value: Any
3551
) -> None:

src/uipath/runtime/resumable/runtime.py

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import logging
44
from typing import Any, AsyncGenerator
55

6+
from uipath.core.errors import UiPathPendingTriggerError
7+
68
from uipath.runtime.base import (
79
UiPathExecuteOptions,
810
UiPathRuntimeProtocol,
@@ -111,21 +113,29 @@ async def _restore_resume_input(
111113
input: User-provided input (takes precedence)
112114
113115
Returns:
114-
Input to use for resume, either provided or from storage
116+
Input to use for resume: {interrupt_id: resume_data, ...}
115117
"""
116118
# If user provided explicit input, use it
117119
if input is not None:
118120
return input
119121

120-
# Otherwise, fetch from storage
121-
trigger = await self.storage.get_latest_trigger(self.runtime_id)
122-
if not trigger:
122+
# Fetch all triggers from storage
123+
triggers = await self.storage.get_triggers(self.runtime_id)
124+
if not triggers:
123125
return None
124126

125-
# Read trigger data via trigger_manager
126-
resume_data = await self.trigger_manager.read_trigger(trigger)
127+
# Build resume map: {interrupt_id: resume_data}
128+
resume_map: dict[str, Any] = {}
129+
for trigger in triggers:
130+
try:
131+
data = await self.trigger_manager.read_trigger(trigger)
132+
resume_map[trigger.interrupt_id] = data
133+
await self.storage.delete_trigger(self.runtime_id, trigger)
134+
except UiPathPendingTriggerError:
135+
# Trigger still pending, skip it
136+
pass
127137

128-
return resume_data
138+
return resume_map
129139

130140
async def _handle_suspension(
131141
self, result: UiPathRuntimeResult
@@ -142,22 +152,31 @@ async def _handle_suspension(
142152
if isinstance(result, UiPathBreakpointResult):
143153
return result
144154

145-
# Check if trigger already exists in result
146-
if result.trigger:
147-
await self.storage.save_trigger(self.runtime_id, result.trigger)
148-
return result
149-
150155
suspended_result = UiPathRuntimeResult(
151156
status=UiPathRuntimeStatus.SUSPENDED,
152157
output=result.output,
153158
)
154159

155-
if result.output:
156-
suspended_result.trigger = await self.trigger_manager.create_trigger(
157-
result.output
160+
# Get existing triggers and current interrupts
161+
suspended_result.triggers = (
162+
await self.storage.get_triggers(self.runtime_id) or []
163+
)
164+
current_interrupts = result.output or {}
165+
166+
# Diff: find new interrupts
167+
existing_ids = {t.interrupt_id for t in suspended_result.triggers}
168+
new_ids = current_interrupts.keys() - existing_ids
169+
170+
# Create triggers only for new interrupts
171+
for interrupt_id in new_ids:
172+
trigger = await self.trigger_manager.create_trigger(
173+
current_interrupts[interrupt_id]
158174
)
175+
trigger.interrupt_id = interrupt_id
176+
suspended_result.triggers.append(trigger)
159177

160-
await self.storage.save_trigger(self.runtime_id, suspended_result.trigger)
178+
if suspended_result.triggers:
179+
await self.storage.save_triggers(self.runtime_id, suspended_result.triggers)
161180

162181
return suspended_result
163182

src/uipath/runtime/resumable/trigger.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class UiPathApiTrigger(BaseModel):
4949
class UiPathResumeTrigger(BaseModel):
5050
"""Information needed to resume execution."""
5151

52+
interrupt_id: str = Field(alias="interruptId")
5253
trigger_type: UiPathResumeTriggerType = Field(
5354
default=UiPathResumeTriggerType.API, alias="triggerType"
5455
)

0 commit comments

Comments
 (0)