diff --git a/ai_platform_engineering/dynamic_agents/src/dynamic_agents/services/stream_encoders/langgraph_helpers.py b/ai_platform_engineering/dynamic_agents/src/dynamic_agents/services/stream_encoders/langgraph_helpers.py index 0923050f16..71046ba013 100644 --- a/ai_platform_engineering/dynamic_agents/src/dynamic_agents/services/stream_encoders/langgraph_helpers.py +++ b/ai_platform_engineering/dynamic_agents/src/dynamic_agents/services/stream_encoders/langgraph_helpers.py @@ -89,36 +89,69 @@ def parse_chunk(self, chunk: tuple) -> tuple[tuple[str, ...], str, Any]: return (namespace, mode, data) + @staticmethod + def _extract_task_tool_calls(task_input: Any) -> list[dict]: + """Return the tool_call dicts from a ``tasks``-mode chunk ``input``. + + LangGraph has emitted two different shapes for the ``input`` field of a + ``tasks`` chunk across versions; both must be supported: + + - **langgraph < 1.2** (e.g. 1.0.x): ``input`` is a dict + ``{"__type": "tool_call_with_context", "tool_call": {...}, "state": {...}}`` + — a single tool_call nested under the ``tool_call`` key. + - **langgraph >= 1.2** (e.g. 1.2.x): ``input`` is a *list* of tool_call + dicts, e.g. ``[{"name": "task", "args": {...}, "id": "...", "type": "tool_call"}]``. + + Returning a list normalizes both so the caller can iterate uniformly. + Unrecognized shapes yield an empty list. + + Note: when this returns nothing for a real subagent invocation, the + namespace mapping stays empty and ``correlate_namespace`` falls back to + treating subagent events as the parent agent — which causes subagent + output to leak into the parent's response. Keep this tolerant of the + shapes langgraph emits. + """ + if isinstance(task_input, dict): + tool_call = task_input.get("tool_call") + return [tool_call] if isinstance(tool_call, dict) else [] + if isinstance(task_input, list): + return [tc for tc in task_input if isinstance(tc, dict)] + return [] + def _handle_tasks_chunk(self, data: Any) -> None: """Extract namespace UUID -> tool_call_id mapping from tasks events. LangGraph's ``tasks`` stream mode emits task metadata when a tool is invoked. For the ``task`` tool (subagent invocation), this contains: - id: The task UUID (used in namespace as "tools:{id}") - - input.tool_call.id: The tool_call_id from the original invocation + - a tool_call carrying the original ``tool_call_id`` and name We build this mapping so subagent events can be correlated to their - ``tool_start`` events, which clients already have. + ``tool_start`` events, which clients already have. The shape of the + ``input`` field varies across langgraph versions — see + ``_extract_task_tool_calls``. """ # Tasks data comes as a single dict per event, not a list if not isinstance(data, dict): return task_id = data.get("id") - task_input = data.get("input", {}) - - # The tool_call info is nested under input.tool_call for tool executions - tool_call = task_input.get("tool_call", {}) if isinstance(task_input, dict) else {} - tool_call_id = tool_call.get("id") - tool_name = tool_call.get("name") + if not task_id: + return - # Only create mapping for "task" tool calls (subagent invocations) - # Other tools don't spawn subgraphs with their own namespace - if task_id and tool_call_id and tool_name == "task": + # Only create mapping for "task" tool calls (subagent invocations). + # Other tools don't spawn subgraphs with their own namespace. + for tool_call in self._extract_task_tool_calls(data.get("input")): + if tool_call.get("name") != "task": + continue + tool_call_id = tool_call.get("id") + if not tool_call_id: + continue namespace_key = f"tools:{task_id}" if namespace_key not in self._namespace_mapping: self._namespace_mapping[namespace_key] = tool_call_id logger.debug(f"[sse:tasks] Mapped {namespace_key} → {tool_call_id}") + break def correlate_namespace(self, namespace: tuple[str, ...]) -> tuple[str, ...]: """Correlate using internal namespace_mapping. diff --git a/ai_platform_engineering/dynamic_agents/tests/test_agui_sse_namespace.py b/ai_platform_engineering/dynamic_agents/tests/test_agui_sse_namespace.py index 0132ae3d05..ef1cb76f54 100644 --- a/ai_platform_engineering/dynamic_agents/tests/test_agui_sse_namespace.py +++ b/ai_platform_engineering/dynamic_agents/tests/test_agui_sse_namespace.py @@ -194,3 +194,91 @@ def test_consecutive_same_namespace_no_redundant_context(self): frames_2 = enc._handle_messages((chunk, {}), ("agent-A",)) assert _namespace_values(frames_2) == [] # no redundant emit + + +class TestTasksChunkNamespaceMapping: + """_handle_tasks_chunk must build the subagent namespace mapping across the + differing ``tasks``-mode ``input`` shapes langgraph has emitted. + + Regression: langgraph >= 1.2 changed ``input`` from a + ``{"tool_call": {...}}`` dict to a bare list of tool_call dicts. The old + parser only read the dict shape, so the mapping silently stayed empty, + ``correlate_namespace`` fell back to treating subagent events as the parent, + and subagent output leaked into the parent response. Both shapes must work. + """ + + TASK_ID = "abc123-def456" + TOOL_CALL_ID = "toolu_subagent_1" + + def _legacy_dict_chunk(self) -> dict: + """langgraph < 1.2: input is a tool_call_with_context dict.""" + return { + "id": self.TASK_ID, + "name": "tools", + "input": { + "__type": "tool_call_with_context", + "tool_call": { + "name": "task", + "args": {"subagent_type": "worker", "description": "do work"}, + "id": self.TOOL_CALL_ID, + "type": "tool_call", + }, + "state": {"messages": []}, + }, + } + + def _list_chunk(self) -> dict: + """langgraph >= 1.2: input is a list of tool_call dicts.""" + return { + "id": self.TASK_ID, + "name": "tools", + "input": [ + { + "name": "task", + "args": {"subagent_type": "worker", "description": "do work"}, + "id": self.TOOL_CALL_ID, + "type": "tool_call", + } + ], + } + + def test_legacy_dict_shape_populates_mapping(self): + enc = AGUIStreamEncoder() + enc._helper._handle_tasks_chunk(self._legacy_dict_chunk()) + assert enc._helper._namespace_mapping == {f"tools:{self.TASK_ID}": self.TOOL_CALL_ID} + + def test_list_shape_populates_mapping(self): + enc = AGUIStreamEncoder() + enc._helper._handle_tasks_chunk(self._list_chunk()) + assert enc._helper._namespace_mapping == {f"tools:{self.TASK_ID}": self.TOOL_CALL_ID} + + def test_subagent_namespace_correlates_after_list_chunk(self): + """End-to-end: after a list-shape tasks chunk, the langgraph internal + namespace correlates to the tool_call_id (not the empty/parent tuple).""" + enc = AGUIStreamEncoder() + enc._helper._handle_tasks_chunk(self._list_chunk()) + correlated = enc._helper.correlate_namespace((f"tools:{self.TASK_ID}",)) + assert correlated == (self.TOOL_CALL_ID,) + + def test_uncorrelated_subagent_namespace_falls_back_to_parent(self): + """Without a mapping entry, the namespace collapses to parent () — this + is the failure mode the fix prevents, asserted here as a guard.""" + enc = AGUIStreamEncoder() + correlated = enc._helper.correlate_namespace((f"tools:{self.TASK_ID}",)) + assert correlated == () + + def test_non_task_tool_calls_ignored(self): + """Only ``task`` tool calls spawn subgraphs; other tools must not map.""" + enc = AGUIStreamEncoder() + chunk = self._list_chunk() + chunk["input"][0]["name"] = "get_weather" + enc._helper._handle_tasks_chunk(chunk) + assert enc._helper._namespace_mapping == {} + + def test_result_chunk_without_tool_call_is_ignored(self): + """A tasks result chunk (no input tool_call) must not error or map.""" + enc = AGUIStreamEncoder() + enc._helper._handle_tasks_chunk( + {"id": self.TASK_ID, "name": "tools", "result": {"messages": []}, "interrupts": []} + ) + assert enc._helper._namespace_mapping == {}