Skip to content

Commit 7a0212f

Browse files
fix(groupchat): detect participant loops via executor_completed events
In agent-framework 1.3.0 the GroupChat orchestrator agent (Coordinator) is invoked directly inside the framework's internal _invoke_agent_helper (agent_framework_orchestrations/_group_chat.py:484) rather than through an AgentExecutor. The Coordinator therefore never surfaces as a workflow event, which makes our existing Coordinator-JSON-based loop detector in _complete_agent_response permanently dead in 1.3.0. Symptom in production: workflow loops with the Coordinator latched onto the same participant (e.g., Chief Architect repeatedly asked to produce an Evidence Pack that never satisfies the next reviewer). The loop runs until the framework's max_rounds ceiling fires (~17 min at default 100) instead of being caught early. Fix: * Track participant turn completions from WorkflowEvent.executor_completed, the one observable signal that does NOT depend on Coordinator visibility (participants ARE wrapped in AgentExecutor and so do emit these events). * Force-terminate (hard_loop) after 3 consecutive completions of the same participant. * Force-terminate (hard_timeout) when total participant completions reach max_rounds; independent of len(agent_responses) which only grows on agent switch and so can never reach max_rounds during a same-participant loop. * Flush per-participant streaming buffer on each executor_completed so back-to-back same-agent turns produce one AgentResponse per turn instead of accumulating across turns. * Move forced-termination break check to top of the streaming loop so any branch (timeout, participant loop, Coordinator finish=true) takes effect on the very next event rather than waiting for the next output event. Adds 3 regression tests covering the streak trigger, the alternation reset, and the round-budget enforcement. 836 tests pass (833 -> 836). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 42902ab commit 7a0212f

2 files changed

Lines changed: 264 additions & 9 deletions

File tree

src/processor/src/libs/agent_framework/groupchat_orchestrator.py

Lines changed: 154 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,33 @@ def __init__(
313313
# Snapshot of progress_counter at the time we last saw _last_coordinator_selection.
314314
self._last_coordinator_selection_progress: int = 0
315315

316+
# Per-participant turn tracking driven by ``WorkflowEvent.executor_completed``.
317+
#
318+
# In agent-framework 1.3.0 the GroupChat orchestrator agent (the
319+
# Coordinator) is invoked directly inside the framework's internal
320+
# ``_invoke_agent_helper`` (see
321+
# ``agent_framework_orchestrations/_group_chat.py:484``). It is NOT
322+
# wrapped in an ``AgentExecutor`` and therefore never surfaces as a
323+
# workflow event - which makes the Coordinator-JSON-based loop
324+
# detection in ``_complete_agent_response`` permanently dead in 1.3.0.
325+
#
326+
# The only observable "the conversation is moving" pulse we have is
327+
# ``executor_completed`` events for the *participants* (which DO go
328+
# through ``AgentExecutor``). We track:
329+
# - the most recently completed participant,
330+
# - the streak of consecutive completions of that participant,
331+
# - the total number of participant turns,
332+
# and use these for two safety nets in the streaming loop:
333+
# * 3+ consecutive same-participant turns => hard_loop termination
334+
# * total turns >= ``max_rounds`` => hard_timeout termination
335+
# (independent of ``len(self.agent_responses)`` which only grows on
336+
# agent switch and so cannot reach ``max_rounds`` during a same-
337+
# participant loop).
338+
self._participant_completions_total: int = 0
339+
self._last_completed_participant: str | None = None
340+
self._participant_completion_streak: int = 0
341+
self._participant_consecutive_loop_threshold: int = 3
342+
316343
def _request_forced_termination(
317344
self, *, reason: str, termination_type: str
318345
) -> None:
@@ -543,6 +570,15 @@ async def run_stream(
543570
termination_type="hard_timeout",
544571
)
545572

573+
# Honor any pending termination request at the *top* of each
574+
# iteration so that branches which set the flags (timeout,
575+
# participant loop detection, Coordinator finish=true) take
576+
# effect immediately on the next event - rather than being
577+
# gated on the next ``output`` event arriving (which during a
578+
# slow loop can be many seconds away).
579+
if self._forced_termination_requested or self._termination_requested:
580+
break
581+
546582
# In agent-framework 1.3.0, ``workflow.run(stream=True)`` yields
547583
# only ``WorkflowEvent`` instances; ``AgentResponseUpdate`` is
548584
# wrapped inside ``WorkflowEvent.data`` for ``type=="output"``
@@ -552,7 +588,46 @@ async def run_stream(
552588
# ``WorkflowEvent.type`` and inspect ``event.data`` /
553589
# ``event.executor_id`` to route per-participant streaming
554590
# chunks vs the orchestrator's final output.
555-
if not isinstance(event, WorkflowEvent) or event.type != "output":
591+
if not isinstance(event, WorkflowEvent):
592+
continue
593+
594+
# Participant turn completion. Used for loop / max_rounds
595+
# safety nets that work even when the Coordinator is
596+
# invisible to the streaming loop (which it is in 1.3.0 -
597+
# the Coordinator runs inside the framework's internal
598+
# ``_invoke_agent_helper`` and never surfaces as an executor
599+
# event). See ``_track_participant_completion`` for details.
600+
if event.type == "executor_completed":
601+
src_executor = self._normalize_executor_id(
602+
event.executor_id or ""
603+
)
604+
if (
605+
src_executor in self.agents
606+
and src_executor != self.coordinator_name
607+
and src_executor != self.get_result_generator_name()
608+
):
609+
# Flush this participant's streaming buffer into a
610+
# discrete per-turn ``AgentResponse`` before we track
611+
# the completion. Without this, when the framework's
612+
# Coordinator picks the same participant back-to-back
613+
# (the loop pattern we're trying to detect),
614+
# ``_start_agent_if_needed`` sees no agent switch on
615+
# the NEXT turn's chunks and the buffer would grow
616+
# across turns - producing one merged response rather
617+
# than one response per turn.
618+
if (
619+
self._last_executor_id == src_executor
620+
and self._current_agent_response
621+
):
622+
await self._complete_agent_response(
623+
src_executor, on_agent_response
624+
)
625+
self._current_agent_response = []
626+
self._last_executor_id = None
627+
self._track_participant_completion(src_executor)
628+
continue
629+
630+
if event.type != "output":
556631
continue
557632

558633
data = event.data
@@ -573,7 +648,12 @@ async def run_stream(
573648
callback=on_agent_response,
574649
)
575650

576-
# Enforce max rounds as a safety guard.
651+
# Secondary max_rounds safety net based on agent switches.
652+
# The primary check lives in ``_track_participant_completion``
653+
# (driven by ``executor_completed`` events) and works even
654+
# when the same agent runs back-to-back. This switch-based
655+
# check is kept as defense-in-depth for sessions with
656+
# normal alternation.
577657
if self.max_rounds and len(self.agent_responses) >= self.max_rounds:
578658
self._request_forced_termination(
579659
reason=(
@@ -582,13 +662,9 @@ async def run_stream(
582662
termination_type="hard_timeout",
583663
)
584664

585-
if self._forced_termination_requested:
586-
break
587-
588-
# If the Coordinator requested finish=true, stop immediately.
589-
if self._termination_requested:
590-
break
591-
665+
# Termination flags are honored at the top of the next
666+
# iteration so any branch can request termination
667+
# uniformly without duplicating break logic here.
592668
continue
593669

594670
# Final orchestrator output: complete any buffered agent
@@ -777,6 +853,75 @@ def _normalize_executor_id(self, executor_id: str) -> str:
777853
"""
778854
return executor_id.split(":")[-1]
779855

856+
def _track_participant_completion(self, src_executor: str) -> None:
857+
"""Track a participant turn completion for loop / max_rounds detection.
858+
859+
Called from the streaming loop on every ``WorkflowEvent.type ==
860+
"executor_completed"`` event whose ``executor_id`` matches one of our
861+
registered non-Coordinator, non-ResultGenerator participants.
862+
863+
Why this exists (agent-framework 1.3.0 design constraint):
864+
The framework's ``GroupChatBuilder.orchestrator_agent`` (our
865+
Coordinator) is invoked directly via ``self._agent.run(...)``
866+
inside ``agent_framework_orchestrations/_group_chat.py:484``. It
867+
is NOT wrapped in an ``AgentExecutor`` and therefore never
868+
surfaces as a workflow event. Our existing Coordinator-JSON-based
869+
loop detector in ``_complete_agent_response`` (lines ~1118-1181)
870+
is consequently permanently dead in 1.3.0. We need an independent
871+
loop signal that does NOT rely on Coordinator visibility.
872+
873+
Two safety nets enforced here:
874+
875+
1. Same-participant streak (``_participant_consecutive_loop_threshold``,
876+
default 3): if the Coordinator keeps selecting the same participant
877+
(e.g., the Chief Architect latched on producing an Evidence Pack
878+
that never satisfies the next reviewer), 3+ consecutive completions
879+
of the same participant force-terminate with ``hard_loop``.
880+
881+
2. Total round budget: each participant turn counts as one round.
882+
Once total completions reach ``self.max_rounds`` the workflow
883+
force-terminates with ``hard_timeout``. This is independent of
884+
``len(self.agent_responses)`` (which only grows on agent switch
885+
via ``_start_agent_if_needed`` and therefore cannot reach
886+
``max_rounds`` during a same-participant loop).
887+
"""
888+
if src_executor == self._last_completed_participant:
889+
self._participant_completion_streak += 1
890+
else:
891+
self._last_completed_participant = src_executor
892+
self._participant_completion_streak = 1
893+
self._participant_completions_total += 1
894+
895+
if (
896+
self._participant_completion_streak
897+
>= self._participant_consecutive_loop_threshold
898+
):
899+
self._request_forced_termination(
900+
reason=(
901+
f"Loop detected: participant '{src_executor}' completed "
902+
f"{self._participant_completion_streak} consecutive turns "
903+
"with no other participant in between (Coordinator is "
904+
"stuck on the same selection; in agent-framework 1.3.0 "
905+
"the Coordinator runs inside the framework and is "
906+
"invisible to the streaming loop, so we infer this from "
907+
"executor_completed events)"
908+
),
909+
termination_type="hard_loop",
910+
)
911+
return
912+
913+
if (
914+
self.max_rounds
915+
and self._participant_completions_total >= self.max_rounds
916+
):
917+
self._request_forced_termination(
918+
reason=(
919+
f"Workflow exceeded max_rounds={self.max_rounds} "
920+
"participant turns; terminating to avoid infinite loop"
921+
),
922+
termination_type="hard_timeout",
923+
)
924+
780925
async def _start_agent_if_needed(
781926
self,
782927
agent_name: str,

src/processor/src/tests/unit/libs/agent_framework/test_groupchat_orchestrator_termination.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,113 @@ async def _run():
293293
)
294294

295295
asyncio.run(_run())
296+
297+
298+
def test_participant_completion_streak_triggers_forced_termination():
299+
"""In agent-framework 1.3.0 the GroupChat orchestrator agent (Coordinator)
300+
is invoked directly inside the framework's ``_invoke_agent_helper`` and
301+
is NOT wrapped in an ``AgentExecutor``, so it never surfaces as a
302+
workflow event. The Coordinator-JSON loop detector in
303+
``_complete_agent_response`` is therefore permanently dead in 1.3.0.
304+
305+
The only observable loop signal we have is consecutive
306+
``executor_completed`` events for the same participant. After
307+
``_participant_consecutive_loop_threshold`` (default 3) same-participant
308+
completions, the orchestrator must force-terminate with ``hard_loop``
309+
so the workflow halts cleanly instead of running until the framework's
310+
own max_rounds ceiling (which at default 100 is ~17 min).
311+
"""
312+
313+
async def _run():
314+
orch = _make_orchestrator()
315+
# Register a participant so the tracker recognizes it.
316+
orch.agents = {"Coordinator": object(), "Chief Architect": object()}
317+
318+
for _ in range(3):
319+
orch._track_participant_completion("Chief Architect")
320+
321+
assert orch._forced_termination_requested is True, (
322+
"Three consecutive completions of the same participant must "
323+
"trigger the participant-streak loop breaker; otherwise the "
324+
"Chief-Architect-only loop observed in production (with the "
325+
"Coordinator invisible to our streaming loop in 1.3.0) can "
326+
"never be detected and the workflow runs until the framework's "
327+
"own max_rounds ceiling fires."
328+
)
329+
assert orch._forced_termination_type == "hard_loop"
330+
assert "Chief Architect" in (orch._forced_termination_reason or "")
331+
assert "3 consecutive" in (orch._forced_termination_reason or "")
332+
333+
asyncio.run(_run())
334+
335+
336+
def test_participant_completion_streak_resets_on_different_participant():
337+
"""If a different participant runs in between, the same-participant
338+
streak counter resets. This prevents false-positive loop detection
339+
when participants alternate normally.
340+
"""
341+
342+
async def _run():
343+
orch = _make_orchestrator()
344+
orch.agents = {
345+
"Coordinator": object(),
346+
"Chief Architect": object(),
347+
"AKS Expert": object(),
348+
}
349+
350+
orch._track_participant_completion("Chief Architect")
351+
orch._track_participant_completion("Chief Architect")
352+
# A different participant runs -> streak resets.
353+
orch._track_participant_completion("AKS Expert")
354+
orch._track_participant_completion("Chief Architect")
355+
orch._track_participant_completion("Chief Architect") # streak=2 only
356+
357+
assert orch._forced_termination_requested is False, (
358+
"Alternating participants must not trigger the loop breaker; "
359+
"the streak should reset whenever a different participant runs."
360+
)
361+
assert orch._participant_completion_streak == 2
362+
assert orch._last_completed_participant == "Chief Architect"
363+
364+
asyncio.run(_run())
365+
366+
367+
def test_participant_completions_total_enforces_max_rounds_under_alternation():
368+
"""``max_rounds`` must be enforced from the per-participant total count
369+
(which grows on EVERY completion) - not from ``len(agent_responses)``
370+
(which only grows on agent switch in ``_start_agent_if_needed`` and
371+
therefore can never reach ``max_rounds`` during a same-agent loop).
372+
373+
This test exercises the alternation case where the streak detector
374+
never fires, ensuring the round-budget guard still halts the workflow.
375+
"""
376+
377+
async def _run():
378+
orch = GroupChatOrchestrator(
379+
name="t",
380+
process_id="p1",
381+
participants={
382+
"Coordinator": object(),
383+
"A": object(),
384+
"B": object(),
385+
},
386+
memory_client=None,
387+
coordinator_name="Coordinator",
388+
max_rounds=4,
389+
result_output_format=None,
390+
)
391+
392+
# Alternate A and B to keep the streak below threshold.
393+
orch._track_participant_completion("A")
394+
orch._track_participant_completion("B")
395+
orch._track_participant_completion("A")
396+
# Streak detector hasn't fired yet (max streak = 1 because of perfect
397+
# alternation). The 4th turn must trip the max_rounds budget.
398+
assert orch._forced_termination_requested is False
399+
orch._track_participant_completion("B")
400+
401+
assert orch._forced_termination_requested is True
402+
assert orch._forced_termination_type == "hard_timeout"
403+
assert "max_rounds=4" in (orch._forced_termination_reason or "")
404+
405+
asyncio.run(_run())

0 commit comments

Comments
 (0)