diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 379d6d893..637022bf3 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -13,6 +13,7 @@ CLINotFoundError, ProcessError, ) +from ._internal.sessions import get_session_messages, list_sessions from ._internal.transport import Transport from ._version import __version__ from .client import ClaudeSDKClient @@ -52,6 +53,8 @@ SandboxSettings, SdkBeta, SdkPluginConfig, + SDKSessionInfo, + SessionMessage, SettingSource, StopHookInput, SubagentStartHookInput, @@ -378,6 +381,11 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any: "SettingSource", # Plugin support "SdkPluginConfig", + # Session listing + "list_sessions", + "get_session_messages", + "SDKSessionInfo", + "SessionMessage", # Beta support "SdkBeta", # Sandbox support diff --git a/src/claude_agent_sdk/_internal/sessions.py b/src/claude_agent_sdk/_internal/sessions.py new file mode 100644 index 000000000..a49d2c63c --- /dev/null +++ b/src/claude_agent_sdk/_internal/sessions.py @@ -0,0 +1,926 @@ +"""Session listing implementation. + +Ported from TypeScript SDK (listSessionsImpl.ts + sessionStoragePortable.ts). +Scans ~/.claude/projects// for .jsonl session files and +extracts metadata from stat + head/tail reads without full JSONL parsing. +""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +import sys +import unicodedata +from pathlib import Path +from typing import Any + +from ..types import SDKSessionInfo, SessionMessage + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# Size of the head/tail buffer for lite metadata reads. +LITE_READ_BUF_SIZE = 65536 + +# Maximum length for a single filesystem path component. Most filesystems +# limit individual components to 255 bytes. We use 200 to leave room for +# the hash suffix and separator. +MAX_SANITIZED_LENGTH = 200 + +_UUID_RE = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", + re.IGNORECASE, +) + +# Pattern matching auto-generated or system messages that should be skipped +# when looking for the first meaningful user prompt. +_SKIP_FIRST_PROMPT_PATTERN = re.compile( + r"^(?:||||" + r"\[Request interrupted by user[^\]]*\]|" + r"\s*[\s\S]*\s*$|" + r"\s*[\s\S]*\s*$)" +) + +_COMMAND_NAME_RE = re.compile(r"(.*?)") + +_SANITIZE_RE = re.compile(r"[^a-zA-Z0-9]") + + +# --------------------------------------------------------------------------- +# UUID validation +# --------------------------------------------------------------------------- + + +def _validate_uuid(maybe_uuid: str) -> str | None: + """Returns the string if it is a valid UUID, else None.""" + if _UUID_RE.match(maybe_uuid): + return maybe_uuid + return None + + +# --------------------------------------------------------------------------- +# Path sanitization +# --------------------------------------------------------------------------- + + +def _simple_hash(s: str) -> str: + """Port of the JS simpleHash function (32-bit integer hash, base36). + + Uses the same algorithm as the TS fallback so directory names match + when the CLI was running under Node.js (not Bun). + """ + h = 0 + for ch in s: + char = ord(ch) + h = (h << 5) - h + char + # Emulate JS `hash |= 0` (coerce to 32-bit signed int) + h = h & 0xFFFFFFFF + if h >= 0x80000000: + h -= 0x100000000 + h = abs(h) + # JS toString(36) + if h == 0: + return "0" + digits = "0123456789abcdefghijklmnopqrstuvwxyz" + out = [] + n = h + while n > 0: + out.append(digits[n % 36]) + n //= 36 + return "".join(reversed(out)) + + +def _sanitize_path(name: str) -> str: + """Makes a string safe for use as a directory name. + + Replaces all non-alphanumeric characters with hyphens. For paths + exceeding MAX_SANITIZED_LENGTH, truncates and appends a hash suffix. + """ + sanitized = _SANITIZE_RE.sub("-", name) + if len(sanitized) <= MAX_SANITIZED_LENGTH: + return sanitized + h = _simple_hash(name) + return f"{sanitized[:MAX_SANITIZED_LENGTH]}-{h}" + + +# --------------------------------------------------------------------------- +# Config directories +# --------------------------------------------------------------------------- + + +def _get_claude_config_home_dir() -> Path: + """Returns the Claude config directory (respects CLAUDE_CONFIG_DIR).""" + config_dir = os.environ.get("CLAUDE_CONFIG_DIR") + if config_dir: + return Path(unicodedata.normalize("NFC", config_dir)) + return Path(unicodedata.normalize("NFC", str(Path.home() / ".claude"))) + + +def _get_projects_dir() -> Path: + return _get_claude_config_home_dir() / "projects" + + +def _get_project_dir(project_path: str) -> Path: + return _get_projects_dir() / _sanitize_path(project_path) + + +def _canonicalize_path(d: str) -> str: + """Resolves a directory path to its canonical form using realpath + NFC.""" + try: + resolved = os.path.realpath(d) + return unicodedata.normalize("NFC", resolved) + except OSError: + return unicodedata.normalize("NFC", d) + + +def _find_project_dir(project_path: str) -> Path | None: + """Finds the project directory for a given path. + + Tolerates hash mismatches for long paths (>200 chars). The CLI uses + Bun.hash while the SDK under Node.js uses simpleHash — for paths that + exceed MAX_SANITIZED_LENGTH, these produce different directory suffixes. + This function falls back to prefix-based scanning when the exact match + doesn't exist. + """ + exact = _get_project_dir(project_path) + if exact.is_dir(): + return exact + + # Exact match failed — for short paths this means no sessions exist. + # For long paths, try prefix matching to handle hash mismatches. + sanitized = _sanitize_path(project_path) + if len(sanitized) <= MAX_SANITIZED_LENGTH: + return None + + prefix = sanitized[:MAX_SANITIZED_LENGTH] + projects_dir = _get_projects_dir() + try: + for entry in projects_dir.iterdir(): + if entry.is_dir() and entry.name.startswith(prefix + "-"): + return entry + except OSError: + pass + return None + + +# --------------------------------------------------------------------------- +# JSON string field extraction — no full parse, works on truncated lines +# --------------------------------------------------------------------------- + + +def _unescape_json_string(raw: str) -> str: + """Unescape a JSON string value extracted as raw text.""" + if "\\" not in raw: + return raw + try: + result = json.loads(f'"{raw}"') + if isinstance(result, str): + return result + return raw + except (json.JSONDecodeError, ValueError): + return raw + + +def _extract_json_string_field(text: str, key: str) -> str | None: + """Extracts a simple JSON string field value without full parsing. + + Looks for "key":"value" or "key": "value" patterns. Returns the first + match, or None if not found. + """ + patterns = [f'"{key}":"', f'"{key}": "'] + for pattern in patterns: + idx = text.find(pattern) + if idx < 0: + continue + + value_start = idx + len(pattern) + i = value_start + while i < len(text): + if text[i] == "\\": + i += 2 + continue + if text[i] == '"': + return _unescape_json_string(text[value_start:i]) + i += 1 + return None + + +def _extract_last_json_string_field(text: str, key: str) -> str | None: + """Like _extract_json_string_field but finds the LAST occurrence.""" + patterns = [f'"{key}":"', f'"{key}": "'] + last_value: str | None = None + for pattern in patterns: + search_from = 0 + while True: + idx = text.find(pattern, search_from) + if idx < 0: + break + + value_start = idx + len(pattern) + i = value_start + while i < len(text): + if text[i] == "\\": + i += 2 + continue + if text[i] == '"': + last_value = _unescape_json_string(text[value_start:i]) + break + i += 1 + search_from = i + 1 + return last_value + + +# --------------------------------------------------------------------------- +# First prompt extraction from head chunk +# --------------------------------------------------------------------------- + + +def _extract_first_prompt_from_head(head: str) -> str: + """Extracts the first meaningful user prompt from a JSONL head chunk. + + Skips tool_result messages, isMeta, isCompactSummary, command-name + messages, and auto-generated patterns. Truncates to 200 chars. + """ + start = 0 + command_fallback = "" + head_len = len(head) + + while start < head_len: + newline_idx = head.find("\n", start) + if newline_idx >= 0: + line = head[start:newline_idx] + start = newline_idx + 1 + else: + line = head[start:] + start = head_len + + if '"type":"user"' not in line and '"type": "user"' not in line: + continue + if '"tool_result"' in line: + continue + if '"isMeta":true' in line or '"isMeta": true' in line: + continue + if '"isCompactSummary":true' in line or '"isCompactSummary": true' in line: + continue + + try: + entry = json.loads(line) + except (json.JSONDecodeError, ValueError): + continue + + if not isinstance(entry, dict) or entry.get("type") != "user": + continue + + message = entry.get("message") + if not isinstance(message, dict): + continue + + content = message.get("content") + texts: list[str] = [] + if isinstance(content, str): + texts.append(content) + elif isinstance(content, list): + for block in content: + if ( + isinstance(block, dict) + and block.get("type") == "text" + and isinstance(block.get("text"), str) + ): + texts.append(block["text"]) + + for raw in texts: + result = raw.replace("\n", " ").strip() + if not result: + continue + + # Skip slash-command messages but remember first as fallback + cmd_match = _COMMAND_NAME_RE.search(result) + if cmd_match: + if not command_fallback: + command_fallback = cmd_match.group(1) + continue + + if _SKIP_FIRST_PROMPT_PATTERN.match(result): + continue + + if len(result) > 200: + result = result[:200].rstrip() + "\u2026" + return result + + if command_fallback: + return command_fallback + return "" + + +# --------------------------------------------------------------------------- +# File I/O — read head and tail of a file +# --------------------------------------------------------------------------- + + +class _LiteSessionFile: + """Result of reading a session file's head, tail, mtime and size.""" + + __slots__ = ("mtime", "size", "head", "tail") + + def __init__(self, mtime: int, size: int, head: str, tail: str) -> None: + self.mtime = mtime + self.size = size + self.head = head + self.tail = tail + + +def _read_session_lite(file_path: Path) -> _LiteSessionFile | None: + """Opens a session file, stats it, and reads head + tail. + + Returns None on any error or if file is empty. + """ + try: + with file_path.open("rb") as f: + stat = os.fstat(f.fileno()) + size = stat.st_size + mtime = int(stat.st_mtime * 1000) + + head_bytes = f.read(LITE_READ_BUF_SIZE) + if not head_bytes: + return None + + head = head_bytes.decode("utf-8", errors="replace") + + tail_offset = max(0, size - LITE_READ_BUF_SIZE) + if tail_offset == 0: + tail = head + else: + f.seek(tail_offset) + tail_bytes = f.read(LITE_READ_BUF_SIZE) + tail = tail_bytes.decode("utf-8", errors="replace") + + return _LiteSessionFile(mtime=mtime, size=size, head=head, tail=tail) + except OSError: + return None + + +# --------------------------------------------------------------------------- +# Git worktree detection +# --------------------------------------------------------------------------- + + +def _get_worktree_paths(cwd: str) -> list[str]: + """Returns absolute worktree paths for the git repo containing cwd. + + Returns empty list if git is unavailable or cwd is not in a repo. + """ + try: + result = subprocess.run( + ["git", "worktree", "list", "--porcelain"], + cwd=cwd, + capture_output=True, + text=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.SubprocessError): + return [] + + if result.returncode != 0 or not result.stdout: + return [] + + paths = [] + for line in result.stdout.split("\n"): + if line.startswith("worktree "): + path = unicodedata.normalize("NFC", line[len("worktree ") :]) + paths.append(path) + return paths + + +# --------------------------------------------------------------------------- +# Core implementation +# --------------------------------------------------------------------------- + + +def _read_sessions_from_dir( + project_dir: Path, project_path: str | None = None +) -> list[SDKSessionInfo]: + """Reads session files from a single project directory. + + Each file gets a stat + head/tail read. Filters out sidechain sessions + and metadata-only sessions (no title/summary/prompt). + """ + try: + entries = list(project_dir.iterdir()) + except OSError: + return [] + + results: list[SDKSessionInfo] = [] + + for entry in entries: + name = entry.name + if not name.endswith(".jsonl"): + continue + session_id = _validate_uuid(name[:-6]) + if not session_id: + continue + + lite = _read_session_lite(entry) + if lite is None: + continue + + head, tail, mtime, size = lite.head, lite.tail, lite.mtime, lite.size + + # Check first line for sidechain sessions + first_newline = head.find("\n") + first_line = head[:first_newline] if first_newline >= 0 else head + if '"isSidechain":true' in first_line or '"isSidechain": true' in first_line: + continue + + custom_title = _extract_last_json_string_field(tail, "customTitle") or None + first_prompt = _extract_first_prompt_from_head(head) or None + summary = ( + custom_title + or _extract_last_json_string_field(tail, "summary") + or first_prompt + ) + + # Skip metadata-only sessions (no title, no summary, no prompt) + if not summary: + continue + + git_branch = ( + _extract_last_json_string_field(tail, "gitBranch") + or _extract_json_string_field(head, "gitBranch") + or None + ) + session_cwd = _extract_json_string_field(head, "cwd") or project_path or None + + results.append( + SDKSessionInfo( + session_id=session_id, + summary=summary, + last_modified=mtime, + file_size=size, + custom_title=custom_title, + first_prompt=first_prompt, + git_branch=git_branch, + cwd=session_cwd, + ) + ) + + return results + + +def _deduplicate_by_session_id( + sessions: list[SDKSessionInfo], +) -> list[SDKSessionInfo]: + """Deduplicates by session_id, keeping the newest last_modified.""" + by_id: dict[str, SDKSessionInfo] = {} + for s in sessions: + existing = by_id.get(s.session_id) + if existing is None or s.last_modified > existing.last_modified: + by_id[s.session_id] = s + return list(by_id.values()) + + +def _apply_sort_and_limit( + sessions: list[SDKSessionInfo], limit: int | None +) -> list[SDKSessionInfo]: + """Sorts sessions by last_modified descending and applies optional limit.""" + sessions.sort(key=lambda s: s.last_modified, reverse=True) + if limit is not None and limit > 0: + return sessions[:limit] + return sessions + + +def _list_sessions_for_project( + directory: str, limit: int | None, include_worktrees: bool +) -> list[SDKSessionInfo]: + """Lists sessions for a specific project directory (and its worktrees).""" + canonical_dir = _canonicalize_path(directory) + + if include_worktrees: + try: + worktree_paths = _get_worktree_paths(canonical_dir) + except Exception: + worktree_paths = [] + else: + worktree_paths = [] + + # No worktrees (or git not available / scanning disabled) — + # just scan the single project dir + if len(worktree_paths) <= 1: + project_dir = _find_project_dir(canonical_dir) + if project_dir is None: + return [] + sessions = _read_sessions_from_dir(project_dir, canonical_dir) + return _apply_sort_and_limit(sessions, limit) + + # Worktree-aware scanning: find all project dirs matching any worktree + projects_dir = _get_projects_dir() + case_insensitive = sys.platform == "win32" + + # Sort worktree paths by sanitized prefix length (longest first) so + # more specific matches take priority over shorter ones + indexed = [] + for wt in worktree_paths: + sanitized = _sanitize_path(wt) + prefix = sanitized.lower() if case_insensitive else sanitized + indexed.append((wt, prefix)) + indexed.sort(key=lambda x: len(x[1]), reverse=True) + + try: + all_dirents = [e for e in projects_dir.iterdir() if e.is_dir()] + except OSError: + # Fall back to single project dir + project_dir = _find_project_dir(canonical_dir) + if project_dir is None: + return _apply_sort_and_limit([], limit) + sessions = _read_sessions_from_dir(project_dir, canonical_dir) + return _apply_sort_and_limit(sessions, limit) + + all_sessions: list[SDKSessionInfo] = [] + seen_dirs: set[str] = set() + + # Always include the user's actual directory (handles subdirectories + # like /repo/packages/my-app that won't match worktree root prefixes) + canonical_project_dir = _find_project_dir(canonical_dir) + if canonical_project_dir is not None: + dir_base = canonical_project_dir.name + seen_dirs.add(dir_base.lower() if case_insensitive else dir_base) + sessions = _read_sessions_from_dir(canonical_project_dir, canonical_dir) + all_sessions.extend(sessions) + + for entry in all_dirents: + dir_name = entry.name.lower() if case_insensitive else entry.name + if dir_name in seen_dirs: + continue + + for wt_path, prefix in indexed: + # Only use startswith for truncated paths (>MAX_SANITIZED_LENGTH) + # where a hash suffix follows. For short paths, require exact match + # to avoid /root/project matching /root/project-foo. + is_match = dir_name == prefix or ( + len(prefix) >= MAX_SANITIZED_LENGTH + and dir_name.startswith(prefix + "-") + ) + if is_match: + seen_dirs.add(dir_name) + sessions = _read_sessions_from_dir(entry, wt_path) + all_sessions.extend(sessions) + break + + deduped = _deduplicate_by_session_id(all_sessions) + return _apply_sort_and_limit(deduped, limit) + + +def _list_all_sessions(limit: int | None) -> list[SDKSessionInfo]: + """Lists sessions across all project directories.""" + projects_dir = _get_projects_dir() + + try: + project_dirs = [e for e in projects_dir.iterdir() if e.is_dir()] + except OSError: + return [] + + all_sessions: list[SDKSessionInfo] = [] + for project_dir in project_dirs: + all_sessions.extend(_read_sessions_from_dir(project_dir)) + + deduped = _deduplicate_by_session_id(all_sessions) + return _apply_sort_and_limit(deduped, limit) + + +def list_sessions( + directory: str | None = None, + limit: int | None = None, + include_worktrees: bool = True, +) -> list[SDKSessionInfo]: + """Lists sessions with metadata extracted from stat + head/tail reads. + + When ``directory`` is provided, returns sessions for that project + directory and its git worktrees. When omitted, returns sessions + across all projects. + + Args: + directory: Directory to list sessions for. When provided, returns + sessions for this project directory (and optionally its git + worktrees). When omitted, returns sessions across all projects. + limit: Maximum number of sessions to return. + include_worktrees: When ``directory`` is provided and the directory + is inside a git repository, include sessions from all git + worktree paths. Defaults to ``True``. + + Returns: + List of ``SDKSessionInfo`` sorted by ``last_modified`` descending. + + Example: + List sessions for a specific project:: + + sessions = list_sessions(directory="/path/to/project") + + List all sessions across all projects:: + + all_sessions = list_sessions() + + List sessions without scanning git worktrees:: + + sessions = list_sessions( + directory="/path/to/project", + include_worktrees=False, + ) + """ + if directory: + return _list_sessions_for_project(directory, limit, include_worktrees) + return _list_all_sessions(limit) + + +# --------------------------------------------------------------------------- +# get_session_messages — full transcript reconstruction +# --------------------------------------------------------------------------- + +# Transcript entry types that carry uuid + parentUuid chain links. +_TRANSCRIPT_ENTRY_TYPES = frozenset( + {"user", "assistant", "progress", "system", "attachment"} +) + +# Internal type for parsed JSONL transcript entries — mirrors the TS +# TranscriptEntry type but as a loose dict (fields: type, uuid, parentUuid, +# sessionId, message, isSidechain, isMeta, isCompactSummary, teamName). +_TranscriptEntry = dict[str, Any] + + +def _try_read_session_file(project_dir: Path, file_name: str) -> str | None: + """Tries to read a session JSONL file from a project directory.""" + try: + return (project_dir / file_name).read_text(encoding="utf-8") + except OSError: + return None + + +def _read_session_file(session_id: str, directory: str | None) -> str | None: + """Finds and reads the session JSONL file. + + If directory is provided, looks in that project directory and its git + worktrees (with prefix-fallback for Bun/Node hash mismatches on long + paths). Otherwise, searches all project directories. + + Returns the file content, or None if not found. + """ + file_name = f"{session_id}.jsonl" + + if directory: + canonical_dir = _canonicalize_path(directory) + + # Try the exact/prefix-matched project directory first + project_dir = _find_project_dir(canonical_dir) + if project_dir is not None: + content = _try_read_session_file(project_dir, file_name) + if content: + return content + + # Try worktree paths — sessions may live under a different worktree root + try: + worktree_paths = _get_worktree_paths(canonical_dir) + except Exception: + worktree_paths = [] + + for wt in worktree_paths: + if wt == canonical_dir: + continue # already tried above + wt_project_dir = _find_project_dir(wt) + if wt_project_dir is not None: + content = _try_read_session_file(wt_project_dir, file_name) + if content: + return content + + return None + + # No directory provided — search all project directories + projects_dir = _get_projects_dir() + try: + dirents = list(projects_dir.iterdir()) + except OSError: + return None + + for entry in dirents: + content = _try_read_session_file(entry, file_name) + if content: + return content + + return None + + +def _parse_transcript_entries(content: str) -> list[_TranscriptEntry]: + """Parses JSONL content into transcript entries. + + Only keeps entries that have a uuid and are transcript message types + (user/assistant/progress/system/attachment). Skips corrupt lines. + """ + entries: list[_TranscriptEntry] = [] + start = 0 + length = len(content) + + while start < length: + end = content.find("\n", start) + if end == -1: + end = length + + line = content[start:end].strip() + start = end + 1 + if not line: + continue + + try: + entry = json.loads(line) + except (json.JSONDecodeError, ValueError): + continue + + if not isinstance(entry, dict): + continue + entry_type = entry.get("type") + if entry_type in _TRANSCRIPT_ENTRY_TYPES and isinstance(entry.get("uuid"), str): + entries.append(entry) + + return entries + + +def _build_conversation_chain( + entries: list[_TranscriptEntry], +) -> list[_TranscriptEntry]: + """Builds the conversation chain by finding the leaf and walking parentUuid. + + Returns messages in chronological order (root → leaf). + + Note: logicalParentUuid (set on compact_boundary entries) is intentionally + NOT followed. This matches VS Code IDE behavior — post-compaction, the + isCompactSummary message replaces earlier messages, so following logical + parents would duplicate content. + """ + if not entries: + return [] + + # Index by uuid for O(1) parent lookup + by_uuid: dict[str, _TranscriptEntry] = {} + for entry in entries: + by_uuid[entry["uuid"]] = entry + + # Build index of entry positions (file order) for tie-breaking + entry_index: dict[str, int] = {} + for i, entry in enumerate(entries): + entry_index[entry["uuid"]] = i + + # Find terminal messages (no children point to them via parentUuid) + parent_uuids: set[str] = set() + for entry in entries: + parent = entry.get("parentUuid") + if parent: + parent_uuids.add(parent) + + terminals = [e for e in entries if e["uuid"] not in parent_uuids] + + # From each terminal, walk back to find the nearest user/assistant leaf + leaves: list[_TranscriptEntry] = [] + for terminal in terminals: + walk_cur: _TranscriptEntry | None = terminal + walk_seen: set[str] = set() + while walk_cur is not None: + uid = walk_cur["uuid"] + if uid in walk_seen: + break + walk_seen.add(uid) + if walk_cur.get("type") in ("user", "assistant"): + leaves.append(walk_cur) + break + parent = walk_cur.get("parentUuid") + walk_cur = by_uuid.get(parent) if parent else None + + if not leaves: + return [] + + # Pick the leaf from the main chain (not sidechain/team/meta), preferring + # the highest position in the entries array (most recent in file) + main_leaves = [ + leaf + for leaf in leaves + if not leaf.get("isSidechain") + and not leaf.get("teamName") + and not leaf.get("isMeta") + ] + + def _pick_best(candidates: list[_TranscriptEntry]) -> _TranscriptEntry: + best = candidates[0] + best_idx = entry_index.get(best["uuid"], -1) + for cur in candidates[1:]: + cur_idx = entry_index.get(cur["uuid"], -1) + if cur_idx > best_idx: + best = cur + best_idx = cur_idx + return best + + leaf = _pick_best(main_leaves) if main_leaves else _pick_best(leaves) + + # Walk from leaf to root via parentUuid + chain: list[_TranscriptEntry] = [] + chain_seen: set[str] = set() + chain_cur: _TranscriptEntry | None = leaf + while chain_cur is not None: + uid = chain_cur["uuid"] + if uid in chain_seen: + break + chain_seen.add(uid) + chain.append(chain_cur) + parent = chain_cur.get("parentUuid") + chain_cur = by_uuid.get(parent) if parent else None + + chain.reverse() + return chain + + +def _is_visible_message(entry: _TranscriptEntry) -> bool: + """Returns True if the entry should be included in the returned messages.""" + entry_type = entry.get("type") + if entry_type != "user" and entry_type != "assistant": + return False + if entry.get("isMeta"): + return False + if entry.get("isSidechain"): + return False + # Note: isCompactSummary messages are intentionally included. They contain + # the summarized content from compacted conversations and are the only + # representation of that content post-compaction. This matches VS Code IDE + # behavior (transcriptToSessionMessage does not filter them). + return not entry.get("teamName") + + +def _to_session_message(entry: _TranscriptEntry) -> SessionMessage: + """Converts a transcript entry dict into a SessionMessage.""" + entry_type = entry.get("type") + # Narrow to the Literal type — _is_visible_message already guarantees + # this is "user" or "assistant". + msg_type: str = "user" if entry_type == "user" else "assistant" + return SessionMessage( + type=msg_type, # type: ignore[arg-type] + uuid=entry.get("uuid", ""), + session_id=entry.get("sessionId", ""), + message=entry.get("message"), + parent_tool_use_id=None, + ) + + +def get_session_messages( + session_id: str, + directory: str | None = None, + limit: int | None = None, + offset: int = 0, +) -> list[SessionMessage]: + """Reads a session's conversation messages from its JSONL transcript file. + + Parses the full JSONL, builds the conversation chain via ``parentUuid`` + links, and returns user/assistant messages in chronological order. + + Args: + session_id: UUID of the session to read. + directory: Project directory to find the session in. If omitted, + searches all project directories under ``~/.claude/projects/``. + limit: Maximum number of messages to return. + offset: Number of messages to skip from the start. + + Returns: + List of ``SessionMessage`` objects in chronological order. Returns + an empty list if the session is not found, the session_id is not a + valid UUID, or the transcript contains no visible messages. + + Example: + Read all messages from a session:: + + messages = get_session_messages( + "550e8400-e29b-41d4-a716-446655440000", + directory="/path/to/project", + ) + for msg in messages: + print(msg.type, msg.message) + + Read with pagination:: + + page = get_session_messages( + session_id, limit=10, offset=20 + ) + """ + if not _validate_uuid(session_id): + return [] + + content = _read_session_file(session_id, directory) + if not content: + return [] + + entries = _parse_transcript_entries(content) + chain = _build_conversation_chain(entries) + visible = [e for e in chain if _is_visible_message(e)] + messages = [_to_session_message(e) for e in visible] + + # Apply offset and limit + if limit is not None and limit > 0: + return messages[offset : offset + limit] + if offset > 0: + return messages[offset:] + return messages diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 3ea89d5a4..dfe402191 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -696,6 +696,64 @@ class StreamEvent: Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage | StreamEvent +# --------------------------------------------------------------------------- +# Session Listing Types +# --------------------------------------------------------------------------- + + +@dataclass +class SDKSessionInfo: + """Session metadata returned by ``list_sessions()``. + + Contains only data extractable from stat + head/tail reads — no full + JSONL parsing required. + + Attributes: + session_id: Unique session identifier (UUID). + summary: Display title for the session — custom title, auto-generated + summary, or first prompt. + last_modified: Last modified time in milliseconds since epoch. + file_size: Session file size in bytes. + custom_title: User-set session title via /rename. + first_prompt: First meaningful user prompt in the session. + git_branch: Git branch at the end of the session. + cwd: Working directory for the session. + """ + + session_id: str + summary: str + last_modified: int + file_size: int + custom_title: str | None = None + first_prompt: str | None = None + git_branch: str | None = None + cwd: str | None = None + + +@dataclass +class SessionMessage: + """A user or assistant message from a session transcript. + + Returned by ``get_session_messages()`` for reading historical session + data. Fields match the SDK wire protocol types (SDKUserMessage / + SDKAssistantMessage). + + Attributes: + type: Message type — ``"user"`` or ``"assistant"``. + uuid: Unique message identifier. + session_id: ID of the session this message belongs to. + message: Raw Anthropic API message dict (role, content, etc.). + parent_tool_use_id: Always ``None`` for top-level conversation + messages (tool-use sidechain messages are filtered out). + """ + + type: Literal["user", "assistant"] + uuid: str + session_id: str + message: Any + parent_tool_use_id: None = None + + class ThinkingConfigAdaptive(TypedDict): type: Literal["adaptive"] diff --git a/tests/test_sessions.py b/tests/test_sessions.py new file mode 100644 index 000000000..39cb7755a --- /dev/null +++ b/tests/test_sessions.py @@ -0,0 +1,1080 @@ +"""Tests for list_sessions().""" + +from __future__ import annotations + +import json +import os +import uuid +from pathlib import Path + +import pytest + +from claude_agent_sdk import ( + SDKSessionInfo, + SessionMessage, + get_session_messages, + list_sessions, +) +from claude_agent_sdk._internal.sessions import ( + _build_conversation_chain, + _extract_first_prompt_from_head, + _extract_json_string_field, + _extract_last_json_string_field, + _sanitize_path, + _simple_hash, + _validate_uuid, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def claude_config_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Creates a temporary ~/.claude directory and points CLAUDE_CONFIG_DIR at it.""" + config_dir = tmp_path / ".claude" + config_dir.mkdir() + (config_dir / "projects").mkdir() + monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir)) + return config_dir + + +def _make_session_file( + project_dir: Path, + session_id: str | None = None, + *, + first_prompt: str = "Hello Claude", + summary: str | None = None, + custom_title: str | None = None, + git_branch: str | None = None, + cwd: str | None = None, + is_sidechain: bool = False, + is_meta_only: bool = False, + mtime: float | None = None, +) -> tuple[str, Path]: + """Creates a .jsonl session file with the given metadata. + + Returns (session_id, file_path). + """ + sid = session_id or str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + + lines: list[str] = [] + + # First line: user message (or meta/sidechain) + first_entry: dict = { + "type": "user", + "message": {"role": "user", "content": first_prompt}, + } + if cwd is not None: + first_entry["cwd"] = cwd + if git_branch is not None: + first_entry["gitBranch"] = git_branch + if is_sidechain: + first_entry["isSidechain"] = True + if is_meta_only: + first_entry["isMeta"] = True + lines.append(json.dumps(first_entry)) + + # Assistant response + lines.append( + json.dumps( + { + "type": "assistant", + "message": {"role": "assistant", "content": "Hi there!"}, + } + ) + ) + + # Tail metadata + tail_entry: dict = {"type": "summary"} + if summary is not None: + tail_entry["summary"] = summary + if custom_title is not None: + tail_entry["customTitle"] = custom_title + if git_branch is not None: + tail_entry["gitBranch"] = git_branch + lines.append(json.dumps(tail_entry)) + + file_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + if mtime is not None: + os.utime(file_path, (mtime, mtime)) + + return sid, file_path + + +def _make_project_dir(config_dir: Path, project_path: str) -> Path: + """Creates a sanitized project directory for the given path.""" + sanitized = _sanitize_path(project_path) + project_dir = config_dir / "projects" / sanitized + project_dir.mkdir(parents=True, exist_ok=True) + return project_dir + + +# --------------------------------------------------------------------------- +# Helper function tests +# --------------------------------------------------------------------------- + + +class TestHelpers: + """Tests for internal helper functions.""" + + def test_validate_uuid_valid(self): + assert _validate_uuid("550e8400-e29b-41d4-a716-446655440000") + assert _validate_uuid("550E8400-E29B-41D4-A716-446655440000") + + def test_validate_uuid_invalid(self): + assert _validate_uuid("not-a-uuid") is None + assert _validate_uuid("") is None + assert _validate_uuid("550e8400-e29b-41d4-a716") is None + + def test_sanitize_path_basic(self): + assert _sanitize_path("/Users/foo/my-project") == "-Users-foo-my-project" + assert _sanitize_path("plugin:name:server") == "plugin-name-server" + + def test_sanitize_path_long(self): + """Long paths get truncated with a hash suffix.""" + long_path = "/x" * 150 # 300 chars + result = _sanitize_path(long_path) + assert len(result) > 200 # truncated + hash + assert result.startswith("-x-x") + # The hash suffix is appended after the 200-char prefix + assert "-" in result[200:] + + def test_simple_hash_deterministic(self): + assert _simple_hash("hello") == _simple_hash("hello") + assert _simple_hash("hello") != _simple_hash("world") + + def test_simple_hash_zero(self): + # Empty string should produce "0" + assert _simple_hash("") == "0" + + def test_extract_json_string_field_simple(self): + text = '{"foo":"bar","baz":"qux"}' + assert _extract_json_string_field(text, "foo") == "bar" + assert _extract_json_string_field(text, "baz") == "qux" + assert _extract_json_string_field(text, "missing") is None + + def test_extract_json_string_field_with_space(self): + text = '{"foo": "bar"}' + assert _extract_json_string_field(text, "foo") == "bar" + + def test_extract_json_string_field_escaped(self): + text = '{"foo":"bar\\"baz"}' + result = _extract_json_string_field(text, "foo") + assert result == 'bar"baz' + + def test_extract_last_json_string_field(self): + text = '{"summary":"first"}\n{"summary":"second"}\n{"summary":"third"}' + assert _extract_last_json_string_field(text, "summary") == "third" + + def test_extract_first_prompt_simple(self): + head = json.dumps({"type": "user", "message": {"content": "Hello!"}}) + "\n" + assert _extract_first_prompt_from_head(head) == "Hello!" + + def test_extract_first_prompt_skips_meta(self): + head = ( + json.dumps({"type": "user", "isMeta": True, "message": {"content": "meta"}}) + + "\n" + + json.dumps({"type": "user", "message": {"content": "real prompt"}}) + + "\n" + ) + assert _extract_first_prompt_from_head(head) == "real prompt" + + def test_extract_first_prompt_skips_tool_result(self): + head = ( + json.dumps( + { + "type": "user", + "message": {"content": [{"type": "tool_result", "content": "x"}]}, + } + ) + + "\n" + + json.dumps({"type": "user", "message": {"content": "actual prompt"}}) + + "\n" + ) + assert _extract_first_prompt_from_head(head) == "actual prompt" + + def test_extract_first_prompt_content_blocks(self): + head = ( + json.dumps( + { + "type": "user", + "message": {"content": [{"type": "text", "text": "block prompt"}]}, + } + ) + + "\n" + ) + assert _extract_first_prompt_from_head(head) == "block prompt" + + def test_extract_first_prompt_truncates(self): + long_prompt = "x" * 300 + head = json.dumps({"type": "user", "message": {"content": long_prompt}}) + "\n" + result = _extract_first_prompt_from_head(head) + assert len(result) <= 201 # 200 chars + ellipsis + assert result.endswith("\u2026") + + def test_extract_first_prompt_command_fallback(self): + """If only slash-commands are found, use first command name.""" + head = ( + json.dumps( + { + "type": "user", + "message": {"content": "/helpstuff"}, + } + ) + + "\n" + ) + assert _extract_first_prompt_from_head(head) == "/help" + + def test_extract_first_prompt_empty(self): + assert _extract_first_prompt_from_head("") == "" + assert _extract_first_prompt_from_head('{"type":"assistant"}\n') == "" + + +# --------------------------------------------------------------------------- +# list_sessions() integration tests +# --------------------------------------------------------------------------- + + +class TestListSessions: + """Tests for the list_sessions() function.""" + + def test_empty_projects_dir(self, claude_config_dir: Path): + """No sessions when projects dir is empty.""" + assert list_sessions() == [] + + def test_no_config_dir(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Gracefully handles missing config dir.""" + monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(tmp_path / "nonexistent")) + assert list_sessions() == [] + + def test_single_session(self, claude_config_dir: Path, tmp_path: Path): + """Single session with basic metadata.""" + project_path = str(tmp_path / "my-project") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid, _ = _make_session_file( + project_dir, + first_prompt="What is 2+2?", + git_branch="main", + cwd=project_path, + ) + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + s = sessions[0] + assert isinstance(s, SDKSessionInfo) + assert s.session_id == sid + assert s.first_prompt == "What is 2+2?" + assert s.summary == "What is 2+2?" # no custom title or summary → first prompt + assert s.git_branch == "main" + assert s.cwd == project_path + assert s.file_size > 0 + assert s.last_modified > 0 + assert s.custom_title is None + + def test_custom_title_wins_summary(self, claude_config_dir: Path, tmp_path: Path): + """custom_title takes precedence over summary and first_prompt.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + _make_session_file( + project_dir, + first_prompt="original question", + summary="auto summary", + custom_title="My Custom Title", + ) + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].summary == "My Custom Title" + assert sessions[0].custom_title == "My Custom Title" + assert sessions[0].first_prompt == "original question" + + def test_summary_wins_first_prompt(self, claude_config_dir: Path, tmp_path: Path): + """Explicit summary takes precedence over first_prompt when no custom_title.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + _make_session_file( + project_dir, first_prompt="question", summary="better summary" + ) + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].summary == "better summary" + assert sessions[0].custom_title is None + + def test_multiple_sessions_sorted_by_mtime( + self, claude_config_dir: Path, tmp_path: Path + ): + """Sessions are sorted by last_modified descending.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + sid_old, _ = _make_session_file(project_dir, first_prompt="old", mtime=1000.0) + sid_new, _ = _make_session_file(project_dir, first_prompt="new", mtime=3000.0) + sid_mid, _ = _make_session_file(project_dir, first_prompt="mid", mtime=2000.0) + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 3 + assert [s.session_id for s in sessions] == [sid_new, sid_mid, sid_old] + # Verify mtime conversion to milliseconds + assert sessions[0].last_modified == 3_000_000 + assert sessions[1].last_modified == 2_000_000 + assert sessions[2].last_modified == 1_000_000 + + def test_limit(self, claude_config_dir: Path, tmp_path: Path): + """Limit option restricts number of results.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + for i in range(5): + _make_session_file( + project_dir, first_prompt=f"prompt {i}", mtime=1000.0 + i + ) + + sessions = list_sessions( + directory=project_path, limit=2, include_worktrees=False + ) + assert len(sessions) == 2 + # Should be the 2 newest + assert sessions[0].last_modified >= sessions[1].last_modified + + def test_filters_sidechain_sessions(self, claude_config_dir: Path, tmp_path: Path): + """Sessions with isSidechain:true are filtered out.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + _make_session_file(project_dir, first_prompt="normal") + _make_session_file(project_dir, first_prompt="sidechain", is_sidechain=True) + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].first_prompt == "normal" + + def test_filters_empty_sessions(self, claude_config_dir: Path, tmp_path: Path): + """Sessions with no summary/title/prompt are filtered (no '(session)' placeholder).""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + # A session with only meta messages → no first_prompt, no summary + _make_session_file(project_dir, first_prompt="ignored meta", is_meta_only=True) + _make_session_file(project_dir, first_prompt="real content") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].first_prompt == "real content" + + def test_filters_non_uuid_filenames(self, claude_config_dir: Path, tmp_path: Path): + """Non-UUID .jsonl files are ignored.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + # Create a non-UUID .jsonl file + (project_dir / "not-a-uuid.jsonl").write_text( + json.dumps({"type": "user", "message": {"content": "x"}}) + "\n" + ) + _make_session_file(project_dir, first_prompt="valid session") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].first_prompt == "valid session" + + def test_ignores_non_jsonl_files(self, claude_config_dir: Path, tmp_path: Path): + """Files not ending in .jsonl are ignored.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + (project_dir / "README.md").write_text("not a session") + _make_session_file(project_dir, first_prompt="session") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + + def test_list_all_sessions(self, claude_config_dir: Path): + """When no directory is given, lists across all projects.""" + proj1 = _make_project_dir(claude_config_dir, "/some/path/one") + proj2 = _make_project_dir(claude_config_dir, "/some/path/two") + + _make_session_file(proj1, first_prompt="from proj1", mtime=1000.0) + _make_session_file(proj2, first_prompt="from proj2", mtime=2000.0) + + sessions = list_sessions() + assert len(sessions) == 2 + # Sorted newest first + assert sessions[0].first_prompt == "from proj2" + assert sessions[1].first_prompt == "from proj1" + + def test_list_all_sessions_dedupes(self, claude_config_dir: Path): + """Duplicate session IDs across projects keep the newest.""" + proj1 = _make_project_dir(claude_config_dir, "/path/one") + proj2 = _make_project_dir(claude_config_dir, "/path/two") + + shared_sid = str(uuid.uuid4()) + _make_session_file( + proj1, session_id=shared_sid, first_prompt="older", mtime=1000.0 + ) + _make_session_file( + proj2, session_id=shared_sid, first_prompt="newer", mtime=2000.0 + ) + + sessions = list_sessions() + assert len(sessions) == 1 + assert sessions[0].first_prompt == "newer" + assert sessions[0].last_modified == 2_000_000 + + def test_nonexistent_project_dir(self, claude_config_dir: Path, tmp_path: Path): + """Returns empty list when project has no session directory.""" + project_path = str(tmp_path / "never-used") + Path(project_path).mkdir(parents=True) + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert sessions == [] + + def test_empty_file_filtered(self, claude_config_dir: Path, tmp_path: Path): + """Empty session files are filtered out.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + sid = str(uuid.uuid4()) + (project_dir / f"{sid}.jsonl").write_text("") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert sessions == [] + + def test_include_worktrees_disabled(self, claude_config_dir: Path, tmp_path: Path): + """include_worktrees=False only scans the given directory.""" + # Create a real directory so realpath works + project_path = str(tmp_path / "main-proj") + Path(project_path).mkdir(parents=True) + canonical = os.path.realpath(project_path) + + main_dir = _make_project_dir(claude_config_dir, canonical) + _make_session_file(main_dir, first_prompt="main session") + + # Create another "worktree-like" project dir that should NOT be scanned + other_dir = _make_project_dir(claude_config_dir, canonical + "-worktree") + _make_session_file(other_dir, first_prompt="worktree session") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].first_prompt == "main session" + + def test_limit_zero_returns_all(self, claude_config_dir: Path, tmp_path: Path): + """limit=0 or negative returns all sessions (TS: limit > 0 check).""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + for i in range(3): + _make_session_file(project_dir, first_prompt=f"p{i}") + + sessions = list_sessions( + directory=project_path, limit=0, include_worktrees=False + ) + assert len(sessions) == 3 + + def test_cwd_from_head_fallback_to_project_path( + self, claude_config_dir: Path, tmp_path: Path + ): + """cwd falls back to project path when not in head.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + canonical = os.path.realpath(project_path) + project_dir = _make_project_dir(claude_config_dir, canonical) + + # Session without cwd field + _make_session_file(project_dir, first_prompt="no cwd field") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].cwd == canonical + + def test_git_branch_from_tail_preferred( + self, claude_config_dir: Path, tmp_path: Path + ): + """gitBranch from tail is preferred over head.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + + sid = str(uuid.uuid4()) + file_path = project_dir / f"{sid}.jsonl" + + lines = [ + json.dumps( + { + "type": "user", + "message": {"content": "hello"}, + "gitBranch": "old-branch", + } + ), + json.dumps({"type": "summary", "gitBranch": "new-branch"}), + ] + file_path.write_text("\n".join(lines) + "\n") + + sessions = list_sessions(directory=project_path, include_worktrees=False) + assert len(sessions) == 1 + assert sessions[0].git_branch == "new-branch" + + +class TestSDKSessionInfoType: + """Tests for the SDKSessionInfo dataclass.""" + + def test_creation_required_fields(self): + info = SDKSessionInfo( + session_id="abc", + summary="test", + last_modified=1000, + file_size=42, + ) + assert info.session_id == "abc" + assert info.summary == "test" + assert info.last_modified == 1000 + assert info.file_size == 42 + assert info.custom_title is None + assert info.first_prompt is None + assert info.git_branch is None + assert info.cwd is None + + def test_creation_all_fields(self): + info = SDKSessionInfo( + session_id="abc", + summary="test", + last_modified=1000, + file_size=42, + custom_title="title", + first_prompt="prompt", + git_branch="main", + cwd="/foo", + ) + assert info.custom_title == "title" + assert info.first_prompt == "prompt" + assert info.git_branch == "main" + assert info.cwd == "/foo" + + +# --------------------------------------------------------------------------- +# get_session_messages() helpers +# --------------------------------------------------------------------------- + + +def _make_transcript_entry( + entry_type: str, + entry_uuid: str, + parent_uuid: str | None, + session_id: str, + content: str | list | None = None, + **extras, +) -> dict: + """Builds a transcript entry dict matching the CLI's JSONL format.""" + entry: dict = { + "type": entry_type, + "uuid": entry_uuid, + "parentUuid": parent_uuid, + "sessionId": session_id, + } + if content is not None: + role = entry_type if entry_type in ("user", "assistant") else "user" + entry["message"] = {"role": role, "content": content} + entry.update(extras) + return entry + + +def _write_transcript(project_dir: Path, session_id: str, entries: list[dict]) -> Path: + """Writes a JSONL transcript file.""" + file_path = project_dir / f"{session_id}.jsonl" + lines = [json.dumps(e) for e in entries] + file_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + return file_path + + +# --------------------------------------------------------------------------- +# get_session_messages() tests +# --------------------------------------------------------------------------- + + +class TestGetSessionMessages: + """Tests for get_session_messages().""" + + def test_invalid_session_id(self, claude_config_dir: Path): + """Non-UUID session_id returns empty list.""" + assert get_session_messages("not-a-uuid") == [] + assert get_session_messages("") == [] + + def test_nonexistent_session(self, claude_config_dir: Path): + """Session file not found returns empty list.""" + sid = str(uuid.uuid4()) + assert get_session_messages(sid) == [] + + def test_no_config_dir(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Missing config dir returns empty list.""" + monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(tmp_path / "nonexistent")) + sid = str(uuid.uuid4()) + assert get_session_messages(sid) == [] + + def test_simple_chain(self, claude_config_dir: Path, tmp_path: Path): + """Basic user → assistant → user → assistant chain.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + u2 = str(uuid.uuid4()) + a2 = str(uuid.uuid4()) + + entries = [ + _make_transcript_entry("user", u1, None, sid, content="hello"), + _make_transcript_entry("assistant", a1, u1, sid, content="hi!"), + _make_transcript_entry("user", u2, a1, sid, content="thanks"), + _make_transcript_entry("assistant", a2, u2, sid, content="welcome"), + ] + _write_transcript(project_dir, sid, entries) + + messages = get_session_messages(sid, directory=project_path) + assert len(messages) == 4 + + # Chronological order: root → leaf + assert messages[0].type == "user" + assert messages[0].uuid == u1 + assert messages[0].session_id == sid + assert messages[0].message == {"role": "user", "content": "hello"} + assert messages[0].parent_tool_use_id is None + + assert messages[1].type == "assistant" + assert messages[1].uuid == a1 + assert messages[1].message == {"role": "assistant", "content": "hi!"} + + assert messages[2].type == "user" + assert messages[2].uuid == u2 + + assert messages[3].type == "assistant" + assert messages[3].uuid == a2 + + # All SessionMessage instances + assert all(isinstance(m, SessionMessage) for m in messages) + + def test_filters_meta_messages(self, claude_config_dir: Path, tmp_path: Path): + """isMeta entries in the chain are filtered from output.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + meta = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + + entries = [ + _make_transcript_entry("user", u1, None, sid, content="hello"), + # Meta user message in the chain — should be walked through but + # filtered from output + _make_transcript_entry("user", meta, u1, sid, content="meta", isMeta=True), + _make_transcript_entry("assistant", a1, meta, sid, content="hi"), + ] + _write_transcript(project_dir, sid, entries) + + messages = get_session_messages(sid, directory=project_path) + # Only u1 and a1 visible (meta filtered out) + assert len(messages) == 2 + assert messages[0].uuid == u1 + assert messages[1].uuid == a1 + + def test_filters_non_user_assistant_from_chain( + self, claude_config_dir: Path, tmp_path: Path + ): + """Progress/system entries in chain are filtered from output.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + prog = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + + entries = [ + _make_transcript_entry("user", u1, None, sid, content="hello"), + # Progress entry in the chain + _make_transcript_entry("progress", prog, u1, sid), + _make_transcript_entry("assistant", a1, prog, sid, content="hi"), + ] + _write_transcript(project_dir, sid, entries) + + messages = get_session_messages(sid, directory=project_path) + # progress is walked through the chain but filtered from output + assert len(messages) == 2 + assert messages[0].uuid == u1 + assert messages[1].uuid == a1 + + def test_keeps_compact_summary(self, claude_config_dir: Path, tmp_path: Path): + """isCompactSummary messages are kept (they represent compacted content).""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + + entries = [ + _make_transcript_entry( + "user", + u1, + None, + sid, + content="compact summary", + isCompactSummary=True, + ), + _make_transcript_entry("assistant", a1, u1, sid, content="hi"), + ] + _write_transcript(project_dir, sid, entries) + + messages = get_session_messages(sid, directory=project_path) + assert len(messages) == 2 + assert messages[0].uuid == u1 # compact summary kept + + def test_limit_and_offset(self, claude_config_dir: Path, tmp_path: Path): + """Limit and offset pagination.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + # Build a chain of 6 messages: u→a→u→a→u→a + uuids = [str(uuid.uuid4()) for _ in range(6)] + entries = [] + for i, uid in enumerate(uuids): + parent = uuids[i - 1] if i > 0 else None + entry_type = "user" if i % 2 == 0 else "assistant" + entries.append( + _make_transcript_entry(entry_type, uid, parent, sid, content=f"m{i}") + ) + _write_transcript(project_dir, sid, entries) + + # No limit/offset + all_msgs = get_session_messages(sid, directory=project_path) + assert len(all_msgs) == 6 + + # limit=2 + page = get_session_messages(sid, directory=project_path, limit=2) + assert len(page) == 2 + assert page[0].uuid == uuids[0] + assert page[1].uuid == uuids[1] + + # offset=2, limit=2 + page = get_session_messages(sid, directory=project_path, limit=2, offset=2) + assert len(page) == 2 + assert page[0].uuid == uuids[2] + assert page[1].uuid == uuids[3] + + # offset only (no limit) + page = get_session_messages(sid, directory=project_path, offset=4) + assert len(page) == 2 + assert page[0].uuid == uuids[4] + assert page[1].uuid == uuids[5] + + # limit=0 returns all (TS: limit > 0 check) + page = get_session_messages(sid, directory=project_path, limit=0) + assert len(page) == 6 + + # offset beyond end + page = get_session_messages(sid, directory=project_path, offset=100) + assert page == [] + + def test_picks_main_chain_over_sidechain( + self, claude_config_dir: Path, tmp_path: Path + ): + """When multiple leaves exist, prefers non-sidechain main leaf.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + root = str(uuid.uuid4()) + main_leaf = str(uuid.uuid4()) + side_leaf = str(uuid.uuid4()) + + entries = [ + _make_transcript_entry("user", root, None, sid, content="root"), + # Main chain continuation + _make_transcript_entry("assistant", main_leaf, root, sid, content="main"), + # Sidechain branch (also from root) — should be ignored as leaf + _make_transcript_entry( + "assistant", + side_leaf, + root, + sid, + content="side", + isSidechain=True, + ), + ] + _write_transcript(project_dir, sid, entries) + + messages = get_session_messages(sid, directory=project_path) + assert len(messages) == 2 + assert messages[0].uuid == root + assert messages[1].uuid == main_leaf # main leaf chosen, not sidechain + + def test_picks_latest_leaf_by_file_position( + self, claude_config_dir: Path, tmp_path: Path + ): + """When multiple main leaves exist, picks the one latest in the file.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + root = str(uuid.uuid4()) + old_leaf = str(uuid.uuid4()) + new_leaf = str(uuid.uuid4()) + + # Both leaves branch from root; new_leaf appears later in file + entries = [ + _make_transcript_entry("user", root, None, sid, content="root"), + _make_transcript_entry("assistant", old_leaf, root, sid, content="old"), + _make_transcript_entry("assistant", new_leaf, root, sid, content="new"), + ] + _write_transcript(project_dir, sid, entries) + + messages = get_session_messages(sid, directory=project_path) + assert len(messages) == 2 + assert messages[0].uuid == root + # new_leaf has higher file position → chosen + assert messages[1].uuid == new_leaf + + def test_terminal_non_message_walked_back( + self, claude_config_dir: Path, tmp_path: Path + ): + """A terminal progress entry is walked back to find user/assistant leaf.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + prog = str(uuid.uuid4()) # terminal progress entry + + entries = [ + _make_transcript_entry("user", u1, None, sid, content="hi"), + _make_transcript_entry("assistant", a1, u1, sid, content="hello"), + # Terminal entry is progress type — should walk back to a1 + _make_transcript_entry("progress", prog, a1, sid), + ] + _write_transcript(project_dir, sid, entries) + + messages = get_session_messages(sid, directory=project_path) + assert len(messages) == 2 + assert messages[0].uuid == u1 + assert messages[1].uuid == a1 + + def test_corrupt_lines_skipped(self, claude_config_dir: Path, tmp_path: Path): + """Corrupt JSON lines are skipped without failing.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + + lines = [ + json.dumps(_make_transcript_entry("user", u1, None, sid, content="hi")), + "not valid json {{{", + "", + json.dumps( + _make_transcript_entry("assistant", a1, u1, sid, content="hello") + ), + ] + (project_dir / f"{sid}.jsonl").write_text("\n".join(lines) + "\n") + + messages = get_session_messages(sid, directory=project_path) + assert len(messages) == 2 + + def test_search_all_projects_when_no_dir(self, claude_config_dir: Path): + """When no directory given, searches all project directories.""" + proj1 = _make_project_dir(claude_config_dir, "/path/one") + proj2 = _make_project_dir(claude_config_dir, "/path/two") + + sid = str(uuid.uuid4()) + u1 = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + + # Session lives only in proj2 + entries = [ + _make_transcript_entry("user", u1, None, sid, content="hi"), + _make_transcript_entry("assistant", a1, u1, sid, content="hello"), + ] + _write_transcript(proj2, sid, entries) + + # proj1 exists but doesn't have this session + _ = proj1 # noqa: F841 + + messages = get_session_messages(sid) # no directory + assert len(messages) == 2 + assert messages[0].uuid == u1 + + def test_cycle_detection(self, claude_config_dir: Path, tmp_path: Path): + """Cyclic parentUuid references don't cause infinite loop.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + + # a1 → u1 → a1 (cycle!) + entries = [ + _make_transcript_entry("user", u1, a1, sid, content="hi"), + _make_transcript_entry("assistant", a1, u1, sid, content="hello"), + ] + _write_transcript(project_dir, sid, entries) + + # Should terminate without hanging. Both entries are parents of + # each other → no terminals → empty chain. + messages = get_session_messages(sid, directory=project_path) + # No terminals found (both are parents) → returns empty + assert messages == [] + + def test_empty_transcript_file(self, claude_config_dir: Path, tmp_path: Path): + """Empty file returns empty list.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + (project_dir / f"{sid}.jsonl").write_text("") + assert get_session_messages(sid, directory=project_path) == [] + + def test_ignores_non_transcript_types( + self, claude_config_dir: Path, tmp_path: Path + ): + """Lines with type=summary (no uuid/chain) are ignored during parsing.""" + project_path = str(tmp_path / "proj") + Path(project_path).mkdir(parents=True) + project_dir = _make_project_dir( + claude_config_dir, os.path.realpath(project_path) + ) + sid = str(uuid.uuid4()) + + u1 = str(uuid.uuid4()) + a1 = str(uuid.uuid4()) + + lines = [ + json.dumps(_make_transcript_entry("user", u1, None, sid, content="hi")), + json.dumps({"type": "summary", "summary": "A nice chat"}), + json.dumps( + _make_transcript_entry("assistant", a1, u1, sid, content="hello") + ), + ] + (project_dir / f"{sid}.jsonl").write_text("\n".join(lines) + "\n") + + messages = get_session_messages(sid, directory=project_path) + assert len(messages) == 2 + + +class TestBuildConversationChain: + """Unit tests for the _build_conversation_chain helper.""" + + def test_empty_input(self): + assert _build_conversation_chain([]) == [] + + def test_single_entry(self): + entry = {"type": "user", "uuid": "a", "parentUuid": None} + result = _build_conversation_chain([entry]) + assert result == [entry] + + def test_linear_chain(self): + entries = [ + {"type": "user", "uuid": "a", "parentUuid": None}, + {"type": "assistant", "uuid": "b", "parentUuid": "a"}, + {"type": "user", "uuid": "c", "parentUuid": "b"}, + ] + result = _build_conversation_chain(entries) + assert [e["uuid"] for e in result] == ["a", "b", "c"] + + def test_only_progress_entries_returns_empty(self): + """If no user/assistant entries, no leaves found → empty.""" + entries = [ + {"type": "progress", "uuid": "a", "parentUuid": None}, + {"type": "progress", "uuid": "b", "parentUuid": "a"}, + ] + result = _build_conversation_chain(entries) + assert result == [] + + +class TestSessionMessageType: + """Tests for the SessionMessage dataclass.""" + + def test_creation(self): + msg = SessionMessage( + type="user", + uuid="abc", + session_id="sess", + message={"role": "user", "content": "hi"}, + ) + assert msg.type == "user" + assert msg.uuid == "abc" + assert msg.session_id == "sess" + assert msg.message == {"role": "user", "content": "hi"} + assert msg.parent_tool_use_id is None