Skip to content

Commit 6c2b820

Browse files
committed
fix: add polling for resumable runtimes
1 parent 23279d6 commit 6c2b820

File tree

5 files changed

+172
-14
lines changed

5 files changed

+172
-14
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-runtime"
3-
version = "0.0.8"
3+
version = "0.0.9"
44
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/runtime/debug/runtime.py

Lines changed: 164 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Debug runtime implementation."""
22

3+
import asyncio
34
import logging
45
from typing import Any, Optional
56

@@ -21,6 +22,9 @@
2122
UiPathRuntimeResult,
2223
UiPathRuntimeStatus,
2324
)
25+
from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
26+
from uipath.runtime.resumable.runtime import UiPathResumableRuntime
27+
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
2428
from uipath.runtime.schema import UiPathRuntimeSchema
2529

2630
logger = logging.getLogger(__name__)
@@ -33,11 +37,21 @@ def __init__(
3337
self,
3438
delegate: UiPathRuntimeProtocol,
3539
debug_bridge: UiPathDebugBridgeProtocol,
40+
trigger_poll_interval: float = 5.0,
3641
):
37-
"""Initialize the UiPathDebugRuntime."""
42+
"""Initialize the UiPathDebugRuntime.
43+
44+
Args:
45+
delegate: The underlying runtime to wrap
46+
debug_bridge: Bridge for debug event communication
47+
trigger_poll_interval: Seconds between poll attempts for resume triggers (default: 5.0, disabled: 0.0)
48+
"""
3849
super().__init__()
3950
self.delegate = delegate
4051
self.debug_bridge: UiPathDebugBridgeProtocol = debug_bridge
52+
if trigger_poll_interval < 0:
53+
raise ValueError("trigger_poll_interval must be >= 0")
54+
self.trigger_poll_interval = trigger_poll_interval
4155

4256
async def execute(
4357
self,
@@ -83,19 +97,32 @@ async def _stream_and_debug(
8397
execution_completed = False
8498

8599
# Starting in paused state - wait for breakpoints and resume
86-
await self.debug_bridge.wait_for_resume()
100+
try:
101+
await asyncio.wait_for(self.debug_bridge.wait_for_resume(), timeout=60.0)
102+
except asyncio.TimeoutError:
103+
logger.warning(
104+
"Initial resume wait timed out after 60s, assuming debug bridge disconnected"
105+
)
106+
return UiPathRuntimeResult(
107+
status=UiPathRuntimeStatus.SUCCESSFUL,
108+
)
87109

88110
debug_options = UiPathStreamOptions(
89111
resume=options.resume if options else False,
90112
breakpoints=options.breakpoints if options else None,
91113
)
92114

115+
current_input = input
116+
93117
# Keep streaming until execution completes (not just paused at breakpoint)
94118
while not execution_completed:
95119
# Update breakpoints from debug bridge
96120
debug_options.breakpoints = self.debug_bridge.get_breakpoints()
121+
97122
# Stream events from inner runtime
98-
async for event in self.delegate.stream(input, options=debug_options):
123+
async for event in self.delegate.stream(
124+
current_input, options=debug_options
125+
):
99126
# Handle final result
100127
if isinstance(event, UiPathRuntimeResult):
101128
final_result = event
@@ -117,9 +144,55 @@ async def _stream_and_debug(
117144
execution_completed = True
118145
else:
119146
# Normal completion or suspension with dynamic interrupt
120-
execution_completed = True
121-
# Handle dynamic interrupts if present
122-
# In the future, poll for resume trigger completion here, using the debug bridge
147+
148+
# Check if this is a suspended execution that needs polling
149+
if (
150+
isinstance(self.delegate, UiPathResumableRuntime)
151+
and self.trigger_poll_interval > 0
152+
and final_result.status == UiPathRuntimeStatus.SUSPENDED
153+
and final_result.trigger
154+
):
155+
await self.debug_bridge.emit_state_update(
156+
UiPathRuntimeStateEvent(
157+
node_name="<suspended>",
158+
payload={
159+
"status": "suspended",
160+
"trigger": final_result.trigger.model_dump(),
161+
},
162+
)
163+
)
164+
165+
resume_data: Optional[dict[str, Any]] = None
166+
try:
167+
resume_data = await self._poll_trigger(
168+
final_result.trigger, self.delegate.trigger_manager
169+
)
170+
except UiPathDebugQuitError:
171+
final_result = UiPathRuntimeResult(
172+
status=UiPathRuntimeStatus.SUCCESSFUL,
173+
)
174+
execution_completed = True
175+
176+
if resume_data is not None:
177+
await self.debug_bridge.emit_state_update(
178+
UiPathRuntimeStateEvent(
179+
node_name="<resumed>",
180+
payload={
181+
"status": "resumed",
182+
"data": resume_data,
183+
},
184+
)
185+
)
186+
187+
# Continue with resumed execution
188+
current_input = resume_data
189+
debug_options.resume = True
190+
# Don't mark as completed - continue the loop
191+
else:
192+
execution_completed = True
193+
else:
194+
# Normal completion - mark as done
195+
execution_completed = True
123196

124197
# Handle state update events - send to debug bridge
125198
elif isinstance(event, UiPathRuntimeStateEvent):
@@ -137,3 +210,88 @@ async def dispose(self) -> None:
137210
await self.debug_bridge.disconnect()
138211
except Exception as e:
139212
logger.warning(f"Error disconnecting debug bridge: {e}")
213+
214+
async def _poll_trigger(
215+
self, trigger: UiPathResumeTrigger, reader: UiPathResumeTriggerReaderProtocol
216+
) -> Optional[dict[str, Any]]:
217+
"""Poll a resume trigger until data is available.
218+
219+
Args:
220+
trigger: The trigger to poll
221+
reader: The trigger reader to use for polling
222+
223+
Returns:
224+
Resume data when available, or None if polling exhausted
225+
226+
Raises:
227+
UiPathDebugQuitError: If quit is requested during polling
228+
"""
229+
attempt = 0
230+
while True:
231+
attempt += 1
232+
233+
await self.debug_bridge.emit_state_update(
234+
UiPathRuntimeStateEvent(
235+
node_name="<polling>",
236+
payload={
237+
"status": "polling",
238+
"attempt": attempt,
239+
},
240+
)
241+
)
242+
243+
try:
244+
resume_data = await reader.read_trigger(trigger)
245+
246+
if resume_data is not None:
247+
return resume_data
248+
249+
await self._wait_with_quit_check()
250+
251+
except UiPathDebugQuitError:
252+
logger.info("Quit requested during polling")
253+
raise
254+
except Exception as e:
255+
logger.error(f"Error polling trigger: {e}", exc_info=True)
256+
await self.debug_bridge.emit_state_update(
257+
UiPathRuntimeStateEvent(
258+
node_name="<polling>",
259+
payload={
260+
"status": "poll_error",
261+
"attempt": attempt,
262+
"error": str(e),
263+
},
264+
)
265+
)
266+
267+
await self._wait_with_quit_check()
268+
269+
async def _wait_with_quit_check(self) -> None:
270+
"""Wait for specified seconds, but allow quit command to interrupt.
271+
272+
Raises:
273+
UiPathDebugQuitError: If quit is requested during wait
274+
"""
275+
sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval))
276+
resume_task = asyncio.create_task(self.debug_bridge.wait_for_resume())
277+
278+
done, pending = await asyncio.wait(
279+
{sleep_task, resume_task}, return_when=asyncio.FIRST_COMPLETED
280+
)
281+
282+
for task in pending:
283+
task.cancel()
284+
try:
285+
await task
286+
except asyncio.CancelledError:
287+
# Expected when cancelling pending tasks; safe to ignore.
288+
pass
289+
290+
# Check if quit was triggered
291+
if resume_task in done:
292+
try:
293+
await (
294+
resume_task
295+
) # This will raise UiPathDebugQuitError if it was a quit
296+
except UiPathDebugQuitError:
297+
raise

src/uipath/runtime/result.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class UiPathRuntimeResult(UiPathRuntimeEvent):
2323

2424
output: Optional[Union[dict[str, Any], BaseModel]] = None
2525
status: UiPathRuntimeStatus = UiPathRuntimeStatus.SUCCESSFUL
26-
resume: Optional[UiPathResumeTrigger] = None
26+
trigger: Optional[UiPathResumeTrigger] = None
2727
error: Optional[UiPathErrorContract] = None
2828

2929
event_type: UiPathRuntimeEventType = Field(
@@ -44,8 +44,8 @@ def to_dict(self) -> dict[str, Any]:
4444
"status": self.status,
4545
}
4646

47-
if self.resume:
48-
result["resume"] = self.resume.model_dump(by_alias=True)
47+
if self.trigger:
48+
result["resume"] = self.trigger.model_dump(by_alias=True)
4949

5050
if self.error:
5151
result["error"] = self.error.model_dump()

src/uipath/runtime/resumable/runtime.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ async def _handle_suspension(self, result: UiPathRuntimeResult) -> None:
139139
return
140140

141141
# Check if trigger already exists in result
142-
if result.resume:
143-
await self.storage.save_trigger(result.resume)
142+
if result.trigger:
143+
await self.storage.save_trigger(result.trigger)
144144
return
145145

146146
if result.output:
147147
trigger = await self.trigger_manager.create_trigger(result.output)
148148

149-
result.resume = trigger
149+
result.trigger = trigger
150150

151151
await self.storage.save_trigger(trigger)
152152

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)