diff --git a/examples/storage_layer_demo/README.md b/examples/storage_layer_demo/README.md new file mode 100644 index 00000000..32a9bd89 --- /dev/null +++ b/examples/storage_layer_demo/README.md @@ -0,0 +1,91 @@ +# Storage layer demo — `iaf index`, `iaf list`, `iaf rank` + +End-to-end demo of the new tiered backtest storage layer +(epic [#540](https://github.com/coding-kitties/investing-algorithm-framework/issues/540) — phase 2). + +It shows how to: + +1. Save a directory of `.iafbt` backtest bundles. +2. Build a SQLite Tier-1 index over them with `iaf index` (or + `build_index` from Python). +3. Query / sort / filter the index with `iaf list` and `iaf rank` + (or the equivalent `list_index` / `rank_index` Python helpers) + without ever decoding the per-run Parquet metric blobs. +4. Drop into raw SQL when you need a custom report. + +## Why this matters + +Previously, comparing 50 walk-forward backtest variants meant +opening every `.iafbt` bundle (each with multi-MB Parquet metric +blobs) just to read scalar headline metrics like `sharpe_ratio` or +`max_drawdown`. + +The new Tier-1 SQLite index gives you a single file (`index.sqlite`) +with one row per bundle and every scalar from +`BacktestSummaryMetrics` promoted to its own column. Filtering and +ranking 12,500 bundles becomes a sub-100 ms SQL query. + +The `.iafbt` bundles themselves remain the source of truth — the +index can always be rebuilt from them with `iaf index`. + +## Run it + +From the repo root: + +```bash +source .venv/bin/activate +python examples/storage_layer_demo/demo.py +``` + +The script will: + +1. Create a temp directory and write 6 `.iafbt` bundles with + varying synthetic Sharpe / Sortino / drawdown values. +2. Build `index.sqlite` over them. +3. Print the equivalent `iaf` CLI commands you could run by hand. +4. Run `list_index` / `rank_index` / a raw SQL query and print + the formatted tables. +5. Open the top-ranked bundle and print its full backtest report + (this is the only step that decodes per-run Parquet metric blobs). +6. Walk the index in rank order and print a one-line summary per + bundle straight out of the SQLite index — no bundle is opened. +7. Iterate every bundle in rank order and print a full per-bundle + report so you can scan _all_ backtests at a glance. + +## CLI cheatsheet + +```bash +# Build the index +iaf index ./my-backtests/ + +# Top 5 by Sharpe +iaf rank ./my-backtests/ --by sharpe_ratio -n 5 + +# Same, but only among bundles with > 50 trades +iaf rank ./my-backtests/ \ + --by sortino_ratio \ + --where "summary_number_of_trades > 50" \ + -n 10 + +# Full listing with custom columns + JSON output +iaf list ./my-backtests/ \ + --sort calmar_ratio \ + --columns "algorithm_id,tag,summary_calmar_ratio,summary_max_drawdown" \ + --json + +# Raw SQL — anything sqlite3 can do +sqlite3 ./my-backtests/index.sqlite \ + "SELECT algorithm_id, summary_sharpe_ratio + FROM backtest_index + WHERE summary_max_drawdown > -0.1 + ORDER BY summary_sharpe_ratio DESC LIMIT 5;" +``` + +## Where this is going + +Phase 3 of #540 introduces a `BacktestStore` Protocol with two +implementations: `LocalDirStore` (the current behavior) and +`LocalTieredStore` (Tier-1 SQLite + Tier-2 Parquet datasets + +Tier-3 content-addressed chunks). The `iaf list` / `iaf rank` +commands shown here are forward-compatible — they will work +unchanged against any store backing the same Tier-1 schema. diff --git a/examples/storage_layer_demo/demo.py b/examples/storage_layer_demo/demo.py new file mode 100644 index 00000000..d2f38f7e --- /dev/null +++ b/examples/storage_layer_demo/demo.py @@ -0,0 +1,289 @@ +"""End-to-end demo of the new tiered backtest storage layer. + +Epic #540 phase 2: + +* save a directory of ``.iafbt`` bundles +* build a SQLite Tier-1 index over them (``iaf index``) +* query / sort / filter the index (``iaf list`` / ``iaf rank``) + without ever decoding the per-run Parquet metric blobs +* drop into raw SQL when needed + +Run from the repo root:: + + source .venv/bin/activate + python examples/storage_layer_demo/demo.py +""" + +from __future__ import annotations + +import os +import sqlite3 +import sys +import tempfile +from pathlib import Path + +from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT +from investing_algorithm_framework.domain.backtesting.bundle import ( + save_bundle, +) +from investing_algorithm_framework.cli.index_command import ( + build_index, + list_index, + rank_index, + format_table, +) + + +# Directory-format fixture shipped with the test suite. We use it +# only as a *template* — we re-save N copies under different +# algorithm_ids and Sharpe / Sortino / drawdown values so the demo +# has something interesting to rank. +REPO_ROOT = Path(__file__).resolve().parents[2] +TEMPLATE = ( + REPO_ROOT + / "tests" + / "resources" + / "backtest_reports_for_testing" + / "test_algorithm_backtest" +) + + +def _print_section(title: str) -> None: + bar = "=" * 72 + print(f"\n{bar}\n {title}\n{bar}") + + +def _print_backtest_report(bt: Backtest) -> None: + """Render a compact, fixture-tolerant report for a backtest. + + We deliberately read fields that exist on every ``.iafbt`` bundle + (identity, run dates, summary metrics) instead of relying on the + full :func:`pretty_print_backtest` helper, which assumes a + populated :class:`BacktestMetrics` per run. + """ + s = bt.backtest_summary + runs = bt.backtest_runs or [] + print(f" algorithm_id : {bt.algorithm_id}") + print(f" tag : {bt.tag}") + if runs: + first, last = runs[0], runs[-1] + print( + f" date range : {first.backtest_start_date} -> " + f"{last.backtest_end_date}" + ) + print(f" number of runs : {len(runs)}") + + if s is None: + print(" (no summary metrics available)") + return + + def _row(label: str, value, fmt: str = "") -> None: + if value is None: + rendered = "n/a" + elif fmt: + rendered = format(value, fmt) + else: + rendered = str(value) + print(f" {label:<22}: {rendered}") + + print(" --- summary metrics ---") + _row("sharpe_ratio", s.sharpe_ratio, ".4f") + _row("sortino_ratio", s.sortino_ratio, ".4f") + _row("calmar_ratio", s.calmar_ratio, ".4f") + _row( + "total_net_gain_pct", + None if s.total_net_gain_percentage is None + else s.total_net_gain_percentage * 100, + ".2f", + ) + _row( + "max_drawdown_pct", + None if s.max_drawdown is None else s.max_drawdown * 100, + ".2f", + ) + _row("number_of_trades", s.number_of_trades) + _row( + "win_rate_pct", + None if s.win_rate is None else s.win_rate * 100, + ".2f", + ) + + +def _seed_bundles(out_dir: Path, n: int = 6) -> None: + """Write ``n`` synthetic ``.iafbt`` bundles into *out_dir*.""" + if not TEMPLATE.is_dir(): + sys.exit( + f"Could not find the template fixture at {TEMPLATE}. " + "Run this demo from inside a git checkout of the " + "investing-algorithm-framework repository." + ) + + template = Backtest.open(str(TEMPLATE)) + + # Three "strategy families", two variants each, with distinct + # risk-adjusted profiles so ranking is meaningful. + profiles = [ + ("momentum_v1", 1.85, 2.40, -0.08, 145, 0.61), + ("momentum_v2", 1.42, 1.91, -0.12, 132, 0.58), + ("mean_revert_v1", 0.95, 1.20, -0.18, 88, 0.54), + ("mean_revert_v2", 1.10, 1.45, -0.15, 102, 0.55), + ("breakout_v1", 0.42, 0.55, -0.31, 41, 0.48), + ("breakout_v2", -0.20, -0.25, -0.42, 27, 0.39), + ][:n] + + for algo_id, sharpe, sortino, mdd, n_trades, win_rate in profiles: + bt = Backtest.from_dict(template.to_dict()) + bt.algorithm_id = algo_id + bt.tag = "demo" + if bt.backtest_summary is not None: + s = bt.backtest_summary + s.sharpe_ratio = sharpe + s.sortino_ratio = sortino + s.max_drawdown = mdd + s.number_of_trades = n_trades + s.win_rate = win_rate + # Calmar = CAGR / |max_drawdown|; fake it for the demo. + s.calmar_ratio = round(abs(sharpe / mdd), 3) + s.total_net_gain_percentage = round(sharpe * 0.12, 4) + save_bundle( + bt, str(out_dir / f"{algo_id}{BUNDLE_EXT}"), + ) + + +def main() -> None: + work = Path(tempfile.mkdtemp(prefix="iaf-storage-demo-")) + print(f"Working directory: {work}") + + # ------------------------------------------------------------------ + # 1. Save bundles + # ------------------------------------------------------------------ + _print_section("1. Save synthetic .iafbt bundles") + _seed_bundles(work) + for p in sorted(work.glob(f"*{BUNDLE_EXT}")): + print(f" {p.name}") + + # ------------------------------------------------------------------ + # 2. Build the Tier-1 SQLite index + # ------------------------------------------------------------------ + _print_section("2. Build the SQLite Tier-1 index") + print(f"$ iaf index {work}") + index_path = build_index(str(work), show_progress=False) + print(f" -> wrote {index_path}") + print(f" -> file size: {os.path.getsize(index_path)} bytes") + + # ------------------------------------------------------------------ + # 3. iaf list — sort by Sharpe, top 4 + # ------------------------------------------------------------------ + _print_section("3. iaf list — sort by sharpe_ratio (top 4)") + print(f"$ iaf list {work} --sort sharpe_ratio -n 4\n") + rows = list_index( + str(work), sort_by="sharpe_ratio", limit=4, + ) + print(format_table(rows)) + + # ------------------------------------------------------------------ + # 4. iaf rank — risk-adjusted, filtered + # ------------------------------------------------------------------ + _print_section( + "4. iaf rank — by sortino_ratio, with WHERE filter" + ) + print( + f'$ iaf rank {work} --by sortino_ratio ' + f'--where "summary_number_of_trades > 50" -n 5\n' + ) + rows = rank_index( + str(work), + by="sortino_ratio", + where="summary_number_of_trades > 50", + limit=5, + ) + print(format_table(rows)) + + # ------------------------------------------------------------------ + # 5. Raw SQL — anything sqlite3 can do + # ------------------------------------------------------------------ + _print_section("5. Raw SQL — custom report") + sql = """ + SELECT algorithm_id, + summary_sharpe_ratio AS sharpe, + summary_max_drawdown AS max_dd, + summary_calmar_ratio AS calmar + FROM backtest_index + WHERE summary_max_drawdown > -0.20 + ORDER BY summary_calmar_ratio DESC + LIMIT 5 + """ + print(f"$ sqlite3 {index_path} ''\n{sql.strip()}\n") + conn = sqlite3.connect(index_path) + conn.row_factory = sqlite3.Row + cur = conn.execute(sql) + rows = [dict(r) for r in cur.fetchall()] + conn.close() + print(format_table(rows)) + + # ------------------------------------------------------------------ + # 6. Open the winner's full backtest report + # ------------------------------------------------------------------ + _print_section( + "6. Open the top-ranked bundle and print its full report" + ) + top = rank_index(str(work), by="sharpe_ratio", limit=1)[0] + winner_path = work / top["bundle_path"] + print( + f"Top by sharpe_ratio: {top['algorithm_id']} " + f"(sharpe={top['summary_sharpe_ratio']})\n" + f"Loading full bundle: {winner_path}\n" + ) + # The index gave us a scalar-only view; for the full report we + # open the .iafbt bundle (this is the only step that decodes the + # per-run Parquet metric blobs). + winner_bt = Backtest.open(str(winner_path)) + _print_backtest_report(winner_bt) + + # ------------------------------------------------------------------ + # 7. Iterate the index and print a one-line report per backtest + # ------------------------------------------------------------------ + _print_section( + "7. Iterate the index and print a one-line summary per bundle" + ) + print( + "Walking the SQLite index in rank order — no bundle is opened " + "for these summaries.\n" + ) + all_rows = list_index(str(work), sort_by="sharpe_ratio") + for i, r in enumerate(all_rows, start=1): + print( + f" {i}. {r['algorithm_id']:<14} " + f"sharpe={r['summary_sharpe_ratio']:>6.2f} " + f"return={r['summary_total_net_gain_percentage'] * 100:>6.2f}% " + f"max_dd={r['summary_max_drawdown']:>6.2%} " + f"trades={r['summary_number_of_trades']:>3}" + ) + + # ------------------------------------------------------------------ + # 8. Full report for every backtest in rank order + # ------------------------------------------------------------------ + _print_section( + "8. Full report per backtest (open each bundle in rank order)" + ) + for i, r in enumerate(all_rows, start=1): + bundle_path = work / r["bundle_path"] + bt = Backtest.open(str(bundle_path)) + print(f"\n--- [{i}] {r['algorithm_id']} ".ljust(72, "-")) + _print_backtest_report(bt) + + # ------------------------------------------------------------------ + # Wrap up + # ------------------------------------------------------------------ + _print_section("Done") + print( + "Bundles + index left in:\n" + f" {work}\n" + "Try the CLI directly:\n" + f" iaf list {work} --sort calmar_ratio --json\n" + f" iaf rank {work} --by sharpe_ratio -n 3\n" + ) + + +if __name__ == "__main__": + main() diff --git a/investing_algorithm_framework/cli/cli.py b/investing_algorithm_framework/cli/cli.py index cc83f41e..988f7d6c 100644 --- a/investing_algorithm_framework/cli/cli.py +++ b/investing_algorithm_framework/cli/cli.py @@ -343,7 +343,13 @@ def migrate_backtests_cmd( "--no-progress", is_flag=True, default=False, help="Suppress the progress bar.", ) -def index_cmd(directory, output, absolute_paths, no_progress): +@click.option( + "--rebuild", is_flag=True, default=False, + help="Force a full rebuild instead of incremental refresh " + "(default: skip bundles whose mtime+size match an existing " + "row).", +) +def index_cmd(directory, output, absolute_paths, no_progress, rebuild): """Build a SQLite Tier-1 index over a folder of ``.iafbt`` bundles. The resulting ``index.sqlite`` file holds one row per bundle with @@ -364,8 +370,150 @@ def index_cmd(directory, output, absolute_paths, no_progress): output=output, relative_paths=not absolute_paths, show_progress=not no_progress, + incremental=not rebuild, ) click.echo(f"Wrote SQLite index to {out}") cli.add_command(index_cmd) + + +@click.command(name="list") +@click.argument( + "index_path", + type=click.Path(exists=True, file_okay=True, dir_okay=True), +) +@click.option( + "--sort", "sort_by", default=None, + help="Metric / column to sort by (e.g. sharpe_ratio, " + "summary_total_net_gain_percentage, algorithm_id). " + "Bare metric names are auto-prefixed with 'summary_'.", +) +@click.option( + "--asc", "ascending", is_flag=True, default=False, + help="Sort ascending (default: descending / best-first).", +) +@click.option( + "--limit", "-n", type=int, default=None, + help="Maximum number of rows to print.", +) +@click.option( + "--where", default=None, + help='Raw SQL WHERE fragment (no leading WHERE). ' + 'Example: --where "summary_sharpe_ratio > 1.0 AND tag = \'demo\'"', +) +@click.option( + "--columns", default=None, + help="Comma-separated list of columns to print " + "(default: a curated set of identity + summary metrics).", +) +@click.option( + "--json", "as_json", is_flag=True, default=False, + help="Emit JSON instead of a text table.", +) +def list_cmd( + index_path, sort_by, ascending, limit, where, columns, as_json, +): + """List rows from a SQLite Tier-1 index built by ``iaf index``. + + ``INDEX_PATH`` may be either an ``index.sqlite`` file or the + directory it lives in. + + Examples: + + iaf list ./backtests --sort sharpe_ratio -n 20 + + iaf list index.sqlite --where "summary_max_drawdown > -0.1" \\ + --sort sortino_ratio + """ + from .index_command import list_index, format_table + cols = ( + [c.strip() for c in columns.split(",")] if columns else None + ) + rows = list_index( + index_path=index_path, + sort_by=sort_by, + ascending=ascending, + limit=limit, + where=where, + columns=cols, + ) + if as_json: + import json as _json + click.echo(_json.dumps(rows, indent=2, default=str)) + else: + click.echo(format_table(rows, columns=cols)) + + +cli.add_command(list_cmd) + + +@click.command(name="rank") +@click.argument( + "index_path", + type=click.Path(exists=True, file_okay=True, dir_okay=True), +) +@click.option( + "--by", "by", required=True, + help="Metric to rank by (e.g. sharpe_ratio, sortino_ratio, " + "calmar_ratio, profit_factor). Bare metric names are " + "auto-prefixed with 'summary_'.", +) +@click.option( + "--limit", "-n", type=int, default=10, + help="Number of rows to return (default: 10).", +) +@click.option( + "--asc", "ascending", is_flag=True, default=False, + help="Rank ascending (e.g. for max_drawdown where smaller is " + "better the user typically wants ascending order on the " + "magnitude). Default: descending / best-first.", +) +@click.option( + "--where", default=None, + help='Optional SQL WHERE fragment to filter candidates before ' + 'ranking. Example: --where "tag = \'walk-forward\'".', +) +@click.option( + "--columns", default=None, + help="Comma-separated list of columns to print " + "(default: identity + key risk-adjusted metrics).", +) +@click.option( + "--json", "as_json", is_flag=True, default=False, + help="Emit JSON instead of a text table.", +) +def rank_cmd(index_path, by, limit, ascending, where, columns, as_json): + """Rank backtests in a Tier-1 index by a single metric. + + Sugar over ``iaf list --sort --limit `` with a column set + geared toward strategy comparison (Sharpe / Sortino / Calmar / + return / drawdown). + + Examples: + + iaf rank ./backtests --by sharpe_ratio -n 5 + + iaf rank index.sqlite --by profit_factor \\ + --where "summary_number_of_trades > 50" + """ + from .index_command import rank_index, format_table + cols = ( + [c.strip() for c in columns.split(",")] if columns else None + ) + rows = rank_index( + index_path=index_path, + by=by, + limit=limit, + where=where, + columns=cols, + ascending=ascending, + ) + if as_json: + import json as _json + click.echo(_json.dumps(rows, indent=2, default=str)) + else: + click.echo(format_table(rows, columns=cols)) + + +cli.add_command(rank_cmd) diff --git a/investing_algorithm_framework/cli/index_command.py b/investing_algorithm_framework/cli/index_command.py index 224e079f..4553a6e5 100644 --- a/investing_algorithm_framework/cli/index_command.py +++ b/investing_algorithm_framework/cli/index_command.py @@ -10,13 +10,16 @@ from __future__ import annotations import logging +from dataclasses import fields as dc_fields from pathlib import Path -from typing import Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Sequence from investing_algorithm_framework.domain import ( Backtest, BUNDLE_EXT, ) +from investing_algorithm_framework.domain.backtesting \ + .backtest_summary_metrics import BacktestSummaryMetrics from investing_algorithm_framework.services.backtest_index import ( SqliteBacktestIndex, ) @@ -37,6 +40,7 @@ def build_index( output: Optional[str] = None, relative_paths: bool = True, show_progress: bool = False, + incremental: bool = True, ) -> str: """Build (or refresh) a SQLite Tier-1 index over *directory*. @@ -48,6 +52,10 @@ def build_index( *directory* so the index file stays portable when the folder is moved/renamed. show_progress: emit a tqdm progress bar. + incremental: if True (default), open the existing index (if + any) and skip bundles whose ``(mtime, size)`` already + match the on-disk file. Pass ``False`` to force a full + rebuild. Returns: Absolute path of the SQLite file that was written. @@ -67,21 +75,36 @@ def build_index( except ImportError: # pragma: no cover - tqdm is a dep pbar = None - index = SqliteBacktestIndex.create(out) + if incremental and out.is_file(): + index = SqliteBacktestIndex.open(out) + else: + index = SqliteBacktestIndex.create(out) n_ok = 0 n_err = 0 + n_skipped = 0 try: for path in paths: try: - bt = Backtest.open(str(path), summary_only=True) + stat = path.stat() bundle_path = ( str(path.relative_to(src)) if relative_paths else str(path) ) + if incremental and index.is_up_to_date( + bundle_path, stat.st_mtime_ns, stat.st_size, + ): + n_skipped += 1 + continue + + bt = Backtest.open(str(path), summary_only=True) row = bt.index_row(bundle_path=bundle_path) - index.upsert(row) + index.upsert( + row, + bundle_mtime_ns=stat.st_mtime_ns, + bundle_size=stat.st_size, + ) n_ok += 1 - except Exception as exc: # noqa: BLE001 \u2014 best-effort scan + except Exception as exc: # noqa: BLE001 — best-effort scan logger.warning("failed to index %s: %s", path, exc) n_err += 1 finally: @@ -93,6 +116,181 @@ def build_index( index.close() logger.info( - "Indexed %d bundle(s) into %s (%d failed)", n_ok, out, n_err, + "Indexed %d bundle(s) into %s (%d skipped, %d failed)", + n_ok, out, n_skipped, n_err, ) return str(out) + + +# --------------------------------------------------------------------------- +# list / rank helpers +# --------------------------------------------------------------------------- +DEFAULT_LIST_COLUMNS: Sequence[str] = ( + "algorithm_id", + "tag", + "summary_sharpe_ratio", + "summary_total_net_gain_percentage", + "summary_max_drawdown", + "summary_number_of_trades", + "bundle_path", +) + +DEFAULT_RANK_COLUMNS: Sequence[str] = ( + "algorithm_id", + "tag", + "summary_sharpe_ratio", + "summary_sortino_ratio", + "summary_calmar_ratio", + "summary_total_net_gain_percentage", + "summary_max_drawdown", + "bundle_path", +) + +_SUMMARY_FIELD_NAMES = frozenset( + f.name for f in dc_fields(BacktestSummaryMetrics) +) + + +def _resolve_index_path(path: str) -> Path: + """Accept either a directory (look for ``index.sqlite`` inside) or + a SQLite file path; return the resolved file path.""" + p = Path(path) + if p.is_dir(): + candidate = p / DEFAULT_INDEX_NAME + if not candidate.is_file(): + raise FileNotFoundError( + f"No {DEFAULT_INDEX_NAME} in {p}. Run `iaf index {p}` " + f"first or pass the SQLite file directly." + ) + return candidate + if not p.is_file(): + raise FileNotFoundError(f"Index file not found: {p}") + return p + + +def _resolve_metric_column(name: str) -> str: + """Map a user-friendly metric name to a real SQL column. + + Accepts both ``sharpe_ratio`` and ``summary_sharpe_ratio``; bare + column names (``algorithm_id``, ``tag``, ...) are returned as-is. + """ + if name.startswith("summary_"): + return name + if name in _SUMMARY_FIELD_NAMES: + return f"summary_{name}" + return name + + +def _row_to_flat_dict(row, columns: Sequence[str]) -> Dict[str, Any]: + """Project a :class:`BacktestIndexRow` onto the requested columns.""" + out: Dict[str, Any] = {} + summary = row.summary_metrics + for col in columns: + if col.startswith("summary_"): + field = col[len("summary_"):] + out[col] = ( + getattr(summary, field, None) if summary is not None + else None + ) + else: + out[col] = getattr(row, col, None) + return out + + +def list_index( + index_path: str, + sort_by: Optional[str] = None, + ascending: bool = False, + limit: Optional[int] = None, + where: Optional[str] = None, + columns: Optional[Sequence[str]] = None, +) -> List[Dict[str, Any]]: + """Query an index file and return matching rows as plain dicts. + + Args: + index_path: Path to ``index.sqlite`` or a directory holding it. + sort_by: Column to sort by (e.g. ``"sharpe_ratio"`` or + ``"summary_sharpe_ratio"``). ``None`` keeps insertion order. + ascending: Sort direction; default descending (best-first). + limit: Maximum number of rows to return. + where: Optional raw SQL ``WHERE`` fragment (no leading + ``WHERE`` keyword). Use ``?`` placeholders only via + :meth:`SqliteBacktestIndex.query` directly if you need + bind parameters. + columns: Columns to project; defaults to + :data:`DEFAULT_LIST_COLUMNS`. + + Returns: + A list of column-name → value dicts, ready for tabulation. + """ + cols = list(columns) if columns else list(DEFAULT_LIST_COLUMNS) + resolved = _resolve_index_path(index_path) + + base_where = f"({where})" if where else "1=1" + fragment = base_where + if sort_by: + sort_col = _resolve_metric_column(sort_by) + direction = "ASC" if ascending else "DESC" + # NULLs always come last regardless of direction so the table + # is useful even when some bundles are missing the metric. + fragment += ( + f' ORDER BY "{sort_col}" IS NULL, "{sort_col}" {direction}' + ) + if limit is not None: + fragment += f" LIMIT {int(limit)}" + + with SqliteBacktestIndex.open(resolved) as idx: + rows = idx.query(where=fragment) + return [_row_to_flat_dict(r, cols) for r in rows] + + +def rank_index( + index_path: str, + by: str, + limit: int = 10, + where: Optional[str] = None, + columns: Optional[Sequence[str]] = None, + ascending: bool = False, +) -> List[Dict[str, Any]]: + """Rank bundles by a metric. Thin wrapper around :func:`list_index` + with a different default column set and a required ``by`` arg.""" + cols = list(columns) if columns else list(DEFAULT_RANK_COLUMNS) + return list_index( + index_path, + sort_by=by, + ascending=ascending, + limit=limit, + where=where, + columns=cols, + ) + + +def format_table( + rows: List[Dict[str, Any]], + columns: Optional[Sequence[str]] = None, +) -> str: + """Render rows as a fixed-width text table (no external deps).""" + if not rows: + return "(no rows)" + cols = list(columns) if columns else list(rows[0].keys()) + + def _fmt(v: Any) -> str: + if v is None: + return "" + if isinstance(v, float): + return f"{v:.4f}" + return str(v) + + cells = [[_fmt(r.get(c)) for c in cols] for r in rows] + widths = [ + max(len(c), *(len(row[i]) for row in cells)) + for i, c in enumerate(cols) + ] + sep = " " + header = sep.join(c.ljust(widths[i]) for i, c in enumerate(cols)) + rule = sep.join("-" * w for w in widths) + body = "\n".join( + sep.join(row[i].ljust(widths[i]) for i in range(len(cols))) + for row in cells + ) + return f"{header}\n{rule}\n{body}" diff --git a/investing_algorithm_framework/domain/backtesting/backtest.py b/investing_algorithm_framework/domain/backtesting/backtest.py index 09fbb6c2..4a46fedc 100644 --- a/investing_algorithm_framework/domain/backtesting/backtest.py +++ b/investing_algorithm_framework/domain/backtesting/backtest.py @@ -263,6 +263,22 @@ def get_backtest_summary(self) -> Union[BacktestSummaryMetrics, None]: """ return self.backtest_summary + def scalar_summary(self) -> Union[BacktestSummaryMetrics, None]: + """Alias for :meth:`get_backtest_summary` — the typed scalar + roll-up named per epic #540 phase 1. + + The Tier-1 storage layer (``BacktestIndexRow`` and the SQLite + index) builds on this scalar view: it can be obtained from a + bundle opened with ``summary_only=True`` without decoding any + Parquet metric blobs, making list / rank workloads cheap. + + Returns: + Union[BacktestSummaryMetrics, None]: The cross-window + scalar metrics, or ``None`` if they were never + computed for this backtest. + """ + return self.backtest_summary + def to_dict(self) -> dict: """ Convert the Backtest instance to a dictionary. diff --git a/investing_algorithm_framework/services/backtest_index/sqlite_index.py b/investing_algorithm_framework/services/backtest_index/sqlite_index.py index c6a93709..745b836a 100644 --- a/investing_algorithm_framework/services/backtest_index/sqlite_index.py +++ b/investing_algorithm_framework/services/backtest_index/sqlite_index.py @@ -51,7 +51,7 @@ # Bumped on any additive schema change. Old files are upgraded # in-place by :meth:`SqliteBacktestIndex._migrate`. -SCHEMA_VERSION = 1 +SCHEMA_VERSION = 2 # Columns of BacktestIndexRow that map 1:1 to typed SQL columns. # (parameters / strategy_ids are emitted as JSON text columns; the @@ -68,6 +68,10 @@ ("strategy_ids_json", "TEXT"), ("extras_json", "TEXT"), ("summary_extras_json", "TEXT"), + # Provenance for incremental indexing — skip bundles whose + # mtime + size match an existing row (epic #540 phase 2). + ("bundle_mtime_ns", "INTEGER"), + ("bundle_size", "INTEGER"), ) @@ -190,9 +194,22 @@ def _migrate(conn: sqlite3.Connection) -> None: # ------------------------------------------------------------------ # Writes # ------------------------------------------------------------------ - def upsert(self, row: BacktestIndexRow) -> None: + def upsert( + self, + row: BacktestIndexRow, + bundle_mtime_ns: Optional[int] = None, + bundle_size: Optional[int] = None, + ) -> None: """Insert or replace a single row, keyed by ``bundle_path``. + Args: + row: the typed row to write. + bundle_mtime_ns: optional file mtime in nanoseconds; used + by :meth:`is_up_to_date` to support incremental + indexing (epic #540 phase 2). + bundle_size: optional file size in bytes, used together + with ``bundle_mtime_ns`` for the freshness check. + Raises: ValueError: if ``row.bundle_path`` is None (it is the PK). """ @@ -202,6 +219,8 @@ def upsert(self, row: BacktestIndexRow) -> None: "upsert (used as the primary key)." ) record = self._row_to_record(row) + record["bundle_mtime_ns"] = bundle_mtime_ns + record["bundle_size"] = bundle_size cols = list(record.keys()) placeholders = ", ".join("?" for _ in cols) col_list = ", ".join(f'"{c}"' for c in cols) @@ -212,6 +231,28 @@ def upsert(self, row: BacktestIndexRow) -> None: ) self._conn.commit() + def is_up_to_date( + self, bundle_path: str, mtime_ns: int, size: int, + ) -> bool: + """Return True if the index already has a row for *bundle_path* + whose ``(mtime_ns, size)`` matches the on-disk file. + + Used by :func:`build_index` to skip bundles that have not + changed since the last index build (epic #540 phase 2). + """ + cur = self._conn.execute( + f'SELECT bundle_mtime_ns, bundle_size ' + f'FROM "{_TABLE}" WHERE bundle_path = ?', + (bundle_path,), + ) + row = cur.fetchone() + if row is None: + return False + return ( + row["bundle_mtime_ns"] == mtime_ns + and row["bundle_size"] == size + ) + def upsert_many(self, rows: Iterable[BacktestIndexRow]) -> int: """Bulk insert/replace; returns the number of rows written.""" rows = list(rows) diff --git a/scripts/bench_540_phase2.py b/scripts/bench_540_phase2.py new file mode 100644 index 00000000..087868dd --- /dev/null +++ b/scripts/bench_540_phase2.py @@ -0,0 +1,129 @@ +"""Acceptance benchmark for epic #540 phase 2. + +Generates a configurable number of synthetic ``.iafbt`` bundles +based on the test-suite fixture and measures: + +* full index build time (``iaf index``) +* incremental re-index time (mtime/size skip path) +* query latency for ``iaf list --sort sharpe_ratio --limit 20`` + +Run with:: + + source .venv/bin/activate + python scripts/bench_540_phase2.py # default: 1,000 bundles + python scripts/bench_540_phase2.py --count 12500 +""" + +from __future__ import annotations + +import argparse +import os +import shutil +import tempfile +import time +from pathlib import Path + +from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT +from investing_algorithm_framework.domain.backtesting.bundle import ( + save_bundle, +) +from investing_algorithm_framework.cli.index_command import ( + build_index, list_index, +) + + +REPO_ROOT = Path(__file__).resolve().parents[1] +TEMPLATE = ( + REPO_ROOT + / "tests" / "resources" / "backtest_reports_for_testing" + / "test_algorithm_backtest" +) + + +def _seed(out_dir: Path, n: int) -> None: + """Write *n* bundles into *out_dir* by re-saving the template.""" + template = Backtest.open(str(TEMPLATE)) + base = template.to_dict() + digits = max(5, len(str(n))) + for i in range(n): + bt = Backtest.from_dict(base) + bt.algorithm_id = f"algo_{i:0{digits}d}" + bt.tag = "bench" + if bt.backtest_summary is not None: + # Vary sharpe so sorting actually does work. + bt.backtest_summary.sharpe_ratio = (i % 100) / 50.0 - 1.0 + save_bundle( + bt, str(out_dir / f"{bt.algorithm_id}{BUNDLE_EXT}"), + ) + + +def _time(label: str, fn, *a, **kw): + t0 = time.perf_counter() + result = fn(*a, **kw) + dt = time.perf_counter() - t0 + print(f" {label:<40s} {dt * 1000:8.1f} ms") + return result, dt + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--count", type=int, default=1000) + ap.add_argument("--keep", action="store_true") + args = ap.parse_args() + + work = Path(tempfile.mkdtemp(prefix="iaf-bench-540-")) + print(f"Working directory: {work}") + print(f"Seeding {args.count} .iafbt bundles...") + t0 = time.perf_counter() + _seed(work, args.count) + print(f" seeded in {time.perf_counter() - t0:.2f} s") + + print(f"\nBenchmark — {args.count} bundles") + print("=" * 60) + out, dt_build = _time( + "build_index (cold)", + build_index, str(work), show_progress=False, + ) + _, dt_incr = _time( + "build_index (incremental, all unchanged)", + build_index, str(work), show_progress=False, + ) + _, dt_list = _time( + "list_index --sort sharpe_ratio --limit 20", + list_index, str(work), + sort_by="sharpe_ratio", limit=20, + ) + _, dt_full = _time( + "list_index --sort sharpe_ratio (no limit)", + list_index, str(work), sort_by="sharpe_ratio", + ) + + index_path = work / "index.sqlite" + size_mb = os.path.getsize(index_path) / (1024 * 1024) + bundles_size = sum( + p.stat().st_size for p in work.glob(f"*{BUNDLE_EXT}") + ) / (1024 * 1024) + print("\nFootprint") + print("=" * 60) + print(f" bundle directory : {bundles_size:8.2f} MiB") + print(f" index.sqlite : {size_mb:8.2f} MiB") + print(f" per-bundle index : {size_mb * 1024 / args.count:8.2f} KiB") + + print("\nProjection to 12,500 bundles (linear extrapolation)") + print("=" * 60) + scale = 12500 / args.count + print(f" build (cold) : ~{dt_build * scale:.1f} s") + print(f" build (incremental) : ~{dt_incr * scale:.1f} s") + print( + f" list top-20 : {dt_list * 1000:.1f} ms " + "(does not scale with bundle count)" + ) + + if not args.keep: + shutil.rmtree(work, ignore_errors=True) + else: + print(f"\nLeft data in: {work}") + + +if __name__ == "__main__": + main() diff --git a/tests/cli/test_index_command.py b/tests/cli/test_index_command.py index f2bdc493..8079d146 100644 --- a/tests/cli/test_index_command.py +++ b/tests/cli/test_index_command.py @@ -1,4 +1,5 @@ """Integration tests for the ``iaf index`` CLI (epic #540 phase 2).""" +import json import os import shutil import tempfile @@ -10,8 +11,12 @@ from investing_algorithm_framework.domain.backtesting.bundle import ( save_bundle, ) -from investing_algorithm_framework.cli.cli import index_cmd -from investing_algorithm_framework.cli.index_command import build_index +from investing_algorithm_framework.cli.cli import ( + index_cmd, list_cmd, rank_cmd, +) +from investing_algorithm_framework.cli.index_command import ( + build_index, list_index, rank_index, format_table, +) from investing_algorithm_framework.services.backtest_index import ( SqliteBacktestIndex, ) @@ -91,3 +96,189 @@ def test_cli_invocation(self): self.assertTrue(os.path.isfile(out)) with SqliteBacktestIndex.open(out) as idx: self.assertEqual(len(idx), 3) + + # ------------------------------------------------------------------ + # list / rank helpers + # ------------------------------------------------------------------ + def _build_index_with_metrics(self): + """Build an index where each bundle has a distinct sharpe.""" + # Re-save with sharpe ratios 0.5 / 1.5 / 1.0 for ranking tests. + sharpes = [0.5, 1.5, 1.0] + for i, s in enumerate(sharpes): + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.algorithm_id = f"algo_{i}" + bt.tag = "demo" + if bt.backtest_summary is not None: + bt.backtest_summary.sharpe_ratio = s + save_bundle( + bt, os.path.join(self.tmp, f"algo_{i}{BUNDLE_EXT}"), + ) + return build_index(self.tmp, show_progress=False) + + def test_list_index_returns_dicts_with_default_columns(self): + out = self._build_index_with_metrics() + rows = list_index(out) + self.assertEqual(len(rows), 3) + self.assertIn("algorithm_id", rows[0]) + self.assertIn("summary_sharpe_ratio", rows[0]) + + def test_list_index_sorts_descending_by_metric(self): + out = self._build_index_with_metrics() + rows = list_index(out, sort_by="sharpe_ratio") + sharpes = [r["summary_sharpe_ratio"] for r in rows] + self.assertEqual(sharpes, sorted(sharpes, reverse=True)) + + def test_list_index_accepts_summary_prefixed_metric(self): + out = self._build_index_with_metrics() + rows = list_index(out, sort_by="summary_sharpe_ratio", limit=2) + self.assertEqual(len(rows), 2) + self.assertEqual(rows[0]["algorithm_id"], "algo_1") # 1.5 highest + + def test_list_index_accepts_directory_argument(self): + self._build_index_with_metrics() + rows = list_index(self.tmp, sort_by="sharpe_ratio", limit=1) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]["algorithm_id"], "algo_1") + + def test_list_index_where_filter(self): + out = self._build_index_with_metrics() + rows = list_index( + out, + where="summary_sharpe_ratio >= 1.0", + sort_by="sharpe_ratio", + ) + self.assertEqual(len(rows), 2) + self.assertEqual( + [r["algorithm_id"] for r in rows], ["algo_1", "algo_2"], + ) + + def test_rank_index_returns_top_n(self): + out = self._build_index_with_metrics() + rows = rank_index(out, by="sharpe_ratio", limit=2) + self.assertEqual(len(rows), 2) + self.assertEqual(rows[0]["algorithm_id"], "algo_1") + self.assertEqual(rows[1]["algorithm_id"], "algo_2") + + def test_format_table_renders_header_and_rows(self): + out = self._build_index_with_metrics() + rows = rank_index(out, by="sharpe_ratio", limit=2) + text = format_table(rows) + self.assertIn("algorithm_id", text) + self.assertIn("algo_1", text) + # First data line should be algo_1 (highest sharpe). + body = text.splitlines()[2] + self.assertTrue(body.startswith("algo_1")) + + def test_cli_list_invocation(self): + out = self._build_index_with_metrics() + runner = CliRunner() + result = runner.invoke( + list_cmd, [out, "--sort", "sharpe_ratio", "-n", "2"], + ) + self.assertEqual( + result.exit_code, 0, + msg=f"stdout={result.output!r} exc={result.exception!r}", + ) + self.assertIn("algo_1", result.output) + self.assertIn("algorithm_id", result.output) + + def test_cli_list_json_output(self): + out = self._build_index_with_metrics() + runner = CliRunner() + result = runner.invoke( + list_cmd, [out, "--sort", "sharpe_ratio", "--json"], + ) + self.assertEqual(result.exit_code, 0, msg=result.output) + payload = json.loads(result.output) + self.assertEqual(len(payload), 3) + self.assertEqual(payload[0]["algorithm_id"], "algo_1") + + def test_cli_rank_invocation(self): + out = self._build_index_with_metrics() + runner = CliRunner() + result = runner.invoke( + rank_cmd, [out, "--by", "sharpe_ratio", "-n", "1"], + ) + self.assertEqual(result.exit_code, 0, msg=result.output) + self.assertIn("algo_1", result.output) + self.assertNotIn("algo_0", result.output) + + def test_cli_rank_requires_by_flag(self): + out = self._build_index_with_metrics() + runner = CliRunner() + result = runner.invoke(rank_cmd, [out]) + self.assertNotEqual(result.exit_code, 0) + self.assertIn("--by", result.output) + + def test_list_on_directory_without_index_raises(self): + # No index built yet in self.tmp. + with self.assertRaises(FileNotFoundError): + list_index(self.tmp) + + # ------------------------------------------------------------------ + # Incremental indexing (epic #540 phase 2) + # ------------------------------------------------------------------ + def test_incremental_index_skips_unchanged_bundles(self): + # First build — all bundles ingested. + out = build_index(self.tmp, show_progress=False) + with SqliteBacktestIndex.open(out) as idx: + first_rows = list(idx.iter_rows()) + self.assertEqual(len(first_rows), 3) + + # Second build over an untouched directory — should skip + # every bundle (we observe this via logging counters, but + # the row count must stay stable and equal to 3). + out2 = build_index(self.tmp, show_progress=False) + self.assertEqual(out, out2) + with SqliteBacktestIndex.open(out) as idx: + second_rows = list(idx.iter_rows()) + self.assertEqual(len(second_rows), 3) + + def test_incremental_index_reingests_modified_bundle(self): + out = build_index(self.tmp, show_progress=False) + modified = os.path.join(self.tmp, f"algo_1{BUNDLE_EXT}") + + # Rewrite the bundle with a different algorithm_id + bump + # mtime to force reingestion. + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.algorithm_id = "algo_1_modified" + bt.tag = "demo" + save_bundle(bt, modified) + # Ensure mtime actually changes on fast filesystems. + future = os.stat(modified).st_mtime + 5 + os.utime(modified, (future, future)) + + build_index(self.tmp, show_progress=False) + with SqliteBacktestIndex.open(out) as idx: + algos = sorted(r.algorithm_id for r in idx.iter_rows()) + self.assertEqual( + algos, ["algo_0", "algo_1_modified", "algo_2"], + ) + + def test_full_rebuild_flag_disables_incremental(self): + out = build_index(self.tmp, show_progress=False) + # Simulate a stale entry by manually modifying the bundle on + # disk without bumping mtime, then rebuild without incremental. + modified = os.path.join(self.tmp, f"algo_2{BUNDLE_EXT}") + original_stat = os.stat(modified) + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.algorithm_id = "algo_2_changed" + bt.tag = "demo" + save_bundle(bt, modified) + # Restore the original mtime so incremental would skip it. + os.utime( + modified, + (original_stat.st_atime, original_stat.st_mtime), + ) + + # Incremental would (incorrectly) skip; force a rebuild. + build_index(self.tmp, show_progress=False, incremental=False) + with SqliteBacktestIndex.open(out) as idx: + algos = sorted(r.algorithm_id for r in idx.iter_rows()) + self.assertIn("algo_2_changed", algos) + + def test_scalar_summary_alias_matches_get_backtest_summary(self): + self.assertIs( + self.fixture.scalar_summary(), + self.fixture.get_backtest_summary(), + )