From 70f62d0480321c6ad6a8fbf5ddf3014f3a3c8927 Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:16:29 -0700 Subject: [PATCH 1/8] feat(replay): add execution trace schema and data models (#315) Add three new database tables (execution_steps, llm_interactions, file_operations) and corresponding dataclasses + CRUD operations for recording complete execution traces during agent runs. --- codeframe/core/replay.py | 413 ++++++++++++++++++++++++++++++++++++ codeframe/core/workspace.py | 111 ++++++++++ tests/core/test_replay.py | 363 +++++++++++++++++++++++++++++++ 3 files changed, 887 insertions(+) create mode 100644 codeframe/core/replay.py create mode 100644 tests/core/test_replay.py diff --git a/codeframe/core/replay.py b/codeframe/core/replay.py new file mode 100644 index 00000000..ddad6b88 --- /dev/null +++ b/codeframe/core/replay.py @@ -0,0 +1,413 @@ +"""Execution trace recording and replay for CodeFRAME. + +Provides data models and CRUD operations for capturing complete +execution traces (steps, LLM interactions, file operations) and +replaying them for debugging and learning. + +This module is headless - no FastAPI or HTTP dependencies. +""" + +import json +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Optional + +from codeframe.core.workspace import Workspace, get_db_connection + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +# ============================================================================= +# Data Models +# ============================================================================= + + +@dataclass +class ExecutionStep: + """A single step in an execution trace. + + Each iteration of the ReactAgent loop or verification gate + is recorded as one step. + """ + + id: str + run_id: str + step_number: int + step_type: str # "tool_call", "verification", "planning", "gate" + description: str + started_at: datetime + completed_at: Optional[datetime] = None + status: str = "started" # "started", "completed", "failed" + input_context: Optional[str] = None + output_result: Optional[str] = None + metadata: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class LLMInteraction: + """A single LLM prompt/response pair.""" + + id: str + run_id: str + step_id: str + prompt: str + response: str + model: str + tokens_used: int + timestamp: datetime + purpose: str # "execution", "planning", "review", "verification" + + +@dataclass +class FileOperation: + """A file create/edit/delete recorded during execution.""" + + id: str + run_id: str + step_id: str + operation_type: str # "create", "edit", "delete" + file_path: str + content_before: Optional[str] + content_after: Optional[str] + timestamp: datetime + + +@dataclass +class ExecutionTrace: + """Complete trace of a single run, assembled from the three tables.""" + + run_id: str + task_id: str + started_at: datetime + status: str + steps: list[ExecutionStep] + llm_interactions: list[LLMInteraction] + file_operations: list[FileOperation] + completed_at: Optional[datetime] = None + + def summary(self) -> dict[str, Any]: + unique_files = {op.file_path for op in self.file_operations} + return { + "total_steps": len(self.steps), + "llm_calls": len(self.llm_interactions), + "total_tokens": sum(i.tokens_used for i in self.llm_interactions), + "files_modified": len(unique_files), + } + + +# ============================================================================= +# CRUD: ExecutionStep +# ============================================================================= + + +def save_execution_step(workspace: Workspace, step: ExecutionStep) -> None: + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + """ + INSERT OR REPLACE INTO execution_steps + (id, run_id, step_number, step_type, description, started_at, + completed_at, status, input_context, output_result, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + step.id, + step.run_id, + step.step_number, + step.step_type, + step.description, + step.started_at.isoformat(), + step.completed_at.isoformat() if step.completed_at else None, + step.status, + step.input_context, + step.output_result, + json.dumps(step.metadata) if step.metadata else None, + ), + ) + conn.commit() + finally: + conn.close() + + +def get_execution_steps( + workspace: Workspace, run_id: str +) -> list[ExecutionStep]: + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + """ + SELECT id, run_id, step_number, step_type, description, started_at, + completed_at, status, input_context, output_result, metadata + FROM execution_steps + WHERE run_id = ? + ORDER BY step_number ASC + """, + (run_id,), + ) + return [_row_to_step(row) for row in cursor.fetchall()] + finally: + conn.close() + + +# ============================================================================= +# CRUD: LLMInteraction +# ============================================================================= + + +def save_llm_interaction(workspace: Workspace, interaction: LLMInteraction) -> None: + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + """ + INSERT OR REPLACE INTO llm_interactions + (id, run_id, step_id, prompt, response, model, tokens_used, + timestamp, purpose) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + interaction.id, + interaction.run_id, + interaction.step_id, + interaction.prompt, + interaction.response, + interaction.model, + interaction.tokens_used, + interaction.timestamp.isoformat(), + interaction.purpose, + ), + ) + conn.commit() + finally: + conn.close() + + +def get_llm_interactions( + workspace: Workspace, run_id: str, step_id: Optional[str] = None +) -> list[LLMInteraction]: + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + query = """ + SELECT id, run_id, step_id, prompt, response, model, tokens_used, + timestamp, purpose + FROM llm_interactions + WHERE run_id = ? + """ + params: list = [run_id] + if step_id: + query += " AND step_id = ?" + params.append(step_id) + query += " ORDER BY timestamp ASC" + cursor.execute(query, params) + return [_row_to_llm_interaction(row) for row in cursor.fetchall()] + finally: + conn.close() + + +# ============================================================================= +# CRUD: FileOperation +# ============================================================================= + + +def save_file_operation(workspace: Workspace, op: FileOperation) -> None: + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + """ + INSERT OR REPLACE INTO file_operations + (id, run_id, step_id, operation_type, file_path, + content_before, content_after, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + op.id, + op.run_id, + op.step_id, + op.operation_type, + op.file_path, + op.content_before, + op.content_after, + op.timestamp.isoformat(), + ), + ) + conn.commit() + finally: + conn.close() + + +def get_file_operations( + workspace: Workspace, run_id: str, step_id: Optional[str] = None +) -> list[FileOperation]: + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + query = """ + SELECT id, run_id, step_id, operation_type, file_path, + content_before, content_after, timestamp + FROM file_operations + WHERE run_id = ? + """ + params: list = [run_id] + if step_id: + query += " AND step_id = ?" + params.append(step_id) + query += " ORDER BY timestamp ASC" + cursor.execute(query, params) + return [_row_to_file_operation(row) for row in cursor.fetchall()] + finally: + conn.close() + + +# ============================================================================= +# Trace Loading +# ============================================================================= + + +def load_execution_trace(workspace: Workspace, run_id: str) -> Optional[ExecutionTrace]: + """Load a complete execution trace for a run. + + Assembles steps, LLM interactions, and file operations into + a single ExecutionTrace object. + + Returns None if no steps are found for the run. + """ + steps = get_execution_steps(workspace, run_id) + if not steps: + return None + + llm_interactions = get_llm_interactions(workspace, run_id) + file_operations = get_file_operations(workspace, run_id) + + # Get run metadata from the runs table + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + "SELECT task_id, status, started_at, completed_at FROM runs WHERE id = ?", + (run_id,), + ) + row = cursor.fetchone() + if not row: + # Build trace from steps alone (run record may not exist in tests) + return ExecutionTrace( + run_id=run_id, + task_id="unknown", + started_at=steps[0].started_at, + status="UNKNOWN", + steps=steps, + llm_interactions=llm_interactions, + file_operations=file_operations, + ) + + return ExecutionTrace( + run_id=run_id, + task_id=row[0], + started_at=datetime.fromisoformat(row[2]), + status=row[1], + steps=steps, + llm_interactions=llm_interactions, + file_operations=file_operations, + completed_at=datetime.fromisoformat(row[3]) if row[3] else None, + ) + finally: + conn.close() + + +def get_step_snapshot( + workspace: Workspace, run_id: str, step_number: int +) -> dict[str, Any]: + """Reconstruct the file state at a given step. + + Replays file operations from step 1 through step_number to + build a dict mapping file_path -> content at that point. + """ + steps = get_execution_steps(workspace, run_id) + step_ids = {s.id for s in steps if s.step_number <= step_number} + + ops = get_file_operations(workspace, run_id) + relevant_ops = [op for op in ops if op.step_id in step_ids] + + file_state: dict[str, Optional[str]] = {} + for op in relevant_ops: + if op.operation_type == "delete": + file_state[op.file_path] = None + else: + file_state[op.file_path] = op.content_after + + # Remove deleted files + return {k: v for k, v in file_state.items() if v is not None} + + +def compare_steps( + workspace: Workspace, run_id: str, step_a: int, step_b: int +) -> dict[str, dict[str, Optional[str]]]: + """Compare file state between two steps. + + Returns a dict of changed files: {file_path: {"before": content_a, "after": content_b}} + """ + state_a = get_step_snapshot(workspace, run_id, step_a) + state_b = get_step_snapshot(workspace, run_id, step_b) + + all_files = set(state_a.keys()) | set(state_b.keys()) + changes = {} + for f in sorted(all_files): + before = state_a.get(f) + after = state_b.get(f) + if before != after: + changes[f] = {"before": before, "after": after} + return changes + + +# ============================================================================= +# Row Converters +# ============================================================================= + + +def _row_to_step(row: tuple) -> ExecutionStep: + return ExecutionStep( + id=row[0], + run_id=row[1], + step_number=row[2], + step_type=row[3], + description=row[4], + started_at=datetime.fromisoformat(row[5]), + completed_at=datetime.fromisoformat(row[6]) if row[6] else None, + status=row[7], + input_context=row[8], + output_result=row[9], + metadata=json.loads(row[10]) if row[10] else {}, + ) + + +def _row_to_llm_interaction(row: tuple) -> LLMInteraction: + return LLMInteraction( + id=row[0], + run_id=row[1], + step_id=row[2], + prompt=row[3], + response=row[4], + model=row[5], + tokens_used=row[6], + timestamp=datetime.fromisoformat(row[7]), + purpose=row[8], + ) + + +def _row_to_file_operation(row: tuple) -> FileOperation: + return FileOperation( + id=row[0], + run_id=row[1], + step_id=row[2], + operation_type=row[3], + file_path=row[4], + content_before=row[5], + content_after=row[6], + timestamp=datetime.fromisoformat(row[7]), + ) diff --git a/codeframe/core/workspace.py b/codeframe/core/workspace.py index 95e2b82f..b0d92c66 100644 --- a/codeframe/core/workspace.py +++ b/codeframe/core/workspace.py @@ -286,6 +286,56 @@ def _init_database(db_path: Path) -> None: ) """) + # Execution trace tables (for debug/replay mode) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS execution_steps ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + step_number INTEGER NOT NULL, + step_type TEXT NOT NULL, + description TEXT NOT NULL, + started_at TEXT NOT NULL, + completed_at TEXT, + status TEXT NOT NULL DEFAULT 'started', + input_context TEXT, + output_result TEXT, + metadata TEXT, + FOREIGN KEY (run_id) REFERENCES runs(id) + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS llm_interactions ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + step_id TEXT NOT NULL, + prompt TEXT NOT NULL, + response TEXT NOT NULL, + model TEXT NOT NULL, + tokens_used INTEGER NOT NULL DEFAULT 0, + timestamp TEXT NOT NULL, + purpose TEXT NOT NULL DEFAULT 'execution', + FOREIGN KEY (run_id) REFERENCES runs(id), + FOREIGN KEY (step_id) REFERENCES execution_steps(id) + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS file_operations ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + step_id TEXT NOT NULL, + operation_type TEXT NOT NULL, + file_path TEXT NOT NULL, + content_before TEXT, + content_after TEXT, + timestamp TEXT NOT NULL, + FOREIGN KEY (run_id) REFERENCES runs(id), + FOREIGN KEY (step_id) REFERENCES execution_steps(id), + CHECK (operation_type IN ('create', 'edit', 'delete')) + ) + """) + # Create indexes for common queries cursor.execute("CREATE INDEX IF NOT EXISTS idx_tasks_workspace ON tasks(workspace_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)") @@ -303,6 +353,12 @@ def _init_database(db_path: Path) -> None: cursor.execute("CREATE INDEX IF NOT EXISTS idx_diagnostic_reports_run ON diagnostic_reports(run_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_run_engine_log_ws_engine ON run_engine_log(workspace_id, engine)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_engine_stats_ws ON engine_stats(workspace_id, engine)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_execution_steps_run ON execution_steps(run_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_execution_steps_run_step ON execution_steps(run_id, step_number)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_llm_interactions_run ON llm_interactions(run_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_llm_interactions_step ON llm_interactions(step_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_operations_run ON file_operations(run_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_operations_step ON file_operations(step_id)") conn.commit() conn.close() @@ -541,6 +597,61 @@ def _ensure_schema_upgrades(db_path: Path) -> None: ) conn.commit() + # Add execution trace tables for debug/replay mode + cursor.execute(""" + CREATE TABLE IF NOT EXISTS execution_steps ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + step_number INTEGER NOT NULL, + step_type TEXT NOT NULL, + description TEXT NOT NULL, + started_at TEXT NOT NULL, + completed_at TEXT, + status TEXT NOT NULL DEFAULT 'started', + input_context TEXT, + output_result TEXT, + metadata TEXT, + FOREIGN KEY (run_id) REFERENCES runs(id) + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS llm_interactions ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + step_id TEXT NOT NULL, + prompt TEXT NOT NULL, + response TEXT NOT NULL, + model TEXT NOT NULL, + tokens_used INTEGER NOT NULL DEFAULT 0, + timestamp TEXT NOT NULL, + purpose TEXT NOT NULL DEFAULT 'execution', + FOREIGN KEY (run_id) REFERENCES runs(id), + FOREIGN KEY (step_id) REFERENCES execution_steps(id) + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS file_operations ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + step_id TEXT NOT NULL, + operation_type TEXT NOT NULL, + file_path TEXT NOT NULL, + content_before TEXT, + content_after TEXT, + timestamp TEXT NOT NULL, + FOREIGN KEY (run_id) REFERENCES runs(id), + FOREIGN KEY (step_id) REFERENCES execution_steps(id), + CHECK (operation_type IN ('create', 'edit', 'delete')) + ) + """) + cursor.execute("CREATE INDEX IF NOT EXISTS idx_execution_steps_run ON execution_steps(run_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_execution_steps_run_step ON execution_steps(run_id, step_number)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_llm_interactions_run ON llm_interactions(run_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_llm_interactions_step ON llm_interactions(step_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_operations_run ON file_operations(run_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_operations_step ON file_operations(step_id)") + conn.commit() + conn.close() diff --git a/tests/core/test_replay.py b/tests/core/test_replay.py new file mode 100644 index 00000000..572181df --- /dev/null +++ b/tests/core/test_replay.py @@ -0,0 +1,363 @@ +"""Tests for the replay module — execution trace recording and replay. + +Tests cover: +- Data model creation (ExecutionStep, LLMInteraction, FileOperation, ExecutionTrace) +- Database CRUD operations for execution trace tables +- Trace loading and state reconstruction +- Step snapshot generation +- Diff computation between steps +""" + +import json +import uuid +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from codeframe.core.workspace import create_or_load_workspace, get_db_connection + + +@pytest.fixture +def workspace(tmp_path: Path): + """Create a temporary workspace for testing.""" + repo_path = tmp_path / "test_repo" + repo_path.mkdir() + return create_or_load_workspace(repo_path) + + +@pytest.fixture +def run_id(): + return str(uuid.uuid4()) + + +@pytest.fixture +def task_id(): + return str(uuid.uuid4()) + + +# ============================================================================= +# Step 1: Data model tests +# ============================================================================= + + +class TestExecutionStepModel: + """Tests for ExecutionStep dataclass.""" + + def test_create_step(self): + from codeframe.core.replay import ExecutionStep + + step = ExecutionStep( + id="step-1", + run_id="run-1", + step_number=1, + step_type="tool_call", + description="Read file main.py", + started_at=datetime.now(timezone.utc), + ) + assert step.step_number == 1 + assert step.step_type == "tool_call" + assert step.completed_at is None + assert step.status == "started" + assert step.metadata == {} + + def test_step_with_all_fields(self): + from codeframe.core.replay import ExecutionStep + + now = datetime.now(timezone.utc) + step = ExecutionStep( + id="step-2", + run_id="run-1", + step_number=2, + step_type="verification", + description="Run pytest", + started_at=now, + completed_at=now, + status="completed", + input_context="pytest tests/", + output_result="5 passed", + metadata={"gate": "pytest"}, + ) + assert step.status == "completed" + assert step.output_result == "5 passed" + assert step.metadata["gate"] == "pytest" + + +class TestLLMInteractionModel: + """Tests for LLMInteraction dataclass.""" + + def test_create_interaction(self): + from codeframe.core.replay import LLMInteraction + + interaction = LLMInteraction( + id="llm-1", + run_id="run-1", + step_id="step-1", + prompt="Implement the feature", + response="I'll start by reading the file...", + model="claude-sonnet-4-20250514", + tokens_used=1500, + timestamp=datetime.now(timezone.utc), + purpose="execution", + ) + assert interaction.model == "claude-sonnet-4-20250514" + assert interaction.tokens_used == 1500 + assert interaction.purpose == "execution" + + +class TestFileOperationModel: + """Tests for FileOperation dataclass.""" + + def test_create_operation(self): + from codeframe.core.replay import FileOperation + + op = FileOperation( + id="fop-1", + run_id="run-1", + step_id="step-1", + operation_type="create", + file_path="src/main.py", + content_before=None, + content_after="print('hello')", + timestamp=datetime.now(timezone.utc), + ) + assert op.operation_type == "create" + assert op.content_before is None + assert op.content_after == "print('hello')" + + +class TestExecutionTraceModel: + """Tests for ExecutionTrace dataclass.""" + + def test_create_trace(self): + from codeframe.core.replay import ExecutionTrace + + trace = ExecutionTrace( + run_id="run-1", + task_id="task-1", + started_at=datetime.now(timezone.utc), + status="COMPLETED", + steps=[], + llm_interactions=[], + file_operations=[], + ) + assert trace.run_id == "run-1" + assert trace.steps == [] + assert trace.completed_at is None + + def test_trace_summary(self): + from codeframe.core.replay import ExecutionStep, ExecutionTrace, LLMInteraction + + now = datetime.now(timezone.utc) + trace = ExecutionTrace( + run_id="run-1", + task_id="task-1", + started_at=now, + status="COMPLETED", + steps=[ + ExecutionStep( + id="s1", run_id="run-1", step_number=1, + step_type="tool_call", description="read", started_at=now, + ), + ExecutionStep( + id="s2", run_id="run-1", step_number=2, + step_type="tool_call", description="edit", started_at=now, + ), + ], + llm_interactions=[ + LLMInteraction( + id="l1", run_id="run-1", step_id="s1", + prompt="p", response="r", model="claude", + tokens_used=100, timestamp=now, purpose="execution", + ), + ], + file_operations=[], + ) + summary = trace.summary() + assert summary["total_steps"] == 2 + assert summary["llm_calls"] == 1 + assert summary["total_tokens"] == 100 + assert summary["files_modified"] == 0 + + +# ============================================================================= +# Step 1: Database schema tests +# ============================================================================= + + +class TestReplaySchemaCreation: + """Tests that replay tables are created during workspace init.""" + + def test_execution_steps_table_exists(self, workspace): + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='execution_steps'" + ) + assert cursor.fetchone() is not None + finally: + conn.close() + + def test_llm_interactions_table_exists(self, workspace): + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='llm_interactions'" + ) + assert cursor.fetchone() is not None + finally: + conn.close() + + def test_file_operations_table_exists(self, workspace): + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='file_operations'" + ) + assert cursor.fetchone() is not None + finally: + conn.close() + + +# ============================================================================= +# Step 1: Database CRUD tests +# ============================================================================= + + +class TestExecutionStepCRUD: + """Tests for saving and loading execution steps.""" + + def test_save_and_load_step(self, workspace, run_id, task_id): + from codeframe.core.replay import ( + ExecutionStep, + save_execution_step, + get_execution_steps, + ) + + now = datetime.now(timezone.utc) + step = ExecutionStep( + id="step-1", + run_id=run_id, + step_number=1, + step_type="tool_call", + description="Read main.py", + started_at=now, + status="completed", + output_result="file contents here", + ) + save_execution_step(workspace, step) + steps = get_execution_steps(workspace, run_id) + assert len(steps) == 1 + assert steps[0].id == "step-1" + assert steps[0].step_type == "tool_call" + assert steps[0].description == "Read main.py" + + def test_steps_ordered_by_step_number(self, workspace, run_id): + from codeframe.core.replay import ( + ExecutionStep, + save_execution_step, + get_execution_steps, + ) + + now = datetime.now(timezone.utc) + for i in [3, 1, 2]: + save_execution_step( + workspace, + ExecutionStep( + id=f"step-{i}", + run_id=run_id, + step_number=i, + step_type="tool_call", + description=f"Step {i}", + started_at=now, + ), + ) + steps = get_execution_steps(workspace, run_id) + assert [s.step_number for s in steps] == [1, 2, 3] + + +class TestLLMInteractionCRUD: + """Tests for saving and loading LLM interactions.""" + + def test_save_and_load_interaction(self, workspace, run_id): + from codeframe.core.replay import ( + LLMInteraction, + save_llm_interaction, + get_llm_interactions, + ) + + now = datetime.now(timezone.utc) + interaction = LLMInteraction( + id="llm-1", + run_id=run_id, + step_id="step-1", + prompt="Implement feature X", + response="I'll read the file first...", + model="claude-sonnet-4-20250514", + tokens_used=2000, + timestamp=now, + purpose="execution", + ) + save_llm_interaction(workspace, interaction) + interactions = get_llm_interactions(workspace, run_id) + assert len(interactions) == 1 + assert interactions[0].prompt == "Implement feature X" + assert interactions[0].tokens_used == 2000 + + +class TestFileOperationCRUD: + """Tests for saving and loading file operations.""" + + def test_save_and_load_file_op(self, workspace, run_id): + from codeframe.core.replay import ( + FileOperation, + save_file_operation, + get_file_operations, + ) + + now = datetime.now(timezone.utc) + op = FileOperation( + id="fop-1", + run_id=run_id, + step_id="step-1", + operation_type="create", + file_path="src/main.py", + content_before=None, + content_after="print('hello')", + timestamp=now, + ) + save_file_operation(workspace, op) + ops = get_file_operations(workspace, run_id) + assert len(ops) == 1 + assert ops[0].file_path == "src/main.py" + assert ops[0].content_after == "print('hello')" + + def test_file_ops_ordered_by_timestamp(self, workspace, run_id): + from codeframe.core.replay import ( + FileOperation, + save_file_operation, + get_file_operations, + ) + + from datetime import timedelta + + base = datetime.now(timezone.utc) + for i in [2, 0, 1]: + save_file_operation( + workspace, + FileOperation( + id=f"fop-{i}", + run_id=run_id, + step_id=f"step-{i}", + operation_type="edit", + file_path=f"file{i}.py", + content_before="old", + content_after="new", + timestamp=base + timedelta(seconds=i), + ), + ) + ops = get_file_operations(workspace, run_id) + assert [op.file_path for op in ops] == ["file0.py", "file1.py", "file2.py"] From a45c005f7e67d4dfef35b2889a56baf00bc29e0c Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:20:03 -0700 Subject: [PATCH 2/8] feat(replay): add trace loading, snapshots, and export (#315) --- codeframe/core/replay.py | 267 +++++++++++++++++++++++++++++ tests/core/test_replay.py | 348 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 615 insertions(+) diff --git a/codeframe/core/replay.py b/codeframe/core/replay.py index ddad6b88..c36de1b9 100644 --- a/codeframe/core/replay.py +++ b/codeframe/core/replay.py @@ -8,6 +8,7 @@ """ import json +import logging import uuid from dataclasses import dataclass, field from datetime import datetime, timezone @@ -15,6 +16,8 @@ from codeframe.core.workspace import Workspace, get_db_connection +logger = logging.getLogger(__name__) + def _utc_now() -> datetime: return datetime.now(timezone.utc) @@ -98,6 +101,169 @@ def summary(self) -> dict[str, Any]: } +# ============================================================================= +# ExecutionRecorder — buffered recording for ReactAgent integration +# ============================================================================= + + +class ExecutionRecorder: + """Buffered execution trace recorder for ReactAgent. + + Collects execution steps, LLM interactions, and file operations + in memory and flushes them to the database periodically or on demand. + + Args: + workspace: Target workspace (for DB access). + run_id: Run identifier to associate all records with. + flush_interval: Number of records to buffer before auto-flushing. + """ + + def __init__( + self, + workspace: Workspace, + run_id: str, + flush_interval: int = 10, + ) -> None: + self.workspace = workspace + self.run_id = run_id + self._flush_interval = flush_interval + self._step_buffer: list[ExecutionStep] = [] + self._llm_buffer: list[LLMInteraction] = [] + self._file_op_buffer: list[FileOperation] = [] + + def record_iteration( + self, + step_number: int, + tool_names: list[str], + llm_response_summary: str, + ) -> str: + """Record one iteration of the react loop as an ExecutionStep. + + Args: + step_number: 1-based iteration number. + tool_names: Names of tools called in this iteration. + llm_response_summary: Short summary of the LLM response. + + Returns: + The generated step ID. + """ + step_id = str(uuid.uuid4()) + now = _utc_now() + description = ( + f"Tools: {', '.join(tool_names)}" if tool_names else llm_response_summary + ) + step = ExecutionStep( + id=step_id, + run_id=self.run_id, + step_number=step_number, + step_type="tool_call", + description=description, + started_at=now, + completed_at=now, + status="completed", + output_result=llm_response_summary[:500] if llm_response_summary else None, + metadata={"tool_names": tool_names}, + ) + self._step_buffer.append(step) + self._maybe_flush() + return step_id + + def record_llm_call( + self, + step_id: str, + prompt_summary: str, + response_summary: str, + model: str, + tokens_used: int, + purpose: str, + ) -> str: + """Record a single LLM prompt/response pair. + + Args: + step_id: ID of the parent execution step. + prompt_summary: Condensed prompt (system summary + last user message). + response_summary: Content or tool calls summary. + model: Model identifier. + tokens_used: Total tokens consumed. + purpose: Purpose label (execution, planning, etc.). + + Returns: + The generated interaction ID. + """ + interaction_id = str(uuid.uuid4()) + interaction = LLMInteraction( + id=interaction_id, + run_id=self.run_id, + step_id=step_id, + prompt=prompt_summary[:2000] if prompt_summary else "", + response=response_summary[:2000] if response_summary else "", + model=model or "", + tokens_used=tokens_used, + timestamp=_utc_now(), + purpose=purpose, + ) + self._llm_buffer.append(interaction) + self._maybe_flush() + return interaction_id + + def record_file_operation( + self, + step_id: str, + op_type: str, + path: str, + before: Optional[str], + after: Optional[str], + ) -> str: + """Record a file create/edit/delete operation. + + Args: + step_id: ID of the parent execution step. + op_type: Operation type (create, edit, delete). + path: Relative file path. + before: Content before the operation (None for create). + after: Content after the operation (None for delete). + + Returns: + The generated file operation ID. + """ + op_id = str(uuid.uuid4()) + op = FileOperation( + id=op_id, + run_id=self.run_id, + step_id=step_id, + operation_type=op_type, + file_path=path, + content_before=before, + content_after=after, + timestamp=_utc_now(), + ) + self._file_op_buffer.append(op) + self._maybe_flush() + return op_id + + def flush(self) -> None: + """Write all buffered records to the database.""" + try: + for step in self._step_buffer: + save_execution_step(self.workspace, step) + for interaction in self._llm_buffer: + save_llm_interaction(self.workspace, interaction) + for op in self._file_op_buffer: + save_file_operation(self.workspace, op) + except Exception: + logger.debug("ExecutionRecorder flush failed", exc_info=True) + finally: + self._step_buffer.clear() + self._llm_buffer.clear() + self._file_op_buffer.clear() + + def _maybe_flush(self) -> None: + """Auto-flush when buffer reaches threshold.""" + total = len(self._step_buffer) + len(self._llm_buffer) + len(self._file_op_buffer) + if total >= self._flush_interval: + self.flush() + + # ============================================================================= # CRUD: ExecutionStep # ============================================================================= @@ -365,6 +531,107 @@ def compare_steps( return changes +# ============================================================================= +# Export Functions +# ============================================================================= + + +def export_trace_json(trace: ExecutionTrace) -> dict[str, Any]: + """Export an ExecutionTrace as a JSON-serializable dict. + + Returns a dict with run metadata, step details, and summary stats. + """ + # Build a lookup of file operations by step_id + ops_by_step: dict[str, list[FileOperation]] = {} + for op in trace.file_operations: + ops_by_step.setdefault(op.step_id, []).append(op) + + steps = [] + for step in trace.steps: + step_ops = ops_by_step.get(step.id, []) + step_dict: dict[str, Any] = { + "step_number": step.step_number, + "step_type": step.step_type, + "description": step.description, + "status": step.status, + "started_at": step.started_at.isoformat(), + "completed_at": step.completed_at.isoformat() if step.completed_at else None, + } + if step_ops: + step_dict["file_changes"] = [ + { + "operation": op.operation_type, + "file_path": op.file_path, + } + for op in step_ops + ] + steps.append(step_dict) + + return { + "run_id": trace.run_id, + "task_id": trace.task_id, + "started_at": trace.started_at.isoformat(), + "completed_at": trace.completed_at.isoformat() if trace.completed_at else None, + "status": trace.status, + "steps": steps, + "summary": trace.summary(), + } + + +def export_trace_markdown(trace: ExecutionTrace) -> str: + """Export an ExecutionTrace as a Markdown report. + + Produces a human-readable report with header, summary stats, + and a step-by-step timeline including file changes. + """ + summary = trace.summary() + + # Build file operations lookup by step_id + ops_by_step: dict[str, list[FileOperation]] = {} + for op in trace.file_operations: + ops_by_step.setdefault(op.step_id, []).append(op) + + lines = [ + f"# Execution Trace: {trace.run_id}", + "", + f"- **Task**: {trace.task_id}", + f"- **Status**: {trace.status}", + f"- **Started**: {trace.started_at.isoformat()}", + ] + if trace.completed_at: + lines.append(f"- **Completed**: {trace.completed_at.isoformat()}") + lines.append("") + + lines.append("## Summary") + lines.append("") + lines.append("| Metric | Value |") + lines.append("|--------|-------|") + lines.append(f"| Total steps | {summary['total_steps']} |") + lines.append(f"| LLM calls | {summary['llm_calls']} |") + lines.append(f"| Total tokens | {summary['total_tokens']} |") + lines.append(f"| Files modified | {summary['files_modified']} |") + lines.append("") + + lines.append("## Steps") + lines.append("") + for step in trace.steps: + status_icon = {"completed": "[OK]", "failed": "[FAIL]", "started": "[...]"}.get( + step.status, f"[{step.status}]" + ) + lines.append(f"### Step {step.step_number}: {step.description}") + lines.append(f"- **Type**: {step.step_type}") + lines.append(f"- **Status**: {status_icon} {step.status}") + + step_ops = ops_by_step.get(step.id, []) + if step_ops: + lines.append("- **File changes**:") + for op in step_ops: + lines.append(f" - {op.operation_type}: `{op.file_path}`") + lines.append("") + + return "\n".join(lines) + + # ============================================================================= # Row Converters # ============================================================================= diff --git a/tests/core/test_replay.py b/tests/core/test_replay.py index 572181df..26fc1d36 100644 --- a/tests/core/test_replay.py +++ b/tests/core/test_replay.py @@ -361,3 +361,351 @@ def test_file_ops_ordered_by_timestamp(self, workspace, run_id): ) ops = get_file_operations(workspace, run_id) assert [op.file_path for op in ops] == ["file0.py", "file1.py", "file2.py"] + + +# ============================================================================= +# Step 3: Trace loading and state reconstruction tests +# ============================================================================= + + +def _insert_run(workspace, run_id, task_id, status="COMPLETED"): + """Helper to insert a run record directly into the database.""" + conn = get_db_connection(workspace) + try: + conn.execute( + "INSERT INTO runs (id, workspace_id, task_id, status, started_at) " + "VALUES (?, ?, ?, ?, ?)", + (run_id, workspace.id, task_id, status, datetime.now(timezone.utc).isoformat()), + ) + conn.commit() + finally: + conn.close() + + +def _seed_three_step_trace(workspace, run_id): + """Seed a 3-step trace: step1 creates A, step2 edits A, step3 creates B. + + Returns the step ids as a tuple (step1_id, step2_id, step3_id). + """ + from codeframe.core.replay import ( + ExecutionStep, + FileOperation, + LLMInteraction, + save_execution_step, + save_file_operation, + save_llm_interaction, + ) + from datetime import timedelta + + base = datetime.now(timezone.utc) + step_ids = [str(uuid.uuid4()) for _ in range(3)] + + # Step 1: create file A + save_execution_step( + workspace, + ExecutionStep( + id=step_ids[0], run_id=run_id, step_number=1, step_type="tool_call", + description="Create file A", started_at=base, + completed_at=base + timedelta(seconds=1), status="completed", + ), + ) + save_file_operation( + workspace, + FileOperation( + id=str(uuid.uuid4()), run_id=run_id, step_id=step_ids[0], + operation_type="create", file_path="src/a.py", + content_before=None, content_after="# original A", + timestamp=base + timedelta(seconds=1), + ), + ) + save_llm_interaction( + workspace, + LLMInteraction( + id=str(uuid.uuid4()), run_id=run_id, step_id=step_ids[0], + prompt="Create file A", response="Done", + model="claude-sonnet", tokens_used=500, + timestamp=base + timedelta(seconds=1), purpose="execution", + ), + ) + + # Step 2: edit file A + save_execution_step( + workspace, + ExecutionStep( + id=step_ids[1], run_id=run_id, step_number=2, step_type="tool_call", + description="Edit file A", started_at=base + timedelta(seconds=2), + completed_at=base + timedelta(seconds=3), status="completed", + ), + ) + save_file_operation( + workspace, + FileOperation( + id=str(uuid.uuid4()), run_id=run_id, step_id=step_ids[1], + operation_type="edit", file_path="src/a.py", + content_before="# original A", content_after="# edited A", + timestamp=base + timedelta(seconds=3), + ), + ) + save_llm_interaction( + workspace, + LLMInteraction( + id=str(uuid.uuid4()), run_id=run_id, step_id=step_ids[1], + prompt="Edit file A", response="Done", + model="claude-sonnet", tokens_used=300, + timestamp=base + timedelta(seconds=3), purpose="execution", + ), + ) + + # Step 3: create file B + save_execution_step( + workspace, + ExecutionStep( + id=step_ids[2], run_id=run_id, step_number=3, step_type="tool_call", + description="Create file B", started_at=base + timedelta(seconds=4), + completed_at=base + timedelta(seconds=5), status="completed", + ), + ) + save_file_operation( + workspace, + FileOperation( + id=str(uuid.uuid4()), run_id=run_id, step_id=step_ids[2], + operation_type="create", file_path="src/b.py", + content_before=None, content_after="# file B", + timestamp=base + timedelta(seconds=5), + ), + ) + + return tuple(step_ids) + + +class TestLoadExecutionTrace: + """Tests for load_execution_trace assembling a full trace.""" + + def test_load_trace_assembles_all_data(self, workspace, run_id, task_id): + from codeframe.core.replay import load_execution_trace + + _insert_run(workspace, run_id, task_id, status="COMPLETED") + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + assert trace is not None + assert trace.run_id == run_id + assert trace.task_id == task_id + assert trace.status == "COMPLETED" + assert len(trace.steps) == 3 + assert len(trace.llm_interactions) == 2 + assert len(trace.file_operations) == 3 + + def test_load_trace_nonexistent_run_returns_none(self, workspace): + from codeframe.core.replay import load_execution_trace + + result = load_execution_trace(workspace, "nonexistent-run-id") + assert result is None + + def test_load_trace_without_run_record(self, workspace, run_id): + """Steps exist but no run record - should still return a trace.""" + from codeframe.core.replay import load_execution_trace + + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + assert trace is not None + assert trace.task_id == "unknown" + assert trace.status == "UNKNOWN" + assert len(trace.steps) == 3 + + def test_load_trace_step_order(self, workspace, run_id, task_id): + from codeframe.core.replay import load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + step_numbers = [s.step_number for s in trace.steps] + assert step_numbers == [1, 2, 3] + + +class TestGetStepSnapshot: + """Tests for get_step_snapshot reconstructing file state.""" + + def test_snapshot_at_step_1(self, workspace, run_id): + from codeframe.core.replay import get_step_snapshot + + _seed_three_step_trace(workspace, run_id) + + snapshot = get_step_snapshot(workspace, run_id, 1) + assert "src/a.py" in snapshot + assert snapshot["src/a.py"] == "# original A" + assert "src/b.py" not in snapshot + + def test_snapshot_at_step_2(self, workspace, run_id): + from codeframe.core.replay import get_step_snapshot + + _seed_three_step_trace(workspace, run_id) + + snapshot = get_step_snapshot(workspace, run_id, 2) + assert snapshot["src/a.py"] == "# edited A" + assert "src/b.py" not in snapshot + + def test_snapshot_at_step_3(self, workspace, run_id): + from codeframe.core.replay import get_step_snapshot + + _seed_three_step_trace(workspace, run_id) + + snapshot = get_step_snapshot(workspace, run_id, 3) + assert snapshot["src/a.py"] == "# edited A" + assert snapshot["src/b.py"] == "# file B" + + def test_snapshot_at_step_0_empty(self, workspace, run_id): + from codeframe.core.replay import get_step_snapshot + + _seed_three_step_trace(workspace, run_id) + + snapshot = get_step_snapshot(workspace, run_id, 0) + assert snapshot == {} + + +class TestCompareSteps: + """Tests for compare_steps diffing file state between steps.""" + + def test_compare_step_1_to_3(self, workspace, run_id): + from codeframe.core.replay import compare_steps + + _seed_three_step_trace(workspace, run_id) + + diff = compare_steps(workspace, run_id, 1, 3) + # A was edited + assert "src/a.py" in diff + assert diff["src/a.py"]["before"] == "# original A" + assert diff["src/a.py"]["after"] == "# edited A" + # B was created (didn't exist at step 1) + assert "src/b.py" in diff + assert diff["src/b.py"]["before"] is None + assert diff["src/b.py"]["after"] == "# file B" + + def test_compare_same_step_no_diff(self, workspace, run_id): + from codeframe.core.replay import compare_steps + + _seed_three_step_trace(workspace, run_id) + + diff = compare_steps(workspace, run_id, 2, 2) + assert diff == {} + + def test_compare_step_1_to_2(self, workspace, run_id): + from codeframe.core.replay import compare_steps + + _seed_three_step_trace(workspace, run_id) + + diff = compare_steps(workspace, run_id, 1, 2) + assert "src/a.py" in diff + assert diff["src/a.py"]["before"] == "# original A" + assert diff["src/a.py"]["after"] == "# edited A" + assert "src/b.py" not in diff + + +class TestExportTrace: + """Tests for export_trace_json producing a JSON-serializable dict.""" + + def test_export_produces_valid_structure(self, workspace, run_id, task_id): + from codeframe.core.replay import export_trace_json, load_execution_trace + + _insert_run(workspace, run_id, task_id, status="COMPLETED") + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + result = export_trace_json(trace) + + assert result["run_id"] == run_id + assert result["task_id"] == task_id + assert result["status"] == "COMPLETED" + assert "started_at" in result + assert "completed_at" in result + assert len(result["steps"]) == 3 + assert result["summary"]["total_steps"] == 3 + assert result["summary"]["llm_calls"] == 2 + assert result["summary"]["total_tokens"] == 800 + assert result["summary"]["files_modified"] == 2 + + def test_export_is_json_serializable(self, workspace, run_id, task_id): + from codeframe.core.replay import export_trace_json, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + result = export_trace_json(trace) + + # Must not raise + serialized = json.dumps(result) + assert isinstance(serialized, str) + + def test_export_step_fields(self, workspace, run_id, task_id): + from codeframe.core.replay import export_trace_json, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + result = export_trace_json(trace) + + step = result["steps"][0] + assert step["step_number"] == 1 + assert step["step_type"] == "tool_call" + assert step["description"] == "Create file A" + assert step["status"] == "completed" + + +class TestExportTraceMarkdown: + """Tests for export_trace_markdown producing a Markdown report.""" + + def test_markdown_contains_headers(self, workspace, run_id, task_id): + from codeframe.core.replay import export_trace_markdown, load_execution_trace + + _insert_run(workspace, run_id, task_id, status="COMPLETED") + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + md = export_trace_markdown(trace) + + assert "# Execution Trace" in md + assert run_id in md + assert task_id in md + assert "COMPLETED" in md + + def test_markdown_contains_summary(self, workspace, run_id, task_id): + from codeframe.core.replay import export_trace_markdown, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + md = export_trace_markdown(trace) + + assert "## Summary" in md + assert "3" in md # total steps + + def test_markdown_contains_step_descriptions(self, workspace, run_id, task_id): + from codeframe.core.replay import export_trace_markdown, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + md = export_trace_markdown(trace) + + assert "## Steps" in md + assert "Create file A" in md + assert "Edit file A" in md + assert "Create file B" in md + + def test_markdown_contains_file_changes(self, workspace, run_id, task_id): + from codeframe.core.replay import export_trace_markdown, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + trace = load_execution_trace(workspace, run_id) + md = export_trace_markdown(trace) + + assert "src/a.py" in md + assert "src/b.py" in md From f7997587d17f98089ff8afcaf409d64a1db5793c Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:20:54 -0700 Subject: [PATCH 3/8] feat(replay): add execution recording to ReactAgent (#315) Add ExecutionRecorder class with buffered writes for recording execution traces during ReactAgent runs. Hook into _react_loop to capture iteration steps, LLM interactions, and file operations without affecting agent logic. --- codeframe/core/react_agent.py | 56 ++++ tests/core/test_execution_recording.py | 377 +++++++++++++++++++++++++ 2 files changed, 433 insertions(+) create mode 100644 tests/core/test_execution_recording.py diff --git a/codeframe/core/react_agent.py b/codeframe/core/react_agent.py index 7a3129b6..f60370ca 100644 --- a/codeframe/core/react_agent.py +++ b/codeframe/core/react_agent.py @@ -38,6 +38,7 @@ if TYPE_CHECKING: from codeframe.core.conductor import GlobalFixCoordinator + from codeframe.core.replay import ExecutionRecorder from codeframe.core.streaming import EventPublisher, RunOutputLogger logger = logging.getLogger(__name__) @@ -125,6 +126,7 @@ def __init__( debug: bool = False, output_logger: Optional[RunOutputLogger] = None, fix_coordinator: Optional[GlobalFixCoordinator] = None, + execution_recorder: Optional[ExecutionRecorder] = None, ) -> None: self.workspace = workspace self.llm_provider = llm_provider @@ -139,6 +141,7 @@ def __init__( self.debug = debug self.output_logger = output_logger self.fix_coordinator = fix_coordinator + self.execution_recorder = execution_recorder self.fix_tracker = FixAttemptTracker() self.blocker_id: Optional[str] = None @@ -265,6 +268,15 @@ def run(self, task_id: str) -> AgentStatus: task_id, exc_info=True, ) + if self.execution_recorder is not None: + try: + self.execution_recorder.flush() + except Exception: + logger.debug( + "Failed to flush execution recorder for task %s", + task_id, + exc_info=True, + ) except StallDetectedError: raise # Monitor stopped by finally above; let runtime handle retry except Exception: @@ -450,6 +462,31 @@ def _react_loop(self, system_prompt: str) -> AgentStatus: "iteration": iterations, }) + # --- Execution recording: LLM call --- + _rec_step_id: Optional[str] = None + if self.execution_recorder is not None: + # Build condensed summaries for the trace + _rec_prompt = f"System: {prompt_summary} | Messages: {len(messages)}" + if response.has_tool_calls: + _rec_response = "Tool calls: " + ", ".join( + tc.name for tc in response.tool_calls + ) + else: + _rec_response = (response.content or "")[:200] + _rec_step_id = self.execution_recorder.record_iteration( + step_number=iterations, + tool_names=[tc.name for tc in response.tool_calls], + llm_response_summary=_rec_response, + ) + self.execution_recorder.record_llm_call( + step_id=_rec_step_id, + prompt_summary=_rec_prompt, + response_summary=_rec_response, + model=response.model or "", + tokens_used=response.input_tokens + response.output_tokens, + purpose="execution", + ) + if not response.has_tool_calls: # Text-only response — agent thinks it's done. # Check for blocker patterns before accepting completion. @@ -529,6 +566,25 @@ def _react_loop(self, system_prompt: str) -> AgentStatus: } ) + # --- Execution recording: file operations --- + if ( + self.execution_recorder is not None + and _rec_step_id is not None + and tc.name in ("edit_file", "create_file") + and not result.is_error + ): + _op_type = "create" if tc.name == "create_file" else "edit" + _op_path = tc.input.get("path", "") + _op_after = tc.input.get("content") if tc.name == "create_file" else tc.input.get("new_text") + _op_before = tc.input.get("old_text") if tc.name == "edit_file" else None + self.execution_recorder.record_file_operation( + step_id=_rec_step_id, + op_type=_op_type, + path=_op_path, + before=_op_before, + after=_op_after, + ) + # Check error tool results for immediate blocker patterns if result.is_error: self._failure_count += 1 diff --git a/tests/core/test_execution_recording.py b/tests/core/test_execution_recording.py new file mode 100644 index 00000000..97b8b0c6 --- /dev/null +++ b/tests/core/test_execution_recording.py @@ -0,0 +1,377 @@ +"""Tests for execution recording in ReactAgent. + +Tests that when ReactAgent runs with an ExecutionRecorder: +1. An ExecutionStep is recorded for each react loop iteration +2. An LLMInteraction is recorded for each LLM call +3. A FileOperation is recorded for each file create/edit tool execution +4. ReactAgent works fine without a recorder (backward compat) +""" + +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest + +from codeframe.adapters.llm.base import LLMResponse, ToolCall, ToolResult +from codeframe.adapters.llm.mock import MockProvider +from codeframe.core.agent import AgentStatus +from codeframe.core.context import FileContent, TaskContext +from codeframe.core.gates import GateCheck, GateResult, GateStatus +from codeframe.core.replay import ( + ExecutionRecorder, + get_execution_steps, + get_file_operations, + get_llm_interactions, +) +from codeframe.core.tasks import Task, TaskStatus +from codeframe.core.workspace import Workspace, create_or_load_workspace + +pytestmark = pytest.mark.v2 + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def workspace(tmp_path): + """Create a workspace with DB tables initialized.""" + repo_path = tmp_path / "test_repo" + repo_path.mkdir() + return create_or_load_workspace(repo_path) + + +@pytest.fixture +def mock_task(): + """Create a minimal task.""" + _ts = datetime(2026, 1, 1, tzinfo=timezone.utc) + return Task( + id="task-1", + workspace_id="ws-test", + prd_id=None, + title="Add hello function", + description="Create a hello() function that returns 'Hello, World!'", + status=TaskStatus.IN_PROGRESS, + priority=1, + created_at=_ts, + updated_at=_ts, + ) + + +@pytest.fixture +def mock_context(mock_task): + """Create a minimal TaskContext.""" + return TaskContext(task=mock_task) + + +@pytest.fixture +def provider(): + """Create a MockProvider.""" + return MockProvider() + + +def _gate_passed(): + return GateResult( + passed=True, + checks=[GateCheck(name="ruff", status=GateStatus.PASSED)], + ) + + +# --------------------------------------------------------------------------- +# ExecutionRecorder unit tests +# --------------------------------------------------------------------------- + + +class TestExecutionRecorder: + """Tests for the ExecutionRecorder class itself.""" + + def test_record_iteration_saves_step(self, workspace): + recorder = ExecutionRecorder(workspace=workspace, run_id="run-1") + step_id = recorder.record_iteration( + step_number=1, + tool_names=["read_file"], + llm_response_summary="Reading file a.py", + ) + recorder.flush() + + steps = get_execution_steps(workspace, "run-1") + assert len(steps) == 1 + assert steps[0].id == step_id + assert steps[0].step_number == 1 + assert steps[0].step_type == "tool_call" + assert steps[0].status == "completed" + assert "read_file" in steps[0].description + + def test_record_llm_call_saves_interaction(self, workspace): + recorder = ExecutionRecorder(workspace=workspace, run_id="run-1") + recorder.record_llm_call( + step_id="step-1", + prompt_summary="System: CodeFRAME agent | User: implement task", + response_summary="Tool calls: read_file(a.py)", + model="claude-sonnet-4-20250514", + tokens_used=1500, + purpose="execution", + ) + recorder.flush() + + interactions = get_llm_interactions(workspace, "run-1") + assert len(interactions) == 1 + assert interactions[0].step_id == "step-1" + assert interactions[0].tokens_used == 1500 + assert interactions[0].model == "claude-sonnet-4-20250514" + + def test_record_file_operation_saves_op(self, workspace): + recorder = ExecutionRecorder(workspace=workspace, run_id="run-1") + recorder.record_file_operation( + step_id="step-1", + op_type="create", + path="src/main.py", + before=None, + after="print('hello')", + ) + recorder.flush() + + ops = get_file_operations(workspace, "run-1") + assert len(ops) == 1 + assert ops[0].operation_type == "create" + assert ops[0].file_path == "src/main.py" + assert ops[0].content_after == "print('hello')" + + def test_flush_writes_buffered_records(self, workspace): + recorder = ExecutionRecorder(workspace=workspace, run_id="run-1") + # Record multiple items without explicit flush + recorder.record_iteration(step_number=1, tool_names=["read_file"], llm_response_summary="read") + recorder.record_iteration(step_number=2, tool_names=["edit_file"], llm_response_summary="edit") + recorder.record_llm_call("s1", "prompt", "response", "model", 100, "execution") + recorder.record_file_operation("s1", "create", "a.py", None, "content") + + # Nothing written yet (buffered) + assert len(get_execution_steps(workspace, "run-1")) == 0 + + recorder.flush() + + assert len(get_execution_steps(workspace, "run-1")) == 2 + assert len(get_llm_interactions(workspace, "run-1")) == 1 + assert len(get_file_operations(workspace, "run-1")) == 1 + + def test_recorder_is_optional_on_react_agent(self, workspace, provider, mock_context): + """ReactAgent must work without a recorder (backward compat).""" + from codeframe.core.react_agent import ReactAgent + + provider.add_text_response("Task completed.") + + with ( + patch("codeframe.core.react_agent.TaskContextPackager") as mock_loader, + patch("codeframe.core.react_agent.gates") as mock_gates, + ): + mock_loader.return_value.load_context.return_value = mock_context + mock_gates.run.return_value = _gate_passed() + + agent = ReactAgent(workspace=workspace, llm_provider=provider) + status = agent.run("task-1") + + assert status == AgentStatus.COMPLETED + + +# --------------------------------------------------------------------------- +# Integration: ReactAgent + ExecutionRecorder +# --------------------------------------------------------------------------- + + +class TestReactAgentRecording: + """Tests that ReactAgent records execution traces when given a recorder.""" + + @patch("codeframe.core.react_agent.gates") + @patch("codeframe.core.react_agent.execute_tool") + @patch("codeframe.core.react_agent.TaskContextPackager") + def test_records_step_per_iteration( + self, mock_ctx_loader, mock_exec_tool, mock_gates, workspace, provider, mock_context + ): + """Each iteration of the react loop records an ExecutionStep.""" + from codeframe.core.react_agent import ReactAgent + + # Two iterations: tool call then text completion + provider.add_tool_response( + [ToolCall(id="tc1", name="read_file", input={"path": "a.py"})] + ) + provider.add_text_response("Done implementing the task.") + + mock_ctx_loader.return_value.load_context.return_value = mock_context + mock_exec_tool.return_value = ToolResult(tool_call_id="tc1", content="file contents") + mock_gates.run.return_value = _gate_passed() + + recorder = ExecutionRecorder(workspace=workspace, run_id="run-rec-1") + agent = ReactAgent( + workspace=workspace, + llm_provider=provider, + execution_recorder=recorder, + ) + status = agent.run("task-1") + + assert status == AgentStatus.COMPLETED + + steps = get_execution_steps(workspace, "run-rec-1") + # Iteration 1 (tool call) + iteration 2 (text completion) = 2 steps + assert len(steps) == 2 + assert steps[0].step_number == 1 + assert steps[1].step_number == 2 + + @patch("codeframe.core.react_agent.gates") + @patch("codeframe.core.react_agent.execute_tool") + @patch("codeframe.core.react_agent.TaskContextPackager") + def test_records_llm_interaction_per_call( + self, mock_ctx_loader, mock_exec_tool, mock_gates, workspace, provider, mock_context + ): + """Each LLM call records an LLMInteraction.""" + from codeframe.core.react_agent import ReactAgent + + provider.add_tool_response( + [ToolCall(id="tc1", name="read_file", input={"path": "a.py"})], + ) + provider.add_text_response("All done.") + + mock_ctx_loader.return_value.load_context.return_value = mock_context + mock_exec_tool.return_value = ToolResult(tool_call_id="tc1", content="contents") + mock_gates.run.return_value = _gate_passed() + + recorder = ExecutionRecorder(workspace=workspace, run_id="run-llm-1") + agent = ReactAgent( + workspace=workspace, + llm_provider=provider, + execution_recorder=recorder, + ) + agent.run("task-1") + + interactions = get_llm_interactions(workspace, "run-llm-1") + assert len(interactions) == 2 # one per LLM call + assert interactions[0].purpose == "execution" + + @patch("codeframe.core.react_agent.gates") + @patch("codeframe.core.react_agent.execute_tool") + @patch("codeframe.core.react_agent.TaskContextPackager") + def test_records_file_operation_for_create( + self, mock_ctx_loader, mock_exec_tool, mock_gates, workspace, provider, mock_context + ): + """create_file tool execution records a FileOperation.""" + from codeframe.core.react_agent import ReactAgent + + provider.add_tool_response( + [ToolCall(id="tc1", name="create_file", input={"path": "hello.py", "content": "print('hi')"})] + ) + provider.add_text_response("Created the file.") + + mock_ctx_loader.return_value.load_context.return_value = mock_context + mock_exec_tool.return_value = ToolResult(tool_call_id="tc1", content="File created: hello.py") + mock_gates.run.return_value = _gate_passed() + + recorder = ExecutionRecorder(workspace=workspace, run_id="run-fop-1") + agent = ReactAgent( + workspace=workspace, + llm_provider=provider, + execution_recorder=recorder, + ) + agent.run("task-1") + + ops = get_file_operations(workspace, "run-fop-1") + assert len(ops) == 1 + assert ops[0].operation_type == "create" + assert ops[0].file_path == "hello.py" + assert ops[0].content_after == "print('hi')" + + @patch("codeframe.core.react_agent.gates") + @patch("codeframe.core.react_agent.execute_tool") + @patch("codeframe.core.react_agent.TaskContextPackager") + def test_records_file_operation_for_edit( + self, mock_ctx_loader, mock_exec_tool, mock_gates, workspace, provider, mock_context + ): + """edit_file tool execution records a FileOperation.""" + from codeframe.core.react_agent import ReactAgent + + provider.add_tool_response( + [ToolCall(id="tc1", name="edit_file", input={ + "path": "main.py", + "old_text": "old code", + "new_text": "new code", + })] + ) + provider.add_text_response("Edited the file.") + + mock_ctx_loader.return_value.load_context.return_value = mock_context + mock_exec_tool.return_value = ToolResult(tool_call_id="tc1", content="Edit applied") + mock_gates.run.return_value = _gate_passed() + + recorder = ExecutionRecorder(workspace=workspace, run_id="run-edit-1") + agent = ReactAgent( + workspace=workspace, + llm_provider=provider, + execution_recorder=recorder, + ) + agent.run("task-1") + + ops = get_file_operations(workspace, "run-edit-1") + assert len(ops) == 1 + assert ops[0].operation_type == "edit" + assert ops[0].file_path == "main.py" + + @patch("codeframe.core.react_agent.gates") + @patch("codeframe.core.react_agent.execute_tool") + @patch("codeframe.core.react_agent.TaskContextPackager") + def test_no_file_operation_for_read_tool( + self, mock_ctx_loader, mock_exec_tool, mock_gates, workspace, provider, mock_context + ): + """read_file tool does NOT record a FileOperation.""" + from codeframe.core.react_agent import ReactAgent + + provider.add_tool_response( + [ToolCall(id="tc1", name="read_file", input={"path": "a.py"})] + ) + provider.add_text_response("Done.") + + mock_ctx_loader.return_value.load_context.return_value = mock_context + mock_exec_tool.return_value = ToolResult(tool_call_id="tc1", content="contents") + mock_gates.run.return_value = _gate_passed() + + recorder = ExecutionRecorder(workspace=workspace, run_id="run-noop-1") + agent = ReactAgent( + workspace=workspace, + llm_provider=provider, + execution_recorder=recorder, + ) + agent.run("task-1") + + ops = get_file_operations(workspace, "run-noop-1") + assert len(ops) == 0 + + @patch("codeframe.core.react_agent.gates") + @patch("codeframe.core.react_agent.execute_tool") + @patch("codeframe.core.react_agent.TaskContextPackager") + def test_recording_does_not_affect_agent_status( + self, mock_ctx_loader, mock_exec_tool, mock_gates, workspace, provider, mock_context + ): + """Agent returns the same status with or without a recorder.""" + from codeframe.core.react_agent import ReactAgent + + # Setup for a simple completion + def setup_mocks(): + provider.reset() + provider.add_text_response("Task completed successfully.") + mock_ctx_loader.return_value.load_context.return_value = mock_context + mock_gates.run.return_value = _gate_passed() + + # Without recorder + setup_mocks() + agent_no_rec = ReactAgent(workspace=workspace, llm_provider=provider) + status_no_rec = agent_no_rec.run("task-1") + + # With recorder + setup_mocks() + recorder = ExecutionRecorder(workspace=workspace, run_id="run-cmp-1") + agent_with_rec = ReactAgent( + workspace=workspace, + llm_provider=provider, + execution_recorder=recorder, + ) + status_with_rec = agent_with_rec.run("task-1") + + assert status_no_rec == status_with_rec == AgentStatus.COMPLETED From 2a62b4f1ea9f5016fcdade5467172248f14b9f7a Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:27:29 -0700 Subject: [PATCH 4/8] feat(replay): add CLI commands for replay, diff, and export-trace (#315) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - cf work replay — step-through execution with Rich formatting - cf work diff — show file changes with unified diff - cf work export-trace — export to JSON or Markdown --- codeframe/cli/app.py | 279 ++++++++++++++++++++++++++++++ tests/cli/test_replay_commands.py | 246 ++++++++++++++++++++++++++ 2 files changed, 525 insertions(+) create mode 100644 tests/cli/test_replay_commands.py diff --git a/codeframe/cli/app.py b/codeframe/cli/app.py index 2bf36d6c..6b2d2237 100644 --- a/codeframe/cli/app.py +++ b/codeframe/cli/app.py @@ -14,6 +14,7 @@ codeframe status """ +import json from pathlib import Path from typing import Optional @@ -3129,6 +3130,284 @@ def work_follow( raise typer.Exit(1) +# ============================================================================= +# Replay / Debug commands (cf work replay, cf work diff, cf work export-trace) +# ============================================================================= + + +@work_app.command("replay") +def work_replay( + run_id: str = typer.Argument(..., help="Run ID to replay"), + workspace_path: Optional[Path] = typer.Option( + None, + "--workspace", + "-w", + help="Workspace path (defaults to current directory)", + ), + step: Optional[int] = typer.Option( + None, + "--step", + "-s", + help="Jump to a specific step number", + ), + show_llm: bool = typer.Option( + False, + "--show-llm", + help="Show LLM prompts and responses", + ), + show_files: bool = typer.Option( + True, + "--show-files/--no-files", + help="Show file changes at each step", + ), +) -> None: + """Replay a past execution step by step. + + Shows what happened during an agent run: which tools were called, + what files were changed, and what the LLM produced at each step. + + Example: + cf work replay + cf work replay --step 3 + cf work replay --show-llm + """ + from rich.panel import Panel + + from codeframe.core.replay import ( + load_execution_trace, + ) + from codeframe.core.workspace import get_workspace + + path = workspace_path or Path.cwd() + + try: + workspace = get_workspace(path) + trace = load_execution_trace(workspace, run_id) + + if not trace: + console.print(f"[red]Error:[/red] No trace found for run '{run_id}'") + raise typer.Exit(1) + + # Header + console.print( + Panel( + f"[bold]Run:[/bold] {trace.run_id}\n" + f"[bold]Task:[/bold] {trace.task_id}\n" + f"[bold]Status:[/bold] {trace.status}\n" + f"[bold]Steps:[/bold] {len(trace.steps)}", + title="Execution Replay", + ) + ) + + # Build lookups + ops_by_step = {} + for op in trace.file_operations: + ops_by_step.setdefault(op.step_id, []).append(op) + + llm_by_step = {} + for llm in trace.llm_interactions: + llm_by_step.setdefault(llm.step_id, []).append(llm) + + # Filter to specific step if requested + steps_to_show = trace.steps + if step is not None: + steps_to_show = [s for s in trace.steps if s.step_number == step] + if not steps_to_show: + console.print(f"[yellow]No step {step} found (max: {len(trace.steps)})[/yellow]") + raise typer.Exit(1) + + for s in steps_to_show: + status_color = {"completed": "green", "failed": "red"}.get(s.status, "yellow") + console.print( + f"\n[bold]Step {s.step_number}:[/bold] {s.description} " + f"[{status_color}][{s.status}][/{status_color}]" + ) + + if show_files: + step_ops = ops_by_step.get(s.id, []) + for op in step_ops: + op_color = {"create": "green", "edit": "yellow", "delete": "red"}.get( + op.operation_type, "white" + ) + console.print(f" [{op_color}]{op.operation_type}[/{op_color}] {op.file_path}") + + if show_llm: + step_llms = llm_by_step.get(s.id, []) + for llm in step_llms: + console.print(f" [dim]LLM ({llm.model}, {llm.tokens_used} tokens):[/dim]") + console.print(f" [cyan]Prompt:[/cyan] {llm.prompt[:200]}") + console.print(f" [cyan]Response:[/cyan] {llm.response[:200]}") + + # Summary + summary = trace.summary() + console.print(f"\n[dim]Total: {summary['total_steps']} steps, " + f"{summary['llm_calls']} LLM calls, " + f"{summary['total_tokens']} tokens, " + f"{summary['files_modified']} files modified[/dim]") + + except FileNotFoundError: + console.print(f"[red]Error:[/red] No workspace found at {path}") + raise typer.Exit(1) + + +@work_app.command("diff") +def work_diff( + run_id: str = typer.Argument(..., help="Run ID to show diffs for"), + workspace_path: Optional[Path] = typer.Option( + None, + "--workspace", + "-w", + help="Workspace path (defaults to current directory)", + ), + from_step: Optional[int] = typer.Option( + None, + "--from-step", + help="Starting step number (default: 0 = before execution)", + ), + to_step: Optional[int] = typer.Option( + None, + "--to-step", + help="Ending step number (default: last step)", + ), +) -> None: + """Show file changes across an execution run. + + Displays unified diffs of all files modified during the run, + or between specific steps. + + Example: + cf work diff + cf work diff --from-step 1 --to-step 3 + """ + import difflib + + from codeframe.core.replay import compare_steps, load_execution_trace + from codeframe.core.workspace import get_workspace + + path = workspace_path or Path.cwd() + + try: + workspace = get_workspace(path) + trace = load_execution_trace(workspace, run_id) + + if not trace: + console.print(f"[red]Error:[/red] No trace found for run '{run_id}'") + raise typer.Exit(1) + + step_a = from_step if from_step is not None else 0 + step_b = to_step if to_step is not None else max(s.step_number for s in trace.steps) + + changes = compare_steps(workspace, run_id, step_a, step_b) + + if not changes: + console.print("[yellow]No file changes between these steps.[/yellow]") + return + + console.print( + f"[bold]File changes:[/bold] step {step_a} → step {step_b} " + f"({len(changes)} file(s))\n" + ) + + for file_path, change in changes.items(): + before = change["before"] or "" + after = change["after"] or "" + + if change["before"] is None: + console.print(f"[green]+++ {file_path}[/green] (created)") + elif change["after"] is None: + console.print(f"[red]--- {file_path}[/red] (deleted)") + else: + console.print(f"[yellow]~~~ {file_path}[/yellow] (modified)") + + diff_lines = list( + difflib.unified_diff( + before.splitlines(keepends=True), + after.splitlines(keepends=True), + fromfile=f"a/{file_path}", + tofile=f"b/{file_path}", + ) + ) + for line in diff_lines: + line = line.rstrip() + if line.startswith("+") and not line.startswith("+++"): + console.print(f"[green]{line}[/green]") + elif line.startswith("-") and not line.startswith("---"): + console.print(f"[red]{line}[/red]") + else: + console.print(f"[dim]{line}[/dim]") + console.print() + + except FileNotFoundError: + console.print(f"[red]Error:[/red] No workspace found at {path}") + raise typer.Exit(1) + + +@work_app.command("export-trace") +def work_export_trace( + run_id: str = typer.Argument(..., help="Run ID to export"), + workspace_path: Optional[Path] = typer.Option( + None, + "--workspace", + "-w", + help="Workspace path (defaults to current directory)", + ), + format: str = typer.Option( + "json", + "--format", + "-f", + help="Export format: json or markdown", + click_type=click.Choice(["json", "markdown"], case_sensitive=False), + ), + output: Optional[Path] = typer.Option( + None, + "--output", + "-o", + help="Write to file instead of stdout", + ), +) -> None: + """Export an execution trace for analysis. + + Produces a complete trace in JSON or Markdown format, + including all steps, LLM interactions, and file changes. + + Example: + cf work export-trace + cf work export-trace --format markdown + cf work export-trace --output trace.json + """ + from codeframe.core.replay import ( + export_trace_json, + export_trace_markdown, + load_execution_trace, + ) + from codeframe.core.workspace import get_workspace + + path = workspace_path or Path.cwd() + + try: + workspace = get_workspace(path) + trace = load_execution_trace(workspace, run_id) + + if not trace: + console.print(f"[red]Error:[/red] No trace found for run '{run_id}'") + raise typer.Exit(1) + + if format == "json": + content = json.dumps(export_trace_json(trace), indent=2) + else: + content = export_trace_markdown(trace) + + if output: + output.write_text(content) + console.print(f"[green]Trace exported to {output}[/green]") + else: + console.print(content, highlight=False) + + except FileNotFoundError: + console.print(f"[red]Error:[/red] No workspace found at {path}") + raise typer.Exit(1) + + # ============================================================================= # Batch execution commands (subcommand group: cf work batch ) # ============================================================================= diff --git a/tests/cli/test_replay_commands.py b/tests/cli/test_replay_commands.py new file mode 100644 index 00000000..1de6a6f5 --- /dev/null +++ b/tests/cli/test_replay_commands.py @@ -0,0 +1,246 @@ +"""Tests for CLI replay commands: cf work replay, cf work diff, cf work export-trace. + +Uses CliRunner to test command output without requiring a real workspace. +""" + +import json +import uuid +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from codeframe.cli.app import app +from codeframe.core.workspace import create_or_load_workspace, get_db_connection + +pytestmark = pytest.mark.v2 + +runner = CliRunner() + + +@pytest.fixture +def workspace(tmp_path: Path): + repo_path = tmp_path / "test_repo" + repo_path.mkdir() + return create_or_load_workspace(repo_path) + + +@pytest.fixture +def seeded_workspace(workspace): + """Workspace with a run, task, and 3-step execution trace.""" + from codeframe.core.replay import ( + ExecutionStep, + FileOperation, + LLMInteraction, + save_execution_step, + save_file_operation, + save_llm_interaction, + ) + + task_id = str(uuid.uuid4()) + run_id = str(uuid.uuid4()) + + # Insert a task + conn = get_db_connection(workspace) + try: + now = datetime.now(timezone.utc).isoformat() + conn.execute( + "INSERT INTO tasks (id, workspace_id, title, description, status, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (task_id, workspace.id, "Test task", "A test task", "DONE", now, now), + ) + conn.execute( + "INSERT INTO runs (id, workspace_id, task_id, status, started_at, completed_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (run_id, workspace.id, task_id, "COMPLETED", now, now), + ) + conn.commit() + finally: + conn.close() + + base = datetime.now(timezone.utc) + step_ids = [str(uuid.uuid4()) for _ in range(3)] + + for i, (desc, op_type, path, before, after) in enumerate([ + ("Create main.py", "create", "src/main.py", None, "print('hello')"), + ("Edit main.py", "edit", "src/main.py", "print('hello')", "print('world')"), + ("Create utils.py", "create", "src/utils.py", None, "def helper(): pass"), + ]): + save_execution_step( + workspace, + ExecutionStep( + id=step_ids[i], + run_id=run_id, + step_number=i + 1, + step_type="tool_call", + description=desc, + started_at=base + timedelta(seconds=i * 2), + completed_at=base + timedelta(seconds=i * 2 + 1), + status="completed", + ), + ) + save_file_operation( + workspace, + FileOperation( + id=str(uuid.uuid4()), + run_id=run_id, + step_id=step_ids[i], + operation_type=op_type, + file_path=path, + content_before=before, + content_after=after, + timestamp=base + timedelta(seconds=i * 2 + 1), + ), + ) + if i < 2: # LLM interactions for first two steps + save_llm_interaction( + workspace, + LLMInteraction( + id=str(uuid.uuid4()), + run_id=run_id, + step_id=step_ids[i], + prompt=f"Do step {i + 1}", + response=f"Done with step {i + 1}", + model="claude-sonnet", + tokens_used=500, + timestamp=base + timedelta(seconds=i * 2 + 1), + purpose="execution", + ), + ) + + return workspace, task_id, run_id + + +class TestWorkReplay: + """Tests for cf work replay .""" + + def test_replay_shows_steps(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, ["work", "replay", run_id, "--workspace", str(workspace.repo_path)] + ) + assert result.exit_code == 0 + assert "Create main.py" in result.output + assert "Edit main.py" in result.output + assert "Create utils.py" in result.output + + def test_replay_specific_step(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, + ["work", "replay", run_id, "--step", "2", "--workspace", str(workspace.repo_path)], + ) + assert result.exit_code == 0 + assert "Edit main.py" in result.output + + def test_replay_with_show_llm(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, + ["work", "replay", run_id, "--show-llm", "--workspace", str(workspace.repo_path)], + ) + assert result.exit_code == 0 + assert "Do step 1" in result.output or "LLM" in result.output + + def test_replay_nonexistent_run(self, workspace): + result = runner.invoke( + app, + ["work", "replay", "nonexistent-id", "--workspace", str(workspace.repo_path)], + ) + assert result.exit_code == 1 + assert "not found" in result.output.lower() or "no trace" in result.output.lower() + + +class TestWorkDiff: + """Tests for cf work diff .""" + + def test_diff_shows_all_changes(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, ["work", "diff", run_id, "--workspace", str(workspace.repo_path)] + ) + assert result.exit_code == 0 + assert "src/main.py" in result.output + assert "src/utils.py" in result.output + + def test_diff_between_steps(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, + [ + "work", "diff", run_id, + "--from-step", "1", "--to-step", "3", + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 0 + assert "src/main.py" in result.output + + def test_diff_nonexistent_run(self, workspace): + result = runner.invoke( + app, + ["work", "diff", "nonexistent-id", "--workspace", str(workspace.repo_path)], + ) + assert result.exit_code == 1 + + +class TestWorkExportTrace: + """Tests for cf work export-trace .""" + + def test_export_json_to_stdout(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, + [ + "work", "export-trace", run_id, + "--format", "json", + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data["run_id"] == run_id + assert data["summary"]["total_steps"] == 3 + + def test_export_markdown_to_stdout(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, + [ + "work", "export-trace", run_id, + "--format", "markdown", + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 0 + assert "# Execution Trace" in result.output + assert run_id in result.output + + def test_export_json_to_file(self, seeded_workspace, tmp_path): + workspace, task_id, run_id = seeded_workspace + output_file = tmp_path / "trace.json" + result = runner.invoke( + app, + [ + "work", "export-trace", run_id, + "--format", "json", + "--output", str(output_file), + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 0 + assert output_file.exists() + data = json.loads(output_file.read_text()) + assert data["run_id"] == run_id + + def test_export_nonexistent_run(self, workspace): + result = runner.invoke( + app, + [ + "work", "export-trace", "nonexistent-id", + "--format", "json", + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 1 From 024d59ff17de558f55183a033ff1ffad780379fe Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:30:31 -0700 Subject: [PATCH 5/8] feat(replay): add interactive navigation, rerun, and session management (#315) - ReplaySession class with n/p/j navigation for step-through - prepare_rerun() reconstructs file state at any step - cf work rerun command shows state and remaining steps --- codeframe/cli/app.py | 61 ++++++++++++ codeframe/core/replay.py | 108 ++++++++++++++++++++ tests/cli/test_replay_commands.py | 42 ++++++++ tests/core/test_replay.py | 157 ++++++++++++++++++++++++++++++ 4 files changed, 368 insertions(+) diff --git a/codeframe/cli/app.py b/codeframe/cli/app.py index 6b2d2237..4fc0ec2a 100644 --- a/codeframe/cli/app.py +++ b/codeframe/cli/app.py @@ -3408,6 +3408,67 @@ def work_export_trace( raise typer.Exit(1) +@work_app.command("rerun") +def work_rerun( + run_id: str = typer.Argument(..., help="Run ID to re-run from"), + workspace_path: Optional[Path] = typer.Option( + None, + "--workspace", + "-w", + help="Workspace path (defaults to current directory)", + ), + from_step: int = typer.Option( + 1, + "--from-step", + help="Step number to resume from", + ), +) -> None: + """Prepare to re-execute a run from a specific step. + + Reconstructs the file state at step N and shows what + would need to be re-executed. Use this to understand + what happened and plan a manual re-run. + + Example: + cf work rerun --from-step 2 + """ + from codeframe.core.replay import prepare_rerun + from codeframe.core.workspace import get_workspace + + path = workspace_path or Path.cwd() + + try: + workspace = get_workspace(path) + rerun_info = prepare_rerun(workspace, run_id, from_step) + + console.print(f"[bold]Re-run preparation for run {run_id}[/bold]\n") + console.print(f"[bold]Resume from:[/bold] Step {from_step}") + console.print(f"[bold]Task:[/bold] {rerun_info['task_id']}") + + file_state = rerun_info["file_state"] + if file_state: + console.print(f"\n[bold]File state at step {from_step}:[/bold]") + for fp in sorted(file_state.keys()): + console.print(f" {fp}") + else: + console.print(f"\n[yellow]No files modified at step {from_step}[/yellow]") + + remaining = rerun_info["remaining_steps"] + if remaining: + console.print(f"\n[bold]Remaining steps ({len(remaining)}):[/bold]") + for rs in remaining: + console.print(f" Step {rs['step_number']}: {rs['description']}") + else: + console.print("\n[yellow]No remaining steps after this point[/yellow]") + + except FileNotFoundError: + console.print(f"[red]Error:[/red] No workspace found at {path}") + raise typer.Exit(1) + except ValueError as e: + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) + + # ============================================================================= # Batch execution commands (subcommand group: cf work batch ) # ============================================================================= diff --git a/codeframe/core/replay.py b/codeframe/core/replay.py index c36de1b9..3a7b4c68 100644 --- a/codeframe/core/replay.py +++ b/codeframe/core/replay.py @@ -632,6 +632,114 @@ def export_trace_markdown(trace: ExecutionTrace) -> str: return "\n".join(lines) +# ============================================================================= +# Interactive Replay Session +# ============================================================================= + + +class ReplaySession: + """Manages interactive step-through of an execution trace. + + Tracks the current position and provides navigation methods. + Display is delegated to the caller (CLI layer). + """ + + def __init__(self, trace: ExecutionTrace) -> None: + self.trace = trace + self._current_index = 0 + + # Build lookups + self.ops_by_step: dict[str, list[FileOperation]] = {} + for op in trace.file_operations: + self.ops_by_step.setdefault(op.step_id, []).append(op) + + self.llm_by_step: dict[str, list[LLMInteraction]] = {} + for llm in trace.llm_interactions: + self.llm_by_step.setdefault(llm.step_id, []).append(llm) + + @property + def current_step(self) -> Optional[ExecutionStep]: + if 0 <= self._current_index < len(self.trace.steps): + return self.trace.steps[self._current_index] + return None + + @property + def current_position(self) -> int: + return self._current_index + 1 + + @property + def total_steps(self) -> int: + return len(self.trace.steps) + + def next(self) -> Optional[ExecutionStep]: + if self._current_index < len(self.trace.steps) - 1: + self._current_index += 1 + return self.current_step + + def previous(self) -> Optional[ExecutionStep]: + if self._current_index > 0: + self._current_index -= 1 + return self.current_step + + def jump(self, step_number: int) -> Optional[ExecutionStep]: + for i, step in enumerate(self.trace.steps): + if step.step_number == step_number: + self._current_index = i + return step + return None + + def get_step_file_ops(self, step: ExecutionStep) -> list[FileOperation]: + return self.ops_by_step.get(step.id, []) + + def get_step_llm_calls(self, step: ExecutionStep) -> list[LLMInteraction]: + return self.llm_by_step.get(step.id, []) + + def list_steps(self) -> list[ExecutionStep]: + return list(self.trace.steps) + + +# ============================================================================= +# Re-run from Step +# ============================================================================= + + +def prepare_rerun( + workspace: Workspace, + run_id: str, + from_step: int, +) -> dict[str, Any]: + """Prepare state for re-executing from a specific step. + + Reconstructs the file state at the given step and returns + metadata needed to create a new run starting from that point. + + Returns a dict with: + - file_state: dict of file_path -> content at step N + - original_run_id: the source run + - from_step: the step number to resume from + - remaining_steps: descriptions of steps that follow + """ + trace = load_execution_trace(workspace, run_id) + if not trace: + raise ValueError(f"No trace found for run '{run_id}'") + + file_state = get_step_snapshot(workspace, run_id, from_step) + + remaining_steps = [ + {"step_number": s.step_number, "description": s.description} + for s in trace.steps + if s.step_number > from_step + ] + + return { + "file_state": file_state, + "original_run_id": run_id, + "from_step": from_step, + "remaining_steps": remaining_steps, + "task_id": trace.task_id, + } + + # ============================================================================= # Row Converters # ============================================================================= diff --git a/tests/cli/test_replay_commands.py b/tests/cli/test_replay_commands.py index 1de6a6f5..49474f38 100644 --- a/tests/cli/test_replay_commands.py +++ b/tests/cli/test_replay_commands.py @@ -244,3 +244,45 @@ def test_export_nonexistent_run(self, workspace): ], ) assert result.exit_code == 1 + + +class TestWorkRerun: + """Tests for cf work rerun .""" + + def test_rerun_shows_file_state(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, + [ + "work", "rerun", run_id, + "--from-step", "2", + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 0 + assert "src/main.py" in result.output + assert "Step 2" in result.output or "step 2" in result.output.lower() + + def test_rerun_shows_remaining_steps(self, seeded_workspace): + workspace, task_id, run_id = seeded_workspace + result = runner.invoke( + app, + [ + "work", "rerun", run_id, + "--from-step", "1", + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 0 + assert "Remaining steps" in result.output + + def test_rerun_nonexistent_run(self, workspace): + result = runner.invoke( + app, + [ + "work", "rerun", "nonexistent-id", + "--from-step", "1", + "--workspace", str(workspace.repo_path), + ], + ) + assert result.exit_code == 1 diff --git a/tests/core/test_replay.py b/tests/core/test_replay.py index 26fc1d36..5623bf4d 100644 --- a/tests/core/test_replay.py +++ b/tests/core/test_replay.py @@ -709,3 +709,160 @@ def test_markdown_contains_file_changes(self, workspace, run_id, task_id): assert "src/a.py" in md assert "src/b.py" in md + + +# ============================================================================= +# Step 5: Interactive replay session tests +# ============================================================================= + + +class TestReplaySession: + """Tests for ReplaySession interactive navigation.""" + + def test_initial_position(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + assert session.current_position == 1 + assert session.total_steps == 3 + assert session.current_step.description == "Create file A" + + def test_next_navigation(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + session.next() + assert session.current_position == 2 + assert session.current_step.description == "Edit file A" + + def test_previous_navigation(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + session.next() + session.next() + session.previous() + assert session.current_position == 2 + + def test_previous_at_start_stays(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + session.previous() + assert session.current_position == 1 + + def test_next_at_end_stays(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + session.next() + session.next() + session.next() # Beyond end + assert session.current_position == 3 + + def test_jump_to_step(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + result = session.jump(3) + assert result is not None + assert session.current_position == 3 + assert session.current_step.description == "Create file B" + + def test_jump_invalid_step_returns_none(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + result = session.jump(99) + assert result is None + assert session.current_position == 1 # Unchanged + + def test_get_step_file_ops(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + ops = session.get_step_file_ops(session.current_step) + assert len(ops) == 1 + assert ops[0].file_path == "src/a.py" + + def test_get_step_llm_calls(self, workspace, run_id, task_id): + from codeframe.core.replay import ReplaySession, load_execution_trace + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + trace = load_execution_trace(workspace, run_id) + + session = ReplaySession(trace) + llms = session.get_step_llm_calls(session.current_step) + assert len(llms) == 1 + assert llms[0].prompt == "Create file A" + + +# ============================================================================= +# Step 6: Re-run preparation tests +# ============================================================================= + + +class TestPrepareRerun: + """Tests for prepare_rerun state reconstruction.""" + + def test_prepare_rerun_from_step_1(self, workspace, run_id, task_id): + from codeframe.core.replay import prepare_rerun + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + result = prepare_rerun(workspace, run_id, from_step=1) + assert result["original_run_id"] == run_id + assert result["from_step"] == 1 + assert result["task_id"] == task_id + assert "src/a.py" in result["file_state"] + assert len(result["remaining_steps"]) == 2 + + def test_prepare_rerun_from_step_2(self, workspace, run_id, task_id): + from codeframe.core.replay import prepare_rerun + + _insert_run(workspace, run_id, task_id) + _seed_three_step_trace(workspace, run_id) + + result = prepare_rerun(workspace, run_id, from_step=2) + assert result["file_state"]["src/a.py"] == "# edited A" + assert len(result["remaining_steps"]) == 1 + + def test_prepare_rerun_nonexistent_run(self, workspace): + from codeframe.core.replay import prepare_rerun + + with pytest.raises(ValueError, match="No trace found"): + prepare_rerun(workspace, "nonexistent-id", from_step=1) From de742c75b6dceb4a5c0dec6d114c99075d9f2e59 Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:33:46 -0700 Subject: [PATCH 6/8] test(replay): add integration tests for full record-replay lifecycle (#315) End-to-end tests: ExecutionRecorder records a 3-step agent run, then verifies trace loading, step snapshots, diffs, JSON/Markdown export, ReplaySession navigation, and rerun preparation. --- tests/core/test_replay_integration.py | 247 ++++++++++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100644 tests/core/test_replay_integration.py diff --git a/tests/core/test_replay_integration.py b/tests/core/test_replay_integration.py new file mode 100644 index 00000000..62605258 --- /dev/null +++ b/tests/core/test_replay_integration.py @@ -0,0 +1,247 @@ +"""Integration tests for the replay system. + +Exercises the full flow: ExecutionRecorder records data during a mock +agent run, then load/replay/diff/export consume that recorded data. +""" + +import json +import uuid +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from codeframe.core.workspace import create_or_load_workspace, get_db_connection + +pytestmark = pytest.mark.v2 + + +@pytest.fixture +def workspace(tmp_path: Path): + repo_path = tmp_path / "test_repo" + repo_path.mkdir() + return create_or_load_workspace(repo_path) + + +@pytest.fixture +def run_with_trace(workspace): + """Simulate a complete agent run using ExecutionRecorder.""" + from codeframe.core.replay import ExecutionRecorder + + task_id = str(uuid.uuid4()) + run_id = str(uuid.uuid4()) + + # Insert run record + conn = get_db_connection(workspace) + try: + now = datetime.now(timezone.utc).isoformat() + conn.execute( + "INSERT INTO tasks (id, workspace_id, title, description, status, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (task_id, workspace.id, "Integration test task", "Full lifecycle test", "DONE", now, now), + ) + conn.execute( + "INSERT INTO runs (id, workspace_id, task_id, status, started_at, completed_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (run_id, workspace.id, task_id, "COMPLETED", now, now), + ) + conn.commit() + finally: + conn.close() + + # Record execution using the recorder (same way ReactAgent does) + recorder = ExecutionRecorder(workspace, run_id, flush_interval=100) + + # Iteration 1: Create a file + step1_id = recorder.record_iteration( + step_number=1, + tool_names=["create_file"], + llm_response_summary="Creating main.py with hello world", + ) + recorder.record_llm_call( + step_id=step1_id, + prompt_summary="Implement the task: create a hello world script", + response_summary="I'll create main.py", + model="claude-sonnet-4-20250514", + tokens_used=800, + purpose="execution", + ) + recorder.record_file_operation( + step_id=step1_id, + op_type="create", + path="main.py", + before=None, + after="print('hello world')", + ) + + # Iteration 2: Edit the file + step2_id = recorder.record_iteration( + step_number=2, + tool_names=["edit_file"], + llm_response_summary="Adding error handling", + ) + recorder.record_llm_call( + step_id=step2_id, + prompt_summary="The file needs error handling", + response_summary="I'll add try/except", + model="claude-sonnet-4-20250514", + tokens_used=600, + purpose="execution", + ) + recorder.record_file_operation( + step_id=step2_id, + op_type="edit", + path="main.py", + before="print('hello world')", + after="try:\n print('hello world')\nexcept Exception:\n pass", + ) + + # Iteration 3: Run tests (no file changes) + step3_id = recorder.record_iteration( + step_number=3, + tool_names=["run_tests"], + llm_response_summary="All tests pass", + ) + recorder.record_llm_call( + step_id=step3_id, + prompt_summary="Run the test suite", + response_summary="5 tests passed", + model="claude-sonnet-4-20250514", + tokens_used=400, + purpose="verification", + ) + + recorder.flush() + return workspace, task_id, run_id + + +class TestFullLifecycle: + """End-to-end: record → load → replay → diff → export.""" + + def test_load_recorded_trace(self, run_with_trace): + from codeframe.core.replay import load_execution_trace + + workspace, task_id, run_id = run_with_trace + trace = load_execution_trace(workspace, run_id) + + assert trace is not None + assert trace.run_id == run_id + assert trace.task_id == task_id + assert trace.status == "COMPLETED" + assert len(trace.steps) == 3 + assert len(trace.llm_interactions) == 3 + assert len(trace.file_operations) == 2 + + def test_step_snapshots_match_recorded_state(self, run_with_trace): + from codeframe.core.replay import get_step_snapshot + + workspace, _, run_id = run_with_trace + + # After step 1: main.py created + snapshot1 = get_step_snapshot(workspace, run_id, 1) + assert snapshot1 == {"main.py": "print('hello world')"} + + # After step 2: main.py edited + snapshot2 = get_step_snapshot(workspace, run_id, 2) + assert "try:" in snapshot2["main.py"] + + # After step 3: no file changes, same state + snapshot3 = get_step_snapshot(workspace, run_id, 3) + assert snapshot3 == snapshot2 + + def test_diff_between_start_and_end(self, run_with_trace): + from codeframe.core.replay import compare_steps + + workspace, _, run_id = run_with_trace + changes = compare_steps(workspace, run_id, 0, 3) + + assert "main.py" in changes + assert changes["main.py"]["before"] is None + assert "try:" in changes["main.py"]["after"] + + def test_diff_step_1_to_2(self, run_with_trace): + from codeframe.core.replay import compare_steps + + workspace, _, run_id = run_with_trace + changes = compare_steps(workspace, run_id, 1, 2) + + assert "main.py" in changes + assert changes["main.py"]["before"] == "print('hello world')" + assert "try:" in changes["main.py"]["after"] + + def test_export_json_roundtrip(self, run_with_trace): + from codeframe.core.replay import export_trace_json, load_execution_trace + + workspace, task_id, run_id = run_with_trace + trace = load_execution_trace(workspace, run_id) + exported = export_trace_json(trace) + + # Verify JSON serializable + serialized = json.dumps(exported) + roundtripped = json.loads(serialized) + + assert roundtripped["run_id"] == run_id + assert roundtripped["task_id"] == task_id + assert roundtripped["summary"]["total_steps"] == 3 + assert roundtripped["summary"]["llm_calls"] == 3 + assert roundtripped["summary"]["total_tokens"] == 1800 + assert roundtripped["summary"]["files_modified"] == 1 + + def test_export_markdown_content(self, run_with_trace): + from codeframe.core.replay import export_trace_markdown, load_execution_trace + + workspace, _, run_id = run_with_trace + trace = load_execution_trace(workspace, run_id) + md = export_trace_markdown(trace) + + assert "# Execution Trace" in md + assert "COMPLETED" in md + assert "main.py" in md + assert "create_file" in md or "Creating" in md + + def test_replay_session_navigation(self, run_with_trace): + from codeframe.core.replay import ReplaySession, load_execution_trace + + workspace, _, run_id = run_with_trace + trace = load_execution_trace(workspace, run_id) + session = ReplaySession(trace) + + # Start at step 1 + assert session.current_position == 1 + assert "create_file" in session.current_step.description + + # Navigate forward + session.next() + assert session.current_position == 2 + assert "edit_file" in session.current_step.description + + # Jump to step 3 + session.jump(3) + assert session.current_position == 3 + assert "run_tests" in session.current_step.description + + # Go back + session.previous() + assert session.current_position == 2 + + def test_prepare_rerun_from_step(self, run_with_trace): + from codeframe.core.replay import prepare_rerun + + workspace, task_id, run_id = run_with_trace + info = prepare_rerun(workspace, run_id, from_step=1) + + assert info["task_id"] == task_id + assert info["file_state"]["main.py"] == "print('hello world')" + assert len(info["remaining_steps"]) == 2 + + def test_summary_aggregation(self, run_with_trace): + from codeframe.core.replay import load_execution_trace + + workspace, _, run_id = run_with_trace + trace = load_execution_trace(workspace, run_id) + summary = trace.summary() + + assert summary["total_steps"] == 3 + assert summary["llm_calls"] == 3 + assert summary["total_tokens"] == 1800 # 800 + 600 + 400 + assert summary["files_modified"] == 1 # Only main.py From dc2ad33830b87a61304089e4ba209ff7385ddaba Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:44:02 -0700 Subject: [PATCH 7/8] fix: remove unused imports in test_execution_recording (#315) --- tests/core/test_execution_recording.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/core/test_execution_recording.py b/tests/core/test_execution_recording.py index 97b8b0c6..4badf87b 100644 --- a/tests/core/test_execution_recording.py +++ b/tests/core/test_execution_recording.py @@ -12,10 +12,10 @@ import pytest -from codeframe.adapters.llm.base import LLMResponse, ToolCall, ToolResult +from codeframe.adapters.llm.base import ToolCall, ToolResult from codeframe.adapters.llm.mock import MockProvider from codeframe.core.agent import AgentStatus -from codeframe.core.context import FileContent, TaskContext +from codeframe.core.context import TaskContext from codeframe.core.gates import GateCheck, GateResult, GateStatus from codeframe.core.replay import ( ExecutionRecorder, @@ -24,7 +24,7 @@ get_llm_interactions, ) from codeframe.core.tasks import Task, TaskStatus -from codeframe.core.workspace import Workspace, create_or_load_workspace +from codeframe.core.workspace import create_or_load_workspace pytestmark = pytest.mark.v2 From c46c439786ac3d1c1c6dd2459e1e1cd8215cd8a4 Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 17 Mar 2026 07:58:30 -0700 Subject: [PATCH 8/8] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20flush=20data=20loss,=20edit=5Ffile=20recording,=20f?= =?UTF-8?q?ormat=20shadow=20(#315)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix data loss in ExecutionRecorder.flush(): only clear buffers on success, retain data for retry on failure, log at WARNING not DEBUG - Fix edit_file recording: read actual file content after edit instead of capturing search/replace snippet (create_file still uses tool input) - Rename format parameter to output_format to avoid shadowing builtin --- codeframe/cli/app.py | 4 ++-- codeframe/core/react_agent.py | 17 ++++++++++++++--- codeframe/core/replay.py | 6 +++--- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/codeframe/cli/app.py b/codeframe/cli/app.py index 4fc0ec2a..c1e42934 100644 --- a/codeframe/cli/app.py +++ b/codeframe/cli/app.py @@ -3351,7 +3351,7 @@ def work_export_trace( "-w", help="Workspace path (defaults to current directory)", ), - format: str = typer.Option( + output_format: str = typer.Option( "json", "--format", "-f", @@ -3392,7 +3392,7 @@ def work_export_trace( console.print(f"[red]Error:[/red] No trace found for run '{run_id}'") raise typer.Exit(1) - if format == "json": + if output_format == "json": content = json.dumps(export_trace_json(trace), indent=2) else: content = export_trace_markdown(trace) diff --git a/codeframe/core/react_agent.py b/codeframe/core/react_agent.py index f60370ca..54e67168 100644 --- a/codeframe/core/react_agent.py +++ b/codeframe/core/react_agent.py @@ -575,13 +575,24 @@ def _react_loop(self, system_prompt: str) -> AgentStatus: ): _op_type = "create" if tc.name == "create_file" else "edit" _op_path = tc.input.get("path", "") - _op_after = tc.input.get("content") if tc.name == "create_file" else tc.input.get("new_text") - _op_before = tc.input.get("old_text") if tc.name == "edit_file" else None + if tc.name == "create_file": + # create_file input has the full content + _op_after = tc.input.get("content", "") + else: + # edit_file uses search/replace snippets — read the + # actual file content after the edit for accurate state. + _op_after = None + try: + _full_path = self.workspace.repo_path / _op_path + if _full_path.is_file(): + _op_after = _full_path.read_text(errors="replace") + except OSError: + pass self.execution_recorder.record_file_operation( step_id=_rec_step_id, op_type=_op_type, path=_op_path, - before=_op_before, + before=None, after=_op_after, ) diff --git a/codeframe/core/replay.py b/codeframe/core/replay.py index 3a7b4c68..096bf6ed 100644 --- a/codeframe/core/replay.py +++ b/codeframe/core/replay.py @@ -250,12 +250,12 @@ def flush(self) -> None: save_llm_interaction(self.workspace, interaction) for op in self._file_op_buffer: save_file_operation(self.workspace, op) - except Exception: - logger.debug("ExecutionRecorder flush failed", exc_info=True) - finally: + # Only clear on success — retain data for retry on failure self._step_buffer.clear() self._llm_buffer.clear() self._file_op_buffer.clear() + except Exception: + logger.warning("ExecutionRecorder flush failed — data retained for retry", exc_info=True) def _maybe_flush(self) -> None: """Auto-flush when buffer reaches threshold."""