33import logging
44from typing import Any , AsyncGenerator
55
6+ from uipath .core .errors import UiPathPendingTriggerError
7+
68from uipath .runtime .base import (
79 UiPathExecuteOptions ,
810 UiPathRuntimeProtocol ,
@@ -111,21 +113,32 @@ async def _restore_resume_input(
111113 input: User-provided input (takes precedence)
112114
113115 Returns:
114- Input to use for resume, either provided or from storage
116+ Input to use for resume: {interrupt_id: resume_data, ...}
115117 """
116118 # If user provided explicit input, use it
117119 if input is not None :
118120 return input
119121
120- # Otherwise, fetch from storage
121- trigger = await self .storage .get_latest_trigger (self .runtime_id )
122- if not trigger :
122+ # Fetch all triggers from storage
123+ triggers = await self .storage .get_triggers (self .runtime_id )
124+ if not triggers :
123125 return None
124126
125- # Read trigger data via trigger_manager
126- resume_data = await self .trigger_manager .read_trigger (trigger )
127-
128- return resume_data
127+ # Build resume map: {interrupt_id: resume_data}
128+ resume_map : dict [str , Any ] = {}
129+ for trigger in triggers :
130+ try :
131+ data = await self .trigger_manager .read_trigger (trigger )
132+ assert trigger .interrupt_id is not None , (
133+ "Trigger interrupt_id cannot be None"
134+ )
135+ resume_map [trigger .interrupt_id ] = data
136+ await self .storage .delete_trigger (self .runtime_id , trigger )
137+ except UiPathPendingTriggerError :
138+ # Trigger still pending, skip it
139+ pass
140+
141+ return resume_map
129142
130143 async def _handle_suspension (
131144 self , result : UiPathRuntimeResult
@@ -142,22 +155,39 @@ async def _handle_suspension(
142155 if isinstance (result , UiPathBreakpointResult ):
143156 return result
144157
145- # Check if trigger already exists in result
146- if result .trigger :
147- await self .storage .save_trigger (self .runtime_id , result .trigger )
148- return result
149-
150158 suspended_result = UiPathRuntimeResult (
151159 status = UiPathRuntimeStatus .SUSPENDED ,
152160 output = result .output ,
153161 )
154162
155- if result .output :
156- suspended_result .trigger = await self .trigger_manager .create_trigger (
157- result .output
163+ assert result .output is None or isinstance (result .output , dict ), (
164+ "Suspended runtime output must be a dict of interrupt IDs to resume data"
165+ )
166+
167+ # Get existing triggers and current interrupts
168+ suspended_result .triggers = (
169+ await self .storage .get_triggers (self .runtime_id ) or []
170+ )
171+ current_interrupts = result .output or {}
172+
173+ # Diff: find new interrupts
174+ existing_ids = {t .interrupt_id for t in suspended_result .triggers }
175+ new_ids = current_interrupts .keys () - existing_ids
176+
177+ # Create triggers only for new interrupts
178+ for interrupt_id in new_ids :
179+ trigger = await self .trigger_manager .create_trigger (
180+ current_interrupts [interrupt_id ]
158181 )
182+ trigger .interrupt_id = interrupt_id
183+ suspended_result .triggers .append (trigger )
184+
185+ if suspended_result .triggers :
186+ await self .storage .save_triggers (self .runtime_id , suspended_result .triggers )
159187
160- await self .storage .save_trigger (self .runtime_id , suspended_result .trigger )
188+ # Backward compatibility: set single trigger directly
189+ if len (suspended_result .triggers ) == 1 :
190+ suspended_result .trigger = suspended_result .triggers [0 ]
161191
162192 return suspended_result
163193
0 commit comments