diff --git a/docs/concepts/agent-rollout-ingestion.md b/docs/concepts/agent-rollout-ingestion.md index 8413f56c..268b39b4 100644 --- a/docs/concepts/agent-rollout-ingestion.md +++ b/docs/concepts/agent-rollout-ingestion.md @@ -42,6 +42,18 @@ Use `AgentRolloutSeedSource` when you want to work from existing agent traces in ) ``` +=== "Pi Coding Agent" + + Uses `~/.pi/agent/sessions` and `*.jsonl` by default. Sessions are tree-structured JSONL files; the active conversation path is resolved automatically. + + ```python + import data_designer.config as dd + + seed_source = dd.AgentRolloutSeedSource( + format=dd.AgentRolloutFormat.PI_CODING_AGENT, + ) + ``` + === "ATIF" ATIF requires an explicit `path`. See Harbor's [ATIF documentation](https://harborframework.com/docs/trajectory-format) for the format specification. @@ -63,31 +75,31 @@ You can override `path` and `file_pattern` for any format when your rollout arti All supported rollout formats map into the same seeded row schema. In the table below, `None` means the source artifact does not expose that field directly, and `derived` means Data Designer computes it from normalized `messages`. -| Normalized field | ATIF | Claude Code | Codex | Hermes Agent | -|---|---|---|---|---| -| `trace_id` | `session_id` | `sessionId[:agentId]` | `session_meta.id` or file stem | CLI `session_id` or file stem; gateway file stem | -| `source_kind` | `"atif"` | `"claude_code"` | `"codex"` | `"hermes_agent"` | -| `source_path` | Parsed `.json` path | Parsed `.jsonl` trace path | Parsed `rollout-*.jsonl` path | Parsed CLI `.json` or gateway `.jsonl` path | -| `root_session_id` | `session_id` | `sessionId` or file stem | `trace_id` | `trace_id` | -| `agent_id` | `None` | `agentId` | `None` | `None` | -| `is_sidechain` | `False` | `isSidechain` | `False` | `False` | -| `cwd` | `agent.extra.cwd` | First non-null record `cwd` | `session_meta.cwd` | `None` | -| `project_path` | `extra.project_path` or `cwd` | `projectPath` or `cwd` | `cwd` | `None` | -| `git_branch` | `agent.extra.git_branch` | First non-null record `gitBranch` | `session_meta.git_branch` | `None` | -| `started_at` | Earliest step timestamp | Earliest row timestamp | `session_meta.timestamp` or earliest record timestamp | CLI `session_start`; gateway `created_at` | -| `ended_at` | Latest step timestamp | Latest row timestamp | Latest record timestamp | CLI `last_updated`; gateway `updated_at` | -| `messages` | Normalized steps | Normalized trace rows | Normalized response items | Normalized CLI or gateway rows | -| `source_meta` | ATIF metadata | Claude metadata | Codex metadata | Hermes metadata | -| `message_count` | `derived` | `derived` | `derived` | `derived` | -| `tool_call_count` | `derived` | `derived` | `derived` | `derived` | -| `final_assistant_message` | `derived` | `derived` | `derived` | `derived` | +| Normalized field | ATIF | Claude Code | Codex | Hermes Agent | Pi Coding Agent | +|---|---|---|---|---|---| +| `trace_id` | `session_id` | `sessionId[:agentId]` | `session_meta.id` or file stem | CLI `session_id` or file stem; gateway file stem | Session header `id` | +| `source_kind` | `"atif"` | `"claude_code"` | `"codex"` | `"hermes_agent"` | `"pi_coding_agent"` | +| `source_path` | Parsed `.json` path | Parsed `.jsonl` trace path | Parsed `rollout-*.jsonl` path | Parsed CLI `.json` or gateway `.jsonl` path | Parsed `.jsonl` session path | +| `root_session_id` | `session_id` | `sessionId` or file stem | `trace_id` | `trace_id` | Session header `id` | +| `agent_id` | `None` | `agentId` | `None` | `None` | `None` | +| `is_sidechain` | `False` | `isSidechain` | `False` | `False` | `False` | +| `cwd` | `agent.extra.cwd` | First non-null record `cwd` | `session_meta.cwd` | `None` | Session header `cwd` | +| `project_path` | `extra.project_path` or `cwd` | `projectPath` or `cwd` | `cwd` | `None` | Session header `cwd` | +| `git_branch` | `agent.extra.git_branch` | First non-null record `gitBranch` | `session_meta.git_branch` | `None` | `None` | +| `started_at` | Earliest step timestamp | Earliest row timestamp | `session_meta.timestamp` or earliest record timestamp | CLI `session_start`; gateway `created_at` | Earliest entry timestamp | +| `ended_at` | Latest step timestamp | Latest row timestamp | Latest record timestamp | CLI `last_updated`; gateway `updated_at` | Latest entry timestamp | +| `messages` | Normalized steps | Normalized trace rows | Normalized response items | Normalized CLI or gateway rows | Normalized active-path messages | +| `source_meta` | ATIF metadata | Claude metadata | Codex metadata | Hermes metadata | Pi session metadata | +| `message_count` | `derived` | `derived` | `derived` | `derived` | `derived` | +| `tool_call_count` | `derived` | `derived` | `derived` | `derived` | `derived` | +| `final_assistant_message` | `derived` | `derived` | `derived` | `derived` | `derived` | ### Notes -- `trace_id`: Claude Code appends `agentId` when present. Hermes uses either the CLI session ID or the gateway transcript file stem. -- `is_sidechain`: ATIF and Hermes currently normalize this to `False`. Claude Code preserves `isSidechain` directly. -- `messages`: All formats normalize into the same chat-style message schema. See [Message Traces](traces.md) for the shared block structure. -- `source_meta`: This is where format-specific details live, such as ATIF copied-context metadata, Claude summaries, Codex response-item types, or Hermes tool/session metadata. +- `trace_id`: Claude Code appends `agentId` when present. Hermes uses either the CLI session ID or the gateway transcript file stem. Pi uses the session header `id`. +- `is_sidechain`: ATIF, Hermes, and Pi currently normalize this to `False`. Claude Code preserves `isSidechain` directly. +- `messages`: All formats normalize into the same chat-style message schema. See [Message Traces](traces.md) for the shared block structure. Pi sessions are tree-structured; only the active conversation path (from the last entry back to root) is included. +- `source_meta`: This is where format-specific details live, such as ATIF copied-context metadata, Claude summaries, Codex response-item types, Hermes tool/session metadata, or Pi session version and branch information. ## Example: Summarize a Random Turn diff --git a/packages/data-designer-config/src/data_designer/config/seed_source.py b/packages/data-designer-config/src/data_designer/config/seed_source.py index e145e6f3..bfd94fbf 100644 --- a/packages/data-designer-config/src/data_designer/config/seed_source.py +++ b/packages/data-designer-config/src/data_designer/config/seed_source.py @@ -176,6 +176,10 @@ def get_hermes_agent_default_path() -> str: return str(Path("~/.hermes/sessions").expanduser()) +def get_pi_coding_agent_default_path() -> str: + return str(Path("~/.pi/agent/sessions").expanduser()) + + def _validate_filesystem_seed_source_path(value: str | None) -> str | None: if value is None: return None @@ -200,6 +204,7 @@ class AgentRolloutFormat(StrEnum): CLAUDE_CODE = "claude_code" CODEX = "codex" HERMES_AGENT = "hermes_agent" + PI_CODING_AGENT = "pi_coding_agent" def get_agent_rollout_format_defaults(fmt: AgentRolloutFormat) -> tuple[str | None, str]: @@ -211,6 +216,8 @@ def get_agent_rollout_format_defaults(fmt: AgentRolloutFormat) -> tuple[str | No return (get_codex_default_path(), "*.jsonl") if fmt == AgentRolloutFormat.HERMES_AGENT: return (get_hermes_agent_default_path(), "*.json*") + if fmt == AgentRolloutFormat.PI_CODING_AGENT: + return (get_pi_coding_agent_default_path(), "*.jsonl") raise ValueError(f"🛑 Unknown agent rollout format: {fmt!r}") @@ -228,7 +235,8 @@ class AgentRolloutSeedSource(FileSystemSeedSource): "Directory containing agent rollout artifacts. This field is required for ATIF trajectories. " "When omitted, built-in defaults are used for formats that define one. " "Claude Code defaults to ~/.claude/projects, Codex defaults to ~/.codex/sessions, " - "and Hermes Agent defaults to ~/.hermes/sessions. " + "Hermes Agent defaults to ~/.hermes/sessions, " + "and Pi Coding Agent defaults to ~/.pi/agent/sessions. " "Relative paths are resolved from the current working directory when the config is loaded, " "not from the config file location." ), @@ -238,7 +246,7 @@ class AgentRolloutSeedSource(FileSystemSeedSource): None, description=( "Case-sensitive filename pattern used to match agent rollout files. When omitted, " - "ATIF defaults to '*.json', Claude Code and Codex default to '*.jsonl', " + "ATIF defaults to '*.json', Claude Code, Codex, and Pi Coding Agent default to '*.jsonl', " "and Hermes Agent defaults to '*.json*'." ), ) diff --git a/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/hermes_agent.py b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/hermes_agent.py index 6d4ca2e9..c6aced68 100644 --- a/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/hermes_agent.py +++ b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/hermes_agent.py @@ -16,6 +16,7 @@ coerce_optional_str, load_json_object, load_jsonl_rows, + normalize_message_content, normalize_message_role, require_string, stringify_json_value, @@ -244,7 +245,7 @@ def normalize_hermes_messages( normalized_messages.append( build_message( role="tool", - content=_normalize_message_content(raw_message.get("content")), + content=normalize_message_content(raw_message.get("content")), tool_call_id=require_string( raw_message.get("tool_call_id"), f"Hermes tool message tool_call_id #{message_index} in {file_path}", @@ -253,7 +254,7 @@ def normalize_hermes_messages( ) continue - content = _normalize_message_content(raw_message.get("content")) + content = normalize_message_content(raw_message.get("content")) reasoning_content = coerce_optional_str(raw_message.get("reasoning")) tool_calls = normalize_hermes_tool_calls( raw_message.get("tool_calls"), @@ -413,22 +414,6 @@ def _require_message_list(raw_messages: Any, *, file_path: Path, context: str) - return raw_messages -def _normalize_message_content(content: Any) -> Any: - """Coerce Hermes message content into the normalized content shape. - - Args: - content: Raw Hermes message content. - - Returns: - A string or content-block list compatible with ``build_message``. - """ - if content is None: - return "" - if isinstance(content, (str, list)): - return content - return stringify_json_value(content) - - def _extract_finish_reasons(raw_messages: list[dict[str, Any]]) -> list[str]: """Collect distinct assistant finish reasons in first-seen order. diff --git a/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/pi_coding_agent.py b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/pi_coding_agent.py new file mode 100644 index 00000000..6854ff4e --- /dev/null +++ b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/pi_coding_agent.py @@ -0,0 +1,428 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any, ClassVar + +from data_designer.config.seed_source import AgentRolloutFormat +from data_designer.engine.resources.agent_rollout.base import AgentRolloutFormatHandler, AgentRolloutParseContext +from data_designer.engine.resources.agent_rollout.types import AgentRolloutSeedParseError, NormalizedAgentRolloutRecord +from data_designer.engine.resources.agent_rollout.utils import ( + build_message, + coerce_optional_str, + load_jsonl_rows, + normalize_message_content, + require_string, + stringify_json_value, +) + +logger = logging.getLogger(__name__) + + +class PiCodingAgentRolloutFormatHandler(AgentRolloutFormatHandler): + """Normalize Pi Coding Agent session artifacts into rollout seed rows.""" + + format: ClassVar[AgentRolloutFormat] = AgentRolloutFormat.PI_CODING_AGENT + + def is_handled_file(self, relative_path: str) -> bool: + """Return whether a file path should be parsed as a Pi session. + + Args: + relative_path: File path relative to the configured rollout root. + + Returns: + ``True`` for ``.jsonl`` session files. + """ + return Path(relative_path).suffix == ".jsonl" + + def parse_file( + self, + *, + root_path: Path, + relative_path: str, + parse_context: AgentRolloutParseContext | None = None, + ) -> list[NormalizedAgentRolloutRecord]: + """Parse one Pi session JSONL file into a normalized rollout record. + + Args: + root_path: Root directory configured on the seed source. + relative_path: Path to the Pi session file relative to ``root_path``. + parse_context: Unused for Pi sessions. + + Returns: + A single normalized record, or an empty list for empty files. + """ + del parse_context + file_path = root_path / relative_path + rows = list(load_jsonl_rows(file_path)) + if not rows: + logger.warning("Skipping empty Pi Coding Agent session file %s", file_path) + return [] + + record = parse_pi_session(file_path=file_path, rows=rows) + return [record] + + +def parse_pi_session( + *, + file_path: Path, + rows: list[tuple[int, dict[str, Any]]], +) -> NormalizedAgentRolloutRecord: + """Parse a Pi Coding Agent JSONL session into a normalized record. + + Pi sessions are tree-structured via ``id``/``parentId`` fields. This + function resolves the active conversation path by walking from the last + entry back to the root. + + Args: + file_path: Absolute path to the Pi session file. + rows: Parsed JSONL rows as ``(line_number, payload)`` pairs. + + Returns: + A normalized rollout record for the session. + + Raises: + AgentRolloutSeedParseError: If the session header is missing or the + payload is malformed. + """ + _, first_row = rows[0] + if first_row.get("type") != "session": + raise AgentRolloutSeedParseError(f"Pi session at {file_path} is missing a session header as the first entry") + + session_header = first_row + session_id = require_string(session_header.get("id"), f"Pi session header id in {file_path}") + cwd = coerce_optional_str(session_header.get("cwd")) + session_version = session_header.get("version") + + entries = [entry for _, entry in rows[1:]] + active_entries = _resolve_active_path(entries) + + messages: list[dict[str, Any]] = [] + entry_types: set[str] = set() + models_used: list[str] = [] + seen_models: set[str] = set() + stop_reasons: list[str] = [] + seen_stop_reasons: set[str] = set() + bash_execution_count = 0 + + for entry in active_entries: + entry_type = coerce_optional_str(entry.get("type")) or "unknown" + entry_types.add(entry_type) + + if entry_type == "model_change": + model_id = coerce_optional_str(entry.get("modelId")) + if model_id and model_id not in seen_models: + seen_models.add(model_id) + models_used.append(model_id) + continue + + # Entry-level compaction (context summary replacing earlier messages) and + # branch_summary (LLM summary of an abandoned branch injected via /tree). + if entry_type in ("compaction", "branch_summary"): + summary = coerce_optional_str(entry.get("summary")) + if summary: + messages.append(build_message(role="system", content=summary)) + continue + + # Entry-level custom message: extension-injected content that participates + # in LLM context. Distinct from the ``custom`` entry type which is + # state-only and never enters context. + if entry_type == "custom_message": + if entry.get("display"): + messages.append(build_message(role="system", content=normalize_message_content(entry.get("content")))) + continue + + if entry_type != "message": + continue + + raw_message = entry.get("message") + if not isinstance(raw_message, dict): + continue + + entry_id = coerce_optional_str(entry.get("id")) or "" + role = coerce_optional_str(raw_message.get("role")) + + if role == "assistant": + stop_reason = coerce_optional_str(raw_message.get("stopReason")) + if stop_reason and stop_reason not in seen_stop_reasons: + seen_stop_reasons.add(stop_reason) + stop_reasons.append(stop_reason) + + model = coerce_optional_str(raw_message.get("model")) + if model and model not in seen_models: + seen_models.add(model) + models_used.append(model) + + if role == "bashExecution": + bash_execution_count += 1 + + normalized = _normalize_pi_message(raw_message, file_path=file_path, entry_id=entry_id) + messages.extend(normalized) + + timestamps: list[str] = [] + session_timestamp = coerce_optional_str(session_header.get("timestamp")) + if session_timestamp: + timestamps.append(session_timestamp) + for entry in entries: + ts = coerce_optional_str(entry.get("timestamp")) + if ts: + timestamps.append(ts) + + has_branches = _detect_branches(entries) + + source_meta: dict[str, Any] = { + "record_count": len(rows), + "entry_types": sorted(entry_types), + "stop_reasons": stop_reasons, + } + if session_version is not None: + source_meta["session_version"] = session_version + if models_used: + source_meta["models_used"] = models_used + if has_branches: + source_meta["has_branches"] = True + if bash_execution_count: + source_meta["bash_execution_count"] = bash_execution_count + parent_session = coerce_optional_str(session_header.get("parentSession")) + if parent_session: + source_meta["parent_session"] = parent_session + + return NormalizedAgentRolloutRecord( + trace_id=session_id, + source_kind=AgentRolloutFormat.PI_CODING_AGENT.value, + source_path=str(file_path), + root_session_id=session_id, + agent_id=None, + is_sidechain=False, + cwd=cwd, + project_path=cwd, + git_branch=None, + started_at=min(timestamps) if timestamps else None, + ended_at=max(timestamps) if timestamps else None, + messages=messages, + source_meta=source_meta, + ) + + +def _resolve_active_path(entries: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Walk from the last entry back to the root via ``parentId`` to get the active path. + + Args: + entries: All session entries (excluding the session header). + + Returns: + Entries on the active conversation path in chronological order. + """ + if not entries: + return [] + + entries_by_id: dict[str, dict[str, Any]] = {} + for entry in entries: + entry_id = entry.get("id") + if isinstance(entry_id, str) and entry_id: + entries_by_id[entry_id] = entry + + path: list[dict[str, Any]] = [] + current: dict[str, Any] | None = entries[-1] + visited: set[str] = set() + + while current is not None: + current_id = current.get("id") + if isinstance(current_id, str) and current_id in visited: + break + if isinstance(current_id, str): + visited.add(current_id) + path.append(current) + parent_id = current.get("parentId") + if parent_id is None: + break + current = entries_by_id.get(parent_id) + + path.reverse() + return path + + +def _detect_branches(entries: list[dict[str, Any]]) -> bool: + """Return whether any parent ID is referenced by more than one child. + + Args: + entries: All session entries (excluding the session header). + + Returns: + ``True`` if the session tree contains at least one branch point. + """ + seen_parents: set[str] = set() + for entry in entries: + parent_id = entry.get("parentId") + if not isinstance(parent_id, str) or not parent_id: + continue + if parent_id in seen_parents: + return True + seen_parents.add(parent_id) + return False + + +def _normalize_pi_message( + raw_message: dict[str, Any], + *, + file_path: Path, + entry_id: str, +) -> list[dict[str, Any]]: + """Normalize one Pi ``AgentMessage`` into chat-schema messages. + + Args: + raw_message: The ``message`` payload from a Pi ``message`` entry. + file_path: File being parsed, used for error reporting. + entry_id: Entry ID used to synthesize tool-call IDs for bash executions. + + Returns: + One or more normalized messages. ``bashExecution`` messages produce an + assistant tool-call message followed by a tool-result message. + """ + role = coerce_optional_str(raw_message.get("role")) + + if role == "user": + return [build_message(role="user", content=normalize_message_content(raw_message.get("content")))] + + if role == "assistant": + return [_normalize_pi_assistant_message(raw_message)] + + if role == "toolResult": + return [ + build_message( + role="tool", + content=normalize_message_content(raw_message.get("content")), + tool_call_id=require_string( + raw_message.get("toolCallId"), + f"Pi toolResult toolCallId in {file_path}", + ), + ) + ] + + if role == "bashExecution": + return _normalize_pi_bash_execution(raw_message, entry_id=entry_id) + + if role == "custom": + if raw_message.get("display"): + return [build_message(role="system", content=normalize_message_content(raw_message.get("content")))] + return [] + + if role in ("compactionSummary", "branchSummary"): + summary = coerce_optional_str(raw_message.get("summary")) + if summary: + return [build_message(role="system", content=summary)] + return [] + + return [] + + +def _normalize_pi_assistant_message(raw_message: dict[str, Any]) -> dict[str, Any]: + """Normalize a Pi assistant message with structured content blocks. + + Pi assistant messages contain a list of typed content blocks: + ``TextContent``, ``ThinkingContent``, and ``ToolCall``. + + Args: + raw_message: Raw Pi assistant message payload. + + Returns: + A single normalized assistant message. + """ + content_blocks = raw_message.get("content") + text_parts: list[str] = [] + thinking_parts: list[str] = [] + tool_calls: list[dict[str, Any]] = [] + + if isinstance(content_blocks, list): + for block in content_blocks: + if not isinstance(block, dict): + if isinstance(block, str) and block: + text_parts.append(block) + continue + + block_type = block.get("type") + if block_type == "text": + text = coerce_optional_str(block.get("text")) + if text: + text_parts.append(text) + elif block_type == "thinking": + thinking = coerce_optional_str(block.get("thinking")) + if thinking: + thinking_parts.append(thinking) + elif block_type == "toolCall": + tool_calls.append( + { + "id": block.get("id", ""), + "type": "function", + "function": { + "name": block.get("name", ""), + "arguments": stringify_json_value(block.get("arguments")), + }, + } + ) + elif isinstance(content_blocks, str): + text_parts.append(content_blocks) + + content = "\n\n".join(text_parts) if text_parts else "" + reasoning_content = "\n\n".join(thinking_parts) if thinking_parts else None + + return build_message( + role="assistant", + content=content, + reasoning_content=reasoning_content, + tool_calls=tool_calls, + ) + + +def _normalize_pi_bash_execution( + raw_message: dict[str, Any], + *, + entry_id: str, +) -> list[dict[str, Any]]: + """Normalize a Pi ``bashExecution`` message as a tool-call pair. + + Args: + raw_message: Raw Pi bash execution message payload. + entry_id: Entry ID used to synthesize a tool-call ID. + + Returns: + An assistant message with a synthetic ``bash`` tool call followed by + a tool-result message containing the execution output. + """ + if raw_message.get("excludeFromContext"): + return [] + + synthetic_id = f"bash_{entry_id}" if entry_id else "bash_unknown" + command = coerce_optional_str(raw_message.get("command")) or "" + output = coerce_optional_str(raw_message.get("output")) or "" + exit_code = raw_message.get("exitCode") + + result_parts = [output] if output else [] + if exit_code is not None: + result_parts.append(f"[exit code: {exit_code}]") + result_content = "\n".join(result_parts) if result_parts else "" + + return [ + build_message( + role="assistant", + content="", + tool_calls=[ + { + "id": synthetic_id, + "type": "function", + "function": { + "name": "bash", + "arguments": stringify_json_value({"command": command}), + }, + } + ], + ), + build_message( + role="tool", + content=result_content, + tool_call_id=synthetic_id, + ), + ] diff --git a/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/registry.py b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/registry.py index 838454aa..2c24052d 100644 --- a/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/registry.py +++ b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/registry.py @@ -9,6 +9,7 @@ from data_designer.engine.resources.agent_rollout.claude_code import ClaudeCodeAgentRolloutFormatHandler from data_designer.engine.resources.agent_rollout.codex import CodexAgentRolloutFormatHandler from data_designer.engine.resources.agent_rollout.hermes_agent import HermesAgentRolloutFormatHandler +from data_designer.engine.resources.agent_rollout.pi_coding_agent import PiCodingAgentRolloutFormatHandler BUILTIN_AGENT_ROLLOUT_FORMAT_HANDLERS: dict[AgentRolloutFormat, AgentRolloutFormatHandler] = { handler.format: handler @@ -17,6 +18,7 @@ ClaudeCodeAgentRolloutFormatHandler(), CodexAgentRolloutFormatHandler(), HermesAgentRolloutFormatHandler(), + PiCodingAgentRolloutFormatHandler(), ) } diff --git a/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/utils.py b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/utils.py index a9d3cfb4..a799afda 100644 --- a/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/utils.py +++ b/packages/data-designer-engine/src/data_designer/engine/resources/agent_rollout/utils.py @@ -90,6 +90,20 @@ def stringify_json_value(value: Any) -> str: return json.dumps(value if value is not None else {}, sort_keys=True) +def normalize_message_content(content: Any) -> Any: + """Coerce raw message content into the normalized content shape. + + Returns ``""`` for ``None``, passes through ``str`` and ``list`` + unchanged, and falls back to :func:`stringify_json_value` for + everything else. + """ + if content is None: + return "" + if isinstance(content, (str, list)): + return content + return stringify_json_value(content) + + def stringify_text_value(value: Any) -> str: if value is None: return "" diff --git a/packages/data-designer-engine/tests/engine/resources/agent_rollout/test_pi_coding_agent.py b/packages/data-designer-engine/tests/engine/resources/agent_rollout/test_pi_coding_agent.py new file mode 100644 index 00000000..b5b43495 --- /dev/null +++ b/packages/data-designer-engine/tests/engine/resources/agent_rollout/test_pi_coding_agent.py @@ -0,0 +1,854 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import json +from collections.abc import Callable +from pathlib import Path +from typing import Any + +import pytest + +from data_designer.engine.resources.agent_rollout.pi_coding_agent import PiCodingAgentRolloutFormatHandler +from data_designer.engine.resources.agent_rollout.types import AgentRolloutSeedParseError + + +def _make_handler() -> PiCodingAgentRolloutFormatHandler: + return PiCodingAgentRolloutFormatHandler() + + +def _make_session_header( + *, + session_id: str = "abc-123", + cwd: str = "/home/user/project", + version: int = 3, + timestamp: str = "2026-04-07T10:00:00.000Z", +) -> dict[str, Any]: + return { + "type": "session", + "version": version, + "id": session_id, + "timestamp": timestamp, + "cwd": cwd, + } + + +def _make_entry( + *, + entry_type: str = "message", + entry_id: str, + parent_id: str | None, + timestamp: str = "2026-04-07T10:00:01.000Z", + message: dict[str, Any] | None = None, + **extra: Any, +) -> dict[str, Any]: + entry: dict[str, Any] = { + "type": entry_type, + "id": entry_id, + "parentId": parent_id, + "timestamp": timestamp, + } + if message is not None: + entry["message"] = message + entry.update(extra) + return entry + + +def test_parse_file_happy_path( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """Mirrors the structure of real Pi sessions: structural entries before messages, + block-list user content, parallel tool calls, and a final text-only assistant.""" + write_jsonl( + tmp_path / "subdir" / "20260407_abc123.jsonl", + [ + _make_session_header(), + # Real sessions start with model_change + thinking_level_change + _make_entry( + entry_type="model_change", + entry_id="m1", + parent_id=None, + provider="tcri", + modelId="tcri/donatello-70b", + ), + _make_entry( + entry_type="thinking_level_change", + entry_id="t1", + parent_id="m1", + thinkingLevel="high", + ), + # User message with block-list content (real Pi format) + _make_entry( + entry_id="e1", + parent_id="t1", + message={ + "role": "user", + "content": [{"type": "text", "text": "Analyze the project structure."}], + "timestamp": 1712484001000, + }, + ), + # Assistant with thinking + text + tool call + _make_entry( + entry_id="e2", + parent_id="e1", + message={ + "role": "assistant", + "content": [ + {"type": "thinking", "thinking": "Let me explore the repo."}, + {"type": "text", "text": "I'll look at the directory layout."}, + { + "type": "toolCall", + "id": "tooluse_abc123", + "name": "bash", + "arguments": {"command": "find . -type f | head -20"}, + }, + ], + "api": "foot-clan", + "provider": "tcri", + "model": "tcri/donatello-70b", + "stopReason": "toolUse", + "timestamp": 1712484002000, + }, + ), + _make_entry( + entry_id="e3", + parent_id="e2", + message={ + "role": "toolResult", + "toolCallId": "tooluse_abc123", + "toolName": "bash", + "content": [{"type": "text", "text": "./src/main.py\n./README.md"}], + "isError": False, + "timestamp": 1712484003000, + }, + ), + # Parallel tool calls: assistant with 2 ToolCall blocks + _make_entry( + entry_id="e4", + parent_id="e3", + message={ + "role": "assistant", + "content": [ + { + "type": "toolCall", + "id": "tooluse_read1", + "name": "read", + "arguments": {"path": "src/main.py"}, + }, + { + "type": "toolCall", + "id": "tooluse_read2", + "name": "read", + "arguments": {"path": "README.md"}, + }, + ], + "model": "tcri/donatello-70b", + "stopReason": "toolUse", + "timestamp": 1712484004000, + }, + ), + # Two consecutive toolResult messages + _make_entry( + entry_id="e5", + parent_id="e4", + message={ + "role": "toolResult", + "toolCallId": "tooluse_read1", + "toolName": "read", + "content": [{"type": "text", "text": "import sys\nprint('hello')"}], + "isError": False, + "timestamp": 1712484005000, + }, + ), + _make_entry( + entry_id="e6", + parent_id="e5", + message={ + "role": "toolResult", + "toolCallId": "tooluse_read2", + "toolName": "read", + "content": [{"type": "text", "text": "# My Project\nA sample project."}], + "isError": False, + "timestamp": 1712484006000, + }, + ), + # Final assistant response (text only, stop) + _make_entry( + entry_id="e7", + parent_id="e6", + timestamp="2026-04-07T10:01:00.000Z", + message={ + "role": "assistant", + "content": [{"type": "text", "text": "The project has a simple Python entry point and a README."}], + "model": "tcri/donatello-70b", + "stopReason": "stop", + "timestamp": 1712484007000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file( + root_path=tmp_path, + relative_path="subdir/20260407_abc123.jsonl", + ) + + assert len(records) == 1 + record = records[0] + assert record.trace_id == "abc-123" + assert record.root_session_id == "abc-123" + assert record.source_kind == "pi_coding_agent" + assert record.cwd == "/home/user/project" + assert record.project_path == "/home/user/project" + assert record.started_at == "2026-04-07T10:00:00.000Z" + assert record.ended_at == "2026-04-07T10:01:00.000Z" + assert record.final_assistant_message == "The project has a simple Python entry point and a README." + + # 7 normalized messages: user, assistant(+tc), tool, assistant(+2tc), tool, tool, assistant + assert record.message_count == 7 + assert record.tool_call_count == 3 + + # Message structure + assert record.messages[0]["role"] == "user" + assert record.messages[0]["content"][0]["text"] == "Analyze the project structure." + + assert record.messages[1]["role"] == "assistant" + assert record.messages[1]["reasoning_content"] == "Let me explore the repo." + assert record.messages[1]["tool_calls"][0]["function"]["name"] == "bash" + + assert record.messages[2]["role"] == "tool" + assert record.messages[2]["tool_call_id"] == "tooluse_abc123" + + # Parallel tool calls: one assistant message → two tool results + assert record.messages[3]["role"] == "assistant" + assert len(record.messages[3]["tool_calls"]) == 2 + assert record.messages[3]["tool_calls"][0]["id"] == "tooluse_read1" + assert record.messages[3]["tool_calls"][1]["id"] == "tooluse_read2" + assert record.messages[4]["role"] == "tool" + assert record.messages[4]["tool_call_id"] == "tooluse_read1" + assert record.messages[5]["role"] == "tool" + assert record.messages[5]["tool_call_id"] == "tooluse_read2" + + assert record.messages[6]["role"] == "assistant" + assert "tool_calls" not in record.messages[6] # no tool calls on final message + + # Tool call IDs link 1:1 with tool results + declared_tc_ids = {tc["id"] for m in record.messages for tc in m.get("tool_calls", [])} + tool_result_ids = {m["tool_call_id"] for m in record.messages if m.get("role") == "tool"} + assert declared_tc_ids == tool_result_ids + + # Source meta + assert record.source_meta["record_count"] == 10 # header + 2 structural + 7 message entries + assert record.source_meta["session_version"] == 3 + assert record.source_meta["stop_reasons"] == ["toolUse", "stop"] + assert record.source_meta["models_used"] == ["tcri/donatello-70b"] + assert "model_change" in record.source_meta["entry_types"] + assert "thinking_level_change" in record.source_meta["entry_types"] + assert "has_branches" not in record.source_meta + + +def test_parse_file_assistant_tool_calls_without_text( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """Real sessions frequently have assistants with only ToolCall blocks and no text.""" + write_jsonl( + tmp_path / "tools_only.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={ + "role": "user", + "content": [{"type": "text", "text": "Read all config files."}], + "timestamp": 1000, + }, + ), + _make_entry( + entry_id="e2", + parent_id="e1", + message={ + "role": "assistant", + "content": [ + {"type": "toolCall", "id": "tc_a", "name": "read", "arguments": {"path": "config.yaml"}}, + {"type": "toolCall", "id": "tc_b", "name": "read", "arguments": {"path": "settings.json"}}, + {"type": "toolCall", "id": "tc_c", "name": "read", "arguments": {"path": ".env"}}, + ], + "model": "tcri/donatello-70b", + "stopReason": "toolUse", + "timestamp": 2000, + }, + ), + _make_entry( + entry_id="e3", + parent_id="e2", + message={ + "role": "toolResult", + "toolCallId": "tc_a", + "toolName": "read", + "content": [{"type": "text", "text": "port: 8080"}], + "isError": False, + "timestamp": 3000, + }, + ), + _make_entry( + entry_id="e4", + parent_id="e3", + message={ + "role": "toolResult", + "toolCallId": "tc_b", + "toolName": "read", + "content": [{"type": "text", "text": '{"debug": true}'}], + "isError": False, + "timestamp": 4000, + }, + ), + _make_entry( + entry_id="e5", + parent_id="e4", + message={ + "role": "toolResult", + "toolCallId": "tc_c", + "toolName": "read", + "content": [{"type": "text", "text": "SECRET=hunter2"}], + "isError": False, + "timestamp": 5000, + }, + ), + _make_entry( + entry_id="e6", + parent_id="e5", + message={ + "role": "assistant", + "content": [{"type": "text", "text": "Found 3 config files with settings."}], + "stopReason": "stop", + "timestamp": 6000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="tools_only.jsonl") + record = records[0] + + assert record.message_count == 6 # user, assistant(3tc), tool, tool, tool, assistant + assert record.tool_call_count == 3 + + # The tools-only assistant still produces a valid message with empty content + assistant_msg = record.messages[1] + assert assistant_msg["role"] == "assistant" + assert len(assistant_msg["tool_calls"]) == 3 + tool_names = [tc["function"]["name"] for tc in assistant_msg["tool_calls"]] + assert tool_names == ["read", "read", "read"] + + # All three tool results are present and linked + for i, expected_id in enumerate(["tc_a", "tc_b", "tc_c"]): + assert record.messages[2 + i]["role"] == "tool" + assert record.messages[2 + i]["tool_call_id"] == expected_id + + assert record.final_assistant_message == "Found 3 config files with settings." + + +def test_parse_file_resolves_active_branch( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """When a session has branches, the active path follows the last entry back to root.""" + write_jsonl( + tmp_path / "branched.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={"role": "user", "content": "Hello", "timestamp": 1000}, + ), + # Branch A (abandoned): e2a follows e1 + _make_entry( + entry_id="e2a", + parent_id="e1", + message={ + "role": "assistant", + "content": [{"type": "text", "text": "Branch A"}], + "stopReason": "stop", + "timestamp": 2000, + }, + ), + # Branch B (active): e2b also follows e1, and is the last entry + _make_entry( + entry_id="e2b", + parent_id="e1", + message={ + "role": "assistant", + "content": [{"type": "text", "text": "Branch B"}], + "stopReason": "stop", + "timestamp": 3000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="branched.jsonl") + + assert len(records) == 1 + record = records[0] + # Active path is e1 → e2b (not e2a) + assert record.message_count == 2 + assert record.messages[0]["role"] == "user" + assert record.messages[1]["content"][0]["text"] == "Branch B" + assert record.source_meta["has_branches"] is True + + +def test_parse_file_normalizes_bash_execution( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + write_jsonl( + tmp_path / "bash_session.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={ + "role": "bashExecution", + "command": "ls -la", + "output": "total 42\ndrwxr-xr-x 5 user user 160 Apr 7 10:00 .", + "exitCode": 0, + "cancelled": False, + "truncated": False, + "timestamp": 1000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="bash_session.jsonl") + record = records[0] + + # bashExecution normalizes to tool-call pair + assert record.message_count == 2 + assert record.messages[0]["role"] == "assistant" + assert record.messages[0]["tool_calls"][0]["function"]["name"] == "bash" + assert json.loads(record.messages[0]["tool_calls"][0]["function"]["arguments"])["command"] == "ls -la" + assert record.messages[1]["role"] == "tool" + assert record.messages[1]["tool_call_id"] == "bash_e1" + assert "[exit code: 0]" in record.messages[1]["content"][0]["text"] + assert record.source_meta["bash_execution_count"] == 1 + + +def test_parse_file_skips_excluded_bash_execution( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + write_jsonl( + tmp_path / "excluded_bash.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={ + "role": "bashExecution", + "command": "echo hello", + "output": "hello", + "exitCode": 0, + "cancelled": False, + "truncated": False, + "excludeFromContext": True, + "timestamp": 1000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="excluded_bash.jsonl") + assert records[0].message_count == 0 + + +def test_parse_file_requires_session_header( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + write_jsonl( + tmp_path / "no_header.jsonl", + [ + _make_entry( + entry_id="e1", + parent_id=None, + message={"role": "user", "content": "Hello", "timestamp": 1000}, + ), + ], + ) + + handler = _make_handler() + with pytest.raises(AgentRolloutSeedParseError, match="missing a session header"): + handler.parse_file(root_path=tmp_path, relative_path="no_header.jsonl") + + +def test_parse_file_empty_file_returns_empty_list( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + write_jsonl(tmp_path / "empty.jsonl", []) + + handler = _make_handler() + assert handler.parse_file(root_path=tmp_path, relative_path="empty.jsonl") == [] + + +def test_is_handled_file_accepts_jsonl_only() -> None: + handler = _make_handler() + + assert handler.is_handled_file("20260407_abc123.jsonl") is True + assert handler.is_handled_file("subdir/20260407_abc123.jsonl") is True + assert handler.is_handled_file("config.json") is False + assert handler.is_handled_file("notes.txt") is False + + +def test_parse_file_includes_custom_display_messages( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + write_jsonl( + tmp_path / "custom_msg.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={ + "role": "custom", + "customType": "my-extension", + "content": "Extension context injected.", + "display": True, + "timestamp": 1000, + }, + ), + _make_entry( + entry_id="e2", + parent_id="e1", + message={ + "role": "custom", + "customType": "my-extension", + "content": "Hidden context.", + "display": False, + "timestamp": 2000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="custom_msg.jsonl") + record = records[0] + + # Only display=True custom message is included + assert record.message_count == 1 + assert record.messages[0]["role"] == "system" + + +def test_parse_file_tracks_model_changes( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + write_jsonl( + tmp_path / "model_change.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={"role": "user", "content": "Hello", "timestamp": 1000}, + ), + _make_entry( + entry_type="model_change", + entry_id="e2", + parent_id="e1", + provider="dimension-x", + modelId="krang/raphael-405b", + ), + _make_entry( + entry_id="e3", + parent_id="e2", + message={ + "role": "assistant", + "content": [{"type": "text", "text": "Hi!"}], + "model": "krang/raphael-405b", + "stopReason": "stop", + "timestamp": 2000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="model_change.jsonl") + assert records[0].source_meta["models_used"] == ["krang/raphael-405b"] + + +def test_parse_file_includes_compaction_summary_message( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """compactionSummary role inside a message entry is normalized as a system message.""" + write_jsonl( + tmp_path / "compaction_msg.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={ + "role": "compactionSummary", + "summary": "User discussed project setup and ran several commands.", + "tokensBefore": 50000, + "timestamp": 1000, + }, + ), + _make_entry( + entry_id="e2", + parent_id="e1", + message={"role": "user", "content": "Continue please.", "timestamp": 2000}, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="compaction_msg.jsonl") + record = records[0] + + assert record.message_count == 2 + assert record.messages[0]["role"] == "system" + assert "project setup" in record.messages[0]["content"][0]["text"] + assert record.messages[1]["role"] == "user" + + +def test_parse_file_handles_compaction_entry( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """Entry-level compaction (type=compaction) is normalized as a system message.""" + write_jsonl( + tmp_path / "compaction_entry.jsonl", + [ + _make_session_header(), + _make_entry( + entry_type="compaction", + entry_id="e1", + parent_id=None, + summary="Earlier discussion about API design and error handling.", + firstKeptEntryId="e0", + tokensBefore=40000, + ), + _make_entry( + entry_id="e2", + parent_id="e1", + message={"role": "user", "content": "Now let's add tests.", "timestamp": 2000}, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="compaction_entry.jsonl") + record = records[0] + + assert record.message_count == 2 + assert record.messages[0]["role"] == "system" + assert "API design" in record.messages[0]["content"][0]["text"] + assert record.messages[1]["role"] == "user" + + +def test_parse_file_handles_branch_summary_entry( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """Entry-level branch_summary is normalized as a system message on the active path.""" + write_jsonl( + tmp_path / "branch_summary.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={"role": "user", "content": "Try approach A", "timestamp": 1000}, + ), + _make_entry( + entry_id="e2", + parent_id="e1", + message={ + "role": "assistant", + "content": [{"type": "text", "text": "Approach A failed."}], + "stopReason": "stop", + "timestamp": 2000, + }, + ), + # User switched branches; branch summary injected for abandoned branch + _make_entry( + entry_type="branch_summary", + entry_id="e3", + parent_id="e1", + fromId="e2", + summary="Explored approach A which failed due to compatibility issues.", + ), + _make_entry( + entry_id="e4", + parent_id="e3", + message={"role": "user", "content": "Try approach B instead.", "timestamp": 3000}, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="branch_summary.jsonl") + record = records[0] + + # Active path: e1 → e3 (branch_summary) → e4 + assert record.message_count == 3 + assert record.messages[0]["role"] == "user" + assert record.messages[0]["content"][0]["text"] == "Try approach A" + assert record.messages[1]["role"] == "system" + assert "approach A" in record.messages[1]["content"][0]["text"] + assert record.messages[2]["role"] == "user" + assert record.messages[2]["content"][0]["text"] == "Try approach B instead." + + +def test_parse_file_handles_custom_message_entry( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """Entry-level custom_message (display=true) is included; display=false is skipped.""" + write_jsonl( + tmp_path / "custom_entry.jsonl", + [ + _make_session_header(), + _make_entry( + entry_type="custom_message", + entry_id="e1", + parent_id=None, + customType="lint-hook", + content="Lint found 3 warnings in src/app.ts.", + display=True, + ), + _make_entry( + entry_type="custom_message", + entry_id="e2", + parent_id="e1", + customType="internal-state", + content="Cache refreshed.", + display=False, + ), + _make_entry( + entry_id="e3", + parent_id="e2", + message={"role": "user", "content": "Fix those warnings.", "timestamp": 2000}, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="custom_entry.jsonl") + record = records[0] + + assert record.message_count == 2 + assert record.messages[0]["role"] == "system" + assert "3 warnings" in record.messages[0]["content"][0]["text"] + assert record.messages[1]["role"] == "user" + + +def test_parse_file_handles_branch_summary_message_role( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """branchSummary role inside a message entry is normalized as a system message.""" + write_jsonl( + tmp_path / "branch_msg.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={ + "role": "branchSummary", + "summary": "Abandoned branch tried regex-based parsing.", + "fromId": "prev1", + "timestamp": 1000, + }, + ), + _make_entry( + entry_id="e2", + parent_id="e1", + message={"role": "user", "content": "Use an AST parser instead.", "timestamp": 2000}, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="branch_msg.jsonl") + record = records[0] + + assert record.message_count == 2 + assert record.messages[0]["role"] == "system" + assert "regex-based parsing" in record.messages[0]["content"][0]["text"] + assert record.messages[1]["role"] == "user" + + +def test_parse_file_includes_parent_session_in_source_meta( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """Forked sessions include the parent session path in source_meta.""" + header = _make_session_header() + header["parentSession"] = "/home/user/.pi/agent/sessions/--project--/original.jsonl" + write_jsonl( + tmp_path / "forked.jsonl", + [ + header, + _make_entry( + entry_id="e1", + parent_id=None, + message={"role": "user", "content": "Continue from where we left off.", "timestamp": 1000}, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="forked.jsonl") + assert records[0].source_meta["parent_session"] == "/home/user/.pi/agent/sessions/--project--/original.jsonl" + + +def test_parse_file_assistant_bare_string_content( + tmp_path: Path, + write_jsonl: Callable[[Path, list[dict[str, Any]]], None], +) -> None: + """Assistant messages with bare string content (not a block list) are normalized.""" + write_jsonl( + tmp_path / "bare_string.jsonl", + [ + _make_session_header(), + _make_entry( + entry_id="e1", + parent_id=None, + message={ + "role": "assistant", + "content": "Plain text response.", + "stopReason": "stop", + "timestamp": 1000, + }, + ), + ], + ) + + handler = _make_handler() + records = handler.parse_file(root_path=tmp_path, relative_path="bare_string.jsonl") + record = records[0] + + assert record.message_count == 1 + assert record.final_assistant_message == "Plain text response."