Skip to content

Commit 558776c

Browse files
committed
fix: add polling for resumable runtimes
1 parent 23279d6 commit 558776c

File tree

5 files changed

+165
-13
lines changed

5 files changed

+165
-13
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-runtime"
3-
version = "0.0.8"
3+
version = "0.0.9"
44
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/runtime/debug/runtime.py

Lines changed: 157 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Debug runtime implementation."""
22

3+
import asyncio
34
import logging
45
from typing import Any, Optional
56

@@ -21,6 +22,9 @@
2122
UiPathRuntimeResult,
2223
UiPathRuntimeStatus,
2324
)
25+
from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
26+
from uipath.runtime.resumable.runtime import UiPathResumableRuntime
27+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
2428
from uipath.runtime.schema import UiPathRuntimeSchema
2529

2630
logger = logging.getLogger(__name__)
@@ -33,11 +37,19 @@ def __init__(
3337
self,
3438
delegate: UiPathRuntimeProtocol,
3539
debug_bridge: UiPathDebugBridgeProtocol,
40+
trigger_poll_interval: float = 5.0,
3641
):
37-
"""Initialize the UiPathDebugRuntime."""
42+
"""Initialize the UiPathDebugRuntime.
43+
44+
Args:
45+
delegate: The underlying runtime to wrap
46+
debug_bridge: Bridge for debug event communication
47+
trigger_poll_interval: Seconds between poll attempts for resume triggers (default: 5.0)
48+
"""
3849
super().__init__()
3950
self.delegate = delegate
4051
self.debug_bridge: UiPathDebugBridgeProtocol = debug_bridge
52+
self.trigger_poll_interval = trigger_poll_interval
4153

4254
async def execute(
4355
self,
@@ -90,12 +102,17 @@ async def _stream_and_debug(
90102
breakpoints=options.breakpoints if options else None,
91103
)
92104

105+
current_input = input
106+
93107
# Keep streaming until execution completes (not just paused at breakpoint)
94108
while not execution_completed:
95109
# Update breakpoints from debug bridge
96110
debug_options.breakpoints = self.debug_bridge.get_breakpoints()
111+
97112
# Stream events from inner runtime
98-
async for event in self.delegate.stream(input, options=debug_options):
113+
async for event in self.delegate.stream(
114+
current_input, options=debug_options
115+
):
99116
# Handle final result
100117
if isinstance(event, UiPathRuntimeResult):
101118
final_result = event
@@ -117,9 +134,55 @@ async def _stream_and_debug(
117134
execution_completed = True
118135
else:
119136
# Normal completion or suspension with dynamic interrupt
120-
execution_completed = True
121-
# Handle dynamic interrupts if present
122-
# In the future, poll for resume trigger completion here, using the debug bridge
137+
138+
# Check if this is a suspended execution that needs polling
139+
if (
140+
isinstance(self.delegate, UiPathResumableRuntime)
141+
and self.trigger_poll_interval > 0
142+
and final_result.status == UiPathRuntimeStatus.SUSPENDED
143+
and final_result.trigger
144+
):
145+
await self.debug_bridge.emit_state_update(
146+
UiPathRuntimeStateEvent(
147+
node_name="<suspended>",
148+
payload={
149+
"status": "suspended",
150+
"trigger": final_result.trigger.model_dump(),
151+
},
152+
)
153+
)
154+
155+
resume_data: Optional[dict[str, Any]] = None
156+
try:
157+
resume_data = await self._poll_trigger(
158+
final_result.trigger
159+
)
160+
except UiPathDebugQuitError:
161+
final_result = UiPathRuntimeResult(
162+
status=UiPathRuntimeStatus.SUCCESSFUL,
163+
)
164+
execution_completed = True
165+
166+
if resume_data is not None:
167+
await self.debug_bridge.emit_state_update(
168+
UiPathRuntimeStateEvent(
169+
node_name="<resumed>",
170+
payload={
171+
"status": "resumed",
172+
"data": resume_data,
173+
},
174+
)
175+
)
176+
177+
# Continue with resumed execution
178+
current_input = resume_data
179+
debug_options.resume = True
180+
# Don't mark as completed - continue the loop
181+
else:
182+
execution_completed = True
183+
else:
184+
# Normal completion - mark as done
185+
execution_completed = True
123186

124187
# Handle state update events - send to debug bridge
125188
elif isinstance(event, UiPathRuntimeStateEvent):
@@ -137,3 +200,92 @@ async def dispose(self) -> None:
137200
await self.debug_bridge.disconnect()
138201
except Exception as e:
139202
logger.warning(f"Error disconnecting debug bridge: {e}")
203+
204+
async def _poll_trigger(
205+
self, trigger: UiPathResumeTrigger
206+
) -> Optional[dict[str, Any]]:
207+
"""Poll a resume trigger until data is available.
208+
209+
Args:
210+
trigger: The trigger to poll
211+
212+
Returns:
213+
Resume data when available, or None if polling exhausted
214+
215+
Raises:
216+
UiPathDebugQuitError: If quit is requested during polling
217+
"""
218+
reader: Optional[UiPathResumeTriggerReaderProtocol] = None
219+
if isinstance(self.delegate, UiPathResumableRuntime):
220+
reader = self.delegate.trigger_manager
221+
if not reader:
222+
return None
223+
224+
attempt = 0
225+
while True:
226+
attempt += 1
227+
228+
await self.debug_bridge.emit_state_update(
229+
UiPathRuntimeStateEvent(
230+
node_name="<polling>",
231+
payload={
232+
"status": "polling",
233+
"attempt": attempt,
234+
},
235+
)
236+
)
237+
238+
try:
239+
resume_data = await reader.read_trigger(trigger)
240+
241+
if resume_data is not None:
242+
return resume_data
243+
244+
await self._wait_with_quit_check()
245+
246+
except UiPathDebugQuitError:
247+
logger.info("Quit requested during polling")
248+
raise
249+
except Exception as e:
250+
logger.error(f"Error polling trigger: {e}", exc_info=True)
251+
await self.debug_bridge.emit_state_update(
252+
UiPathRuntimeStateEvent(
253+
node_name="<polling>",
254+
payload={
255+
"status": "poll_error",
256+
"attempt": attempt,
257+
"error": str(e),
258+
},
259+
)
260+
)
261+
262+
await self._wait_with_quit_check()
263+
264+
async def _wait_with_quit_check(self) -> None:
265+
"""Wait for specified seconds, but allow quit command to interrupt.
266+
267+
Raises:
268+
UiPathDebugQuitError: If quit is requested during wait
269+
"""
270+
sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval))
271+
resume_task = asyncio.create_task(self.debug_bridge.wait_for_resume())
272+
273+
done, pending = await asyncio.wait(
274+
{sleep_task, resume_task}, return_when=asyncio.FIRST_COMPLETED
275+
)
276+
277+
for task in pending:
278+
task.cancel()
279+
try:
280+
await task
281+
except asyncio.CancelledError:
282+
pass
283+
284+
# Check if quit was triggered
285+
if resume_task in done:
286+
try:
287+
await (
288+
resume_task
289+
) # This will raise UiPathDebugQuitError if it was a quit
290+
except UiPathDebugQuitError:
291+
raise

src/uipath/runtime/result.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class UiPathRuntimeResult(UiPathRuntimeEvent):
2323

2424
output: Optional[Union[dict[str, Any], BaseModel]] = None
2525
status: UiPathRuntimeStatus = UiPathRuntimeStatus.SUCCESSFUL
26-
resume: Optional[UiPathResumeTrigger] = None
26+
trigger: Optional[UiPathResumeTrigger] = None
2727
error: Optional[UiPathErrorContract] = None
2828

2929
event_type: UiPathRuntimeEventType = Field(
@@ -44,8 +44,8 @@ def to_dict(self) -> dict[str, Any]:
4444
"status": self.status,
4545
}
4646

47-
if self.resume:
48-
result["resume"] = self.resume.model_dump(by_alias=True)
47+
if self.trigger:
48+
result["resume"] = self.trigger.model_dump(by_alias=True)
4949

5050
if self.error:
5151
result["error"] = self.error.model_dump()

src/uipath/runtime/resumable/runtime.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ async def _handle_suspension(self, result: UiPathRuntimeResult) -> None:
139139
return
140140

141141
# Check if trigger already exists in result
142-
if result.resume:
143-
await self.storage.save_trigger(result.resume)
142+
if result.trigger:
143+
await self.storage.save_trigger(result.trigger)
144144
return
145145

146146
if result.output:
147147
trigger = await self.trigger_manager.create_trigger(result.output)
148148

149-
result.resume = trigger
149+
result.trigger = trigger
150150

151151
await self.storage.save_trigger(trigger)
152152

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)