22
33import asyncio
44import logging
5- from typing import Any , Optional
5+ from typing import Any , AsyncGenerator , Optional , cast
66
77from uipath .runtime .base import (
88 UiPathExecuteOptions ,
1616 UiPathDebugQuitError ,
1717)
1818from uipath .runtime .events import (
19+ UiPathRuntimeEvent ,
1920 UiPathRuntimeStateEvent ,
2021)
2122from uipath .runtime .result import (
@@ -59,39 +60,56 @@ async def execute(
5960 options : Optional [UiPathExecuteOptions ] = None ,
6061 ) -> UiPathRuntimeResult :
6162 """Execute the workflow with debug support."""
63+ final_result = None
64+ async for event in self .stream (input , cast (UiPathStreamOptions , options )):
65+ if isinstance (event , UiPathRuntimeResult ):
66+ final_result = event
67+
68+ return (
69+ final_result
70+ if final_result
71+ else UiPathRuntimeResult (status = UiPathRuntimeStatus .SUCCESSFUL )
72+ )
73+
74+ async def stream (
75+ self ,
76+ input : Optional [dict [str , Any ]] = None ,
77+ options : Optional [UiPathStreamOptions ] = None ,
78+ ) -> AsyncGenerator [UiPathRuntimeEvent , None ]:
79+ """Stream execution events with debug support."""
6280 try :
6381 await self .debug_bridge .connect ()
64-
6582 await self .debug_bridge .emit_execution_started ()
6683
67- result : UiPathRuntimeResult
84+ result : Optional [ UiPathRuntimeResult ] = None
6885
6986 # Try to stream events from inner runtime
7087 try :
71- result = await self ._stream_and_debug (input , options = options )
88+ async for event in self ._stream_and_debug (input , options = options ):
89+ yield event
90+ if isinstance (event , UiPathRuntimeResult ):
91+ result = event
7292 except UiPathStreamNotSupportedError :
7393 # Fallback to regular execute if streaming not supported
7494 logger .debug (
7595 f"Runtime { self .delegate .__class__ .__name__ } does not support "
7696 "streaming, falling back to execute()"
7797 )
7898 result = await self .delegate .execute (input , options = options )
99+ yield result
79100
80- await self .debug_bridge .emit_execution_completed (result )
81-
82- return result
101+ if result :
102+ await self .debug_bridge .emit_execution_completed (result )
83103
84104 except Exception as e :
85- await self .debug_bridge .emit_execution_error (
86- error = str (e ),
87- )
105+ await self .debug_bridge .emit_execution_error (error = str (e ))
88106 raise
89107
90108 async def _stream_and_debug (
91109 self ,
92110 input : Optional [dict [str , Any ]] = None ,
93111 options : Optional [UiPathExecuteOptions ] = None ,
94- ) -> UiPathRuntimeResult :
112+ ) -> AsyncGenerator [ UiPathRuntimeEvent , None ] :
95113 """Stream events from inner runtime and handle debug interactions."""
96114 final_result : UiPathRuntimeResult
97115 execution_completed = False
@@ -103,9 +121,8 @@ async def _stream_and_debug(
103121 logger .warning (
104122 "Initial resume wait timed out after 60s, assuming debug bridge disconnected"
105123 )
106- return UiPathRuntimeResult (
107- status = UiPathRuntimeStatus .SUCCESSFUL ,
108- )
124+ yield UiPathRuntimeResult (status = UiPathRuntimeStatus .SUCCESSFUL )
125+ return
109126
110127 debug_options = UiPathStreamOptions (
111128 resume = options .resume if options else False ,
@@ -123,6 +140,8 @@ async def _stream_and_debug(
123140 async for event in self .delegate .stream (
124141 current_input , options = debug_options
125142 ):
143+ yield event
144+
126145 # Handle final result
127146 if isinstance (event , UiPathRuntimeResult ):
128147 final_result = event
@@ -141,6 +160,7 @@ async def _stream_and_debug(
141160 final_result = UiPathRuntimeResult (
142161 status = UiPathRuntimeStatus .SUCCESSFUL ,
143162 )
163+ yield final_result
144164 execution_completed = True
145165 else :
146166 # Normal completion or suspension with dynamic interrupt
@@ -152,15 +172,14 @@ async def _stream_and_debug(
152172 and final_result .status == UiPathRuntimeStatus .SUSPENDED
153173 and final_result .trigger
154174 ):
155- await self .debug_bridge .emit_state_update (
156- UiPathRuntimeStateEvent (
157- node_name = "<suspended>" ,
158- payload = {
159- "status" : "suspended" ,
160- "trigger" : final_result .trigger .model_dump (),
161- },
162- )
175+ state_event = UiPathRuntimeStateEvent (
176+ node_name = "<suspended>" ,
177+ payload = {
178+ "status" : "suspended" ,
179+ "trigger" : final_result .trigger .model_dump (),
180+ },
163181 )
182+ await self .debug_bridge .emit_state_update (state_event )
164183
165184 resume_data : Optional [dict [str , Any ]] = None
166185 try :
@@ -171,18 +190,18 @@ async def _stream_and_debug(
171190 final_result = UiPathRuntimeResult (
172191 status = UiPathRuntimeStatus .SUCCESSFUL ,
173192 )
193+ yield final_result
174194 execution_completed = True
175195
176196 if resume_data is not None :
177- await self .debug_bridge .emit_state_update (
178- UiPathRuntimeStateEvent (
179- node_name = "<resumed>" ,
180- payload = {
181- "status" : "resumed" ,
182- "data" : resume_data ,
183- },
184- )
197+ resumed_event = UiPathRuntimeStateEvent (
198+ node_name = "<resumed>" ,
199+ payload = {
200+ "status" : "resumed" ,
201+ "data" : resume_data ,
202+ },
185203 )
204+ await self .debug_bridge .emit_state_update (resumed_event )
186205
187206 # Continue with resumed execution
188207 current_input = resume_data
@@ -198,8 +217,6 @@ async def _stream_and_debug(
198217 elif isinstance (event , UiPathRuntimeStateEvent ):
199218 await self .debug_bridge .emit_state_update (event )
200219
201- return final_result
202-
203220 async def get_schema (self ) -> UiPathRuntimeSchema :
204221 """Passthrough schema for the delegate."""
205222 return await self .delegate .get_schema ()
0 commit comments