Skip to content

Commit e8dcd7b

Browse files
committed
fix: add polling for resumable runtimes
1 parent 23279d6 commit e8dcd7b

File tree

5 files changed

+168
-13
lines changed

5 files changed

+168
-13
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-runtime"
3-
version = "0.0.8"
3+
version = "0.0.9"
44
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/runtime/debug/runtime.py

Lines changed: 160 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Debug runtime implementation."""
22

3+
import asyncio
34
import logging
45
from typing import Any, Optional
56

@@ -21,6 +22,9 @@
2122
UiPathRuntimeResult,
2223
UiPathRuntimeStatus,
2324
)
25+
from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
26+
from uipath.runtime.resumable.runtime import UiPathResumableRuntime
27+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
2428
from uipath.runtime.schema import UiPathRuntimeSchema
2529

2630
logger = logging.getLogger(__name__)
@@ -33,11 +37,21 @@ def __init__(
3337
self,
3438
delegate: UiPathRuntimeProtocol,
3539
debug_bridge: UiPathDebugBridgeProtocol,
40+
trigger_poll_interval: float = 5.0,
3641
):
37-
"""Initialize the UiPathDebugRuntime."""
42+
"""Initialize the UiPathDebugRuntime.
43+
44+
Args:
45+
delegate: The underlying runtime to wrap
46+
debug_bridge: Bridge for debug event communication
47+
trigger_poll_interval: Seconds between poll attempts for resume triggers (default: 5.0, disabled: 0.0)
48+
"""
3849
super().__init__()
3950
self.delegate = delegate
4051
self.debug_bridge: UiPathDebugBridgeProtocol = debug_bridge
52+
if trigger_poll_interval < 0:
53+
raise ValueError("trigger_poll_interval must be >= 0")
54+
self.trigger_poll_interval = trigger_poll_interval
4155

4256
async def execute(
4357
self,
@@ -90,12 +104,17 @@ async def _stream_and_debug(
90104
breakpoints=options.breakpoints if options else None,
91105
)
92106

107+
current_input = input
108+
93109
# Keep streaming until execution completes (not just paused at breakpoint)
94110
while not execution_completed:
95111
# Update breakpoints from debug bridge
96112
debug_options.breakpoints = self.debug_bridge.get_breakpoints()
113+
97114
# Stream events from inner runtime
98-
async for event in self.delegate.stream(input, options=debug_options):
115+
async for event in self.delegate.stream(
116+
current_input, options=debug_options
117+
):
99118
# Handle final result
100119
if isinstance(event, UiPathRuntimeResult):
101120
final_result = event
@@ -117,9 +136,55 @@ async def _stream_and_debug(
117136
execution_completed = True
118137
else:
119138
# Normal completion or suspension with dynamic interrupt
120-
execution_completed = True
121-
# Handle dynamic interrupts if present
122-
# In the future, poll for resume trigger completion here, using the debug bridge
139+
140+
# Check if this is a suspended execution that needs polling
141+
if (
142+
isinstance(self.delegate, UiPathResumableRuntime)
143+
and self.trigger_poll_interval > 0
144+
and final_result.status == UiPathRuntimeStatus.SUSPENDED
145+
and final_result.trigger
146+
):
147+
await self.debug_bridge.emit_state_update(
148+
UiPathRuntimeStateEvent(
149+
node_name="<suspended>",
150+
payload={
151+
"status": "suspended",
152+
"trigger": final_result.trigger.model_dump(),
153+
},
154+
)
155+
)
156+
157+
resume_data: Optional[dict[str, Any]] = None
158+
try:
159+
resume_data = await self._poll_trigger(
160+
final_result.trigger
161+
)
162+
except UiPathDebugQuitError:
163+
final_result = UiPathRuntimeResult(
164+
status=UiPathRuntimeStatus.SUCCESSFUL,
165+
)
166+
execution_completed = True
167+
168+
if resume_data is not None:
169+
await self.debug_bridge.emit_state_update(
170+
UiPathRuntimeStateEvent(
171+
node_name="<resumed>",
172+
payload={
173+
"status": "resumed",
174+
"data": resume_data,
175+
},
176+
)
177+
)
178+
179+
# Continue with resumed execution
180+
current_input = resume_data
181+
debug_options.resume = True
182+
# Don't mark as completed - continue the loop
183+
else:
184+
execution_completed = True
185+
else:
186+
# Normal completion - mark as done
187+
execution_completed = True
123188

124189
# Handle state update events - send to debug bridge
125190
elif isinstance(event, UiPathRuntimeStateEvent):
@@ -137,3 +202,93 @@ async def dispose(self) -> None:
137202
await self.debug_bridge.disconnect()
138203
except Exception as e:
139204
logger.warning(f"Error disconnecting debug bridge: {e}")
205+
206+
async def _poll_trigger(
207+
self, trigger: UiPathResumeTrigger
208+
) -> Optional[dict[str, Any]]:
209+
"""Poll a resume trigger until data is available.
210+
211+
Args:
212+
trigger: The trigger to poll
213+
214+
Returns:
215+
Resume data when available, or None if polling exhausted
216+
217+
Raises:
218+
UiPathDebugQuitError: If quit is requested during polling
219+
"""
220+
reader: Optional[UiPathResumeTriggerReaderProtocol] = None
221+
if isinstance(self.delegate, UiPathResumableRuntime):
222+
reader = self.delegate.trigger_manager
223+
if not reader:
224+
return None
225+
226+
attempt = 0
227+
while True:
228+
attempt += 1
229+
230+
await self.debug_bridge.emit_state_update(
231+
UiPathRuntimeStateEvent(
232+
node_name="<polling>",
233+
payload={
234+
"status": "polling",
235+
"attempt": attempt,
236+
},
237+
)
238+
)
239+
240+
try:
241+
resume_data = await reader.read_trigger(trigger)
242+
243+
if resume_data is not None:
244+
return resume_data
245+
246+
await self._wait_with_quit_check()
247+
248+
except UiPathDebugQuitError:
249+
logger.info("Quit requested during polling")
250+
raise
251+
except Exception as e:
252+
logger.error(f"Error polling trigger: {e}", exc_info=True)
253+
await self.debug_bridge.emit_state_update(
254+
UiPathRuntimeStateEvent(
255+
node_name="<polling>",
256+
payload={
257+
"status": "poll_error",
258+
"attempt": attempt,
259+
"error": str(e),
260+
},
261+
)
262+
)
263+
264+
await self._wait_with_quit_check()
265+
266+
async def _wait_with_quit_check(self) -> None:
267+
"""Wait for specified seconds, but allow quit command to interrupt.
268+
269+
Raises:
270+
UiPathDebugQuitError: If quit is requested during wait
271+
"""
272+
sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval))
273+
resume_task = asyncio.create_task(self.debug_bridge.wait_for_resume())
274+
275+
done, pending = await asyncio.wait(
276+
{sleep_task, resume_task}, return_when=asyncio.FIRST_COMPLETED
277+
)
278+
279+
for task in pending:
280+
task.cancel()
281+
try:
282+
await task
283+
except asyncio.CancelledError:
284+
# Expected when cancelling pending tasks; safe to ignore.
285+
pass
286+
287+
# Check if quit was triggered
288+
if resume_task in done:
289+
try:
290+
await (
291+
resume_task
292+
) # This will raise UiPathDebugQuitError if it was a quit
293+
except UiPathDebugQuitError:
294+
raise

src/uipath/runtime/result.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class UiPathRuntimeResult(UiPathRuntimeEvent):
2323

2424
output: Optional[Union[dict[str, Any], BaseModel]] = None
2525
status: UiPathRuntimeStatus = UiPathRuntimeStatus.SUCCESSFUL
26-
resume: Optional[UiPathResumeTrigger] = None
26+
trigger: Optional[UiPathResumeTrigger] = None
2727
error: Optional[UiPathErrorContract] = None
2828

2929
event_type: UiPathRuntimeEventType = Field(
@@ -44,8 +44,8 @@ def to_dict(self) -> dict[str, Any]:
4444
"status": self.status,
4545
}
4646

47-
if self.resume:
48-
result["resume"] = self.resume.model_dump(by_alias=True)
47+
if self.trigger:
48+
result["resume"] = self.trigger.model_dump(by_alias=True)
4949

5050
if self.error:
5151
result["error"] = self.error.model_dump()

src/uipath/runtime/resumable/runtime.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ async def _handle_suspension(self, result: UiPathRuntimeResult) -> None:
139139
return
140140

141141
# Check if trigger already exists in result
142-
if result.resume:
143-
await self.storage.save_trigger(result.resume)
142+
if result.trigger:
143+
await self.storage.save_trigger(result.trigger)
144144
return
145145

146146
if result.output:
147147
trigger = await self.trigger_manager.create_trigger(result.output)
148148

149-
result.resume = trigger
149+
result.trigger = trigger
150150

151151
await self.storage.save_trigger(trigger)
152152

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)