11"""Breakpoint management for the Agent Framework runtime.
22
3+ Provides:
4+ - AgentInterruptException: raised by middleware to suspend agent execution
5+ - BreakpointMiddleware: intercepts tools matching breakpoint configuration
6+
37Implements breakpoints by wrapping executor.execute() methods so that
48execution pauses BEFORE the executor runs. This works regardless of
59the inner agent type (RawAgent, Agent, etc.) because interception
1418
1519from __future__ import annotations
1620
21+ from collections .abc import Awaitable , Callable
1722from typing import Any
1823from uuid import uuid4
1924
2025from agent_framework import AgentExecutor , WorkflowAgent
26+ from agent_framework ._middleware import (
27+ FunctionInvocationContext ,
28+ FunctionMiddleware ,
29+ )
2130from uipath .runtime .debug import UiPathBreakpointResult
2231
23- from .interrupt import AgentInterruptException
2432from .schema import get_agent_tools , get_tool_name
2533
34+
35+ class AgentInterruptException (Exception ):
36+ """Raised by middleware to suspend agent execution for HITL.
37+
38+ Carries an interrupt_id and suspend_value that the runtime uses
39+ to create a UiPathRuntimeResult with SUSPENDED status.
40+ When is_breakpoint is True, the runtime returns UiPathBreakpointResult
41+ instead, which bypasses trigger management and is handled by the
42+ debug runtime layer.
43+ """
44+
45+ def __init__ (
46+ self ,
47+ interrupt_id : str ,
48+ suspend_value : Any ,
49+ * ,
50+ is_breakpoint : bool = False ,
51+ ) -> None :
52+ self .interrupt_id = interrupt_id
53+ self .suspend_value = suspend_value
54+ self .is_breakpoint = is_breakpoint
55+ super ().__init__ (f"Agent interrupted: { interrupt_id } " )
56+
57+
58+ class BreakpointMiddleware (FunctionMiddleware ):
59+ """Intercepts tools matching breakpoint configuration.
60+
61+ Breakpoint flow (orchestrated by UiPathDebugRuntime):
62+
63+ 1. UiPathDebugRuntime gets breakpoints from debug bridge and passes
64+ them via ``options.breakpoints`` to the integration runtime.
65+ 2. The integration runtime injects this middleware into the agent's
66+ middleware chain with the breakpoint list.
67+ 3. When the agent calls a matching tool, this middleware raises
68+ ``AgentInterruptException(is_breakpoint=True)`` BEFORE the tool runs.
69+ 4. The runtime catches the exception and returns
70+ ``UiPathBreakpointResult`` (a SUSPENDED result subclass).
71+ 5. ``UiPathResumableRuntime`` passes the breakpoint result through
72+ (no trigger management — breakpoints bypass the trigger system).
73+ 6. ``UiPathDebugRuntime`` sees ``UiPathBreakpointResult``, notifies
74+ the debug bridge, and waits for a resume command.
75+ 7. On resume, ``UiPathDebugRuntime`` re-invokes the runtime with
76+ ``options.resume=True, input=None``. The runtime re-injects this
77+ middleware with ``skip_tool`` set to the previously-interrupted
78+ tool name so the first matching call is let through (one-shot).
79+ 8. After the skipped call completes, subsequent breakpoint-matching
80+ tool calls will pause again.
81+ """
82+
83+ def __init__ (
84+ self ,
85+ breakpoints : list [str ] | str ,
86+ skip_tool : str | None = None ,
87+ ) -> None :
88+ self .breakpoints = breakpoints
89+ self ._skip_tool = skip_tool
90+
91+ def _matches (self , tool_name : str ) -> bool :
92+ if self .breakpoints == "*" :
93+ return True
94+ if isinstance (self .breakpoints , list ):
95+ return tool_name in self .breakpoints
96+ return False
97+
98+ async def process (
99+ self ,
100+ context : FunctionInvocationContext ,
101+ call_next : Callable [[], Awaitable [None ]],
102+ ) -> None :
103+ tool = context .function
104+ tool_name = getattr (tool , "name" , "" )
105+
106+ if not self ._matches (tool_name ):
107+ await call_next ()
108+ return
109+
110+ # One-shot skip for the tool we just resumed from
111+ if self ._skip_tool and tool_name == self ._skip_tool :
112+ self ._skip_tool = None
113+ await call_next ()
114+ return
115+
116+ # Legacy metadata-based resume (kept for backward compatibility)
117+ if context .metadata .get ("_breakpoint_continue" ):
118+ await call_next ()
119+ return
120+
121+ interrupt_id = str (uuid4 ())
122+
123+ input_value = None
124+ if context .arguments is not None :
125+ try :
126+ input_value = context .arguments .model_dump ()
127+ except Exception :
128+ input_value = str (context .arguments )
129+
130+ suspend_value = {
131+ "type" : "breakpoint" ,
132+ "tool_name" : tool_name ,
133+ "input_value" : input_value ,
134+ }
135+
136+ raise AgentInterruptException (
137+ interrupt_id = interrupt_id ,
138+ suspend_value = suspend_value ,
139+ is_breakpoint = True ,
140+ )
141+
142+
26143_ORIGINAL_EXECUTE_ATTR = "_bp_original_execute"
27144
28145
@@ -81,22 +198,35 @@ def _resolve_to_executor_ids(
81198def inject_breakpoint_middleware (
82199 agent : WorkflowAgent ,
83200 breakpoints : list [str ] | str ,
84- skip_nodes : set [str ] | None = None ,
201+ skip_nodes : dict [str , int ] | None = None ,
85202) -> None :
86203 """Wrap executor.execute() to pause before breakpointed executors run.
87204
88205 Replaces each matching executor's execute() with a wrapper that raises
89206 AgentInterruptException(is_breakpoint=True) before the executor runs.
90207
208+ For executors in *skip_nodes*, the wrapper allows *N* pass-through calls
209+ (running the original execute) before re-arming the breakpoint. The
210+ count *N* equals the number of times that executor has previously been
211+ breakpointed and resumed — this correctly handles both:
212+
213+ * **GroupChat star topology** where the orchestrator is called multiple
214+ times per workflow run (initial + once per participant response).
215+ * **Cyclic graphs** where each executor is visited on every cycle.
216+
217+ Each resume increments the count so the executor passes through all the
218+ calls that happened *before* the breakpoint, then breaks on the next new
219+ call.
220+
91221 Args:
92222 agent: The workflow agent whose executors to wrap.
93223 breakpoints: ``"*"`` or a list of node IDs from the debug UI.
94- skip_nodes: Executor IDs to skip (for resume after breakpoint).
95- In concurrent workflows multiple executors may have been
96- breakpointed across sequential resumes within the same
97- superstep, so all of them must be skipped.
224+ skip_nodes: Mapping of executor_id → pass-through count.
225+ Each value is the number of calls to let through before
226+ re-arming the breakpoint on that executor.
98227 """
99228 executor_ids = _resolve_to_executor_ids (agent , breakpoints )
229+ skip = skip_nodes or {}
100230
101231 for exec_id in executor_ids :
102232 executor = agent .workflow .executors .get (exec_id )
@@ -107,11 +237,8 @@ def inject_breakpoint_middleware(
107237 if hasattr (executor , _ORIGINAL_EXECUTE_ATTR ):
108238 continue
109239
110- # Skip executors already resumed past
111- if skip_nodes and exec_id in skip_nodes :
112- continue
113-
114240 original = executor .execute
241+ pass_count = skip .get (exec_id , 0 )
115242
116243 async def wrapped_execute (
117244 message : Any ,
@@ -121,8 +248,20 @@ async def wrapped_execute(
121248 trace_contexts : list [dict [str , str ]] | None = None ,
122249 source_span_ids : list [str ] | None = None ,
123250 * ,
251+ _original : Any = original ,
124252 _exec_id : str = exec_id ,
253+ _remaining : list [int ] = [pass_count ],
125254 ) -> None :
255+ if _remaining [0 ] > 0 :
256+ _remaining [0 ] -= 1
257+ return await _original (
258+ message ,
259+ source_executor_ids ,
260+ state ,
261+ runner_context ,
262+ trace_contexts ,
263+ source_span_ids ,
264+ )
126265 raise AgentInterruptException (
127266 interrupt_id = str (uuid4 ()),
128267 suspend_value = {
@@ -162,6 +301,8 @@ def create_breakpoint_result(
162301
163302
164303__all__ = [
304+ "AgentInterruptException" ,
305+ "BreakpointMiddleware" ,
165306 "create_breakpoint_result" ,
166307 "inject_breakpoint_middleware" ,
167308 "remove_breakpoint_middleware" ,
0 commit comments