Skip to content

Commit 03387c4

Browse files
Revert "fix(groupchat): detect participant loops via executor_completed events"
This reverts commit 7a0212f.
1 parent 7a0212f commit 03387c4

2 files changed

Lines changed: 9 additions & 264 deletions

File tree

src/processor/src/libs/agent_framework/groupchat_orchestrator.py

Lines changed: 9 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -313,33 +313,6 @@ def __init__(
313313
# Snapshot of progress_counter at the time we last saw _last_coordinator_selection.
314314
self._last_coordinator_selection_progress: int = 0
315315

316-
# Per-participant turn tracking driven by ``WorkflowEvent.executor_completed``.
317-
#
318-
# In agent-framework 1.3.0 the GroupChat orchestrator agent (the
319-
# Coordinator) is invoked directly inside the framework's internal
320-
# ``_invoke_agent_helper`` (see
321-
# ``agent_framework_orchestrations/_group_chat.py:484``). It is NOT
322-
# wrapped in an ``AgentExecutor`` and therefore never surfaces as a
323-
# workflow event - which makes the Coordinator-JSON-based loop
324-
# detection in ``_complete_agent_response`` permanently dead in 1.3.0.
325-
#
326-
# The only observable "the conversation is moving" pulse we have is
327-
# ``executor_completed`` events for the *participants* (which DO go
328-
# through ``AgentExecutor``). We track:
329-
# - the most recently completed participant,
330-
# - the streak of consecutive completions of that participant,
331-
# - the total number of participant turns,
332-
# and use these for two safety nets in the streaming loop:
333-
# * 3+ consecutive same-participant turns => hard_loop termination
334-
# * total turns >= ``max_rounds`` => hard_timeout termination
335-
# (independent of ``len(self.agent_responses)`` which only grows on
336-
# agent switch and so cannot reach ``max_rounds`` during a same-
337-
# participant loop).
338-
self._participant_completions_total: int = 0
339-
self._last_completed_participant: str | None = None
340-
self._participant_completion_streak: int = 0
341-
self._participant_consecutive_loop_threshold: int = 3
342-
343316
def _request_forced_termination(
344317
self, *, reason: str, termination_type: str
345318
) -> None:
@@ -570,15 +543,6 @@ async def run_stream(
570543
termination_type="hard_timeout",
571544
)
572545

573-
# Honor any pending termination request at the *top* of each
574-
# iteration so that branches which set the flags (timeout,
575-
# participant loop detection, Coordinator finish=true) take
576-
# effect immediately on the next event - rather than being
577-
# gated on the next ``output`` event arriving (which during a
578-
# slow loop can be many seconds away).
579-
if self._forced_termination_requested or self._termination_requested:
580-
break
581-
582546
# In agent-framework 1.3.0, ``workflow.run(stream=True)`` yields
583547
# only ``WorkflowEvent`` instances; ``AgentResponseUpdate`` is
584548
# wrapped inside ``WorkflowEvent.data`` for ``type=="output"``
@@ -588,46 +552,7 @@ async def run_stream(
588552
# ``WorkflowEvent.type`` and inspect ``event.data`` /
589553
# ``event.executor_id`` to route per-participant streaming
590554
# chunks vs the orchestrator's final output.
591-
if not isinstance(event, WorkflowEvent):
592-
continue
593-
594-
# Participant turn completion. Used for loop / max_rounds
595-
# safety nets that work even when the Coordinator is
596-
# invisible to the streaming loop (which it is in 1.3.0 -
597-
# the Coordinator runs inside the framework's internal
598-
# ``_invoke_agent_helper`` and never surfaces as an executor
599-
# event). See ``_track_participant_completion`` for details.
600-
if event.type == "executor_completed":
601-
src_executor = self._normalize_executor_id(
602-
event.executor_id or ""
603-
)
604-
if (
605-
src_executor in self.agents
606-
and src_executor != self.coordinator_name
607-
and src_executor != self.get_result_generator_name()
608-
):
609-
# Flush this participant's streaming buffer into a
610-
# discrete per-turn ``AgentResponse`` before we track
611-
# the completion. Without this, when the framework's
612-
# Coordinator picks the same participant back-to-back
613-
# (the loop pattern we're trying to detect),
614-
# ``_start_agent_if_needed`` sees no agent switch on
615-
# the NEXT turn's chunks and the buffer would grow
616-
# across turns - producing one merged response rather
617-
# than one response per turn.
618-
if (
619-
self._last_executor_id == src_executor
620-
and self._current_agent_response
621-
):
622-
await self._complete_agent_response(
623-
src_executor, on_agent_response
624-
)
625-
self._current_agent_response = []
626-
self._last_executor_id = None
627-
self._track_participant_completion(src_executor)
628-
continue
629-
630-
if event.type != "output":
555+
if not isinstance(event, WorkflowEvent) or event.type != "output":
631556
continue
632557

633558
data = event.data
@@ -648,12 +573,7 @@ async def run_stream(
648573
callback=on_agent_response,
649574
)
650575

651-
# Secondary max_rounds safety net based on agent switches.
652-
# The primary check lives in ``_track_participant_completion``
653-
# (driven by ``executor_completed`` events) and works even
654-
# when the same agent runs back-to-back. This switch-based
655-
# check is kept as defense-in-depth for sessions with
656-
# normal alternation.
576+
# Enforce max rounds as a safety guard.
657577
if self.max_rounds and len(self.agent_responses) >= self.max_rounds:
658578
self._request_forced_termination(
659579
reason=(
@@ -662,9 +582,13 @@ async def run_stream(
662582
termination_type="hard_timeout",
663583
)
664584

665-
# Termination flags are honored at the top of the next
666-
# iteration so any branch can request termination
667-
# uniformly without duplicating break logic here.
585+
if self._forced_termination_requested:
586+
break
587+
588+
# If the Coordinator requested finish=true, stop immediately.
589+
if self._termination_requested:
590+
break
591+
668592
continue
669593

670594
# Final orchestrator output: complete any buffered agent
@@ -853,75 +777,6 @@ def _normalize_executor_id(self, executor_id: str) -> str:
853777
"""
854778
return executor_id.split(":")[-1]
855779

856-
def _track_participant_completion(self, src_executor: str) -> None:
857-
"""Track a participant turn completion for loop / max_rounds detection.
858-
859-
Called from the streaming loop on every ``WorkflowEvent.type ==
860-
"executor_completed"`` event whose ``executor_id`` matches one of our
861-
registered non-Coordinator, non-ResultGenerator participants.
862-
863-
Why this exists (agent-framework 1.3.0 design constraint):
864-
The framework's ``GroupChatBuilder.orchestrator_agent`` (our
865-
Coordinator) is invoked directly via ``self._agent.run(...)``
866-
inside ``agent_framework_orchestrations/_group_chat.py:484``. It
867-
is NOT wrapped in an ``AgentExecutor`` and therefore never
868-
surfaces as a workflow event. Our existing Coordinator-JSON-based
869-
loop detector in ``_complete_agent_response`` (lines ~1118-1181)
870-
is consequently permanently dead in 1.3.0. We need an independent
871-
loop signal that does NOT rely on Coordinator visibility.
872-
873-
Two safety nets enforced here:
874-
875-
1. Same-participant streak (``_participant_consecutive_loop_threshold``,
876-
default 3): if the Coordinator keeps selecting the same participant
877-
(e.g., the Chief Architect latched on producing an Evidence Pack
878-
that never satisfies the next reviewer), 3+ consecutive completions
879-
of the same participant force-terminate with ``hard_loop``.
880-
881-
2. Total round budget: each participant turn counts as one round.
882-
Once total completions reach ``self.max_rounds`` the workflow
883-
force-terminates with ``hard_timeout``. This is independent of
884-
``len(self.agent_responses)`` (which only grows on agent switch
885-
via ``_start_agent_if_needed`` and therefore cannot reach
886-
``max_rounds`` during a same-participant loop).
887-
"""
888-
if src_executor == self._last_completed_participant:
889-
self._participant_completion_streak += 1
890-
else:
891-
self._last_completed_participant = src_executor
892-
self._participant_completion_streak = 1
893-
self._participant_completions_total += 1
894-
895-
if (
896-
self._participant_completion_streak
897-
>= self._participant_consecutive_loop_threshold
898-
):
899-
self._request_forced_termination(
900-
reason=(
901-
f"Loop detected: participant '{src_executor}' completed "
902-
f"{self._participant_completion_streak} consecutive turns "
903-
"with no other participant in between (Coordinator is "
904-
"stuck on the same selection; in agent-framework 1.3.0 "
905-
"the Coordinator runs inside the framework and is "
906-
"invisible to the streaming loop, so we infer this from "
907-
"executor_completed events)"
908-
),
909-
termination_type="hard_loop",
910-
)
911-
return
912-
913-
if (
914-
self.max_rounds
915-
and self._participant_completions_total >= self.max_rounds
916-
):
917-
self._request_forced_termination(
918-
reason=(
919-
f"Workflow exceeded max_rounds={self.max_rounds} "
920-
"participant turns; terminating to avoid infinite loop"
921-
),
922-
termination_type="hard_timeout",
923-
)
924-
925780
async def _start_agent_if_needed(
926781
self,
927782
agent_name: str,

src/processor/src/tests/unit/libs/agent_framework/test_groupchat_orchestrator_termination.py

Lines changed: 0 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -293,113 +293,3 @@ async def _run():
293293
)
294294

295295
asyncio.run(_run())
296-
297-
298-
def test_participant_completion_streak_triggers_forced_termination():
299-
"""In agent-framework 1.3.0 the GroupChat orchestrator agent (Coordinator)
300-
is invoked directly inside the framework's ``_invoke_agent_helper`` and
301-
is NOT wrapped in an ``AgentExecutor``, so it never surfaces as a
302-
workflow event. The Coordinator-JSON loop detector in
303-
``_complete_agent_response`` is therefore permanently dead in 1.3.0.
304-
305-
The only observable loop signal we have is consecutive
306-
``executor_completed`` events for the same participant. After
307-
``_participant_consecutive_loop_threshold`` (default 3) same-participant
308-
completions, the orchestrator must force-terminate with ``hard_loop``
309-
so the workflow halts cleanly instead of running until the framework's
310-
own max_rounds ceiling (which at default 100 is ~17 min).
311-
"""
312-
313-
async def _run():
314-
orch = _make_orchestrator()
315-
# Register a participant so the tracker recognizes it.
316-
orch.agents = {"Coordinator": object(), "Chief Architect": object()}
317-
318-
for _ in range(3):
319-
orch._track_participant_completion("Chief Architect")
320-
321-
assert orch._forced_termination_requested is True, (
322-
"Three consecutive completions of the same participant must "
323-
"trigger the participant-streak loop breaker; otherwise the "
324-
"Chief-Architect-only loop observed in production (with the "
325-
"Coordinator invisible to our streaming loop in 1.3.0) can "
326-
"never be detected and the workflow runs until the framework's "
327-
"own max_rounds ceiling fires."
328-
)
329-
assert orch._forced_termination_type == "hard_loop"
330-
assert "Chief Architect" in (orch._forced_termination_reason or "")
331-
assert "3 consecutive" in (orch._forced_termination_reason or "")
332-
333-
asyncio.run(_run())
334-
335-
336-
def test_participant_completion_streak_resets_on_different_participant():
337-
"""If a different participant runs in between, the same-participant
338-
streak counter resets. This prevents false-positive loop detection
339-
when participants alternate normally.
340-
"""
341-
342-
async def _run():
343-
orch = _make_orchestrator()
344-
orch.agents = {
345-
"Coordinator": object(),
346-
"Chief Architect": object(),
347-
"AKS Expert": object(),
348-
}
349-
350-
orch._track_participant_completion("Chief Architect")
351-
orch._track_participant_completion("Chief Architect")
352-
# A different participant runs -> streak resets.
353-
orch._track_participant_completion("AKS Expert")
354-
orch._track_participant_completion("Chief Architect")
355-
orch._track_participant_completion("Chief Architect") # streak=2 only
356-
357-
assert orch._forced_termination_requested is False, (
358-
"Alternating participants must not trigger the loop breaker; "
359-
"the streak should reset whenever a different participant runs."
360-
)
361-
assert orch._participant_completion_streak == 2
362-
assert orch._last_completed_participant == "Chief Architect"
363-
364-
asyncio.run(_run())
365-
366-
367-
def test_participant_completions_total_enforces_max_rounds_under_alternation():
368-
"""``max_rounds`` must be enforced from the per-participant total count
369-
(which grows on EVERY completion) - not from ``len(agent_responses)``
370-
(which only grows on agent switch in ``_start_agent_if_needed`` and
371-
therefore can never reach ``max_rounds`` during a same-agent loop).
372-
373-
This test exercises the alternation case where the streak detector
374-
never fires, ensuring the round-budget guard still halts the workflow.
375-
"""
376-
377-
async def _run():
378-
orch = GroupChatOrchestrator(
379-
name="t",
380-
process_id="p1",
381-
participants={
382-
"Coordinator": object(),
383-
"A": object(),
384-
"B": object(),
385-
},
386-
memory_client=None,
387-
coordinator_name="Coordinator",
388-
max_rounds=4,
389-
result_output_format=None,
390-
)
391-
392-
# Alternate A and B to keep the streak below threshold.
393-
orch._track_participant_completion("A")
394-
orch._track_participant_completion("B")
395-
orch._track_participant_completion("A")
396-
# Streak detector hasn't fired yet (max streak = 1 because of perfect
397-
# alternation). The 4th turn must trip the max_rounds budget.
398-
assert orch._forced_termination_requested is False
399-
orch._track_participant_completion("B")
400-
401-
assert orch._forced_termination_requested is True
402-
assert orch._forced_termination_type == "hard_timeout"
403-
assert "max_rounds=4" in (orch._forced_termination_reason or "")
404-
405-
asyncio.run(_run())

0 commit comments

Comments
 (0)