|
4 | 4 | import logging |
5 | 5 | from typing import Any, AsyncGenerator, cast |
6 | 6 |
|
7 | | -from uipath.core.errors import UiPathPendingTriggerError |
8 | | - |
9 | 7 | from uipath.runtime.base import ( |
10 | 8 | UiPathExecuteOptions, |
11 | 9 | UiPathRuntimeProtocol, |
|
25 | 23 | UiPathRuntimeResult, |
26 | 24 | UiPathRuntimeStatus, |
27 | 25 | ) |
28 | | -from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol |
| 26 | +from uipath.runtime.resumable.polling import TriggerPoller |
29 | 27 | from uipath.runtime.resumable.runtime import UiPathResumableRuntime |
30 | 28 | from uipath.runtime.resumable.trigger import ( |
31 | 29 | UiPathResumeTrigger, |
@@ -203,8 +201,7 @@ async def _stream_and_debug( |
203 | 201 | ) |
204 | 202 | else: |
205 | 203 | trigger_data = await self._poll_trigger( |
206 | | - final_result.trigger, |
207 | | - self.delegate.trigger_manager, |
| 204 | + final_result.trigger |
208 | 205 | ) |
209 | 206 | resume_data = {interrupt_id: trigger_data} |
210 | 207 | except UiPathDebugQuitError: |
@@ -245,77 +242,65 @@ async def dispose(self) -> None: |
245 | 242 | logger.warning(f"Error disconnecting debug bridge: {e}") |
246 | 243 |
|
247 | 244 | async def _poll_trigger( |
248 | | - self, trigger: UiPathResumeTrigger, reader: UiPathResumeTriggerReaderProtocol |
| 245 | + self, trigger: UiPathResumeTrigger |
249 | 246 | ) -> dict[str, Any] | None: |
250 | 247 | """Poll a resume trigger until data is available. |
251 | 248 |
|
252 | 249 | Args: |
253 | 250 | trigger: The trigger to poll |
254 | | - reader: The trigger reader to use for polling |
255 | 251 |
|
256 | 252 | Returns: |
257 | | - Resume data when available, or None if polling exhausted |
| 253 | + Resume data when available, or None if polling was stopped |
258 | 254 |
|
259 | 255 | Raises: |
260 | 256 | UiPathDebugQuitError: If quit is requested during polling |
261 | 257 | """ |
262 | | - attempt = 0 |
263 | | - while True: |
264 | | - attempt += 1 |
265 | | - |
266 | | - try: |
267 | | - resume_data = await reader.read_trigger(trigger) |
268 | | - |
269 | | - if resume_data is not None: |
270 | | - return resume_data |
271 | | - |
272 | | - await self.debug_bridge.emit_state_update( |
273 | | - UiPathRuntimeStateEvent( |
274 | | - node_name="<polling>", |
275 | | - payload={ |
276 | | - "attempt": attempt, |
277 | | - }, |
278 | | - ) |
| 258 | + self._quit_requested = False |
| 259 | + |
| 260 | + async def on_poll_attempt(attempt: int, info: str | None) -> None: |
| 261 | + """Callback for each poll attempt.""" |
| 262 | + payload: dict[str, Any] = {"attempt": attempt} |
| 263 | + if info: |
| 264 | + payload["info"] = info |
| 265 | + await self.debug_bridge.emit_state_update( |
| 266 | + UiPathRuntimeStateEvent( |
| 267 | + node_name="<polling>", |
| 268 | + payload=payload, |
279 | 269 | ) |
| 270 | + ) |
280 | 271 |
|
281 | | - await self._wait_with_quit_check() |
282 | | - |
283 | | - except UiPathDebugQuitError: |
284 | | - raise |
285 | | - |
286 | | - except UiPathPendingTriggerError as e: |
287 | | - await self.debug_bridge.emit_state_update( |
288 | | - UiPathRuntimeStateEvent( |
289 | | - node_name="<polling>", |
290 | | - payload={ |
291 | | - "attempt": attempt, |
292 | | - "info": str(e), |
293 | | - }, |
294 | | - ) |
| 272 | + async def should_stop() -> bool: |
| 273 | + """Check if quit was requested.""" |
| 274 | + # Check for termination request with a short timeout |
| 275 | + try: |
| 276 | + term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate()) |
| 277 | + done, _ = await asyncio.wait( |
| 278 | + {term_task}, |
| 279 | + timeout=0.01, # Very short timeout just to check |
295 | 280 | ) |
| 281 | + if term_task in done: |
| 282 | + self._quit_requested = True |
| 283 | + return True |
| 284 | + else: |
| 285 | + term_task.cancel() |
| 286 | + try: |
| 287 | + await term_task |
| 288 | + except asyncio.CancelledError: |
| 289 | + pass |
| 290 | + except Exception: |
| 291 | + pass |
| 292 | + return False |
296 | 293 |
|
297 | | - await self._wait_with_quit_check() |
298 | | - |
299 | | - async def _wait_with_quit_check(self) -> None: |
300 | | - """Wait for specified seconds, but allow quit command to interrupt. |
301 | | -
|
302 | | - Raises: |
303 | | - UiPathDebugQuitError: If quit is requested during wait |
304 | | - """ |
305 | | - sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval)) |
306 | | - term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate()) |
307 | | - |
308 | | - done, pending = await asyncio.wait( |
309 | | - {sleep_task, term_task}, |
310 | | - return_when=asyncio.FIRST_COMPLETED, |
| 294 | + poller = TriggerPoller( |
| 295 | + reader=self.delegate.trigger_manager, |
| 296 | + poll_interval=self.trigger_poll_interval, |
| 297 | + on_poll_attempt=on_poll_attempt, |
| 298 | + should_stop=should_stop, |
311 | 299 | ) |
312 | 300 |
|
313 | | - for task in pending: |
314 | | - task.cancel() |
315 | | - try: |
316 | | - await task |
317 | | - except asyncio.CancelledError: |
318 | | - pass |
| 301 | + result = await poller.poll_trigger(trigger) |
319 | 302 |
|
320 | | - if term_task in done: |
| 303 | + if self._quit_requested: |
321 | 304 | raise UiPathDebugQuitError("Debugging terminated during polling.") |
| 305 | + |
| 306 | + return result |
0 commit comments