Skip to content

Commit d70e375

Browse files
committed
fix: add polling for resumable runtimes
1 parent 23279d6 commit d70e375

File tree

5 files changed

+169
-13
lines changed

5 files changed

+169
-13
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-runtime"
3-
version = "0.0.8"
3+
version = "0.0.9"
44
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/runtime/debug/runtime.py

Lines changed: 161 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Debug runtime implementation."""
22

3+
import asyncio
34
import logging
45
from typing import Any, Optional
56

@@ -21,6 +22,9 @@
2122
UiPathRuntimeResult,
2223
UiPathRuntimeStatus,
2324
)
25+
from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
26+
from uipath.runtime.resumable.runtime import UiPathResumableRuntime
27+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
2428
from uipath.runtime.schema import UiPathRuntimeSchema
2529

2630
logger = logging.getLogger(__name__)
@@ -33,11 +37,19 @@ def __init__(
3337
self,
3438
delegate: UiPathRuntimeProtocol,
3539
debug_bridge: UiPathDebugBridgeProtocol,
40+
trigger_poll_interval: float = 5.0,
3641
):
37-
"""Initialize the UiPathDebugRuntime."""
42+
"""Initialize the UiPathDebugRuntime.
43+
44+
Args:
45+
delegate: The underlying runtime to wrap
46+
debug_bridge: Bridge for debug event communication
47+
trigger_poll_interval: Seconds between poll attempts for resume triggers (default: 5.0)
48+
"""
3849
super().__init__()
3950
self.delegate = delegate
4051
self.debug_bridge: UiPathDebugBridgeProtocol = debug_bridge
52+
self.trigger_poll_interval = trigger_poll_interval
4153

4254
async def execute(
4355
self,
@@ -90,12 +102,17 @@ async def _stream_and_debug(
90102
breakpoints=options.breakpoints if options else None,
91103
)
92104

105+
current_input = input
106+
93107
# Keep streaming until execution completes (not just paused at breakpoint)
94108
while not execution_completed:
95109
# Update breakpoints from debug bridge
96110
debug_options.breakpoints = self.debug_bridge.get_breakpoints()
111+
97112
# Stream events from inner runtime
98-
async for event in self.delegate.stream(input, options=debug_options):
113+
async for event in self.delegate.stream(
114+
current_input, options=debug_options
115+
):
99116
# Handle final result
100117
if isinstance(event, UiPathRuntimeResult):
101118
final_result = event
@@ -117,9 +134,59 @@ async def _stream_and_debug(
117134
execution_completed = True
118135
else:
119136
# Normal completion or suspension with dynamic interrupt
120-
execution_completed = True
121-
# Handle dynamic interrupts if present
122-
# In the future, poll for resume trigger completion here, using the debug bridge
137+
138+
# Check if this is a suspended execution that needs polling
139+
if (
140+
isinstance(self.delegate, UiPathResumableRuntime)
141+
and final_result.status == UiPathRuntimeStatus.SUSPENDED
142+
and final_result.trigger
143+
):
144+
logger.info(
145+
"Execution suspended. Polling for resume data..."
146+
)
147+
148+
# Emit suspension event to debug bridge
149+
await self.debug_bridge.emit_state_update(
150+
UiPathRuntimeStateEvent(
151+
node_name="<suspended>",
152+
payload={
153+
"status": "suspended",
154+
"trigger": final_result.trigger.model_dump(),
155+
},
156+
)
157+
)
158+
159+
resume_data: Optional[dict[str, Any]] = None
160+
try:
161+
resume_data = await self._poll_trigger(
162+
final_result.trigger
163+
)
164+
except UiPathDebugQuitError:
165+
final_result = UiPathRuntimeResult(
166+
status=UiPathRuntimeStatus.SUCCESSFUL,
167+
)
168+
execution_completed = True
169+
170+
if resume_data is not None:
171+
await self.debug_bridge.emit_state_update(
172+
UiPathRuntimeStateEvent(
173+
node_name="<resumed>",
174+
payload={
175+
"status": "resumed",
176+
"data": resume_data,
177+
},
178+
)
179+
)
180+
181+
# Continue with resumed execution
182+
current_input = resume_data
183+
debug_options.resume = True
184+
# Don't mark as completed - continue the loop
185+
else:
186+
execution_completed = True
187+
else:
188+
# Normal completion - mark as done
189+
execution_completed = True
123190

124191
# Handle state update events - send to debug bridge
125192
elif isinstance(event, UiPathRuntimeStateEvent):
@@ -137,3 +204,92 @@ async def dispose(self) -> None:
137204
await self.debug_bridge.disconnect()
138205
except Exception as e:
139206
logger.warning(f"Error disconnecting debug bridge: {e}")
207+
208+
async def _poll_trigger(
209+
self, trigger: UiPathResumeTrigger
210+
) -> Optional[dict[str, Any]]:
211+
"""Poll a resume trigger until data is available.
212+
213+
Args:
214+
trigger: The trigger to poll
215+
216+
Returns:
217+
Resume data when available, or None if polling exhausted
218+
219+
Raises:
220+
UiPathDebugQuitError: If quit is requested during polling
221+
"""
222+
reader: Optional[UiPathResumeTriggerReaderProtocol] = None
223+
if isinstance(self.delegate, UiPathResumableRuntime):
224+
reader = self.delegate.trigger_manager
225+
if not reader:
226+
return None
227+
228+
attempt = 0
229+
while True:
230+
attempt += 1
231+
232+
await self.debug_bridge.emit_state_update(
233+
UiPathRuntimeStateEvent(
234+
node_name="<polling>",
235+
payload={
236+
"status": "polling",
237+
"attempt": attempt,
238+
},
239+
)
240+
)
241+
242+
try:
243+
resume_data = await reader.read_trigger(trigger)
244+
245+
if resume_data is not None:
246+
return resume_data
247+
248+
await self._wait_with_quit_check()
249+
250+
except UiPathDebugQuitError:
251+
logger.info("Quit requested during polling")
252+
raise
253+
except Exception as e:
254+
logger.error(f"Error polling trigger: {e}", exc_info=True)
255+
await self.debug_bridge.emit_state_update(
256+
UiPathRuntimeStateEvent(
257+
node_name="<polling>",
258+
payload={
259+
"status": "poll_error",
260+
"attempt": attempt,
261+
"error": str(e),
262+
},
263+
)
264+
)
265+
266+
await self._wait_with_quit_check()
267+
268+
async def _wait_with_quit_check(self) -> None:
269+
"""Wait for specified seconds, but allow quit command to interrupt.
270+
271+
Raises:
272+
UiPathDebugQuitError: If quit is requested during wait
273+
"""
274+
sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval))
275+
resume_task = asyncio.create_task(self.debug_bridge.wait_for_resume())
276+
277+
done, pending = await asyncio.wait(
278+
{sleep_task, resume_task}, return_when=asyncio.FIRST_COMPLETED
279+
)
280+
281+
for task in pending:
282+
task.cancel()
283+
try:
284+
await task
285+
except asyncio.CancelledError:
286+
pass
287+
288+
# Check if quit was triggered
289+
if resume_task in done:
290+
try:
291+
await (
292+
resume_task
293+
) # This will raise UiPathDebugQuitError if it was a quit
294+
except UiPathDebugQuitError:
295+
raise

src/uipath/runtime/result.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class UiPathRuntimeResult(UiPathRuntimeEvent):
2323

2424
output: Optional[Union[dict[str, Any], BaseModel]] = None
2525
status: UiPathRuntimeStatus = UiPathRuntimeStatus.SUCCESSFUL
26-
resume: Optional[UiPathResumeTrigger] = None
26+
trigger: Optional[UiPathResumeTrigger] = None
2727
error: Optional[UiPathErrorContract] = None
2828

2929
event_type: UiPathRuntimeEventType = Field(
@@ -44,8 +44,8 @@ def to_dict(self) -> dict[str, Any]:
4444
"status": self.status,
4545
}
4646

47-
if self.resume:
48-
result["resume"] = self.resume.model_dump(by_alias=True)
47+
if self.trigger:
48+
result["resume"] = self.trigger.model_dump(by_alias=True)
4949

5050
if self.error:
5151
result["error"] = self.error.model_dump()

src/uipath/runtime/resumable/runtime.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ async def _handle_suspension(self, result: UiPathRuntimeResult) -> None:
139139
return
140140

141141
# Check if trigger already exists in result
142-
if result.resume:
143-
await self.storage.save_trigger(result.resume)
142+
if result.trigger:
143+
await self.storage.save_trigger(result.trigger)
144144
return
145145

146146
if result.output:
147147
trigger = await self.trigger_manager.create_trigger(result.output)
148148

149-
result.resume = trigger
149+
result.trigger = trigger
150150

151151
await self.storage.save_trigger(trigger)
152152

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)