1212)
1313from uipath .runtime .debug .breakpoint import UiPathBreakpointResult
1414from uipath .runtime .events import UiPathRuntimeEvent
15- from uipath .runtime .result import UiPathRuntimeResult , UiPathRuntimeStatus
15+ from uipath .runtime .result import (
16+ UiPathRuntimeResult ,
17+ UiPathRuntimeStatus ,
18+ )
1619from uipath .runtime .resumable .protocols import (
1720 UiPathResumableStorageProtocol ,
1821 UiPathResumeTriggerProtocol ,
1922)
23+ from uipath .runtime .resumable .trigger import UiPathResumeTrigger
2024from uipath .runtime .schema import UiPathRuntimeSchema
2125
2226logger = logging .getLogger (__name__ )
@@ -68,12 +72,28 @@ async def execute(
6872 """
6973 # If resuming, restore trigger from storage
7074 if options and options .resume :
75+ # restore trigger from storage
7176 input = await self ._restore_resume_input (input )
7277
73- # Execute the delegate
74- result = await self .delegate .execute (input , options = options )
75- # If suspended, create and persist trigger
76- return await self ._handle_suspension (result )
78+ while True :
79+ # Execute the delegate
80+ result = await self .delegate .execute (input , options = options )
81+ # If suspended, create and persist trigger
82+ suspension_result = await self ._handle_suspension (result )
83+
84+ # check if any trigger may be resumed
85+ if suspension_result .status != UiPathRuntimeStatus .SUSPENDED or not (
86+ fired_triggers := await self ._restore_resume_input (None )
87+ ):
88+ return suspension_result
89+
90+ # Note: when resuming a job, orchestrator deletes all triggers associated with it,
91+ # thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
92+ input = fired_triggers
93+ if not options :
94+ options = UiPathExecuteOptions (resume = True )
95+ else :
96+ options .resume = True
7797
7898 async def stream (
7999 self ,
@@ -94,15 +114,33 @@ async def stream(
94114 input = await self ._restore_resume_input (input )
95115
96116 final_result : UiPathRuntimeResult | None = None
97- async for event in self .delegate .stream (input , options = options ):
98- if isinstance (event , UiPathRuntimeResult ):
99- final_result = event
100- else :
101- yield event
117+ execution_completed = False
118+
119+ while not execution_completed :
120+ async for event in self .delegate .stream (input , options = options ):
121+ if isinstance (event , UiPathRuntimeResult ):
122+ final_result = event
123+ else :
124+ yield event
125+
126+ # If suspended, create and persist trigger
127+ if final_result :
128+ suspension_result = await self ._handle_suspension (final_result )
102129
103- # If suspended, create and persist trigger
104- if final_result :
105- yield await self ._handle_suspension (final_result )
130+ if suspension_result .status != UiPathRuntimeStatus .SUSPENDED or not (
131+ fired_triggers := await self ._restore_resume_input (None )
132+ ):
133+ yield suspension_result
134+ execution_completed = True
135+
136+ # Note: when resuming a job, orchestrator deletes all triggers associated with it,
137+ # thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
138+ input = fired_triggers
139+
140+ if not options :
141+ options = UiPathStreamOptions (resume = True )
142+ else :
143+ options .resume = True
106144
107145 async def _restore_resume_input (
108146 self , input : dict [str , Any ] | None
@@ -142,6 +180,11 @@ async def _restore_resume_input(
142180 if not triggers :
143181 return None
144182
183+ return await self ._build_resume_map (triggers )
184+
185+ async def _build_resume_map (
186+ self , triggers : list [UiPathResumeTrigger ]
187+ ) -> dict [str , Any ]:
145188 # Build resume map: {interrupt_id: resume_data}
146189 resume_map : dict [str , Any ] = {}
147190 for trigger in triggers :
@@ -166,11 +209,10 @@ async def _handle_suspension(
166209 Args:
167210 result: The execution result to check for suspension
168211 """
169- # Only handle suspensions
170- if result .status != UiPathRuntimeStatus .SUSPENDED :
171- return result
172-
173- if isinstance (result , UiPathBreakpointResult ):
212+ # Only handle interrupt suspensions
213+ if result .status != UiPathRuntimeStatus .SUSPENDED or isinstance (
214+ result , UiPathBreakpointResult
215+ ):
174216 return result
175217
176218 suspended_result = UiPathRuntimeResult (
0 commit comments