Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 119 additions & 14 deletions scripts/introspection_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
raw content. The skill feeds ONLY this digest to the model. Raw private text
never enters the context (complements the PII redaction gate #82).

Two on-disk session formats are scanned (#238): the upstream ``*.jsonl``
transcripts AND ``request_dump_*.json`` snapshots, which some installs persist
instead. A request dump carries the same role-tagged messages at
``request.body.messages`` plus a provider ``error`` object; ignoring it left
those installs reporting ``sessions_scanned: 0`` and blinded the whole
self-improvement loop.
Three on-disk session formats are scanned: the upstream ``*.jsonl``
transcripts, ``request_dump_*.json`` snapshots (#238), and the SQLite
SessionDB ``state.db`` messages table (#399). A request dump carries the same
role-tagged messages at ``request.body.messages`` plus a provider ``error``
object; ignoring it left those installs reporting ``sessions_scanned: 0`` and
blinded the whole self-improvement loop. The SessionDB is where >90% of real
sessions live, so the messages table is read, grouped by session_id and ordered
by id, then passed through the same scan_messages path.

Signals extracted:
* tool_failures — tool results that look like failures, attributed to the
Expand All @@ -37,11 +39,12 @@
import json
import os
import re
import sqlite3
import sys
import time
from collections import Counter
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, Iterable, List, Optional

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

Expand Down Expand Up @@ -118,10 +121,92 @@ def _iter_lines(path: Path):
return


# Keep the id on DB-derived message dicts so _state_db_session_signals can re-sort.
_MESSAGE_ROW_ID_KEY = "_db_id"


def _message_row_to_dict(row: sqlite3.Row) -> Optional[Dict[str, Any]]:
"""Convert a SessionDB messages row into the role-tagged dict scan_messages
consumes (#399). Drops DB-only columns (session_id, timestamp) but keeps
the original id for ordering."""
obj: Dict[str, Any] = {_MESSAGE_ROW_ID_KEY: row["id"]}
if "role" in row.keys():
obj["role"] = row["role"]
if "content" in row.keys():
obj["content"] = row["content"]
if "tool_call_id" in row.keys():
obj["tool_call_id"] = row["tool_call_id"]
if "tool_calls" in row.keys() and row["tool_calls"] is not None:
try:
parsed = json.loads(row["tool_calls"])
if isinstance(parsed, list):
obj["tool_calls"] = parsed
except ValueError:
pass
if "tool_name" in row.keys():
obj["tool_name"] = row["tool_name"]
return obj if obj.get("role") else None


def _iter_state_db(db_path: Path) -> Iterable[tuple[str, List[Dict[str, Any]]]]:
"""Yield (session_id, messages) from a SQLite state.db messages table.

Messages are grouped by session_id and ordered by id (insertion order) so
tool_call_id -> tool name resolution works exactly as it does for JSONL.
Malformed rows / missing columns are skipped without crashing the scan."""
try:
conn = sqlite3.connect(str(db_path))
except sqlite3.Error:
return
try:
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# Probe schema; the expected columns are id, session_id, role, content,
# tool_call_id, tool_calls, tool_name, timestamp. Any subset is fine.
try:
cur.execute(
"SELECT session_id, id, role, content, tool_call_id, tool_calls, "
"tool_name FROM messages ORDER BY session_id, id"
)
except sqlite3.Error:
return
current_session: Optional[str] = None
current_messages: List[Dict[str, Any]] = []
for row in cur:
sid = row["session_id"]
msg = _message_row_to_dict(row)
if msg is None:
continue
if sid != current_session:
if current_session is not None:
yield current_session, current_messages
current_session = sid
current_messages = []
current_messages.append(msg)
if current_session is not None:
yield current_session, current_messages
finally:
conn.close()


def _state_db_session_signals(msgs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Return signals from one SessionDB session, ordered by the original id.

The caller gives us messages already grouped by session_id and ordered by
id, but we also carry the original id on each dict so we can re-sort here
as a defense-in-depth step. The id key is stripped before scanning so it
never leaks into the digest."""
ordered = sorted(msgs, key=lambda m: m.get(_MESSAGE_ROW_ID_KEY, 0))
for m in ordered:
m.pop(_MESSAGE_ROW_ID_KEY, None)
return scan_messages(ordered)


def scan_messages(messages) -> Dict[str, Any]:
"""Return per-session signal counts (no raw text) from an iterable of
role-tagged message dicts. Shared by the JSONL transcript path and the
request_dump_*.json path (#238) so both formats yield the identical digest.
role-tagged message dicts. Shared by the JSONL transcript path, the
request_dump_*.json path (#238), and the SessionDB state.db path (#399) so
all formats yield the identical digest.
"""
tool_failures: Counter = Counter()
timeouts = 0
Expand Down Expand Up @@ -161,9 +246,10 @@ def scan_messages(messages) -> Dict[str, Any]:
elif role == "tool":
content = obj.get("content")
tool = id_to_tool.get(obj.get("tool_call_id"), "unknown")
if _tool_result_failed(content):
failed = _tool_result_failed(content)
if failed:
tool_failures[tool] += 1
if isinstance(content, str) and _TIMEOUT_RE.search(content):
if failed and isinstance(content, str) and _TIMEOUT_RE.search(content):
timeouts += 1

repeated = {t: n for t, n in max_runs.items() if n >= _REPEAT_THRESHOLD}
Expand Down Expand Up @@ -210,7 +296,11 @@ def scan_request_dump(obj: Dict[str, Any]) -> Dict[str, Any]:
label = err.get("failure_category") or err.get("type") or "error"
provider_errors[f"{status}:{label}" if status else str(label)] += 1
s["provider_errors"] = dict(provider_errors)
body = obj.get("request", {}).get("body") if isinstance(obj.get("request"), dict) else None
body = (
obj.get("request", {}).get("body")
if isinstance(obj.get("request"), dict)
else None
)
model = body.get("model") if isinstance(body, dict) else None
s["models"] = {model: 1} if isinstance(model, str) and model else {}
return s
Expand All @@ -223,7 +313,9 @@ def _fresh(path: Path, cutoff: float) -> bool:
return False


def build_digest(sessions_dir: Path, window_days: int = 7, now: float | None = None) -> Dict[str, Any]:
def build_digest(
sessions_dir: Path, window_days: int = 7, now: float | None = None
) -> Dict[str, Any]:
now = now if now is not None else time.time()
cutoff = now - window_days * 86400
failures: Counter = Counter()
Expand Down Expand Up @@ -278,6 +370,17 @@ def _aggregate(s: Dict[str, Any]) -> None:
scanned += 1
_aggregate(scan_request_dump(obj))

# 3. SQLite SessionDB messages table (#399) — canonical store for real
# sessions. No per-session freshness check: the DB itself lives in
# sessions_dir, and build_digest is already bounded by window_days.
db_path = sessions_dir / "state.db"
if db_path.is_file():
for _sid, msgs in _iter_state_db(db_path):
if not msgs:
continue
scanned += 1
_aggregate(_state_db_session_signals(msgs))

return {
"window_days": window_days,
"sessions_scanned": scanned,
Expand All @@ -293,7 +396,9 @@ def _aggregate(s: Dict[str, Any]) -> None:


def _sessions_dir() -> Path:
return Path(os.environ.get("HERMES_HOME", str(Path.home() / ".hermes"))) / "sessions"
return (
Path(os.environ.get("HERMES_HOME", str(Path.home() / ".hermes"))) / "sessions"
)


def main(argv: List[str]) -> int:
Expand Down
Loading
Loading