Skip to content

Commit 6cb6f2b

Browse files
cristipufuclaude
andcommitted
fix: prevent breakpoint+HITL infinite loop and duplicate events
- Load breakpoint skip counts on HITL resume so the executor passes through without re-firing the breakpoint (prevents the infinite breakpoint → HITL → breakpoint loop on same node) - Clear _last_checkpoint_id after loading breakpoint state to prevent the next fresh turn from being mistaken for a breakpoint resume - Detect stale checkpoints from previous turns by capturing a baseline before workflow.run() and comparing after breakpoint fires - Fall back to _resumed_from_checkpoint_id (instead of None) when no new checkpoint was created, preventing replay from scratch that caused duplicate handoff_to_billing_agent events (4x) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4a31d71 commit 6cb6f2b

4 files changed

Lines changed: 779 additions & 49 deletions

File tree

packages/uipath-agent-framework/samples/hitl-workflow/pyproject.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,3 @@ dev = [
1818

1919
[tool.uv]
2020
prerelease = "allow"
21-
22-
[tool.uv.sources]
23-
uipath-dev = { path = "../../../../../uipath-dev-python", editable = true }
24-
uipath-agent-framework = { path = "../../", editable = true }

packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py

Lines changed: 90 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,14 @@ async def _save_breakpoint_state(
9999
to run before re-arming its breakpoint. The count is incremented
100100
every time the same executor hits a breakpoint again (cyclic
101101
graphs, GroupChat orchestrators).
102+
103+
checkpoint_id may be None when the breakpoint fired before any
104+
new checkpoint was created (e.g. the very first executor of a
105+
fresh turn). In that case the resume path will replay from
106+
original_input with skip counts instead of restoring a checkpoint.
102107
"""
103108
if not self._resumable_storage:
104109
return
105-
if checkpoint_id is None:
106-
checkpoint_id = await self._get_latest_checkpoint_id()
107110
state = {
108111
"skip_nodes": dict(self._breakpoint_skip_nodes),
109112
"last_breakpoint_node": self._last_breakpoint_node,
@@ -308,13 +311,31 @@ async def execute(
308311

309312
workflow = self.agent.workflow
310313

314+
# Capture the latest checkpoint BEFORE workflow.run() so we can
315+
# detect whether a NEW checkpoint was created during this execution.
316+
# Without this, breakpoints that fire before any new checkpoint
317+
# (e.g. the first executor of turn 2) would save a stale
318+
# checkpoint from the previous turn, causing the resume to
319+
# restore completed state instead of replaying from input.
320+
baseline_checkpoint_id = await self._get_latest_checkpoint_id()
321+
311322
if is_resuming and input is not None:
312323
# HITL resume: checkpoint restores executor state (including session)
313324
self._resume_responses = await self._convert_resume_responses(input)
314325

315-
# Inject breakpoints (no skip needed for HITL resume)
326+
# Inject breakpoints with accumulated skip counts so that
327+
# breakpoints don't re-fire on the same executor after HITL
328+
# approval (prevents breakpoint→HITL→breakpoint loop).
316329
if options and options.breakpoints:
317-
inject_breakpoint_middleware(self.agent, options.breakpoints)
330+
await self._load_breakpoint_state()
331+
inject_breakpoint_middleware(
332+
self.agent,
333+
options.breakpoints,
334+
self._get_breakpoint_skip(),
335+
)
336+
# _load_breakpoint_state sets _last_checkpoint_id as a
337+
# side effect. Clear it so it doesn't contaminate later runs.
338+
self._last_checkpoint_id = None
318339

319340
if self._resume_responses:
320341
checkpoint_id = await self._get_latest_checkpoint_id()
@@ -419,8 +440,21 @@ async def execute(
419440
)
420441
self._last_breakpoint_node = node_id
421442
original_input = self._prepare_input(input) if not is_resuming else ""
443+
# Only save checkpoint_id if it was created during THIS run.
444+
# If latest == baseline, no new checkpoint was created (e.g.
445+
# breakpoint on the first executor of a fresh turn) — save
446+
# the checkpoint we resumed from (if any) so we don't lose
447+
# it and replay from scratch on the next resume.
448+
# For fresh turns _resumed_from_checkpoint_id is None, which
449+
# correctly prevents using a stale checkpoint from the
450+
# previous turn.
451+
effective_checkpoint = (
452+
latest_checkpoint
453+
if latest_checkpoint != baseline_checkpoint_id
454+
else self._resumed_from_checkpoint_id
455+
)
422456
await self._save_breakpoint_state(
423-
original_input, checkpoint_id=latest_checkpoint
457+
original_input, checkpoint_id=effective_checkpoint
424458
)
425459
return create_breakpoint_result(e)
426460
return self._create_suspended_result(e)
@@ -444,9 +478,20 @@ async def stream(
444478
self._resume_responses = await self._convert_resume_responses(input)
445479
user_input = self._prepare_input(None)
446480

447-
# Inject breakpoints (no skip needed for HITL resume)
481+
# Inject breakpoints with accumulated skip counts so that
482+
# breakpoints don't re-fire on the same executor after HITL
483+
# approval (prevents breakpoint→HITL→breakpoint loop).
448484
if options and options.breakpoints:
449-
inject_breakpoint_middleware(self.agent, options.breakpoints)
485+
await self._load_breakpoint_state()
486+
inject_breakpoint_middleware(
487+
self.agent,
488+
options.breakpoints,
489+
self._get_breakpoint_skip(),
490+
)
491+
# _load_breakpoint_state sets _last_checkpoint_id as a
492+
# side effect. Clear it so _stream_workflow doesn't
493+
# mistake a subsequent fresh run for a breakpoint resume.
494+
self._last_checkpoint_id = None
450495

451496
elif is_resuming:
452497
# Breakpoint resume: restore original_input and session
@@ -465,8 +510,9 @@ async def stream(
465510
)
466511

467512
else:
468-
# Fresh run
513+
# Fresh run — clear stale resume state from previous turns
469514
self._resume_responses = None
515+
self._last_checkpoint_id = None
470516
user_input = self._prepare_input(input)
471517

472518
# Load session for multi-turn conversation history
@@ -519,6 +565,10 @@ async def _stream_workflow(
519565
)
520566
self._pending_tool_nodes.clear()
521567

568+
# Capture the latest checkpoint BEFORE workflow.run() so we can
569+
# detect whether a NEW checkpoint was created during this execution.
570+
baseline_checkpoint_id = await self._get_latest_checkpoint_id()
571+
522572
# Choose workflow.run() mode based on resume type
523573
if self._resume_responses:
524574
# HITL resume: pass responses to workflow with checkpoint
@@ -609,20 +659,21 @@ async def _stream_workflow(
609659
):
610660
if tool_event.phase not in emitted_phases:
611661
# Track pending tool nodes
612-
if (
613-
tool_event.phase
614-
== UiPathRuntimeStatePhase.STARTED
615-
):
616-
self._pending_tool_nodes.add(
617-
tool_event.node_name
618-
)
619-
elif (
620-
tool_event.phase
621-
== UiPathRuntimeStatePhase.COMPLETED
622-
):
623-
self._pending_tool_nodes.discard(
624-
tool_event.node_name
625-
)
662+
if tool_event.node_name:
663+
if (
664+
tool_event.phase
665+
== UiPathRuntimeStatePhase.STARTED
666+
):
667+
self._pending_tool_nodes.add(
668+
tool_event.node_name
669+
)
670+
elif (
671+
tool_event.phase
672+
== UiPathRuntimeStatePhase.COMPLETED
673+
):
674+
self._pending_tool_nodes.discard(
675+
tool_event.node_name
676+
)
626677
yield tool_event
627678
yield UiPathRuntimeStateEvent(
628679
payload=self._serialize_event_data(
@@ -637,14 +688,15 @@ async def _stream_workflow(
637688
event.data, executor_id
638689
)
639690
for tool_event in tool_events:
640-
executor_tool_phases.setdefault(
641-
executor_id, set()
642-
).add(tool_event.phase)
691+
executor_tool_phases.setdefault(executor_id, set()).add(
692+
tool_event.phase
693+
)
643694
# Track pending tool nodes across stream iterations
644-
if tool_event.phase == UiPathRuntimeStatePhase.STARTED:
645-
self._pending_tool_nodes.add(tool_event.node_name)
646-
elif tool_event.phase == UiPathRuntimeStatePhase.COMPLETED:
647-
self._pending_tool_nodes.discard(tool_event.node_name)
695+
if tool_event.node_name:
696+
if tool_event.phase == UiPathRuntimeStatePhase.STARTED:
697+
self._pending_tool_nodes.add(tool_event.node_name)
698+
elif tool_event.phase == UiPathRuntimeStatePhase.COMPLETED:
699+
self._pending_tool_nodes.discard(tool_event.node_name)
648700
yield tool_event
649701
for msg_event in self._extract_workflow_messages(event.data):
650702
yield UiPathRuntimeMessageEvent(payload=msg_event)
@@ -691,8 +743,16 @@ async def _stream_workflow(
691743
self._breakpoint_skip_nodes.get(node_id, 0) + 1
692744
)
693745
self._last_breakpoint_node = node_id
746+
# Only save checkpoint_id if it was created during THIS run.
747+
# Fall back to the checkpoint we resumed from (if any) to
748+
# avoid replaying from scratch on the next resume.
749+
effective_checkpoint = (
750+
latest_checkpoint
751+
if latest_checkpoint != baseline_checkpoint_id
752+
else self._resumed_from_checkpoint_id
753+
)
694754
await self._save_breakpoint_state(
695-
user_input, checkpoint_id=latest_checkpoint
755+
user_input, checkpoint_id=effective_checkpoint
696756
)
697757
yield create_breakpoint_result(e)
698758
else:

0 commit comments

Comments
 (0)