From de280019fda767e091597d344461198bd0997337 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 14 Mar 2026 12:23:30 -0700 Subject: [PATCH 1/3] feat(core): add engine performance tracking schema and stats module (#419) Add run_engine_log and engine_stats tables for tracking per-run engine metrics and computing aggregate performance statistics. Implements record_run, get_engine_stats, and get_run_log functions with 11 tests. --- codeframe/core/engine_stats.py | 225 +++++++++++++++++++++++++++++ codeframe/core/workspace.py | 68 +++++++++ tests/core/test_engine_stats.py | 241 ++++++++++++++++++++++++++++++++ 3 files changed, 534 insertions(+) create mode 100644 codeframe/core/engine_stats.py create mode 100644 tests/core/test_engine_stats.py diff --git a/codeframe/core/engine_stats.py b/codeframe/core/engine_stats.py new file mode 100644 index 00000000..e5f33c37 --- /dev/null +++ b/codeframe/core/engine_stats.py @@ -0,0 +1,225 @@ +"""Engine performance tracking for CodeFRAME. + +Records per-run engine metrics and computes aggregate statistics +for comparing engine performance (react vs plan vs external adapters). + +This module is headless - no FastAPI or HTTP dependencies. +""" + +from typing import Optional + +from codeframe.core.workspace import Workspace, get_db_connection, _utc_now + + +def record_run( + workspace: Workspace, + run_id: str, + engine: str, + task_id: str, + status: str, + duration_ms: Optional[int] = None, + tokens_used: int = 0, + gates_passed: Optional[int] = None, + self_corrections: int = 0, +) -> None: + """Record an engine run in the run_engine_log table. + + After inserting, recomputes aggregate stats for the engine. + + Args: + workspace: Active workspace. + run_id: Unique run identifier. + engine: Engine name (e.g. "react", "plan"). + task_id: Task that was executed. + status: Final run status (COMPLETED, FAILED, BLOCKED). + duration_ms: Execution duration in milliseconds. + tokens_used: Total LLM tokens consumed. + gates_passed: 1 if all gates passed, 0 if not, None if no gate data. + self_corrections: Number of self-correction attempts. + """ + now = _utc_now().isoformat() + + conn = get_db_connection(workspace) + try: + conn.execute( + "INSERT INTO run_engine_log " + "(run_id, engine, task_id, workspace_id, status, duration_ms, " + "tokens_used, gates_passed, self_corrections, created_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + run_id, + engine, + task_id, + workspace.id, + status, + duration_ms, + tokens_used, + gates_passed, + self_corrections, + now, + ), + ) + conn.commit() + finally: + conn.close() + + _update_aggregate_stats(workspace, engine) + + +def _update_aggregate_stats(workspace: Workspace, engine: str) -> None: + """Recompute aggregate metrics for an engine from run_engine_log. + + Upserts each metric into the engine_stats table. + """ + now = _utc_now().isoformat() + ws_id = workspace.id + + conn = get_db_connection(workspace) + try: + cur = conn.cursor() + + # Compute all metrics in one pass where possible + row = cur.execute( + "SELECT " + " COUNT(*), " + " COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END), " + " COUNT(CASE WHEN status = 'FAILED' THEN 1 END), " + " COUNT(CASE WHEN gates_passed = 1 THEN 1 END), " + " COUNT(CASE WHEN gates_passed IS NOT NULL THEN 1 END), " + " COUNT(CASE WHEN self_corrections > 0 THEN 1 END), " + " AVG(CASE WHEN duration_ms IS NOT NULL THEN duration_ms END), " + " SUM(tokens_used), " + " SUM(CASE WHEN status = 'COMPLETED' THEN tokens_used ELSE 0 END), " + " COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) " + "FROM run_engine_log " + "WHERE engine = ? AND workspace_id = ?", + (engine, ws_id), + ).fetchone() + + total = row[0] + completed = row[1] + failed = row[2] + gates_pass_count = row[3] + gates_total = row[4] + self_corr_count = row[5] + avg_duration = row[6] + total_tokens = row[7] or 0 + completed_tokens = row[8] or 0 + completed_count = row[9] + + gate_pass_rate = ( + 100.0 * gates_pass_count / gates_total if gates_total > 0 else 0.0 + ) + self_correction_rate = ( + 100.0 * self_corr_count / total if total > 0 else 0.0 + ) + avg_tokens_per_task = ( + completed_tokens / completed_count if completed_count > 0 else 0.0 + ) + + metrics = { + "tasks_attempted": float(total), + "tasks_completed": float(completed), + "tasks_failed": float(failed), + "gate_pass_rate": round(gate_pass_rate, 2), + "self_correction_rate": round(self_correction_rate, 2), + "avg_duration_ms": round(avg_duration, 2) if avg_duration is not None else 0.0, + "total_tokens": float(total_tokens), + "avg_tokens_per_task": round(avg_tokens_per_task, 2), + } + + for metric, value in metrics.items(): + cur.execute( + "INSERT OR REPLACE INTO engine_stats " + "(workspace_id, engine, metric, value, updated_at) " + "VALUES (?, ?, ?, ?, ?)", + (ws_id, engine, metric, value, now), + ) + + conn.commit() + finally: + conn.close() + + +def get_engine_stats( + workspace: Workspace, engine: Optional[str] = None +) -> dict[str, dict[str, float]]: + """Get aggregate engine statistics. + + Args: + workspace: Active workspace. + engine: Optional engine filter. If None, returns all engines. + + Returns: + Dict keyed by engine name, each value is a dict of metric -> value. + Empty dict if no stats exist. + """ + conn = get_db_connection(workspace) + try: + if engine is not None: + rows = conn.execute( + "SELECT engine, metric, value FROM engine_stats " + "WHERE workspace_id = ? AND engine = ?", + (workspace.id, engine), + ).fetchall() + else: + rows = conn.execute( + "SELECT engine, metric, value FROM engine_stats " + "WHERE workspace_id = ?", + (workspace.id,), + ).fetchall() + finally: + conn.close() + + result: dict[str, dict[str, float]] = {} + for eng, metric, value in rows: + if eng not in result: + result[eng] = {} + result[eng][metric] = value + + return result + + +def get_run_log( + workspace: Workspace, engine: Optional[str] = None, limit: int = 100 +) -> list[dict]: + """Get raw per-run records from the run_engine_log table. + + Args: + workspace: Active workspace. + engine: Optional engine filter. + limit: Maximum records to return (default 100). + + Returns: + List of dicts, each representing a run record. + Ordered by created_at DESC. + """ + conn = get_db_connection(workspace) + try: + if engine is not None: + rows = conn.execute( + "SELECT run_id, engine, task_id, workspace_id, status, " + "duration_ms, tokens_used, gates_passed, self_corrections, " + "created_at FROM run_engine_log " + "WHERE workspace_id = ? AND engine = ? " + "ORDER BY created_at DESC LIMIT ?", + (workspace.id, engine, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT run_id, engine, task_id, workspace_id, status, " + "duration_ms, tokens_used, gates_passed, self_corrections, " + "created_at FROM run_engine_log " + "WHERE workspace_id = ? " + "ORDER BY created_at DESC LIMIT ?", + (workspace.id, limit), + ).fetchall() + finally: + conn.close() + + columns = [ + "run_id", "engine", "task_id", "workspace_id", "status", + "duration_ms", "tokens_used", "gates_passed", "self_corrections", + "created_at", + ] + return [dict(zip(columns, row)) for row in rows] diff --git a/codeframe/core/workspace.py b/codeframe/core/workspace.py index a4659b52..a517609b 100644 --- a/codeframe/core/workspace.py +++ b/codeframe/core/workspace.py @@ -246,6 +246,34 @@ def _init_database(db_path: Path) -> None: ) """) + # Engine performance tracking: per-run log + cursor.execute(""" + CREATE TABLE IF NOT EXISTS run_engine_log ( + run_id TEXT PRIMARY KEY, + engine TEXT NOT NULL, + task_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, + status TEXT NOT NULL, + duration_ms INTEGER, + tokens_used INTEGER DEFAULT 0, + gates_passed INTEGER, + self_corrections INTEGER DEFAULT 0, + created_at TEXT NOT NULL + ) + """) + + # Engine performance tracking: aggregate stats + cursor.execute(""" + CREATE TABLE IF NOT EXISTS engine_stats ( + workspace_id TEXT NOT NULL, + engine TEXT NOT NULL, + metric TEXT NOT NULL, + value REAL NOT NULL DEFAULT 0.0, + updated_at TEXT NOT NULL, + PRIMARY KEY (workspace_id, engine, metric) + ) + """) + # Create indexes for common queries cursor.execute("CREATE INDEX IF NOT EXISTS idx_tasks_workspace ON tasks(workspace_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)") @@ -261,6 +289,8 @@ def _init_database(db_path: Path) -> None: cursor.execute("CREATE INDEX IF NOT EXISTS idx_run_logs_task ON run_logs(task_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_diagnostic_reports_task ON diagnostic_reports(task_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_diagnostic_reports_run ON diagnostic_reports(run_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_run_engine_log_ws_engine ON run_engine_log(workspace_id, engine)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_engine_stats_ws ON engine_stats(workspace_id, engine)") conn.commit() conn.close() @@ -449,6 +479,44 @@ def _ensure_schema_upgrades(db_path: Path) -> None: cursor.execute("CREATE INDEX IF NOT EXISTS idx_diagnostic_reports_run ON diagnostic_reports(run_id)") conn.commit() + # Add run_engine_log table for engine performance tracking + cursor.execute(""" + CREATE TABLE IF NOT EXISTS run_engine_log ( + run_id TEXT PRIMARY KEY, + engine TEXT NOT NULL, + task_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, + status TEXT NOT NULL, + duration_ms INTEGER, + tokens_used INTEGER DEFAULT 0, + gates_passed INTEGER, + self_corrections INTEGER DEFAULT 0, + created_at TEXT NOT NULL + ) + """) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_run_engine_log_ws_engine " + "ON run_engine_log(workspace_id, engine)" + ) + conn.commit() + + # Add engine_stats table for aggregate engine metrics + cursor.execute(""" + CREATE TABLE IF NOT EXISTS engine_stats ( + workspace_id TEXT NOT NULL, + engine TEXT NOT NULL, + metric TEXT NOT NULL, + value REAL NOT NULL DEFAULT 0.0, + updated_at TEXT NOT NULL, + PRIMARY KEY (workspace_id, engine, metric) + ) + """) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_engine_stats_ws " + "ON engine_stats(workspace_id, engine)" + ) + conn.commit() + conn.close() diff --git a/tests/core/test_engine_stats.py b/tests/core/test_engine_stats.py new file mode 100644 index 00000000..977f5245 --- /dev/null +++ b/tests/core/test_engine_stats.py @@ -0,0 +1,241 @@ +"""Tests for engine_stats module. + +Tests engine performance tracking: recording runs, computing aggregate stats, +and retrieving run logs. +""" + +import uuid + +import pytest + +from codeframe.core.workspace import create_or_load_workspace, get_db_connection + + +pytestmark = pytest.mark.v2 + + +@pytest.fixture +def workspace(tmp_path): + """Create a temporary workspace for testing.""" + repo_path = tmp_path / "test_repo" + repo_path.mkdir() + return create_or_load_workspace(repo_path) + + +@pytest.fixture +def run_id(): + return str(uuid.uuid4()) + + +@pytest.fixture +def task_id(): + return str(uuid.uuid4()) + + +def _record(workspace, engine, status, duration_ms=None, tokens_used=0, + gates_passed=None, self_corrections=0): + """Helper to record a run with a fresh run_id.""" + from codeframe.core.engine_stats import record_run + + rid = str(uuid.uuid4()) + tid = str(uuid.uuid4()) + record_run( + workspace=workspace, + run_id=rid, + engine=engine, + task_id=tid, + status=status, + duration_ms=duration_ms, + tokens_used=tokens_used, + gates_passed=gates_passed, + self_corrections=self_corrections, + ) + return rid + + +class TestRecordRun: + """Tests for record_run function.""" + + def test_record_run_inserts_row(self, workspace, run_id, task_id): + """Verify a row exists in run_engine_log after recording.""" + from codeframe.core.engine_stats import record_run + + record_run( + workspace=workspace, + run_id=run_id, + engine="react", + task_id=task_id, + status="COMPLETED", + duration_ms=5000, + tokens_used=1200, + gates_passed=1, + self_corrections=0, + ) + + conn = get_db_connection(workspace) + row = conn.execute( + "SELECT run_id, engine, task_id, status, duration_ms, tokens_used, " + "gates_passed, self_corrections FROM run_engine_log WHERE run_id = ?", + (run_id,), + ).fetchone() + conn.close() + + assert row is not None + assert row[0] == run_id + assert row[1] == "react" + assert row[2] == task_id + assert row[3] == "COMPLETED" + assert row[4] == 5000 + assert row[5] == 1200 + assert row[6] == 1 + assert row[7] == 0 + + def test_record_run_updates_aggregates(self, workspace, run_id, task_id): + """Verify engine_stats table is updated after recording a run.""" + from codeframe.core.engine_stats import record_run + + record_run( + workspace=workspace, + run_id=run_id, + engine="react", + task_id=task_id, + status="COMPLETED", + duration_ms=5000, + tokens_used=1200, + gates_passed=1, + self_corrections=0, + ) + + conn = get_db_connection(workspace) + rows = conn.execute( + "SELECT metric, value FROM engine_stats " + "WHERE workspace_id = ? AND engine = ?", + (workspace.id, "react"), + ).fetchall() + conn.close() + + metrics = {r[0]: r[1] for r in rows} + assert metrics["tasks_attempted"] == 1.0 + assert metrics["tasks_completed"] == 1.0 + assert metrics["tasks_failed"] == 0.0 + + +class TestGetEngineStats: + """Tests for get_engine_stats function.""" + + def test_get_engine_stats_single_engine(self, workspace): + """Verify stats structure for a single engine.""" + from codeframe.core.engine_stats import get_engine_stats + + _record(workspace, "react", "COMPLETED", duration_ms=3000, tokens_used=500) + + stats = get_engine_stats(workspace) + + assert "react" in stats + assert "tasks_attempted" in stats["react"] + assert "tasks_completed" in stats["react"] + assert stats["react"]["tasks_attempted"] == 1.0 + assert stats["react"]["tasks_completed"] == 1.0 + + def test_get_engine_stats_multiple_engines(self, workspace): + """Verify stats returned for multiple engines.""" + from codeframe.core.engine_stats import get_engine_stats + + _record(workspace, "react", "COMPLETED", duration_ms=3000, tokens_used=500) + _record(workspace, "plan", "COMPLETED", duration_ms=4000, tokens_used=800) + + stats = get_engine_stats(workspace) + + assert "react" in stats + assert "plan" in stats + + def test_get_engine_stats_filter_by_engine(self, workspace): + """Verify filtering to a specific engine.""" + from codeframe.core.engine_stats import get_engine_stats + + _record(workspace, "react", "COMPLETED", duration_ms=3000, tokens_used=500) + _record(workspace, "plan", "COMPLETED", duration_ms=4000, tokens_used=800) + + stats = get_engine_stats(workspace, engine="react") + + assert "react" in stats + assert "plan" not in stats + + def test_get_engine_stats_empty(self, workspace): + """Returns empty dict when no data exists.""" + from codeframe.core.engine_stats import get_engine_stats + + stats = get_engine_stats(workspace) + + assert stats == {} + + +class TestGetRunLog: + """Tests for get_run_log function.""" + + def test_get_run_log(self, workspace): + """Verify run log returns correct records.""" + from codeframe.core.engine_stats import get_run_log + + rid1 = _record(workspace, "react", "COMPLETED", duration_ms=3000) + rid2 = _record(workspace, "react", "FAILED", duration_ms=1000) + + logs = get_run_log(workspace) + + assert len(logs) == 2 + run_ids = {log["run_id"] for log in logs} + assert rid1 in run_ids + assert rid2 in run_ids + + def test_get_run_log_with_limit(self, workspace): + """Verify limit works.""" + from codeframe.core.engine_stats import get_run_log + + for _ in range(5): + _record(workspace, "react", "COMPLETED", duration_ms=1000) + + logs = get_run_log(workspace, limit=3) + + assert len(logs) == 3 + + +class TestAggregateCalculations: + """Tests for aggregate metric calculations.""" + + def test_aggregate_gate_pass_rate(self, workspace): + """Verify correct gate pass rate calculation.""" + from codeframe.core.engine_stats import get_engine_stats + + # 2 passed, 1 failed out of 3 with gate data + _record(workspace, "react", "COMPLETED", gates_passed=1) + _record(workspace, "react", "COMPLETED", gates_passed=1) + _record(workspace, "react", "FAILED", gates_passed=0) + + stats = get_engine_stats(workspace) + + # 2/3 * 100 = ~66.67 + assert abs(stats["react"]["gate_pass_rate"] - 66.67) < 0.1 + + def test_aggregate_self_correction_rate(self, workspace): + """Verify correct self-correction rate calculation.""" + from codeframe.core.engine_stats import get_engine_stats + + _record(workspace, "react", "COMPLETED", self_corrections=2) + _record(workspace, "react", "COMPLETED", self_corrections=0) + _record(workspace, "react", "COMPLETED", self_corrections=1) + + stats = get_engine_stats(workspace) + + # 2 out of 3 had self_corrections > 0 = 66.67% + assert abs(stats["react"]["self_correction_rate"] - 66.67) < 0.1 + + def test_aggregate_handles_zero_completed(self, workspace): + """No division by zero when no tasks completed.""" + from codeframe.core.engine_stats import get_engine_stats + + _record(workspace, "react", "FAILED", tokens_used=500) + + stats = get_engine_stats(workspace) + + assert stats["react"]["tasks_completed"] == 0.0 + assert stats["react"]["avg_tokens_per_task"] == 0.0 From 16fe673f081a8b87dc295ccfbd863619d8103bdd Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 14 Mar 2026 12:26:32 -0700 Subject: [PATCH 2/3] feat(core): wire engine stats recording into runtime and add CLI commands (#419 steps 3-4) - Record engine performance metrics (duration, tokens, status) after each execute_agent() call via engine_stats.record_run(), with graceful failure handling so stats errors never crash execution - Add `cf engines stats` command with text/JSON output and engine filter - Add `cf engines compare` command showing all engines sorted by success rate - 12 new tests: 4 for runtime integration, 8 for CLI commands --- codeframe/cli/engines_commands.py | 121 +++++++++++++++++ codeframe/core/runtime.py | 25 ++++ tests/cli/test_engines_stats_commands.py | 143 ++++++++++++++++++++ tests/core/test_runtime_engine_stats.py | 164 +++++++++++++++++++++++ 4 files changed, 453 insertions(+) create mode 100644 tests/cli/test_engines_stats_commands.py create mode 100644 tests/core/test_runtime_engine_stats.py diff --git a/codeframe/cli/engines_commands.py b/codeframe/cli/engines_commands.py index 8fa989e4..df6424b8 100644 --- a/codeframe/cli/engines_commands.py +++ b/codeframe/cli/engines_commands.py @@ -3,12 +3,24 @@ Usage: codeframe engines list # Show available engines codeframe engines check # Check engine requirements + codeframe engines stats # Show engine performance stats + codeframe engines compare # Compare engine performance """ +import json as _json +import logging +from pathlib import Path +from typing import Optional + import typer from rich.console import Console from rich.table import Table +from codeframe.core import engine_stats +from codeframe.core.workspace import get_workspace + +logger = logging.getLogger(__name__) + console = Console() engines_app = typer.Typer( @@ -82,3 +94,112 @@ def engines_check( else: console.print(f"\n[red]Engine '{name}' has unmet requirements.[/red]") raise typer.Exit(1) + + +def _get_current_workspace(): + """Get workspace from current working directory. + + Returns: + Workspace object. + + Raises: + typer.Exit: If no workspace is found. + """ + try: + return get_workspace(Path.cwd()) + except (FileNotFoundError, ValueError): + console.print("[red]Error:[/red] No workspace found. Run 'cf init' first.") + raise typer.Exit(1) + + +def _compute_success_rate(metrics: dict[str, float]) -> float: + """Compute success rate from engine metrics.""" + attempted = metrics.get("tasks_attempted", 0) + completed = metrics.get("tasks_completed", 0) + if attempted == 0: + return 0.0 + return round(100.0 * completed / attempted, 1) + + +def _format_duration(ms: float) -> str: + """Format duration in human-readable form.""" + if ms < 1000: + return f"{ms:.0f}ms" + return f"{ms / 1000:.1f}s" + + +def _build_stats_table(stats: dict[str, dict[str, float]], title: str = "Engine Stats") -> Table: + """Build a Rich table from engine stats.""" + table = Table(title=title) + table.add_column("Engine", style="cyan") + table.add_column("Tasks", justify="right") + table.add_column("Success %", justify="right", style="green") + table.add_column("Gate Pass %", justify="right") + table.add_column("Avg Duration", justify="right") + table.add_column("Total Tokens", justify="right") + table.add_column("Avg Tokens/Task", justify="right") + + # Sort by success rate descending + sorted_engines = sorted( + stats.items(), + key=lambda item: _compute_success_rate(item[1]), + reverse=True, + ) + + for eng, metrics in sorted_engines: + success_rate = _compute_success_rate(metrics) + gate_rate = metrics.get("gate_pass_rate", 0.0) + avg_dur = metrics.get("avg_duration_ms", 0.0) + total_tok = metrics.get("total_tokens", 0.0) + avg_tok = metrics.get("avg_tokens_per_task", 0.0) + attempted = int(metrics.get("tasks_attempted", 0)) + + table.add_row( + eng, + str(attempted), + f"{success_rate}%", + f"{gate_rate}%", + _format_duration(avg_dur), + f"{int(total_tok):,}", + f"{int(avg_tok):,}", + ) + + return table + + +@engines_app.command("stats") +def stats( + engine: Optional[str] = typer.Option(None, "--engine", "-e", help="Filter by engine name"), + format: str = typer.Option("text", "--format", "-f", help="Output format: text or json"), +) -> None: + """Show engine performance statistics.""" + workspace = _get_current_workspace() + data = engine_stats.get_engine_stats(workspace, engine=engine) + + if not data: + console.print("[yellow]No engine stats recorded yet.[/yellow]") + return + + if format == "json": + console.print(_json.dumps(data, indent=2)) + return + + table = _build_stats_table(data, title="Engine Performance Stats") + console.print(table) + + +@engines_app.command("compare") +def compare() -> None: + """Compare performance across all engines.""" + workspace = _get_current_workspace() + data = engine_stats.get_engine_stats(workspace) + + if not data: + console.print( + "[yellow]No engine stats recorded yet. " + "Run tasks with different engines to see comparison.[/yellow]" + ) + return + + table = _build_stats_table(data, title="Engine Comparison (sorted by success rate)") + console.print(table) diff --git a/codeframe/core/runtime.py b/codeframe/core/runtime.py index d55607b3..6f2ee471 100644 --- a/codeframe/core/runtime.py +++ b/codeframe/core/runtime.py @@ -672,6 +672,9 @@ def execute_agent( workspace_path=str(workspace.repo_path), ) + import time as _time_mod + _perf_start_ms = int(_time_mod.monotonic() * 1000) + try: # Execute before_task hook (aborts on failure) if env_config and hook_ctx: @@ -802,6 +805,28 @@ def on_adapter_event(event: AdapterEvent) -> None: elif state.status == AgentStatus.FAILED: fail_run(workspace, run.id) + # Record engine performance metrics + try: + from codeframe.core import engine_stats + _perf_duration_ms = int(_time_mod.monotonic() * 1000) - _perf_start_ms + _perf_tokens = 0 + if hasattr(result, 'token_usage') and result.token_usage: + _perf_tokens = result.token_usage.total_tokens + + engine_stats.record_run( + workspace=workspace, + run_id=run.id, + engine=engine, + task_id=run.task_id, + status=agent_status.value.upper(), + duration_ms=_perf_duration_ms, + tokens_used=_perf_tokens, + gates_passed=None, + self_corrections=0, + ) + except Exception: + logger.debug("Engine stats recording failed", exc_info=True) + # Execute after_task hooks (non-blocking, after state is persisted) if env_config and hook_ctx: after_hook = None diff --git a/tests/cli/test_engines_stats_commands.py b/tests/cli/test_engines_stats_commands.py new file mode 100644 index 00000000..8a141e86 --- /dev/null +++ b/tests/cli/test_engines_stats_commands.py @@ -0,0 +1,143 @@ +"""Tests for CLI engine stats and compare commands. + +Tests the `cf engines stats` and `cf engines compare` commands. +""" + +import json +from unittest.mock import patch + +import pytest +from typer.testing import CliRunner + +from codeframe.cli.app import app + +pytestmark = pytest.mark.v2 + +runner = CliRunner() + + +SAMPLE_STATS = { + "react": { + "tasks_attempted": 10.0, + "tasks_completed": 8.0, + "tasks_failed": 2.0, + "gate_pass_rate": 75.0, + "self_correction_rate": 30.0, + "avg_duration_ms": 5000.0, + "total_tokens": 12000.0, + "avg_tokens_per_task": 1500.0, + }, + "plan": { + "tasks_attempted": 5.0, + "tasks_completed": 3.0, + "tasks_failed": 2.0, + "gate_pass_rate": 60.0, + "self_correction_rate": 40.0, + "avg_duration_ms": 8000.0, + "total_tokens": 9000.0, + "avg_tokens_per_task": 3000.0, + }, +} + + +class TestEnginesStats: + """Tests for 'cf engines stats' command.""" + + def test_engines_stats_no_workspace(self, tmp_path, monkeypatch): + """Should show error when no workspace exists.""" + monkeypatch.chdir(tmp_path) + result = runner.invoke(app, ["engines", "stats"]) + assert result.exit_code == 1 + assert "No workspace found" in result.output or "Error" in result.output + + @patch("codeframe.cli.engines_commands.engine_stats") + @patch("codeframe.cli.engines_commands.get_workspace") + def test_engines_stats_empty(self, mock_get_ws, mock_es, tmp_path, monkeypatch): + """Should show 'no stats' message when empty.""" + monkeypatch.chdir(tmp_path) + mock_get_ws.return_value = "fake-ws" + mock_es.get_engine_stats.return_value = {} + + result = runner.invoke(app, ["engines", "stats"]) + assert result.exit_code == 0 + assert "No engine stats recorded yet" in result.output + + @patch("codeframe.cli.engines_commands.engine_stats") + @patch("codeframe.cli.engines_commands.get_workspace") + def test_engines_stats_with_data(self, mock_get_ws, mock_es, tmp_path, monkeypatch): + """Should show Rich table with engine data.""" + monkeypatch.chdir(tmp_path) + mock_get_ws.return_value = "fake-ws" + mock_es.get_engine_stats.return_value = SAMPLE_STATS + + result = runner.invoke(app, ["engines", "stats"]) + assert result.exit_code == 0 + assert "react" in result.output + assert "80.0" in result.output # success rate: 8/10 * 100 + + @patch("codeframe.cli.engines_commands.engine_stats") + @patch("codeframe.cli.engines_commands.get_workspace") + def test_engines_stats_json_format(self, mock_get_ws, mock_es, tmp_path, monkeypatch): + """Should output valid JSON when --format json.""" + monkeypatch.chdir(tmp_path) + mock_get_ws.return_value = "fake-ws" + mock_es.get_engine_stats.return_value = SAMPLE_STATS + + result = runner.invoke(app, ["engines", "stats", "--format", "json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert "react" in data + assert "plan" in data + + @patch("codeframe.cli.engines_commands.engine_stats") + @patch("codeframe.cli.engines_commands.get_workspace") + def test_engines_stats_filter_engine(self, mock_get_ws, mock_es, tmp_path, monkeypatch): + """Should pass engine filter to get_engine_stats.""" + monkeypatch.chdir(tmp_path) + mock_get_ws.return_value = "fake-ws" + mock_es.get_engine_stats.return_value = { + "react": SAMPLE_STATS["react"], + } + + result = runner.invoke(app, ["engines", "stats", "--engine", "react"]) + assert result.exit_code == 0 + mock_es.get_engine_stats.assert_called_once_with("fake-ws", engine="react") + + +class TestEnginesCompare: + """Tests for 'cf engines compare' command.""" + + def test_engines_compare_no_workspace(self, tmp_path, monkeypatch): + """Should show error when no workspace exists.""" + monkeypatch.chdir(tmp_path) + result = runner.invoke(app, ["engines", "compare"]) + assert result.exit_code == 1 + + @patch("codeframe.cli.engines_commands.engine_stats") + @patch("codeframe.cli.engines_commands.get_workspace") + def test_engines_compare_empty(self, mock_get_ws, mock_es, tmp_path, monkeypatch): + """Should show message when no stats exist.""" + monkeypatch.chdir(tmp_path) + mock_get_ws.return_value = "fake-ws" + mock_es.get_engine_stats.return_value = {} + + result = runner.invoke(app, ["engines", "compare"]) + assert result.exit_code == 0 + assert "No engine stats recorded yet" in result.output + + @patch("codeframe.cli.engines_commands.engine_stats") + @patch("codeframe.cli.engines_commands.get_workspace") + def test_engines_compare_with_data(self, mock_get_ws, mock_es, tmp_path, monkeypatch): + """Should show comparison table sorted by success rate.""" + monkeypatch.chdir(tmp_path) + mock_get_ws.return_value = "fake-ws" + mock_es.get_engine_stats.return_value = SAMPLE_STATS + + result = runner.invoke(app, ["engines", "compare"]) + assert result.exit_code == 0 + assert "react" in result.output + assert "plan" in result.output + # react (80%) should appear before plan (60%) in sorted output + react_pos = result.output.index("react") + plan_pos = result.output.index("plan") + assert react_pos < plan_pos diff --git a/tests/core/test_runtime_engine_stats.py b/tests/core/test_runtime_engine_stats.py new file mode 100644 index 00000000..b91507a5 --- /dev/null +++ b/tests/core/test_runtime_engine_stats.py @@ -0,0 +1,164 @@ +"""Tests for engine stats recording in execute_agent. + +Verifies that runtime.execute_agent records engine performance metrics +via engine_stats.record_run after each agent execution. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +pytestmark = pytest.mark.v2 + + +def _make_agent_result(status="completed", tokens=1200, duration_ms=5000): + """Create a mock AgentResult with the given parameters.""" + from codeframe.core.adapters.agent_adapter import AdapterTokenUsage, AgentResult + + token_usage = AdapterTokenUsage(input_tokens=tokens // 2, output_tokens=tokens // 2) + return AgentResult( + status=status, + output="test output", + token_usage=token_usage, + duration_ms=duration_ms, + ) + + +def _make_workspace_and_run(tmp_path): + """Create a real workspace and a mock Run for testing.""" + from codeframe.core.workspace import create_or_load_workspace + from codeframe.core.runtime import Run, RunStatus + + repo = tmp_path / "test_repo" + repo.mkdir() + workspace = create_or_load_workspace(repo) + + run = Run( + id="run-001", + workspace_id=workspace.id, + task_id="task-001", + status=RunStatus.RUNNING, + started_at="2026-03-14T00:00:00Z", + completed_at=None, + ) + return workspace, run + + +class TestExecuteAgentRecordsEngineStats: + """Verify that execute_agent calls engine_stats.record_run.""" + + @patch("codeframe.core.engine_stats.record_run") + def test_execute_agent_records_engine_stats(self, mock_record_run, tmp_path): + """record_run should be called with correct args after a successful run.""" + from codeframe.core.runtime import execute_agent + + workspace, run = _make_workspace_and_run(tmp_path) + result = _make_agent_result(status="completed", tokens=1200, duration_ms=5000) + + # Patch the adapter to return our controlled result + with ( + patch("codeframe.core.engine_registry.get_builtin_adapter") as mock_adapter_factory, + patch("codeframe.core.runtime.complete_run"), + patch("codeframe.core.runtime.fail_run"), + patch("codeframe.core.runtime.block_run"), + patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-test"}), + ): + mock_adapter = MagicMock() + mock_adapter.run.return_value = result + mock_adapter_factory.return_value = mock_adapter + + execute_agent(workspace, run, engine="react") + + # Verify record_run was called + mock_record_run.assert_called_once() + call_kwargs = mock_record_run.call_args[1] + + assert call_kwargs["workspace"] is workspace + assert call_kwargs["run_id"] == "run-001" + assert call_kwargs["engine"] == "react" + assert call_kwargs["task_id"] == "task-001" + assert call_kwargs["status"] == "COMPLETED" + assert call_kwargs["tokens_used"] == 1200 + assert isinstance(call_kwargs["duration_ms"], int) + assert call_kwargs["duration_ms"] > 0 + + @patch("codeframe.core.engine_stats.record_run") + def test_execute_agent_records_stats_on_failure(self, mock_record_run, tmp_path): + """record_run should be called even when the agent fails.""" + from codeframe.core.runtime import execute_agent + + workspace, run = _make_workspace_and_run(tmp_path) + result = _make_agent_result(status="failed", tokens=800, duration_ms=3000) + + with ( + patch("codeframe.core.engine_registry.get_builtin_adapter") as mock_adapter_factory, + patch("codeframe.core.runtime.complete_run"), + patch("codeframe.core.runtime.fail_run"), + patch("codeframe.core.runtime.block_run"), + patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-test"}), + ): + mock_adapter = MagicMock() + mock_adapter.run.return_value = result + mock_adapter_factory.return_value = mock_adapter + + execute_agent(workspace, run, engine="react") + + mock_record_run.assert_called_once() + call_kwargs = mock_record_run.call_args[1] + assert call_kwargs["status"] == "FAILED" + + @patch("codeframe.core.engine_stats.record_run") + def test_execute_agent_handles_no_token_usage(self, mock_record_run, tmp_path): + """record_run should handle result with no token_usage gracefully.""" + from codeframe.core.adapters.agent_adapter import AgentResult + from codeframe.core.runtime import execute_agent + + workspace, run = _make_workspace_and_run(tmp_path) + result = AgentResult(status="completed", output="done", token_usage=None) + + with ( + patch("codeframe.core.engine_registry.get_builtin_adapter") as mock_adapter_factory, + patch("codeframe.core.runtime.complete_run"), + patch("codeframe.core.runtime.fail_run"), + patch("codeframe.core.runtime.block_run"), + patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-test"}), + ): + mock_adapter = MagicMock() + mock_adapter.run.return_value = result + mock_adapter_factory.return_value = mock_adapter + + execute_agent(workspace, run, engine="react") + + mock_record_run.assert_called_once() + call_kwargs = mock_record_run.call_args[1] + assert call_kwargs["tokens_used"] == 0 + + +class TestExecuteAgentStatsFailureGraceful: + """Verify that engine_stats failures don't crash execute_agent.""" + + @patch("codeframe.core.engine_stats.record_run", side_effect=Exception("DB error")) + def test_execute_agent_stats_failure_doesnt_crash(self, mock_record_run, tmp_path): + """execute_agent should complete normally even if stats recording fails.""" + from codeframe.core.agent import AgentStatus + from codeframe.core.runtime import execute_agent + + workspace, run = _make_workspace_and_run(tmp_path) + result = _make_agent_result(status="completed", tokens=1200) + + with ( + patch("codeframe.core.engine_registry.get_builtin_adapter") as mock_adapter_factory, + patch("codeframe.core.runtime.complete_run"), + patch("codeframe.core.runtime.fail_run"), + patch("codeframe.core.runtime.block_run"), + patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-test"}), + ): + mock_adapter = MagicMock() + mock_adapter.run.return_value = result + mock_adapter_factory.return_value = mock_adapter + + state = execute_agent(workspace, run, engine="react") + + # Should still return COMPLETED despite stats failure + assert state.status == AgentStatus.COMPLETED + mock_record_run.assert_called_once() From 7eb3e132ff1261fac4d22d30eeb9f03dc27eb70c Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 14 Mar 2026 12:56:56 -0700 Subject: [PATCH 3/3] fix: address review feedback on engine performance tracking (#419) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use single DB connection in record_run (fixes TOCTOU/double connection) - Move engine_stats import to module level in runtime.py - Upgrade stats failure logging from debug to warning - Rename format→output_format to avoid shadowing builtin - Keep .upper() on status values for SQL convention consistency --- codeframe/cli/engines_commands.py | 4 +- codeframe/core/engine_stats.py | 140 ++++++++++++++++-------------- codeframe/core/runtime.py | 5 +- 3 files changed, 77 insertions(+), 72 deletions(-) diff --git a/codeframe/cli/engines_commands.py b/codeframe/cli/engines_commands.py index df6424b8..f1f9ac15 100644 --- a/codeframe/cli/engines_commands.py +++ b/codeframe/cli/engines_commands.py @@ -170,7 +170,7 @@ def _build_stats_table(stats: dict[str, dict[str, float]], title: str = "Engine @engines_app.command("stats") def stats( engine: Optional[str] = typer.Option(None, "--engine", "-e", help="Filter by engine name"), - format: str = typer.Option("text", "--format", "-f", help="Output format: text or json"), + output_format: str = typer.Option("text", "--format", "-f", help="Output format: text or json"), ) -> None: """Show engine performance statistics.""" workspace = _get_current_workspace() @@ -180,7 +180,7 @@ def stats( console.print("[yellow]No engine stats recorded yet.[/yellow]") return - if format == "json": + if output_format == "json": console.print(_json.dumps(data, indent=2)) return diff --git a/codeframe/core/engine_stats.py b/codeframe/core/engine_stats.py index e5f33c37..2564f987 100644 --- a/codeframe/core/engine_stats.py +++ b/codeframe/core/engine_stats.py @@ -59,88 +59,94 @@ def record_run( now, ), ) + # Recompute aggregates in the same connection to avoid TOCTOU issues + _update_aggregate_stats_conn(conn, workspace.id, engine) conn.commit() finally: conn.close() - _update_aggregate_stats(workspace, engine) - def _update_aggregate_stats(workspace: Workspace, engine: str) -> None: - """Recompute aggregate metrics for an engine from run_engine_log. + """Recompute aggregate metrics for an engine (opens its own connection). - Upserts each metric into the engine_stats table. + Convenience wrapper for external callers (e.g., data seeding). """ - now = _utc_now().isoformat() - ws_id = workspace.id - conn = get_db_connection(workspace) try: - cur = conn.cursor() - - # Compute all metrics in one pass where possible - row = cur.execute( - "SELECT " - " COUNT(*), " - " COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END), " - " COUNT(CASE WHEN status = 'FAILED' THEN 1 END), " - " COUNT(CASE WHEN gates_passed = 1 THEN 1 END), " - " COUNT(CASE WHEN gates_passed IS NOT NULL THEN 1 END), " - " COUNT(CASE WHEN self_corrections > 0 THEN 1 END), " - " AVG(CASE WHEN duration_ms IS NOT NULL THEN duration_ms END), " - " SUM(tokens_used), " - " SUM(CASE WHEN status = 'COMPLETED' THEN tokens_used ELSE 0 END), " - " COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) " - "FROM run_engine_log " - "WHERE engine = ? AND workspace_id = ?", - (engine, ws_id), - ).fetchone() - - total = row[0] - completed = row[1] - failed = row[2] - gates_pass_count = row[3] - gates_total = row[4] - self_corr_count = row[5] - avg_duration = row[6] - total_tokens = row[7] or 0 - completed_tokens = row[8] or 0 - completed_count = row[9] - - gate_pass_rate = ( - 100.0 * gates_pass_count / gates_total if gates_total > 0 else 0.0 - ) - self_correction_rate = ( - 100.0 * self_corr_count / total if total > 0 else 0.0 - ) - avg_tokens_per_task = ( - completed_tokens / completed_count if completed_count > 0 else 0.0 - ) - - metrics = { - "tasks_attempted": float(total), - "tasks_completed": float(completed), - "tasks_failed": float(failed), - "gate_pass_rate": round(gate_pass_rate, 2), - "self_correction_rate": round(self_correction_rate, 2), - "avg_duration_ms": round(avg_duration, 2) if avg_duration is not None else 0.0, - "total_tokens": float(total_tokens), - "avg_tokens_per_task": round(avg_tokens_per_task, 2), - } - - for metric, value in metrics.items(): - cur.execute( - "INSERT OR REPLACE INTO engine_stats " - "(workspace_id, engine, metric, value, updated_at) " - "VALUES (?, ?, ?, ?, ?)", - (ws_id, engine, metric, value, now), - ) - + _update_aggregate_stats_conn(conn, workspace.id, engine) conn.commit() finally: conn.close() +def _update_aggregate_stats_conn(conn, ws_id: str, engine: str) -> None: + """Recompute aggregate metrics using an existing connection. + + Does NOT commit — caller is responsible for committing. + """ + now = _utc_now().isoformat() + + cur = conn.cursor() + + # Compute all metrics in one pass where possible + row = cur.execute( + "SELECT " + " COUNT(*), " + " COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END), " + " COUNT(CASE WHEN status = 'FAILED' THEN 1 END), " + " COUNT(CASE WHEN gates_passed = 1 THEN 1 END), " + " COUNT(CASE WHEN gates_passed IS NOT NULL THEN 1 END), " + " COUNT(CASE WHEN self_corrections > 0 THEN 1 END), " + " AVG(CASE WHEN duration_ms IS NOT NULL THEN duration_ms END), " + " SUM(tokens_used), " + " SUM(CASE WHEN status = 'COMPLETED' THEN tokens_used ELSE 0 END), " + " COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) " + "FROM run_engine_log " + "WHERE engine = ? AND workspace_id = ?", + (engine, ws_id), + ).fetchone() + + total = row[0] + completed = row[1] + failed = row[2] + gates_pass_count = row[3] + gates_total = row[4] + self_corr_count = row[5] + avg_duration = row[6] + total_tokens = row[7] or 0 + completed_tokens = row[8] or 0 + completed_count = row[9] + + gate_pass_rate = ( + 100.0 * gates_pass_count / gates_total if gates_total > 0 else 0.0 + ) + self_correction_rate = ( + 100.0 * self_corr_count / total if total > 0 else 0.0 + ) + avg_tokens_per_task = ( + completed_tokens / completed_count if completed_count > 0 else 0.0 + ) + + metrics = { + "tasks_attempted": float(total), + "tasks_completed": float(completed), + "tasks_failed": float(failed), + "gate_pass_rate": round(gate_pass_rate, 2), + "self_correction_rate": round(self_correction_rate, 2), + "avg_duration_ms": round(avg_duration, 2) if avg_duration is not None else 0.0, + "total_tokens": float(total_tokens), + "avg_tokens_per_task": round(avg_tokens_per_task, 2), + } + + for metric, value in metrics.items(): + cur.execute( + "INSERT OR REPLACE INTO engine_stats " + "(workspace_id, engine, metric, value, updated_at) " + "VALUES (?, ?, ?, ?, ?)", + (ws_id, engine, metric, value, now), + ) + + def get_engine_stats( workspace: Workspace, engine: Optional[str] = None ) -> dict[str, dict[str, float]]: diff --git a/codeframe/core/runtime.py b/codeframe/core/runtime.py index 6f2ee471..67e340fe 100644 --- a/codeframe/core/runtime.py +++ b/codeframe/core/runtime.py @@ -12,7 +12,7 @@ from enum import Enum from typing import TYPE_CHECKING, Optional -from codeframe.core import events, tasks +from codeframe.core import engine_stats, events, tasks from codeframe.core.state_machine import TaskStatus from codeframe.core.workspace import Workspace, get_db_connection @@ -807,7 +807,6 @@ def on_adapter_event(event: AdapterEvent) -> None: # Record engine performance metrics try: - from codeframe.core import engine_stats _perf_duration_ms = int(_time_mod.monotonic() * 1000) - _perf_start_ms _perf_tokens = 0 if hasattr(result, 'token_usage') and result.token_usage: @@ -825,7 +824,7 @@ def on_adapter_event(event: AdapterEvent) -> None: self_corrections=0, ) except Exception: - logger.debug("Engine stats recording failed", exc_info=True) + logger.warning("Engine stats recording failed", exc_info=True) # Execute after_task hooks (non-blocking, after state is persisted) if env_config and hook_ctx: