@@ -99,11 +99,14 @@ async def _save_breakpoint_state(
9999 to run before re-arming its breakpoint. The count is incremented
100100 every time the same executor hits a breakpoint again (cyclic
101101 graphs, GroupChat orchestrators).
102+
103+ checkpoint_id may be None when the breakpoint fired before any
104+ new checkpoint was created (e.g. the very first executor of a
105+ fresh turn). In that case the resume path will replay from
106+ original_input with skip counts instead of restoring a checkpoint.
102107 """
103108 if not self ._resumable_storage :
104109 return
105- if checkpoint_id is None :
106- checkpoint_id = await self ._get_latest_checkpoint_id ()
107110 state = {
108111 "skip_nodes" : dict (self ._breakpoint_skip_nodes ),
109112 "last_breakpoint_node" : self ._last_breakpoint_node ,
@@ -308,13 +311,31 @@ async def execute(
308311
309312 workflow = self .agent .workflow
310313
314+ # Capture the latest checkpoint BEFORE workflow.run() so we can
315+ # detect whether a NEW checkpoint was created during this execution.
316+ # Without this, breakpoints that fire before any new checkpoint
317+ # (e.g. the first executor of turn 2) would save a stale
318+ # checkpoint from the previous turn, causing the resume to
319+ # restore completed state instead of replaying from input.
320+ baseline_checkpoint_id = await self ._get_latest_checkpoint_id ()
321+
311322 if is_resuming and input is not None :
312323 # HITL resume: checkpoint restores executor state (including session)
313324 self ._resume_responses = await self ._convert_resume_responses (input )
314325
315- # Inject breakpoints (no skip needed for HITL resume)
326+ # Inject breakpoints with accumulated skip counts so that
327+ # breakpoints don't re-fire on the same executor after HITL
328+ # approval (prevents breakpoint→HITL→breakpoint loop).
316329 if options and options .breakpoints :
317- inject_breakpoint_middleware (self .agent , options .breakpoints )
330+ await self ._load_breakpoint_state ()
331+ inject_breakpoint_middleware (
332+ self .agent ,
333+ options .breakpoints ,
334+ self ._get_breakpoint_skip (),
335+ )
336+ # _load_breakpoint_state sets _last_checkpoint_id as a
337+ # side effect. Clear it so it doesn't contaminate later runs.
338+ self ._last_checkpoint_id = None
318339
319340 if self ._resume_responses :
320341 checkpoint_id = await self ._get_latest_checkpoint_id ()
@@ -419,8 +440,21 @@ async def execute(
419440 )
420441 self ._last_breakpoint_node = node_id
421442 original_input = self ._prepare_input (input ) if not is_resuming else ""
443+ # Only save checkpoint_id if it was created during THIS run.
444+ # If latest == baseline, no new checkpoint was created (e.g.
445+ # breakpoint on the first executor of a fresh turn) — save
446+ # the checkpoint we resumed from (if any) so we don't lose
447+ # it and replay from scratch on the next resume.
448+ # For fresh turns _resumed_from_checkpoint_id is None, which
449+ # correctly prevents using a stale checkpoint from the
450+ # previous turn.
451+ effective_checkpoint = (
452+ latest_checkpoint
453+ if latest_checkpoint != baseline_checkpoint_id
454+ else self ._resumed_from_checkpoint_id
455+ )
422456 await self ._save_breakpoint_state (
423- original_input , checkpoint_id = latest_checkpoint
457+ original_input , checkpoint_id = effective_checkpoint
424458 )
425459 return create_breakpoint_result (e )
426460 return self ._create_suspended_result (e )
@@ -444,9 +478,20 @@ async def stream(
444478 self ._resume_responses = await self ._convert_resume_responses (input )
445479 user_input = self ._prepare_input (None )
446480
447- # Inject breakpoints (no skip needed for HITL resume)
481+ # Inject breakpoints with accumulated skip counts so that
482+ # breakpoints don't re-fire on the same executor after HITL
483+ # approval (prevents breakpoint→HITL→breakpoint loop).
448484 if options and options .breakpoints :
449- inject_breakpoint_middleware (self .agent , options .breakpoints )
485+ await self ._load_breakpoint_state ()
486+ inject_breakpoint_middleware (
487+ self .agent ,
488+ options .breakpoints ,
489+ self ._get_breakpoint_skip (),
490+ )
491+ # _load_breakpoint_state sets _last_checkpoint_id as a
492+ # side effect. Clear it so _stream_workflow doesn't
493+ # mistake a subsequent fresh run for a breakpoint resume.
494+ self ._last_checkpoint_id = None
450495
451496 elif is_resuming :
452497 # Breakpoint resume: restore original_input and session
@@ -465,8 +510,9 @@ async def stream(
465510 )
466511
467512 else :
468- # Fresh run
513+ # Fresh run — clear stale resume state from previous turns
469514 self ._resume_responses = None
515+ self ._last_checkpoint_id = None
470516 user_input = self ._prepare_input (input )
471517
472518 # Load session for multi-turn conversation history
@@ -519,6 +565,10 @@ async def _stream_workflow(
519565 )
520566 self ._pending_tool_nodes .clear ()
521567
568+ # Capture the latest checkpoint BEFORE workflow.run() so we can
569+ # detect whether a NEW checkpoint was created during this execution.
570+ baseline_checkpoint_id = await self ._get_latest_checkpoint_id ()
571+
522572 # Choose workflow.run() mode based on resume type
523573 if self ._resume_responses :
524574 # HITL resume: pass responses to workflow with checkpoint
@@ -609,13 +659,8 @@ async def _stream_workflow(
609659 ):
610660 if tool_event .phase not in emitted_phases :
611661 # Track pending tool nodes
612- if (
613- tool_event .phase
614- == UiPathRuntimeStatePhase .STARTED
615- ):
616- self ._pending_tool_nodes .add (
617- tool_event .node_name
618- )
662+ if tool_event .phase == UiPathRuntimeStatePhase .STARTED :
663+ self ._pending_tool_nodes .add (tool_event .node_name )
619664 elif (
620665 tool_event .phase
621666 == UiPathRuntimeStatePhase .COMPLETED
@@ -637,9 +682,9 @@ async def _stream_workflow(
637682 event .data , executor_id
638683 )
639684 for tool_event in tool_events :
640- executor_tool_phases .setdefault (
641- executor_id , set ()
642- ). add ( tool_event . phase )
685+ executor_tool_phases .setdefault (executor_id , set ()). add (
686+ tool_event . phase
687+ )
643688 # Track pending tool nodes across stream iterations
644689 if tool_event .phase == UiPathRuntimeStatePhase .STARTED :
645690 self ._pending_tool_nodes .add (tool_event .node_name )
@@ -691,8 +736,16 @@ async def _stream_workflow(
691736 self ._breakpoint_skip_nodes .get (node_id , 0 ) + 1
692737 )
693738 self ._last_breakpoint_node = node_id
739+ # Only save checkpoint_id if it was created during THIS run.
740+ # Fall back to the checkpoint we resumed from (if any) to
741+ # avoid replaying from scratch on the next resume.
742+ effective_checkpoint = (
743+ latest_checkpoint
744+ if latest_checkpoint != baseline_checkpoint_id
745+ else self ._resumed_from_checkpoint_id
746+ )
694747 await self ._save_breakpoint_state (
695- user_input , checkpoint_id = latest_checkpoint
748+ user_input , checkpoint_id = effective_checkpoint
696749 )
697750 yield create_breakpoint_result (e )
698751 else :
0 commit comments