66
77from agent_framework import (
88 AgentExecutor ,
9+ AgentExecutorResponse ,
910 AgentResponse ,
1011 AgentResponseUpdate ,
1112 AgentSession ,
@@ -67,6 +68,12 @@ def __init__(
6768 self ._last_breakpoint_node : str | None = None
6869 self ._last_checkpoint_id : str | None = None
6970 self ._resumed_from_checkpoint_id : str | None = None
71+ # Track tool nodes that emitted STARTED but not yet COMPLETED.
72+ # Persists across _stream_workflow() calls (same runtime instance
73+ # reused by UiPathChatRuntime's while loop), allowing us to emit
74+ # synthetic COMPLETED events on HITL resume when the framework
75+ # doesn't surface function_result in output/executor_completed.
76+ self ._pending_tool_nodes : set [str ] = set ()
7077
7178 # ------------------------------------------------------------------
7279 # Checkpoint helpers
@@ -171,6 +178,37 @@ def _apply_session_to_executors(self, session: AgentSession) -> None:
171178 if isinstance (executor , AgentExecutor ):
172179 executor ._session = session
173180
181+ def _get_session_from_executors (self ) -> AgentSession | None :
182+ """Extract the most complete session from AgentExecutors in the workflow.
183+
184+ After checkpoint restore each executor receives its own independent
185+ session copy (unlike fresh runs where all executors share one object).
186+ Only the executor that processed the HITL/breakpoint response will
187+ have the updated conversation history. We return the session with the
188+ most messages to ensure the complete history is persisted.
189+ """
190+ workflow = self .agent .workflow
191+ best_session : AgentSession | None = None
192+ best_msg_count = - 1
193+ for executor in workflow .executors .values ():
194+ if isinstance (executor , AgentExecutor ) and executor ._session is not None :
195+ msg_count = self ._count_session_messages (executor ._session )
196+ if msg_count > best_msg_count :
197+ best_msg_count = msg_count
198+ best_session = executor ._session
199+ return best_session
200+
201+ @staticmethod
202+ def _count_session_messages (session : AgentSession ) -> int :
203+ """Count total messages across all provider keys in a session's state."""
204+ count = 0
205+ for value in session .state .values ():
206+ if isinstance (value , dict ) and "messages" in value :
207+ messages = value ["messages" ]
208+ if isinstance (messages , list ):
209+ count += len (messages )
210+ return count
211+
174212 # ------------------------------------------------------------------
175213 # HITL helpers (tool approval flow)
176214 # ------------------------------------------------------------------
@@ -332,6 +370,12 @@ async def execute(
332370 checkpoint_storage = self ._checkpoint_storage ,
333371 )
334372
373+ # After resume paths the checkpoint restores the session into
374+ # executors directly, so the local ``session`` is still None.
375+ # Extract it so it can be persisted after completion.
376+ if session is None :
377+ session = self ._get_session_from_executors ()
378+
335379 # Check for HITL suspension (framework's request_info mechanism)
336380 request_info_events = result .get_request_info_events ()
337381 hitl_requests = {
@@ -462,6 +506,19 @@ async def _stream_workflow(
462506 phase = UiPathRuntimeStatePhase .STARTED ,
463507 )
464508
509+ # On HITL resume, emit COMPLETED for tool nodes that were left
510+ # pending when the previous stream suspended. The framework
511+ # doesn't surface function_result in output/executor_completed
512+ # for handoff workflows, so we synthesize these events here.
513+ if is_resuming and self ._pending_tool_nodes :
514+ for tool_node in list (self ._pending_tool_nodes ):
515+ yield UiPathRuntimeStateEvent (
516+ payload = {},
517+ node_name = tool_node ,
518+ phase = UiPathRuntimeStatePhase .COMPLETED ,
519+ )
520+ self ._pending_tool_nodes .clear ()
521+
465522 # Choose workflow.run() mode based on resume type
466523 if self ._resume_responses :
467524 # HITL resume: pass responses to workflow with checkpoint
@@ -495,10 +552,13 @@ async def _stream_workflow(
495552
496553 request_info_map : dict [str , Any ] = {}
497554 is_suspended = False
498- # Track executors whose tool events were emitted via output events.
499- # When the workflow filters output events (e.g. GroupChat), tool events
500- # are instead extracted from executor_completed data as a fallback.
501- executors_with_tool_outputs : set [str ] = set ()
555+ # Track which tool event phases were emitted per executor via output
556+ # events. When the workflow filters output events (e.g. GroupChat),
557+ # tool events are extracted from executor_completed data as a fallback.
558+ # Tracking phases (not just executor_ids) lets us handle HITL resume
559+ # where function_call (STARTED) is in output but function_result
560+ # (COMPLETED) is only in executor_completed.
561+ executor_tool_phases : dict [str , set [UiPathRuntimeStatePhase ]] = {}
502562
503563 # Emit an early STARTED event for the start executor so the graph
504564 # visualization shows it immediately rather than after it finishes.
@@ -534,17 +594,36 @@ async def _stream_workflow(
534594 phase = UiPathRuntimeStatePhase .STARTED ,
535595 )
536596 elif event .type == "executor_completed" :
537- # When output events were filtered by the workflow (e.g.
538- # GroupChat where participants are not output executors),
539- # extract tool state events from the completed data instead.
540- if (
541- event .executor_id
542- and event .executor_id not in executors_with_tool_outputs
543- ):
597+ # Extract tool state events from executor_completed data,
598+ # skipping phases already emitted via output events.
599+ # This handles three scenarios:
600+ # 1. GroupChat (no output events): emit all from completed
601+ # 2. Normal (both in output): skip all from completed
602+ # 3. HITL resume (only STARTED in output): emit COMPLETED
603+ if event .executor_id :
604+ emitted_phases = executor_tool_phases .get (
605+ event .executor_id , set ()
606+ )
544607 for tool_event in self ._extract_tool_state_events (
545608 event .data , event .executor_id
546609 ):
547- yield tool_event
610+ if tool_event .phase not in emitted_phases :
611+ # Track pending tool nodes
612+ if (
613+ tool_event .phase
614+ == UiPathRuntimeStatePhase .STARTED
615+ ):
616+ self ._pending_tool_nodes .add (
617+ tool_event .node_name
618+ )
619+ elif (
620+ tool_event .phase
621+ == UiPathRuntimeStatePhase .COMPLETED
622+ ):
623+ self ._pending_tool_nodes .discard (
624+ tool_event .node_name
625+ )
626+ yield tool_event
548627 yield UiPathRuntimeStateEvent (
549628 payload = self ._serialize_event_data (
550629 self ._filter_completed_data (event .data )
@@ -557,9 +636,15 @@ async def _stream_workflow(
557636 tool_events = self ._extract_tool_state_events (
558637 event .data , executor_id
559638 )
560- if tool_events :
561- executors_with_tool_outputs .add (executor_id )
562639 for tool_event in tool_events :
640+ executor_tool_phases .setdefault (
641+ executor_id , set ()
642+ ).add (tool_event .phase )
643+ # Track pending tool nodes across stream iterations
644+ if tool_event .phase == UiPathRuntimeStatePhase .STARTED :
645+ self ._pending_tool_nodes .add (tool_event .node_name )
646+ elif tool_event .phase == UiPathRuntimeStatePhase .COMPLETED :
647+ self ._pending_tool_nodes .discard (tool_event .node_name )
563648 yield tool_event
564649 for msg_event in self ._extract_workflow_messages (event .data ):
565650 yield UiPathRuntimeMessageEvent (payload = msg_event )
@@ -581,6 +666,10 @@ async def _stream_workflow(
581666 for msg_event in self .chat .close_message ():
582667 yield UiPathRuntimeMessageEvent (payload = msg_event )
583668
669+ # After resume paths the checkpoint restores the session into
670+ # executors directly, so the local ``session`` may still be None.
671+ if session is None :
672+ session = self ._get_session_from_executors ()
584673 if session is not None :
585674 await self ._save_session (session )
586675
@@ -619,6 +708,10 @@ async def _stream_workflow(
619708 for msg_event in self .chat .close_message ():
620709 yield UiPathRuntimeMessageEvent (payload = msg_event )
621710
711+ # After resume paths the checkpoint restores the session into
712+ # executors directly, so the local ``session`` may still be None.
713+ if session is None :
714+ session = self ._get_session_from_executors ()
622715 if session is not None :
623716 await self ._save_session (session )
624717
@@ -681,7 +774,11 @@ def _extract_tool_state_events(
681774 """
682775 contents : list [Any ] = []
683776
684- if isinstance (data , AgentResponseUpdate ):
777+ if isinstance (data , AgentExecutorResponse ):
778+ return UiPathAgentFrameworkRuntime ._extract_tool_state_events (
779+ data .agent_response , executor_id
780+ )
781+ elif isinstance (data , AgentResponseUpdate ):
685782 contents = list (data .contents or [])
686783 elif isinstance (data , AgentResponse ):
687784 for message in data .messages or []:
@@ -724,7 +821,9 @@ def _extract_tool_state_events(
724821 def _extract_contents (data : Any ) -> list [Any ]:
725822 """Extract Content objects from any workflow data type."""
726823 contents : list [Any ] = []
727- if isinstance (data , AgentResponseUpdate ):
824+ if isinstance (data , AgentExecutorResponse ):
825+ return UiPathAgentFrameworkRuntime ._extract_contents (data .agent_response )
826+ elif isinstance (data , AgentResponseUpdate ):
728827 contents = list (data .contents or [])
729828 elif isinstance (data , AgentResponse ):
730829 for message in data .messages or []:
0 commit comments