Skip to content

Commit 42902ab

Browse files
fix(groupchat): route WorkflowEvent payloads + enforce framework max_rounds for af 1.3.0
In agent-framework 1.3.0, `workflow.run(stream=True)` only yields `WorkflowEvent` instances. `AgentResponseUpdate` is wrapped inside `event.data` for `type=="output"` events. The two types are unrelated (verified by MRO), so the previous `isinstance(event, AgentResponseUpdate)` gate from the b260107 era was permanently dead in 1.3.0. As a result every orchestrator-side safety guard inside that branch silently no-opped: * per-agent loop detection * Coordinator finish=true detection * max_rounds enforcement * streaming callback dispatch * manager-instruction extraction That is why production runs hit the framework's own 100-iteration runner cap as `RuntimeError("Runner did not converge after 100 iterations")` even after the recent identity-resolution patch (which only touched code that never executed). Three coordinated fixes: 1. Replace the dead `isinstance(event, AgentResponseUpdate)` gate with `isinstance(event, WorkflowEvent) and event.type == "output"` and inspect `event.data` / `event.executor_id` to distinguish per- participant streaming chunks (executor_id matches one of self.agents and data is AgentResponseUpdate) from the framework orchestrator's final output (list[Message] or custom result object). 2. Add `executor_id` parameter to `_handle_agent_update` so identity resolves from the WorkflowEvent wrapper's executor_id (always populated from `AgentExecutor.id` = the agent's name) first, then falls back to `event.author_name`, then legacy `event.agent_id`. Matches the approach already used by Content Processing Solution. 3. Pass `max_rounds=self.max_rounds` and `intermediate_outputs=True` to `GroupChatBuilder`: - `max_rounds` gives the framework itself a clean termination ceiling so even if our orchestrator-side guards miss, the workflow halts cleanly instead of crashing at the runner's 100-iteration cap. - `intermediate_outputs=True` is required for each participant's `yield_output(AgentResponseUpdate)` call to surface as a workflow `output` event. Without this, only the orchestrator's final yield reaches our streaming loop and the per-agent guards above never run. Tests: * Existing termination/loop-detection tests still pass (handler now has 3-tier identity resolution with backward-compat for `author_name`). * Added `test_handle_agent_update_prefers_executor_id_over_author_name` to lock in the new precedence. * Added `test_handle_agent_update_strips_executor_id_prefix` to cover the `groupchat_agent:Coordinator` framework prefix. * Full suite: 833 passed (was 831; +2 new tests). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent f74629b commit 42902ab

2 files changed

Lines changed: 135 additions & 34 deletions

File tree

src/processor/src/libs/agent_framework/groupchat_orchestrator.py

Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,32 @@ async def run_stream(
543543
termination_type="hard_timeout",
544544
)
545545

546-
if isinstance(event, AgentResponseUpdate):
546+
# In agent-framework 1.3.0, ``workflow.run(stream=True)`` yields
547+
# only ``WorkflowEvent`` instances; ``AgentResponseUpdate`` is
548+
# wrapped inside ``WorkflowEvent.data`` for ``type=="output"``
549+
# events. The previous ``isinstance(event, AgentResponseUpdate)``
550+
# check from the b260107 era is permanently dead in 1.3.0
551+
# because the two types are unrelated. We now dispatch on
552+
# ``WorkflowEvent.type`` and inspect ``event.data`` /
553+
# ``event.executor_id`` to route per-participant streaming
554+
# chunks vs the orchestrator's final output.
555+
if not isinstance(event, WorkflowEvent) or event.type != "output":
556+
continue
557+
558+
data = event.data
559+
src_executor = self._normalize_executor_id(event.executor_id or "")
560+
561+
# Per-participant streaming chunk. Requires
562+
# ``intermediate_outputs=True`` on the GroupChatBuilder so the
563+
# underlying executors' ``yield_output(AgentResponseUpdate)``
564+
# calls surface as workflow events rather than being swallowed.
565+
if (
566+
isinstance(data, AgentResponseUpdate)
567+
and src_executor in self.agents
568+
):
547569
await self._handle_agent_update(
548-
event,
570+
data,
571+
executor_id=event.executor_id,
549572
stream_callback=on_agent_response_stream,
550573
callback=on_agent_response,
551574
)
@@ -565,22 +588,23 @@ async def run_stream(
565588
# If the Coordinator requested finish=true, stop immediately.
566589
if self._termination_requested:
567590
break
568-
elif event.type == "output":
569-
event: WorkflowEvent
570-
# Complete last agent's response before finishing
571-
if self._last_executor_id and self._current_agent_response:
572-
await self._complete_agent_response(
573-
self._last_executor_id, on_agent_response
574-
)
575591

576-
# Extract final conversation from output
577-
if isinstance(event.data, list):
578-
conversation = event.data
579-
self._conversation = conversation # Update instance variable
580-
else:
581-
# Handle custom result objects with conversation attribute
582-
conversation = getattr(event.data, "conversation", [])
583-
self._conversation = conversation # Update instance variable
592+
continue
593+
594+
# Final orchestrator output: complete any buffered agent
595+
# response and capture the conversation.
596+
if self._last_executor_id and self._current_agent_response:
597+
await self._complete_agent_response(
598+
self._last_executor_id, on_agent_response
599+
)
600+
601+
if isinstance(data, list):
602+
conversation = data
603+
self._conversation = conversation # Update instance variable
604+
else:
605+
# Handle custom result objects with conversation attribute
606+
conversation = getattr(data, "conversation", [])
607+
self._conversation = conversation # Update instance variable
584608

585609
# Backfill tool usage from the final conversation (more reliable than streaming updates)
586610
# AgentResponseUpdate may stream text only; tool calls are represented as FunctionCallContent
@@ -715,6 +739,7 @@ async def run_stream(
715739
async def _handle_agent_update(
716740
self,
717741
event: AgentResponseUpdate,
742+
executor_id: str | None = None,
718743
stream_callback: AgentResponseStreamCallback | None = None,
719744
callback: AgentResponseCallback | None = None,
720745
) -> None:
@@ -726,19 +751,21 @@ async def _handle_agent_update(
726751
2. On agent switch, complete previous agent's response
727752
3. Trigger callback with complete response
728753
4. Handle tool calls separately from text streaming
754+
755+
Agent identity resolution priority:
756+
1. ``executor_id`` from the wrapping ``WorkflowEvent`` (always
757+
populated by the workflow runner from ``AgentExecutor.id`` which
758+
is the agent's name). This is the primary source in 1.3.0.
759+
2. ``event.author_name`` (set by 1.3.0's ``map_chat_to_agent_update``).
760+
3. ``event.agent_id`` (legacy; not populated in 1.3.0).
729761
"""
730-
# NOTE: In agent-framework 1.3.0, ``AgentResponseUpdate.agent_id`` is no
731-
# longer populated by ``map_chat_to_agent_update`` (only ``author_name``
732-
# is set, from the agent's name). Reading ``event.agent_id`` alone
733-
# silently yielded an empty string, which made every downstream identity
734-
# check (loop detection, coordinator termination signal extraction,
735-
# manager-instruction parsing) silently no-op. Prefer ``author_name``
736-
# and fall back to ``agent_id`` only for older shapes. Use ``getattr``
737-
# so older event types without ``author_name`` still work.
738-
author_name = getattr(event, "author_name", None)
739-
agent_name = author_name or self._normalize_executor_id(
740-
getattr(event, "agent_id", None) or ""
741-
)
762+
if executor_id:
763+
agent_name = self._normalize_executor_id(executor_id)
764+
else:
765+
author_name = getattr(event, "author_name", None)
766+
agent_name = author_name or self._normalize_executor_id(
767+
getattr(event, "agent_id", None) or ""
768+
)
742769
await self._start_agent_if_needed(agent_name, stream_callback, callback)
743770
self._append_text_chunk(event)
744771
await self._process_tool_calls(event, agent_name, stream_callback)
@@ -1237,10 +1264,24 @@ async def _build_groupchat(self) -> Workflow:
12371264
and name != self.get_result_generator_name()
12381265
]
12391266

1267+
# ``max_rounds`` is enforced at the framework level so the workflow
1268+
# halts cleanly even if our orchestrator-side guards miss an event
1269+
# shape. Without this, the framework's default behavior is "continue
1270+
# indefinitely" (see GroupChatBuilder docstring) until the workflow
1271+
# runner hits its own 100-iteration cap and raises
1272+
# ``RuntimeError("Runner did not converge after 100 iterations")``.
1273+
#
1274+
# ``intermediate_outputs=True`` surfaces each participant's
1275+
# ``yield_output(AgentResponseUpdate)`` call as a workflow ``output``
1276+
# event. Without this, only the orchestrator's final yield reaches
1277+
# our streaming loop, which means per-agent loop detection, finish
1278+
# signal extraction, and streaming callbacks all silently no-op.
12401279
return (
12411280
GroupChatBuilder(
12421281
orchestrator_agent=coordinator,
12431282
participants=participants,
1283+
max_rounds=self.max_rounds,
1284+
intermediate_outputs=True,
12441285
)
12451286
.build()
12461287
)

src/processor/src/tests/unit/libs/agent_framework/test_groupchat_orchestrator_termination.py

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,69 @@ async def _run():
227227
await orch._complete_agent_response("Chief Architect", callback=None)
228228

229229
assert orch._forced_termination_requested is True, (
230-
"Loop detection failed to fire after 3 identical Coordinator "
231-
"selections via _handle_agent_update; agent identity resolution "
232-
"is broken."
233-
)
234-
230+
"Loop detection failed to fire after 3 identical Coordinator "
231+
"selections via _handle_agent_update; agent identity resolution "
232+
"is broken."
233+
)
234+
235+
asyncio.run(_run())
236+
237+
238+
def test_handle_agent_update_prefers_executor_id_over_author_name():
239+
"""In agent-framework 1.3.0, the workflow runner always wraps payloads in
240+
a ``WorkflowEvent`` whose ``executor_id`` is the ``AgentExecutor.id``
241+
(= the agent's name). This is the most reliable identity source - more
242+
reliable than ``author_name`` which may differ if the agent runtime
243+
rewrites the chat author. The handler must prefer ``executor_id`` when
244+
provided.
245+
"""
246+
247+
async def _run():
248+
orch = _make_orchestrator()
249+
250+
# author_name disagrees with the framework executor_id on purpose.
251+
event = _AgentResponseUpdateStub(
252+
author_name="SomethingElse",
253+
agent_id=None,
254+
)
255+
256+
await orch._handle_agent_update(
257+
event,
258+
executor_id="Coordinator",
259+
stream_callback=None,
260+
callback=None,
261+
) # type: ignore[arg-type]
262+
263+
assert orch._last_executor_id == "Coordinator", (
264+
"executor_id from the WorkflowEvent wrapper must take precedence "
265+
"over event.author_name; otherwise downstream coordinator checks "
266+
"may resolve to the wrong agent."
267+
)
268+
269+
asyncio.run(_run())
270+
271+
272+
def test_handle_agent_update_strips_executor_id_prefix():
273+
"""``GroupChatBuilder`` may wrap executor ids with a
274+
``groupchat_agent:Coordinator`` prefix. ``_normalize_executor_id`` must
275+
strip it so the agent name compares cleanly against ``coordinator_name``.
276+
"""
277+
278+
async def _run():
279+
orch = _make_orchestrator()
280+
281+
event = _AgentResponseUpdateStub(author_name=None, agent_id=None)
282+
283+
await orch._handle_agent_update(
284+
event,
285+
executor_id="groupchat_agent:Coordinator",
286+
stream_callback=None,
287+
callback=None,
288+
) # type: ignore[arg-type]
289+
290+
assert orch._last_executor_id == "Coordinator", (
291+
"_normalize_executor_id must strip the framework prefix so "
292+
"agent identity matches the configured coordinator_name."
293+
)
294+
235295
asyncio.run(_run())

0 commit comments

Comments
 (0)