Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,36 +89,69 @@ def parse_chunk(self, chunk: tuple) -> tuple[tuple[str, ...], str, Any]:

return (namespace, mode, data)

@staticmethod
def _extract_task_tool_calls(task_input: Any) -> list[dict]:
"""Return the tool_call dicts from a ``tasks``-mode chunk ``input``.

LangGraph has emitted two different shapes for the ``input`` field of a
``tasks`` chunk across versions; both must be supported:

- **langgraph < 1.2** (e.g. 1.0.x): ``input`` is a dict
``{"__type": "tool_call_with_context", "tool_call": {...}, "state": {...}}``
— a single tool_call nested under the ``tool_call`` key.
- **langgraph >= 1.2** (e.g. 1.2.x): ``input`` is a *list* of tool_call
dicts, e.g. ``[{"name": "task", "args": {...}, "id": "...", "type": "tool_call"}]``.

Returning a list normalizes both so the caller can iterate uniformly.
Unrecognized shapes yield an empty list.

Note: when this returns nothing for a real subagent invocation, the
namespace mapping stays empty and ``correlate_namespace`` falls back to
treating subagent events as the parent agent — which causes subagent
output to leak into the parent's response. Keep this tolerant of the
shapes langgraph emits.
"""
if isinstance(task_input, dict):
tool_call = task_input.get("tool_call")
return [tool_call] if isinstance(tool_call, dict) else []
if isinstance(task_input, list):
return [tc for tc in task_input if isinstance(tc, dict)]
return []

def _handle_tasks_chunk(self, data: Any) -> None:
"""Extract namespace UUID -> tool_call_id mapping from tasks events.

LangGraph's ``tasks`` stream mode emits task metadata when a tool is
invoked. For the ``task`` tool (subagent invocation), this contains:
- id: The task UUID (used in namespace as "tools:{id}")
- input.tool_call.id: The tool_call_id from the original invocation
- a tool_call carrying the original ``tool_call_id`` and name

We build this mapping so subagent events can be correlated to their
``tool_start`` events, which clients already have.
``tool_start`` events, which clients already have. The shape of the
``input`` field varies across langgraph versions — see
``_extract_task_tool_calls``.
"""
# Tasks data comes as a single dict per event, not a list
if not isinstance(data, dict):
return

task_id = data.get("id")
task_input = data.get("input", {})

# The tool_call info is nested under input.tool_call for tool executions
tool_call = task_input.get("tool_call", {}) if isinstance(task_input, dict) else {}
tool_call_id = tool_call.get("id")
tool_name = tool_call.get("name")
if not task_id:
return

# Only create mapping for "task" tool calls (subagent invocations)
# Other tools don't spawn subgraphs with their own namespace
if task_id and tool_call_id and tool_name == "task":
# Only create mapping for "task" tool calls (subagent invocations).
# Other tools don't spawn subgraphs with their own namespace.
for tool_call in self._extract_task_tool_calls(data.get("input")):
if tool_call.get("name") != "task":
continue
tool_call_id = tool_call.get("id")
if not tool_call_id:
continue
namespace_key = f"tools:{task_id}"
if namespace_key not in self._namespace_mapping:
self._namespace_mapping[namespace_key] = tool_call_id
logger.debug(f"[sse:tasks] Mapped {namespace_key} → {tool_call_id}")
break

def correlate_namespace(self, namespace: tuple[str, ...]) -> tuple[str, ...]:
"""Correlate using internal namespace_mapping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,91 @@ def test_consecutive_same_namespace_no_redundant_context(self):

frames_2 = enc._handle_messages((chunk, {}), ("agent-A",))
assert _namespace_values(frames_2) == [] # no redundant emit


class TestTasksChunkNamespaceMapping:
"""_handle_tasks_chunk must build the subagent namespace mapping across the
differing ``tasks``-mode ``input`` shapes langgraph has emitted.

Regression: langgraph >= 1.2 changed ``input`` from a
``{"tool_call": {...}}`` dict to a bare list of tool_call dicts. The old
parser only read the dict shape, so the mapping silently stayed empty,
``correlate_namespace`` fell back to treating subagent events as the parent,
and subagent output leaked into the parent response. Both shapes must work.
"""

TASK_ID = "abc123-def456"
TOOL_CALL_ID = "toolu_subagent_1"

def _legacy_dict_chunk(self) -> dict:
"""langgraph < 1.2: input is a tool_call_with_context dict."""
return {
"id": self.TASK_ID,
"name": "tools",
"input": {
"__type": "tool_call_with_context",
"tool_call": {
"name": "task",
"args": {"subagent_type": "worker", "description": "do work"},
"id": self.TOOL_CALL_ID,
"type": "tool_call",
},
"state": {"messages": []},
},
}

def _list_chunk(self) -> dict:
"""langgraph >= 1.2: input is a list of tool_call dicts."""
return {
"id": self.TASK_ID,
"name": "tools",
"input": [
{
"name": "task",
"args": {"subagent_type": "worker", "description": "do work"},
"id": self.TOOL_CALL_ID,
"type": "tool_call",
}
],
}

def test_legacy_dict_shape_populates_mapping(self):
enc = AGUIStreamEncoder()
enc._helper._handle_tasks_chunk(self._legacy_dict_chunk())
assert enc._helper._namespace_mapping == {f"tools:{self.TASK_ID}": self.TOOL_CALL_ID}

def test_list_shape_populates_mapping(self):
enc = AGUIStreamEncoder()
enc._helper._handle_tasks_chunk(self._list_chunk())
assert enc._helper._namespace_mapping == {f"tools:{self.TASK_ID}": self.TOOL_CALL_ID}

def test_subagent_namespace_correlates_after_list_chunk(self):
"""End-to-end: after a list-shape tasks chunk, the langgraph internal
namespace correlates to the tool_call_id (not the empty/parent tuple)."""
enc = AGUIStreamEncoder()
enc._helper._handle_tasks_chunk(self._list_chunk())
correlated = enc._helper.correlate_namespace((f"tools:{self.TASK_ID}",))
assert correlated == (self.TOOL_CALL_ID,)

def test_uncorrelated_subagent_namespace_falls_back_to_parent(self):
"""Without a mapping entry, the namespace collapses to parent () — this
is the failure mode the fix prevents, asserted here as a guard."""
enc = AGUIStreamEncoder()
correlated = enc._helper.correlate_namespace((f"tools:{self.TASK_ID}",))
assert correlated == ()

def test_non_task_tool_calls_ignored(self):
"""Only ``task`` tool calls spawn subgraphs; other tools must not map."""
enc = AGUIStreamEncoder()
chunk = self._list_chunk()
chunk["input"][0]["name"] = "get_weather"
enc._helper._handle_tasks_chunk(chunk)
assert enc._helper._namespace_mapping == {}

def test_result_chunk_without_tool_call_is_ignored(self):
"""A tasks result chunk (no input tool_call) must not error or map."""
enc = AGUIStreamEncoder()
enc._helper._handle_tasks_chunk(
{"id": self.TASK_ID, "name": "tools", "result": {"messages": []}, "interrupts": []}
)
assert enc._helper._namespace_mapping == {}