diff --git a/docs/design/tiered-backtest-storage.md b/docs/design/tiered-backtest-storage.md index 283b7a2e..198df086 100644 --- a/docs/design/tiered-backtest-storage.md +++ b/docs/design/tiered-backtest-storage.md @@ -82,6 +82,14 @@ Row size: ~1–2 KB. 12,500 rows ≈ 25 MB. Fits comfortably in SQLite for local > existing `BacktestIndex` Parquet sidecar is now built on top of this > typed row (`BacktestIndexRow.to_flat_dict()`), making the wire > shape and the in-memory shape a single source of truth. +> +> **Status (epic #540 phase 2, v8.10):** the SQLite implementation +> ships as `investing_algorithm_framework.services.backtest_index +> .SqliteBacktestIndex`, with `iaf index ` as the CLI +> entry point. Every scalar field of `BacktestSummaryMetrics` is +> promoted to its own SQL column (`summary_`), so analysts can +> filter without opening any bundle, e.g. `SELECT bundle_path FROM +> backtest_index WHERE summary_sharpe_ratio > 1.0`. ### 3.2 Tier 2 schemas (Parquet, long format) diff --git a/investing_algorithm_framework/cli/cli.py b/investing_algorithm_framework/cli/cli.py index a25daf67..cc83f41e 100644 --- a/investing_algorithm_framework/cli/cli.py +++ b/investing_algorithm_framework/cli/cli.py @@ -320,3 +320,52 @@ def migrate_backtests_cmd( cli.add_command(migrate_backtests_cmd) + + +@click.command(name="index") +@click.argument( + "directory", + type=click.Path(exists=True, file_okay=False, dir_okay=True), +) +@click.option( + "--output", "-o", + type=click.Path(file_okay=True, dir_okay=False), + default=None, + help="Path to the SQLite index file (default: /index.sqlite).", +) +@click.option( + "--absolute-paths", is_flag=True, default=False, + help="Store absolute bundle paths in the index " + "(default: paths relative to , so the index stays " + "portable when the folder is moved).", +) +@click.option( + "--no-progress", is_flag=True, default=False, + help="Suppress the progress bar.", +) +def index_cmd(directory, output, absolute_paths, no_progress): + """Build a SQLite Tier-1 index over a folder of ``.iafbt`` bundles. + + The resulting ``index.sqlite`` file holds one row per bundle with + identity / provenance / config columns and every scalar + ``BacktestSummaryMetrics`` field promoted to its own column, so + analysts can run ad-hoc SQL queries (e.g. + ``SELECT bundle_path FROM backtest_index + WHERE summary_sharpe_ratio > 1.0``) without opening any bundle. + + Each bundle is opened with ``summary_only=True`` so no Parquet + metric blobs are decoded \u2014 indexing 12,500 bundles is bounded by + msgpack header parsing, not metric reconstruction. + """ + from .index_command import build_index + + out = build_index( + directory=directory, + output=output, + relative_paths=not absolute_paths, + show_progress=not no_progress, + ) + click.echo(f"Wrote SQLite index to {out}") + + +cli.add_command(index_cmd) diff --git a/investing_algorithm_framework/cli/index_command.py b/investing_algorithm_framework/cli/index_command.py new file mode 100644 index 00000000..224e079f --- /dev/null +++ b/investing_algorithm_framework/cli/index_command.py @@ -0,0 +1,98 @@ +"""``iaf index`` CLI \u2014 build a SQLite Tier-1 index over a folder of +``.iafbt`` bundles (epic #540 phase 2). + +Walks the directory, opens each bundle with ``summary_only=True`` (no +Parquet metric-blob decode), derives a :class:`BacktestIndexRow` via +:meth:`Backtest.index_row`, and upserts into a +:class:`SqliteBacktestIndex`. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Iterable, List, Optional + +from investing_algorithm_framework.domain import ( + Backtest, + BUNDLE_EXT, +) +from investing_algorithm_framework.services.backtest_index import ( + SqliteBacktestIndex, +) + +logger = logging.getLogger(__name__) + + +DEFAULT_INDEX_NAME = "index.sqlite" + + +def _iter_bundle_paths(directory: Path) -> Iterable[Path]: + """Yield every ``*.iafbt`` file under *directory* (sorted).""" + return sorted(directory.rglob(f"*{BUNDLE_EXT}")) + + +def build_index( + directory: str, + output: Optional[str] = None, + relative_paths: bool = True, + show_progress: bool = False, +) -> str: + """Build (or refresh) a SQLite Tier-1 index over *directory*. + + Args: + directory: Folder to scan for ``.iafbt`` bundles. + output: Path to the SQLite file. Defaults to + ``/index.sqlite``. + relative_paths: if True, store ``bundle_path`` relative to + *directory* so the index file stays portable when the + folder is moved/renamed. + show_progress: emit a tqdm progress bar. + + Returns: + Absolute path of the SQLite file that was written. + """ + src = Path(directory).resolve() + if not src.is_dir(): + raise NotADirectoryError(f"Not a directory: {src}") + + out = Path(output).resolve() if output else src / DEFAULT_INDEX_NAME + paths: List[Path] = list(_iter_bundle_paths(src)) + + pbar = None + if show_progress: + try: + from tqdm import tqdm + pbar = tqdm(total=len(paths), desc="Indexing bundles") + except ImportError: # pragma: no cover - tqdm is a dep + pbar = None + + index = SqliteBacktestIndex.create(out) + n_ok = 0 + n_err = 0 + try: + for path in paths: + try: + bt = Backtest.open(str(path), summary_only=True) + bundle_path = ( + str(path.relative_to(src)) if relative_paths + else str(path) + ) + row = bt.index_row(bundle_path=bundle_path) + index.upsert(row) + n_ok += 1 + except Exception as exc: # noqa: BLE001 \u2014 best-effort scan + logger.warning("failed to index %s: %s", path, exc) + n_err += 1 + finally: + if pbar is not None: + pbar.update(1) + finally: + if pbar is not None: + pbar.close() + index.close() + + logger.info( + "Indexed %d bundle(s) into %s (%d failed)", n_ok, out, n_err, + ) + return str(out) diff --git a/investing_algorithm_framework/services/backtest_index/__init__.py b/investing_algorithm_framework/services/backtest_index/__init__.py new file mode 100644 index 00000000..a492c132 --- /dev/null +++ b/investing_algorithm_framework/services/backtest_index/__init__.py @@ -0,0 +1,3 @@ +from .sqlite_index import SqliteBacktestIndex, SCHEMA_VERSION + +__all__ = ["SqliteBacktestIndex", "SCHEMA_VERSION"] diff --git a/investing_algorithm_framework/services/backtest_index/sqlite_index.py b/investing_algorithm_framework/services/backtest_index/sqlite_index.py new file mode 100644 index 00000000..c6a93709 --- /dev/null +++ b/investing_algorithm_framework/services/backtest_index/sqlite_index.py @@ -0,0 +1,385 @@ +"""SQLite-backed Tier-1 backtest index (epic #540 phase 2). + +A :class:`SqliteBacktestIndex` is a single-file SQLite database that +holds one row per backtest bundle, derived from +:class:`BacktestIndexRow`. It is the local-disk implementation of the +Tier-1 store described in +``docs/design/tiered-backtest-storage.md`` \u00a73.1. + +Schema +------ +The schema is generated from two sources of truth: + +* The canonical *identity / provenance / config* columns of + :class:`BacktestIndexRow`. +* All numeric / string fields of :class:`BacktestSummaryMetrics`, + promoted as ``summary_`` columns so analysts can filter on + e.g. ``WHERE summary_sharpe_ratio > 1.0``. + +Anything that doesn't fit those is round-tripped opaquely in the +``extras_json`` and ``summary_extras_json`` columns. ``parameters`` +and ``strategy_ids`` are stored as JSON text. + +The file carries ``PRAGMA user_version = SCHEMA_VERSION`` so future +migrations can detect and upgrade older index files additively. + +Concurrency +----------- +Writes go through a single connection in ``WAL`` mode; multiple +readers from other processes are safe. +""" + +from __future__ import annotations + +import json +import logging +import sqlite3 +from dataclasses import fields as dc_fields +from pathlib import Path +from typing import ( + Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, +) + +from investing_algorithm_framework.domain.backtesting.backtest_index_row \ + import BacktestIndexRow +from investing_algorithm_framework.domain.backtesting \ + .backtest_summary_metrics import BacktestSummaryMetrics + + +logger = logging.getLogger(__name__) + + +# Bumped on any additive schema change. Old files are upgraded +# in-place by :meth:`SqliteBacktestIndex._migrate`. +SCHEMA_VERSION = 1 + +# Columns of BacktestIndexRow that map 1:1 to typed SQL columns. +# (parameters / strategy_ids are emitted as JSON text columns; the +# scalar metrics are promoted from BacktestSummaryMetrics below.) +_IDENTITY_COLUMNS: Tuple[Tuple[str, str], ...] = ( + ("bundle_path", "TEXT PRIMARY KEY"), + ("algorithm_id", "TEXT"), + ("tag", "TEXT"), + ("framework_version", "TEXT"), + ("engine_type", "TEXT"), + ("risk_free_rate", "REAL"), + ("number_of_runs", "INTEGER"), + ("parameters_json", "TEXT"), + ("strategy_ids_json", "TEXT"), + ("extras_json", "TEXT"), + ("summary_extras_json", "TEXT"), +) + + +def _summary_columns() -> List[Tuple[str, str]]: + """Promote BacktestSummaryMetrics fields to ``summary_`` cols. + + Numeric fields become ``REAL`` (or ``INTEGER`` if annotated ``int``); + everything else degrades to ``TEXT``. + """ + cols: List[Tuple[str, str]] = [] + for f in dc_fields(BacktestSummaryMetrics): + ann = f.type + if ann is int or ann == "int": + sql_type = "INTEGER" + elif ann is float or ann == "float": + sql_type = "REAL" + elif ann is bool or ann == "bool": + sql_type = "INTEGER" + else: + sql_type = "TEXT" + cols.append((f"summary_{f.name}", sql_type)) + return cols + + +_SUMMARY_COLUMNS: Tuple[Tuple[str, str], ...] = tuple(_summary_columns()) +_SUMMARY_FIELD_NAMES: frozenset = frozenset( + f.name for f in dc_fields(BacktestSummaryMetrics) +) + + +def _all_columns() -> List[Tuple[str, str]]: + return list(_IDENTITY_COLUMNS) + list(_SUMMARY_COLUMNS) + + +_TABLE = "backtest_index" + + +class SqliteBacktestIndex: + """Single-file SQLite index over a directory of ``.iafbt`` bundles. + + Use :meth:`create` to make a fresh file (overwrites if exists), + :meth:`open` to connect to an existing one (creating tables if + needed), :meth:`upsert` to add/replace a row, and + :meth:`iter_rows` / :meth:`query` for read access. + """ + + def __init__(self, path: Union[str, Path], conn: sqlite3.Connection): + self.path = Path(path) + self._conn = conn + + # ------------------------------------------------------------------ + # Construction + # ------------------------------------------------------------------ + @classmethod + def create(cls, path: Union[str, Path]) -> "SqliteBacktestIndex": + """Create a fresh index file (overwrites any existing file).""" + p = Path(path) + if p.exists(): + p.unlink() + p.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(p)) + conn.row_factory = sqlite3.Row + cls._init_schema(conn) + return cls(p, conn) + + @classmethod + def open(cls, path: Union[str, Path]) -> "SqliteBacktestIndex": + """Open an existing index file, creating tables on first use.""" + p = Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(p)) + conn.row_factory = sqlite3.Row + cls._init_schema(conn) + cls._migrate(conn) + return cls(p, conn) + + # ------------------------------------------------------------------ + # Schema + # ------------------------------------------------------------------ + @staticmethod + def _init_schema(conn: sqlite3.Connection) -> None: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + cols = ", ".join( + f'"{name}" {sql_type}' for name, sql_type in _all_columns() + ) + conn.execute(f'CREATE TABLE IF NOT EXISTS "{_TABLE}" ({cols})') + conn.execute( + f'CREATE INDEX IF NOT EXISTS idx_{_TABLE}_algorithm_id ' + f'ON "{_TABLE}"(algorithm_id)' + ) + conn.execute( + f'CREATE INDEX IF NOT EXISTS idx_{_TABLE}_tag ' + f'ON "{_TABLE}"(tag)' + ) + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.commit() + + @staticmethod + def _migrate(conn: sqlite3.Connection) -> None: + """Additive forward-only migration based on PRAGMA user_version. + + Adds any columns that the current code knows about but the + on-disk file is missing. Never drops or rewrites existing + columns. + """ + existing = { + row["name"] + for row in conn.execute(f'PRAGMA table_info("{_TABLE}")') + } + for name, sql_type in _all_columns(): + if name not in existing: + conn.execute( + f'ALTER TABLE "{_TABLE}" ' + f'ADD COLUMN "{name}" {sql_type}' + ) + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.commit() + + # ------------------------------------------------------------------ + # Writes + # ------------------------------------------------------------------ + def upsert(self, row: BacktestIndexRow) -> None: + """Insert or replace a single row, keyed by ``bundle_path``. + + Raises: + ValueError: if ``row.bundle_path`` is None (it is the PK). + """ + if not row.bundle_path: + raise ValueError( + "BacktestIndexRow.bundle_path is required for SQLite " + "upsert (used as the primary key)." + ) + record = self._row_to_record(row) + cols = list(record.keys()) + placeholders = ", ".join("?" for _ in cols) + col_list = ", ".join(f'"{c}"' for c in cols) + self._conn.execute( + f'INSERT OR REPLACE INTO "{_TABLE}" ({col_list}) ' + f'VALUES ({placeholders})', + [record[c] for c in cols], + ) + self._conn.commit() + + def upsert_many(self, rows: Iterable[BacktestIndexRow]) -> int: + """Bulk insert/replace; returns the number of rows written.""" + rows = list(rows) + if not rows: + return 0 + # Use the first row to fix the column set; record-builder is + # deterministic so all rows produce the same keys. + first = self._row_to_record(rows[0]) + cols = list(first.keys()) + placeholders = ", ".join("?" for _ in cols) + col_list = ", ".join(f'"{c}"' for c in cols) + sql = ( + f'INSERT OR REPLACE INTO "{_TABLE}" ({col_list}) ' + f'VALUES ({placeholders})' + ) + payload = [first] + [self._row_to_record(r) for r in rows[1:]] + for r in payload: + if not r.get("bundle_path"): + raise ValueError( + "BacktestIndexRow.bundle_path is required for SQLite " + "upsert (used as the primary key)." + ) + self._conn.executemany(sql, [[r[c] for c in cols] for r in payload]) + self._conn.commit() + return len(rows) + + @staticmethod + def _row_to_record(row: BacktestIndexRow) -> Dict[str, Any]: + """Map a typed row onto a flat dict ready for SQL binding.""" + record: Dict[str, Any] = { + "bundle_path": row.bundle_path, + "algorithm_id": row.algorithm_id, + "tag": row.tag, + "framework_version": row.framework_version, + "engine_type": row.engine_type, + "risk_free_rate": row.risk_free_rate, + "number_of_runs": row.number_of_runs, + "parameters_json": ( + _safe_json(row.parameters) if row.parameters else None + ), + "strategy_ids_json": ( + _safe_json(row.strategy_ids) if row.strategy_ids else None + ), + "extras_json": ( + _safe_json(row.extras) if row.extras else None + ), + } + + summary_extras: Dict[str, Any] = {} + if row.summary_metrics is not None: + summary_dict = row.summary_metrics.to_dict() + for k, v in summary_dict.items(): + if k in _SUMMARY_FIELD_NAMES: + record[f"summary_{k}"] = _coerce_scalar(v) + else: + summary_extras[k] = v + + record["summary_extras_json"] = ( + _safe_json(summary_extras) if summary_extras else None + ) + return record + + # ------------------------------------------------------------------ + # Reads + # ------------------------------------------------------------------ + def __len__(self) -> int: + cur = self._conn.execute(f'SELECT COUNT(*) AS n FROM "{_TABLE}"') + return int(cur.fetchone()["n"]) + + def iter_rows(self) -> Iterator[BacktestIndexRow]: + """Yield every row as a :class:`BacktestIndexRow`.""" + for sql_row in self._conn.execute(f'SELECT * FROM "{_TABLE}"'): + yield self._record_to_row(sql_row) + + def query( + self, where: Optional[str] = None, + params: Optional[Tuple[Any, ...]] = None, + ) -> List[BacktestIndexRow]: + """Run a parameterised ``SELECT`` and return typed rows. + + Args: + where: optional SQL fragment (without the ``WHERE`` keyword). + params: positional bind values for ``where``. + """ + sql = f'SELECT * FROM "{_TABLE}"' + if where: + sql += f" WHERE {where}" + cur = self._conn.execute(sql, params or ()) + return [self._record_to_row(r) for r in cur] + + @staticmethod + def _record_to_row(sql_row: sqlite3.Row) -> BacktestIndexRow: + d = dict(sql_row) + + params_json = d.pop("parameters_json", None) + strat_json = d.pop("strategy_ids_json", None) + extras_json = d.pop("extras_json", None) + summary_extras_json = d.pop("summary_extras_json", None) + + summary_dict: Dict[str, Any] = {} + for name in list(d.keys()): + if name.startswith("summary_"): + value = d.pop(name) + if value is not None: + summary_dict[name[len("summary_"):]] = value + if summary_extras_json: + try: + summary_dict.update(json.loads(summary_extras_json)) + except (TypeError, ValueError): + pass + + kwargs: Dict[str, Any] = { + "algorithm_id": d.get("algorithm_id"), + "tag": d.get("tag"), + "bundle_path": d.get("bundle_path"), + "framework_version": d.get("framework_version"), + "engine_type": d.get("engine_type"), + "risk_free_rate": d.get("risk_free_rate"), + "number_of_runs": d.get("number_of_runs") or 0, + "parameters": _safe_loads(params_json) or {}, + "strategy_ids": _safe_loads(strat_json) or [], + "extras": _safe_loads(extras_json) or {}, + "summary_metrics": ( + BacktestSummaryMetrics.from_dict(summary_dict) + if summary_dict else None + ), + } + return BacktestIndexRow(**kwargs) + + # ------------------------------------------------------------------ + # House-keeping + # ------------------------------------------------------------------ + def close(self) -> None: + try: + self._conn.close() + except Exception: # pragma: no cover - best-effort + pass + + def __enter__(self) -> "SqliteBacktestIndex": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- +def _safe_json(obj: Any) -> Optional[str]: + try: + return json.dumps(obj, default=str) + except (TypeError, ValueError): + return None + + +def _safe_loads(text: Optional[str]) -> Any: + if not text: + return None + try: + return json.loads(text) + except (TypeError, ValueError): + return None + + +def _coerce_scalar(v: Any) -> Any: + """Bind helper: SQLite accepts None / int / float / str / bytes only.""" + if v is None or isinstance(v, (int, float, str, bytes)): + return v + if isinstance(v, bool): + return int(v) + return str(v) diff --git a/tests/cli/test_index_command.py b/tests/cli/test_index_command.py new file mode 100644 index 00000000..f2bdc493 --- /dev/null +++ b/tests/cli/test_index_command.py @@ -0,0 +1,93 @@ +"""Integration tests for the ``iaf index`` CLI (epic #540 phase 2).""" +import os +import shutil +import tempfile +from unittest import TestCase + +from click.testing import CliRunner + +from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT +from investing_algorithm_framework.domain.backtesting.bundle import ( + save_bundle, +) +from investing_algorithm_framework.cli.cli import index_cmd +from investing_algorithm_framework.cli.index_command import build_index +from investing_algorithm_framework.services.backtest_index import ( + SqliteBacktestIndex, +) + + +_FIXTURE = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + "resources", + "backtest_reports_for_testing", + "test_algorithm_backtest", +) + + +class TestIndexCommand(TestCase): + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + # Drop a few bundles into the temp dir. + for i in range(3): + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.algorithm_id = f"algo_{i}" + bt.tag = "demo" + save_bundle( + bt, os.path.join(self.tmp, f"algo_{i}{BUNDLE_EXT}"), + ) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + # ------------------------------------------------------------------ + # Builder API + # ------------------------------------------------------------------ + def test_build_index_writes_one_row_per_bundle(self): + out = build_index(self.tmp, show_progress=False) + self.assertTrue(os.path.isfile(out)) + + with SqliteBacktestIndex.open(out) as idx: + self.assertEqual(len(idx), 3) + algos = sorted(r.algorithm_id for r in idx.iter_rows()) + self.assertEqual(algos, ["algo_0", "algo_1", "algo_2"]) + + def test_build_index_uses_relative_paths_by_default(self): + out = build_index(self.tmp, show_progress=False) + with SqliteBacktestIndex.open(out) as idx: + for row in idx.iter_rows(): + self.assertFalse( + os.path.isabs(row.bundle_path), + f"expected relative path, got {row.bundle_path}", + ) + + def test_build_index_absolute_paths_when_requested(self): + out = build_index( + self.tmp, show_progress=False, relative_paths=False, + ) + with SqliteBacktestIndex.open(out) as idx: + for row in idx.iter_rows(): + self.assertTrue(os.path.isabs(row.bundle_path)) + + # ------------------------------------------------------------------ + # Click CLI surface + # ------------------------------------------------------------------ + def test_cli_invocation(self): + runner = CliRunner() + out = os.path.join(self.tmp, "custom.sqlite") + result = runner.invoke( + index_cmd, + [self.tmp, "--output", out, "--no-progress"], + ) + self.assertEqual( + result.exit_code, 0, + msg=f"stdout={result.output!r} exc={result.exception!r}", + ) + self.assertTrue(os.path.isfile(out)) + with SqliteBacktestIndex.open(out) as idx: + self.assertEqual(len(idx), 3) diff --git a/tests/services/backtest_index/__init__.py b/tests/services/backtest_index/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/services/backtest_index/test_sqlite_index.py b/tests/services/backtest_index/test_sqlite_index.py new file mode 100644 index 00000000..7f238bc1 --- /dev/null +++ b/tests/services/backtest_index/test_sqlite_index.py @@ -0,0 +1,155 @@ +"""Tests for :class:`SqliteBacktestIndex` (epic #540 phase 2).""" +import os +import shutil +import tempfile +from unittest import TestCase + +from investing_algorithm_framework.domain import ( + Backtest, + BacktestIndexRow, + BUNDLE_EXT, +) +from investing_algorithm_framework.domain.backtesting.bundle import ( + save_bundle, +) +from investing_algorithm_framework.services.backtest_index import ( + SqliteBacktestIndex, +) + + +_FIXTURE = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "resources", + "backtest_reports_for_testing", + "test_algorithm_backtest", +) + + +class TestSqliteBacktestIndex(TestCase): + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.index_path = os.path.join(self.tmp, "index.sqlite") + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + # ------------------------------------------------------------------ + # Schema / lifecycle + # ------------------------------------------------------------------ + def test_create_initialises_schema(self): + idx = SqliteBacktestIndex.create(self.index_path) + try: + self.assertTrue(os.path.isfile(self.index_path)) + self.assertEqual(len(idx), 0) + finally: + idx.close() + + def test_open_creates_file_when_missing(self): + idx = SqliteBacktestIndex.open(self.index_path) + try: + self.assertTrue(os.path.isfile(self.index_path)) + finally: + idx.close() + + def test_upsert_requires_bundle_path(self): + with SqliteBacktestIndex.create(self.index_path) as idx: + row = self.fixture.index_row(bundle_path=None) + with self.assertRaises(ValueError): + idx.upsert(row) + + # ------------------------------------------------------------------ + # Round-trip + # ------------------------------------------------------------------ + def test_round_trip_preserves_identity_and_metrics(self): + row = self.fixture.index_row(bundle_path="bundle.iafbt") + + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert(row) + self.assertEqual(len(idx), 1) + (loaded,) = list(idx.iter_rows()) + + self.assertIsInstance(loaded, BacktestIndexRow) + self.assertEqual(loaded.algorithm_id, row.algorithm_id) + self.assertEqual(loaded.tag, row.tag) + self.assertEqual(loaded.bundle_path, row.bundle_path) + self.assertEqual(loaded.number_of_runs, row.number_of_runs) + self.assertEqual(loaded.parameters, row.parameters) + + # If the fixture has scalar metrics, key ones must round-trip. + # SQLite stores NaN as NULL, so treat NaN/None as equivalent. + if row.summary_metrics is not None: + import math + + self.assertIsNotNone(loaded.summary_metrics) + for name in ("sharpe_ratio", "total_net_gain"): + got = getattr(loaded.summary_metrics, name, None) + exp = getattr(row.summary_metrics, name, None) + if isinstance(exp, float) and math.isnan(exp): + self.assertIsNone(got) + else: + self.assertEqual(got, exp) + + def test_upsert_replaces_on_duplicate_bundle_path(self): + row = self.fixture.index_row(bundle_path="dup.iafbt") + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert(row) + + # Mutate algorithm_id and re-upsert \u2014 should not duplicate. + row.algorithm_id = "new_algo" + idx.upsert(row) + + self.assertEqual(len(idx), 1) + (loaded,) = list(idx.iter_rows()) + self.assertEqual(loaded.algorithm_id, "new_algo") + + def test_upsert_many_writes_all(self): + rows = [] + for i in range(3): + r = self.fixture.index_row(bundle_path=f"b{i}.iafbt") + r.algorithm_id = f"algo_{i}" + rows.append(r) + + with SqliteBacktestIndex.create(self.index_path) as idx: + n = idx.upsert_many(rows) + self.assertEqual(n, 3) + self.assertEqual(len(idx), 3) + + # ------------------------------------------------------------------ + # Query + # ------------------------------------------------------------------ + def test_query_with_where_clause(self): + rows = [] + for i in range(3): + r = self.fixture.index_row(bundle_path=f"q{i}.iafbt") + r.algorithm_id = "alpha" if i == 0 else "beta" + rows.append(r) + + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert_many(rows) + hits = idx.query("algorithm_id = ?", ("alpha",)) + self.assertEqual(len(hits), 1) + self.assertEqual(hits[0].bundle_path, "q0.iafbt") + + # ------------------------------------------------------------------ + # Build from real bundle on disk + # ------------------------------------------------------------------ + def test_index_built_from_bundle_uses_summary_only_path(self): + bundle_path = os.path.join(self.tmp, "report" + BUNDLE_EXT) + save_bundle(self.fixture, bundle_path) + + # Open the bundle in summary_only mode — mirrors what the CLI + # does. + bt = Backtest.open(bundle_path, summary_only=True) + row = bt.index_row(bundle_path=bundle_path) + + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert(row) + (loaded,) = list(idx.iter_rows()) + + self.assertEqual(loaded.bundle_path, bundle_path) + self.assertEqual(loaded.algorithm_id, self.fixture.algorithm_id)