@@ -97,20 +97,26 @@ def __init__(self):
9797 self ._approval_events : Dict [str , asyncio .Event ] = {}
9898 self ._clarification_events : Dict [str , asyncio .Event ] = {}
9999
100+ # Track which user each pending plan belongs to, and which plans are superseded
101+ self ._plan_to_user : Dict [str , str ] = {} # plan_id -> user_id
102+ self ._superseded_plans : set = set () # plan IDs cancelled by a new task
103+
100104 # Default timeout (seconds) for waiting operations
101105 self .default_timeout : float = 300.0
102106
103107 def get_current_orchestration (self , user_id : str ) -> Any :
104108 """Get existing orchestration workflow instance for user_id."""
105109 return self .orchestrations .get (user_id , None )
106110
107- def set_approval_pending (self , plan_id : str ) -> None :
111+ def set_approval_pending (self , plan_id : str , user_id : str = None ) -> None :
108112 """Mark approval pending and create/reset its event."""
109113 self .approvals [plan_id ] = None
110114 if plan_id not in self ._approval_events :
111115 self ._approval_events [plan_id ] = asyncio .Event ()
112116 else :
113117 self ._approval_events [plan_id ].clear ()
118+ if user_id :
119+ self ._plan_to_user [plan_id ] = user_id
114120
115121 def set_approval_result (self , plan_id : str , approved : bool ) -> None :
116122 """Set approval decision and trigger its event."""
@@ -214,6 +220,30 @@ def cleanup_approval(self, plan_id: str) -> None:
214220 """Remove approval tracking data and event."""
215221 self .approvals .pop (plan_id , None )
216222 self ._approval_events .pop (plan_id , None )
223+ self ._plan_to_user .pop (plan_id , None )
224+ self ._superseded_plans .discard (plan_id )
225+
226+ def cancel_pending_approvals_for_user (self , user_id : str ) -> None :
227+ """Cancel all pending approvals for a user (called when a new task starts).
228+
229+ Wakes up any blocking wait_for_approval calls so they return immediately.
230+ The plan is marked as superseded so the orchestration can terminate silently
231+ without sending error messages to the user's current WebSocket.
232+ """
233+ plans_to_cancel = [
234+ pid for pid , uid in self ._plan_to_user .items ()
235+ if uid == user_id and pid in self .approvals and self .approvals [pid ] is None
236+ ]
237+ for plan_id in plans_to_cancel :
238+ logger .info ("Superseding stale pending approval: %s (user: %s)" , plan_id , user_id )
239+ self ._superseded_plans .add (plan_id )
240+ self .approvals [plan_id ] = False
241+ if plan_id in self ._approval_events :
242+ self ._approval_events [plan_id ].set () # wake up the blocked wait
243+
244+ def is_plan_superseded (self , plan_id : str ) -> bool :
245+ """Check if a plan was superseded by a newer task from the same user."""
246+ return plan_id in self ._superseded_plans
217247
218248 def cleanup_clarification (self , request_id : str ) -> None :
219249 """Remove clarification tracking data and event."""
0 commit comments