Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions codeframe/cli/engines_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@
Usage:
codeframe engines list # Show available engines
codeframe engines check <name> # Check engine requirements
codeframe engines stats # Show engine performance stats
codeframe engines compare # Compare engine performance
"""

import json as _json
import logging
from pathlib import Path
from typing import Optional

import typer
from rich.console import Console
from rich.table import Table

from codeframe.core import engine_stats
from codeframe.core.workspace import get_workspace

logger = logging.getLogger(__name__)

console = Console()

engines_app = typer.Typer(
Expand Down Expand Up @@ -82,3 +94,112 @@ def engines_check(
else:
console.print(f"\n[red]Engine '{name}' has unmet requirements.[/red]")
raise typer.Exit(1)


def _get_current_workspace():
"""Get workspace from current working directory.

Returns:
Workspace object.

Raises:
typer.Exit: If no workspace is found.
"""
try:
return get_workspace(Path.cwd())
except (FileNotFoundError, ValueError):
console.print("[red]Error:[/red] No workspace found. Run 'cf init' first.")
raise typer.Exit(1)


def _compute_success_rate(metrics: dict[str, float]) -> float:
"""Compute success rate from engine metrics."""
attempted = metrics.get("tasks_attempted", 0)
completed = metrics.get("tasks_completed", 0)
if attempted == 0:
return 0.0
return round(100.0 * completed / attempted, 1)


def _format_duration(ms: float) -> str:
"""Format duration in human-readable form."""
if ms < 1000:
return f"{ms:.0f}ms"
return f"{ms / 1000:.1f}s"


def _build_stats_table(stats: dict[str, dict[str, float]], title: str = "Engine Stats") -> Table:
"""Build a Rich table from engine stats."""
table = Table(title=title)
table.add_column("Engine", style="cyan")
table.add_column("Tasks", justify="right")
table.add_column("Success %", justify="right", style="green")
table.add_column("Gate Pass %", justify="right")
table.add_column("Avg Duration", justify="right")
table.add_column("Total Tokens", justify="right")
table.add_column("Avg Tokens/Task", justify="right")

# Sort by success rate descending
sorted_engines = sorted(
stats.items(),
key=lambda item: _compute_success_rate(item[1]),
reverse=True,
)

for eng, metrics in sorted_engines:
success_rate = _compute_success_rate(metrics)
gate_rate = metrics.get("gate_pass_rate", 0.0)
avg_dur = metrics.get("avg_duration_ms", 0.0)
total_tok = metrics.get("total_tokens", 0.0)
avg_tok = metrics.get("avg_tokens_per_task", 0.0)
attempted = int(metrics.get("tasks_attempted", 0))

table.add_row(
eng,
str(attempted),
f"{success_rate}%",
f"{gate_rate}%",
_format_duration(avg_dur),
f"{int(total_tok):,}",
f"{int(avg_tok):,}",
)

return table


@engines_app.command("stats")
def stats(
engine: Optional[str] = typer.Option(None, "--engine", "-e", help="Filter by engine name"),
output_format: str = typer.Option("text", "--format", "-f", help="Output format: text or json"),
) -> None:
"""Show engine performance statistics."""
workspace = _get_current_workspace()
data = engine_stats.get_engine_stats(workspace, engine=engine)

if not data:
console.print("[yellow]No engine stats recorded yet.[/yellow]")
return

if output_format == "json":
console.print(_json.dumps(data, indent=2))
return

table = _build_stats_table(data, title="Engine Performance Stats")
console.print(table)


@engines_app.command("compare")
def compare() -> None:
"""Compare performance across all engines."""
workspace = _get_current_workspace()
data = engine_stats.get_engine_stats(workspace)

if not data:
console.print(
"[yellow]No engine stats recorded yet. "
"Run tasks with different engines to see comparison.[/yellow]"
)
return

table = _build_stats_table(data, title="Engine Comparison (sorted by success rate)")
console.print(table)
231 changes: 231 additions & 0 deletions codeframe/core/engine_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""Engine performance tracking for CodeFRAME.

Records per-run engine metrics and computes aggregate statistics
for comparing engine performance (react vs plan vs external adapters).

This module is headless - no FastAPI or HTTP dependencies.
"""

from typing import Optional

from codeframe.core.workspace import Workspace, get_db_connection, _utc_now


def record_run(
workspace: Workspace,
run_id: str,
engine: str,
task_id: str,
status: str,
duration_ms: Optional[int] = None,
tokens_used: int = 0,
gates_passed: Optional[int] = None,
self_corrections: int = 0,
) -> None:
"""Record an engine run in the run_engine_log table.

After inserting, recomputes aggregate stats for the engine.

Args:
workspace: Active workspace.
run_id: Unique run identifier.
engine: Engine name (e.g. "react", "plan").
task_id: Task that was executed.
status: Final run status (COMPLETED, FAILED, BLOCKED).
duration_ms: Execution duration in milliseconds.
tokens_used: Total LLM tokens consumed.
gates_passed: 1 if all gates passed, 0 if not, None if no gate data.
self_corrections: Number of self-correction attempts.
"""
now = _utc_now().isoformat()

conn = get_db_connection(workspace)
try:
conn.execute(
"INSERT INTO run_engine_log "
"(run_id, engine, task_id, workspace_id, status, duration_ms, "
"tokens_used, gates_passed, self_corrections, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
run_id,
engine,
task_id,
workspace.id,
status,
duration_ms,
tokens_used,
gates_passed,
self_corrections,
now,
),
)
# Recompute aggregates in the same connection to avoid TOCTOU issues
_update_aggregate_stats_conn(conn, workspace.id, engine)
conn.commit()
finally:
conn.close()


def _update_aggregate_stats(workspace: Workspace, engine: str) -> None:
"""Recompute aggregate metrics for an engine (opens its own connection).

Convenience wrapper for external callers (e.g., data seeding).
"""
conn = get_db_connection(workspace)
try:
_update_aggregate_stats_conn(conn, workspace.id, engine)
conn.commit()
finally:
conn.close()


def _update_aggregate_stats_conn(conn, ws_id: str, engine: str) -> None:
"""Recompute aggregate metrics using an existing connection.

Does NOT commit — caller is responsible for committing.
"""
now = _utc_now().isoformat()

cur = conn.cursor()

# Compute all metrics in one pass where possible
row = cur.execute(
"SELECT "
" COUNT(*), "
" COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END), "
" COUNT(CASE WHEN status = 'FAILED' THEN 1 END), "
" COUNT(CASE WHEN gates_passed = 1 THEN 1 END), "
" COUNT(CASE WHEN gates_passed IS NOT NULL THEN 1 END), "
" COUNT(CASE WHEN self_corrections > 0 THEN 1 END), "
" AVG(CASE WHEN duration_ms IS NOT NULL THEN duration_ms END), "
" SUM(tokens_used), "
" SUM(CASE WHEN status = 'COMPLETED' THEN tokens_used ELSE 0 END), "
" COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) "
"FROM run_engine_log "
"WHERE engine = ? AND workspace_id = ?",
(engine, ws_id),
).fetchone()

total = row[0]
completed = row[1]
failed = row[2]
gates_pass_count = row[3]
gates_total = row[4]
self_corr_count = row[5]
avg_duration = row[6]
total_tokens = row[7] or 0
completed_tokens = row[8] or 0
completed_count = row[9]

gate_pass_rate = (
100.0 * gates_pass_count / gates_total if gates_total > 0 else 0.0
)
self_correction_rate = (
100.0 * self_corr_count / total if total > 0 else 0.0
)
avg_tokens_per_task = (
completed_tokens / completed_count if completed_count > 0 else 0.0
)

metrics = {
"tasks_attempted": float(total),
"tasks_completed": float(completed),
"tasks_failed": float(failed),
"gate_pass_rate": round(gate_pass_rate, 2),
"self_correction_rate": round(self_correction_rate, 2),
"avg_duration_ms": round(avg_duration, 2) if avg_duration is not None else 0.0,
"total_tokens": float(total_tokens),
"avg_tokens_per_task": round(avg_tokens_per_task, 2),
}

for metric, value in metrics.items():
cur.execute(
"INSERT OR REPLACE INTO engine_stats "
"(workspace_id, engine, metric, value, updated_at) "
"VALUES (?, ?, ?, ?, ?)",
(ws_id, engine, metric, value, now),
)


def get_engine_stats(
workspace: Workspace, engine: Optional[str] = None
) -> dict[str, dict[str, float]]:
"""Get aggregate engine statistics.

Args:
workspace: Active workspace.
engine: Optional engine filter. If None, returns all engines.

Returns:
Dict keyed by engine name, each value is a dict of metric -> value.
Empty dict if no stats exist.
"""
conn = get_db_connection(workspace)
try:
if engine is not None:
rows = conn.execute(
"SELECT engine, metric, value FROM engine_stats "
"WHERE workspace_id = ? AND engine = ?",
(workspace.id, engine),
).fetchall()
else:
rows = conn.execute(
"SELECT engine, metric, value FROM engine_stats "
"WHERE workspace_id = ?",
(workspace.id,),
).fetchall()
finally:
conn.close()

result: dict[str, dict[str, float]] = {}
for eng, metric, value in rows:
if eng not in result:
result[eng] = {}
result[eng][metric] = value

return result


def get_run_log(
workspace: Workspace, engine: Optional[str] = None, limit: int = 100
) -> list[dict]:
"""Get raw per-run records from the run_engine_log table.

Args:
workspace: Active workspace.
engine: Optional engine filter.
limit: Maximum records to return (default 100).

Returns:
List of dicts, each representing a run record.
Ordered by created_at DESC.
"""
conn = get_db_connection(workspace)
try:
if engine is not None:
rows = conn.execute(
"SELECT run_id, engine, task_id, workspace_id, status, "
"duration_ms, tokens_used, gates_passed, self_corrections, "
"created_at FROM run_engine_log "
"WHERE workspace_id = ? AND engine = ? "
"ORDER BY created_at DESC LIMIT ?",
(workspace.id, engine, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT run_id, engine, task_id, workspace_id, status, "
"duration_ms, tokens_used, gates_passed, self_corrections, "
"created_at FROM run_engine_log "
"WHERE workspace_id = ? "
"ORDER BY created_at DESC LIMIT ?",
(workspace.id, limit),
).fetchall()
finally:
conn.close()

columns = [
"run_id", "engine", "task_id", "workspace_id", "status",
"duration_ms", "tokens_used", "gates_passed", "self_corrections",
"created_at",
]
return [dict(zip(columns, row)) for row in rows]
Loading
Loading