Skip to content

Commit 6bdc01f

Browse files
mjnoviceclaude
andcommitted
feat: add wait_for_triggers option to poll instead of suspend
Add a new execution option `wait_for_triggers` that allows the runtime to poll triggers until completion instead of suspending and returning. This keeps the process running and automatically resumes execution when triggers complete. Changes: - Add `wait_for_triggers` and `trigger_poll_interval` options to UiPathExecuteOptions - Create shared TriggerPoller utility for reusable polling logic - Update UiPathResumableRuntime to loop and poll when wait_for_triggers=True - Refactor UiPathDebugRuntime to use the shared TriggerPoller Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent d20dee7 commit 6bdc01f

File tree

5 files changed

+327
-81
lines changed

5 files changed

+327
-81
lines changed

src/uipath/runtime/base.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ class UiPathExecuteOptions(BaseModel):
4141
default=None,
4242
description="List of nodes or '*' to break on all steps.",
4343
)
44+
wait_for_triggers: bool = Field(
45+
default=False,
46+
description="When True, poll triggers until completion instead of suspending. "
47+
"This keeps the process running and automatically resumes when triggers complete.",
48+
)
49+
trigger_poll_interval: float = Field(
50+
default=5.0,
51+
description="Seconds between poll attempts when wait_for_triggers is True.",
52+
ge=0.1,
53+
)
4454

4555
model_config = {"arbitrary_types_allowed": True, "extra": "allow"}
4656

src/uipath/runtime/debug/runtime.py

Lines changed: 45 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import logging
55
from typing import Any, AsyncGenerator, cast
66

7-
from uipath.core.errors import UiPathPendingTriggerError
8-
97
from uipath.runtime.base import (
108
UiPathExecuteOptions,
119
UiPathRuntimeProtocol,
@@ -25,7 +23,7 @@
2523
UiPathRuntimeResult,
2624
UiPathRuntimeStatus,
2725
)
28-
from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
26+
from uipath.runtime.resumable.polling import TriggerPoller
2927
from uipath.runtime.resumable.runtime import UiPathResumableRuntime
3028
from uipath.runtime.resumable.trigger import (
3129
UiPathResumeTrigger,
@@ -203,8 +201,7 @@ async def _stream_and_debug(
203201
)
204202
else:
205203
trigger_data = await self._poll_trigger(
206-
final_result.trigger,
207-
self.delegate.trigger_manager,
204+
final_result.trigger
208205
)
209206
resume_data = {interrupt_id: trigger_data}
210207
except UiPathDebugQuitError:
@@ -245,77 +242,65 @@ async def dispose(self) -> None:
245242
logger.warning(f"Error disconnecting debug bridge: {e}")
246243

247244
async def _poll_trigger(
248-
self, trigger: UiPathResumeTrigger, reader: UiPathResumeTriggerReaderProtocol
245+
self, trigger: UiPathResumeTrigger
249246
) -> dict[str, Any] | None:
250247
"""Poll a resume trigger until data is available.
251248
252249
Args:
253250
trigger: The trigger to poll
254-
reader: The trigger reader to use for polling
255251
256252
Returns:
257-
Resume data when available, or None if polling exhausted
253+
Resume data when available, or None if polling was stopped
258254
259255
Raises:
260256
UiPathDebugQuitError: If quit is requested during polling
261257
"""
262-
attempt = 0
263-
while True:
264-
attempt += 1
265-
266-
try:
267-
resume_data = await reader.read_trigger(trigger)
268-
269-
if resume_data is not None:
270-
return resume_data
271-
272-
await self.debug_bridge.emit_state_update(
273-
UiPathRuntimeStateEvent(
274-
node_name="<polling>",
275-
payload={
276-
"attempt": attempt,
277-
},
278-
)
258+
self._quit_requested = False
259+
260+
async def on_poll_attempt(attempt: int, info: str | None) -> None:
261+
"""Callback for each poll attempt."""
262+
payload: dict[str, Any] = {"attempt": attempt}
263+
if info:
264+
payload["info"] = info
265+
await self.debug_bridge.emit_state_update(
266+
UiPathRuntimeStateEvent(
267+
node_name="<polling>",
268+
payload=payload,
279269
)
270+
)
280271

281-
await self._wait_with_quit_check()
282-
283-
except UiPathDebugQuitError:
284-
raise
285-
286-
except UiPathPendingTriggerError as e:
287-
await self.debug_bridge.emit_state_update(
288-
UiPathRuntimeStateEvent(
289-
node_name="<polling>",
290-
payload={
291-
"attempt": attempt,
292-
"info": str(e),
293-
},
294-
)
272+
async def should_stop() -> bool:
273+
"""Check if quit was requested."""
274+
# Check for termination request with a short timeout
275+
try:
276+
term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate())
277+
done, _ = await asyncio.wait(
278+
{term_task},
279+
timeout=0.01, # Very short timeout just to check
295280
)
281+
if term_task in done:
282+
self._quit_requested = True
283+
return True
284+
else:
285+
term_task.cancel()
286+
try:
287+
await term_task
288+
except asyncio.CancelledError:
289+
pass
290+
except Exception:
291+
pass
292+
return False
296293

297-
await self._wait_with_quit_check()
298-
299-
async def _wait_with_quit_check(self) -> None:
300-
"""Wait for specified seconds, but allow quit command to interrupt.
301-
302-
Raises:
303-
UiPathDebugQuitError: If quit is requested during wait
304-
"""
305-
sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval))
306-
term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate())
307-
308-
done, pending = await asyncio.wait(
309-
{sleep_task, term_task},
310-
return_when=asyncio.FIRST_COMPLETED,
294+
poller = TriggerPoller(
295+
reader=self.delegate.trigger_manager,
296+
poll_interval=self.trigger_poll_interval,
297+
on_poll_attempt=on_poll_attempt,
298+
should_stop=should_stop,
311299
)
312300

313-
for task in pending:
314-
task.cancel()
315-
try:
316-
await task
317-
except asyncio.CancelledError:
318-
pass
301+
result = await poller.poll_trigger(trigger)
319302

320-
if term_task in done:
303+
if self._quit_requested:
321304
raise UiPathDebugQuitError("Debugging terminated during polling.")
305+
306+
return result

src/uipath/runtime/resumable/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Module for resumable runtime features."""
22

3+
from uipath.runtime.resumable.polling import TriggerPoller
34
from uipath.runtime.resumable.protocols import (
45
UiPathResumableStorageProtocol,
56
UiPathResumeTriggerCreatorProtocol,
@@ -20,4 +21,5 @@
2021
"UiPathResumeTrigger",
2122
"UiPathResumeTriggerType",
2223
"UiPathApiTrigger",
24+
"TriggerPoller",
2325
]
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Trigger polling utilities for resumable runtime."""
2+
3+
import asyncio
4+
import logging
5+
from typing import Any, Callable, Coroutine
6+
7+
from uipath.core.errors import UiPathPendingTriggerError
8+
9+
from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
10+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class TriggerPoller:
16+
"""Utility for polling resume triggers until completion.
17+
18+
This class provides reusable polling logic for waiting on triggers
19+
to complete, used by both debug runtime and resumable runtime when
20+
wait_for_triggers is enabled.
21+
"""
22+
23+
def __init__(
24+
self,
25+
reader: UiPathResumeTriggerReaderProtocol,
26+
poll_interval: float = 5.0,
27+
on_poll_attempt: Callable[[int, str | None], Coroutine[Any, Any, None]]
28+
| None = None,
29+
should_stop: Callable[[], Coroutine[Any, Any, bool]] | None = None,
30+
):
31+
"""Initialize the trigger poller.
32+
33+
Args:
34+
reader: The trigger reader to use for polling
35+
poll_interval: Seconds between poll attempts
36+
on_poll_attempt: Optional callback for each poll attempt (attempt_num, info)
37+
should_stop: Optional async callback to check if polling should stop early
38+
"""
39+
self.reader = reader
40+
self.poll_interval = poll_interval
41+
self.on_poll_attempt = on_poll_attempt
42+
self.should_stop = should_stop
43+
44+
async def poll_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
45+
"""Poll a single trigger until data is available.
46+
47+
Args:
48+
trigger: The trigger to poll
49+
50+
Returns:
51+
Resume data when available, or None if polling was stopped
52+
53+
Raises:
54+
Exception: If trigger reading fails with non-pending error
55+
"""
56+
attempt = 0
57+
while True:
58+
attempt += 1
59+
60+
# Check if we should stop
61+
if self.should_stop and await self.should_stop():
62+
logger.debug("Polling stopped by should_stop callback")
63+
return None
64+
65+
try:
66+
resume_data = await self.reader.read_trigger(trigger)
67+
68+
if resume_data is not None:
69+
logger.debug(
70+
f"Trigger {trigger.interrupt_id} completed after {attempt} attempts"
71+
)
72+
return resume_data
73+
74+
# Notify about poll attempt
75+
if self.on_poll_attempt:
76+
await self.on_poll_attempt(attempt, None)
77+
78+
await asyncio.sleep(self.poll_interval)
79+
80+
except UiPathPendingTriggerError as e:
81+
# Trigger still pending, notify and continue polling
82+
if self.on_poll_attempt:
83+
await self.on_poll_attempt(attempt, str(e))
84+
85+
await asyncio.sleep(self.poll_interval)
86+
87+
async def poll_all_triggers(
88+
self, triggers: list[UiPathResumeTrigger]
89+
) -> dict[str, Any]:
90+
"""Poll all triggers until they complete.
91+
92+
Args:
93+
triggers: List of triggers to poll
94+
95+
Returns:
96+
Dict mapping interrupt_id to resume data for completed triggers
97+
"""
98+
resume_map: dict[str, Any] = {}
99+
100+
# Poll triggers concurrently
101+
async def poll_single(trigger: UiPathResumeTrigger) -> tuple[str | None, Any]:
102+
data = await self.poll_trigger(trigger)
103+
return trigger.interrupt_id, data
104+
105+
results = await asyncio.gather(
106+
*[poll_single(trigger) for trigger in triggers],
107+
return_exceptions=True,
108+
)
109+
110+
for result in results:
111+
if isinstance(result, Exception):
112+
logger.error(f"Trigger polling failed: {result}")
113+
raise result
114+
interrupt_id, data = result
115+
if interrupt_id and data is not None:
116+
resume_map[interrupt_id] = data
117+
118+
return resume_map

0 commit comments

Comments
 (0)