22
33import asyncio
44import logging
5- from typing import Any , Optional
5+ from typing import Any , AsyncGenerator , Optional
66
77from uipath .runtime .base import (
88 UiPathExecuteOptions ,
1616 UiPathDebugQuitError ,
1717)
1818from uipath .runtime .events import (
19+ UiPathRuntimeEvent ,
1920 UiPathRuntimeStateEvent ,
2021)
2122from uipath .runtime .result import (
@@ -59,39 +60,55 @@ async def execute(
5960 options : Optional [UiPathExecuteOptions ] = None ,
6061 ) -> UiPathRuntimeResult :
6162 """Execute the workflow with debug support."""
63+ final_result = None
64+ async for event in self .stream (input , options ):
65+ if isinstance (event , UiPathRuntimeResult ):
66+ final_result = event
67+
68+ return (
69+ final_result
70+ if final_result
71+ else UiPathRuntimeResult (status = UiPathRuntimeStatus .SUCCESSFUL )
72+ )
73+
74+ async def stream (
75+ self ,
76+ input : Optional [dict [str , Any ]] = None ,
77+ options : Optional [UiPathStreamOptions ] = None ,
78+ ) -> AsyncGenerator [UiPathRuntimeEvent , None ]:
79+ """Stream execution events with debug support."""
6280 try :
6381 await self .debug_bridge .connect ()
64-
6582 await self .debug_bridge .emit_execution_started ()
6683
6784 result : UiPathRuntimeResult
6885
6986 # Try to stream events from inner runtime
7087 try :
71- result = await self ._stream_and_debug (input , options = options )
88+ async for event in self ._stream_and_debug (input , options = options ):
89+ yield event
90+ if isinstance (event , UiPathRuntimeResult ):
91+ result = event
7292 except UiPathStreamNotSupportedError :
7393 # Fallback to regular execute if streaming not supported
7494 logger .debug (
7595 f"Runtime { self .delegate .__class__ .__name__ } does not support "
7696 "streaming, falling back to execute()"
7797 )
7898 result = await self .delegate .execute (input , options = options )
99+ yield result
79100
80101 await self .debug_bridge .emit_execution_completed (result )
81102
82- return result
83-
84103 except Exception as e :
85- await self .debug_bridge .emit_execution_error (
86- error = str (e ),
87- )
104+ await self .debug_bridge .emit_execution_error (error = str (e ))
88105 raise
89106
90107 async def _stream_and_debug (
91108 self ,
92109 input : Optional [dict [str , Any ]] = None ,
93110 options : Optional [UiPathExecuteOptions ] = None ,
94- ) -> UiPathRuntimeResult :
111+ ) -> AsyncGenerator [ UiPathRuntimeEvent , None ] :
95112 """Stream events from inner runtime and handle debug interactions."""
96113 final_result : UiPathRuntimeResult
97114 execution_completed = False
@@ -103,9 +120,8 @@ async def _stream_and_debug(
103120 logger .warning (
104121 "Initial resume wait timed out after 60s, assuming debug bridge disconnected"
105122 )
106- return UiPathRuntimeResult (
107- status = UiPathRuntimeStatus .SUCCESSFUL ,
108- )
123+ yield UiPathRuntimeResult (status = UiPathRuntimeStatus .SUCCESSFUL )
124+ return
109125
110126 debug_options = UiPathStreamOptions (
111127 resume = options .resume if options else False ,
@@ -123,6 +139,8 @@ async def _stream_and_debug(
123139 async for event in self .delegate .stream (
124140 current_input , options = debug_options
125141 ):
142+ yield event
143+
126144 # Handle final result
127145 if isinstance (event , UiPathRuntimeResult ):
128146 final_result = event
@@ -141,6 +159,7 @@ async def _stream_and_debug(
141159 final_result = UiPathRuntimeResult (
142160 status = UiPathRuntimeStatus .SUCCESSFUL ,
143161 )
162+ yield final_result
144163 execution_completed = True
145164 else :
146165 # Normal completion or suspension with dynamic interrupt
@@ -152,15 +171,14 @@ async def _stream_and_debug(
152171 and final_result .status == UiPathRuntimeStatus .SUSPENDED
153172 and final_result .trigger
154173 ):
155- await self .debug_bridge .emit_state_update (
156- UiPathRuntimeStateEvent (
157- node_name = "<suspended>" ,
158- payload = {
159- "status" : "suspended" ,
160- "trigger" : final_result .trigger .model_dump (),
161- },
162- )
174+ state_event = UiPathRuntimeStateEvent (
175+ node_name = "<suspended>" ,
176+ payload = {
177+ "status" : "suspended" ,
178+ "trigger" : final_result .trigger .model_dump (),
179+ },
163180 )
181+ await self .debug_bridge .emit_state_update (state_event )
164182
165183 resume_data : Optional [dict [str , Any ]] = None
166184 try :
@@ -171,18 +189,18 @@ async def _stream_and_debug(
171189 final_result = UiPathRuntimeResult (
172190 status = UiPathRuntimeStatus .SUCCESSFUL ,
173191 )
192+ yield final_result
174193 execution_completed = True
175194
176195 if resume_data is not None :
177- await self .debug_bridge .emit_state_update (
178- UiPathRuntimeStateEvent (
179- node_name = "<resumed>" ,
180- payload = {
181- "status" : "resumed" ,
182- "data" : resume_data ,
183- },
184- )
196+ resumed_event = UiPathRuntimeStateEvent (
197+ node_name = "<resumed>" ,
198+ payload = {
199+ "status" : "resumed" ,
200+ "data" : resume_data ,
201+ },
185202 )
203+ await self .debug_bridge .emit_state_update (resumed_event )
186204
187205 # Continue with resumed execution
188206 current_input = resume_data
@@ -198,8 +216,6 @@ async def _stream_and_debug(
198216 elif isinstance (event , UiPathRuntimeStateEvent ):
199217 await self .debug_bridge .emit_state_update (event )
200218
201- return final_result
202-
203219 async def get_schema (self ) -> UiPathRuntimeSchema :
204220 """Passthrough schema for the delegate."""
205221 return await self .delegate .get_schema ()
0 commit comments