diff --git a/investing_algorithm_framework/services/backtest_store/local_tiered_store.py b/investing_algorithm_framework/services/backtest_store/local_tiered_store.py index 3b9744a8..9865b155 100644 --- a/investing_algorithm_framework/services/backtest_store/local_tiered_store.py +++ b/investing_algorithm_framework/services/backtest_store/local_tiered_store.py @@ -1,5 +1,6 @@ -"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + canonical -``.iafbt`` bundles on a local filesystem (epic #540 phase 3b). +"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + Tier-3 +content-addressed OHLCV chunks + canonical ``.iafbt`` bundles on a +local filesystem (epic #540 phases 3b + 3c). Layout under *root*:: @@ -10,6 +11,8 @@ portfolio_snapshots/run_id=/part-0.parquet trades/run_id=/part-0.parquet orders/run_id=/part-0.parquet + ohlcv/ # Tier-3, content-addressed by SHA-256 + .parquet # shared across every bundle in the store The bundle file is the canonical representation. The Tier-1 index and the Tier-2 Parquet datasets are derived, eagerly maintained, and @@ -61,6 +64,7 @@ INDEX_FILENAME = "index.sqlite" BUNDLES_SUBDIR = "bundles" PARQUET_SUBDIR = "parquet" +OHLCV_SUBDIR = "ohlcv" def _records_to_table(records: List[dict]) -> Optional[Any]: @@ -122,9 +126,12 @@ def __init__(self, root: Union[str, Path]) -> None: self.root.mkdir(parents=True, exist_ok=True) self._bundles_dir = self.root / BUNDLES_SUBDIR self._parquet_dir = self.root / PARQUET_SUBDIR + self._ohlcv_dir = self.root / OHLCV_SUBDIR self._index_path = self.root / INDEX_FILENAME self._bundles_dir.mkdir(parents=True, exist_ok=True) self._parquet_dir.mkdir(parents=True, exist_ok=True) + # Tier-3 dir is created lazily on first OHLCV-bearing write so + # bundles without attached price data leave no empty subdir. # ------------------------------------------------------------------ # Internal helpers @@ -180,8 +187,19 @@ def write( bundle_path = self._bundle_path(bare) bundle_path.parent.mkdir(parents=True, exist_ok=True) - # 1. Canonical bytes — bundle. - backtest.save_bundle(bundle_path) + # 1. Canonical bytes — bundle. When the backtest has OHLCV + # attached, route the side-store to the shared Tier-3 chunk + # directory so identical (symbol, timeframe) Parquet bytes are + # written once and referenced by every bundle that needs them. + has_ohlcv = bool(getattr(backtest, "ohlcv", None)) + if has_ohlcv: + backtest.save_bundle( + bundle_path, + include_ohlcv=True, + ohlcv_store=self._ohlcv_dir, + ) + else: + backtest.save_bundle(bundle_path) # 2. Tier-1 — SQLite row. stat = bundle_path.stat() @@ -242,7 +260,16 @@ def open( path = self._bundle_path(handle) if not path.is_file(): raise StoreHandleNotFoundError(handle) - return Backtest.open(str(path), summary_only=summary_only) + # Route OHLCV resolution through the shared Tier-3 chunk + # directory regardless of what path the bundle was written + # with. Bundles without OHLCV ignore the override. + from investing_algorithm_framework.domain.backtesting.bundle \ + import open_bundle as _open_bundle + return _open_bundle( + str(path), + ohlcv_store=self._ohlcv_dir, + summary_only=summary_only, + ) def exists(self, handle: StoreHandle) -> bool: try: @@ -380,6 +407,10 @@ def rebuild_index(self) -> int: Useful when sidecars were edited out-of-band or after a software upgrade adds new index columns. Returns the number of rows written. + + Tier-3 OHLCV chunks are *not* touched \u2014 they are + content-addressed and globally shared. Use + :meth:`garbage_collect_ohlcv` to reclaim orphans. """ if self._index_path.is_file(): self._index_path.unlink() @@ -408,3 +439,118 @@ def rebuild_index(self) -> int: def __repr__(self) -> str: return f"LocalTieredStore(root={str(self.root)!r})" + + # ------------------------------------------------------------------ + # Tier-3 — content-addressed OHLCV chunks + # ------------------------------------------------------------------ + def _read_ohlcv_manifest(self, handle: StoreHandle) -> dict: + """Return the ``{key: .parquet}`` manifest stored in the + bundle envelope, or an empty dict if the bundle has no OHLCV. + + Decodes the envelope directly so we avoid the cost of + instantiating a full :class:`Backtest`. + """ + from investing_algorithm_framework.domain.backtesting.bundle \ + import _decode_payload # noqa: WPS437 — internal helper + path = self._bundle_path(handle) + if not path.is_file(): + raise StoreHandleNotFoundError(handle) + try: + _, doc = _decode_payload(path.read_bytes()) + except Exception as exc: + logger.warning( + "Tier-3 manifest read failed for %s: %s", handle, exc, + ) + return {} + meta = doc.get("ohlcv") or {} + return dict(meta.get("manifest") or {}) + + def iter_ohlcv_hashes(self) -> Iterator[str]: + """Yield every OHLCV chunk hash referenced by any bundle. + + Hashes are emitted with possible duplicates (one per + ``(handle, key)`` reference); de-duplicate in the caller if + needed. Useful as the input for the dedup-upload negotiate + step in ``docs/design/ohlcv-dedup-protocol.md``. + """ + for handle in self.iter_handles(): + for rel in self._read_ohlcv_manifest(handle).values(): + if rel.endswith(".parquet"): + yield rel[: -len(".parquet")] + else: + yield rel + + def ohlcv_referenced_hashes(self) -> set: + """Return the de-duplicated set of OHLCV hashes referenced + across all bundles in the store. + """ + return set(self.iter_ohlcv_hashes()) + + def ohlcv_stored_hashes(self) -> set: + """Return the set of OHLCV hashes physically present on disk + under ``/ohlcv/``. + """ + if not self._ohlcv_dir.is_dir(): + return set() + out: set = set() + for p in self._ohlcv_dir.iterdir(): + if p.is_file() and p.suffix == ".parquet": + out.add(p.stem) + return out + + def ohlcv_stats(self) -> dict: + """Return a snapshot of the Tier-3 store. + + Keys: ``stored_blobs`` (int), ``stored_bytes`` (int), + ``referenced_blobs`` (int), ``orphan_blobs`` (int), + ``missing_blobs`` (int — referenced but absent on disk, + should be 0 in a healthy store). + """ + stored = self.ohlcv_stored_hashes() + referenced = self.ohlcv_referenced_hashes() + stored_bytes = 0 + if self._ohlcv_dir.is_dir(): + for p in self._ohlcv_dir.iterdir(): + if p.is_file() and p.suffix == ".parquet": + try: + stored_bytes += p.stat().st_size + except OSError: # pragma: no cover + pass + return { + "stored_blobs": len(stored), + "stored_bytes": stored_bytes, + "referenced_blobs": len(referenced), + "orphan_blobs": len(stored - referenced), + "missing_blobs": len(referenced - stored), + } + + def garbage_collect_ohlcv( + self, *, dry_run: bool = False, + ) -> List[str]: + """Remove OHLCV chunks that no bundle in the store references. + + Args: + dry_run: When True, no files are deleted; the list of + orphans is returned so callers can audit before + committing. + + Returns: + The list of hashes that were (or would be) removed. + """ + stored = self.ohlcv_stored_hashes() + referenced = self.ohlcv_referenced_hashes() + orphans = sorted(stored - referenced) + if dry_run: + return orphans + for digest in orphans: + path = self._ohlcv_dir / f"{digest}.parquet" + try: + path.unlink() + except FileNotFoundError: # pragma: no cover - race + pass + except OSError as exc: # pragma: no cover - logged + logger.warning( + "garbage_collect_ohlcv: failed to remove %s: %s", + path, exc, + ) + return orphans diff --git a/tests/services/backtest_store/test_local_tiered_store_ohlcv.py b/tests/services/backtest_store/test_local_tiered_store_ohlcv.py new file mode 100644 index 00000000..18adfdad --- /dev/null +++ b/tests/services/backtest_store/test_local_tiered_store_ohlcv.py @@ -0,0 +1,182 @@ +"""Tests for Tier-3 content-addressed OHLCV chunks in +:class:`LocalTieredStore` (epic #540 phase 3c).""" +from __future__ import annotations + +import os +import shutil +import tempfile +from copy import deepcopy +from pathlib import Path +from unittest import TestCase + +import pandas as pd + +from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT +from investing_algorithm_framework.services.backtest_store import ( + LocalTieredStore, +) + + +_FIXTURE = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "resources", + "backtest_reports_for_testing", + "test_algorithm_backtest", +) + + +def _make_ohlcv(symbol: str, *, n: int = 4, base: float = 100.0) -> dict: + idx = pd.date_range("2024-01-01", periods=n, freq="h") + df = pd.DataFrame( + { + "Datetime": idx, + "Open": [base + i for i in range(n)], + "High": [base + i + 0.5 for i in range(n)], + "Low": [base + i - 0.5 for i in range(n)], + "Close": [base + i + 0.25 for i in range(n)], + "Volume": [1000 + i for i in range(n)], + } + ) + return {f"{symbol}:1h": df} + + +class TestLocalTieredStoreOhlcv(TestCase): + """OHLCV content-addressed chunk store (Tier-3).""" + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.store = LocalTieredStore(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _backtest_with_ohlcv(self, symbol: str) -> Backtest: + # Deep-copy the fixture and attach a synthetic OHLCV blob. + bt = deepcopy(self.fixture) + bt.ohlcv = _make_ohlcv(symbol) + return bt + + # ------------------------------------------------------------------ + # Bundles without OHLCV must not create the chunk dir. + # ------------------------------------------------------------------ + def test_no_ohlcv_no_chunk_dir(self): + self.store.write(self.fixture, handle="plain") + self.assertFalse((Path(self.tmp) / "ohlcv").is_dir()) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0) + + # ------------------------------------------------------------------ + # Tier-3 layout & dedup + # ------------------------------------------------------------------ + def test_ohlcv_chunks_written_to_shared_store(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="r1") + chunks = list((Path(self.tmp) / "ohlcv").iterdir()) + self.assertEqual(len(chunks), 1) + # Filename is .parquet — 64 hex chars + .parquet. + name = chunks[0].name + self.assertTrue(name.endswith(".parquet")) + self.assertEqual(len(name) - len(".parquet"), 64) + + def test_identical_ohlcv_is_deduplicated_across_handles(self): + bt1 = self._backtest_with_ohlcv("BTC/EUR") + # Different algorithm_id but identical OHLCV bytes. + bt2 = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt1, handle="a") + self.store.write(bt2, handle="b") + chunks = sorted((Path(self.tmp) / "ohlcv").iterdir()) + self.assertEqual( + len(chunks), 1, + "identical OHLCV must be stored exactly once", + ) + stats = self.store.ohlcv_stats() + self.assertEqual(stats["stored_blobs"], 1) + self.assertEqual(stats["referenced_blobs"], 1) + self.assertEqual(stats["orphan_blobs"], 0) + self.assertEqual(stats["missing_blobs"], 0) + + def test_different_ohlcv_yields_separate_chunks(self): + self.store.write( + self._backtest_with_ohlcv("BTC/EUR"), handle="btc", + ) + # Different base price -> different bytes -> different hash. + bt2 = self._backtest_with_ohlcv("ETH/EUR") + bt2.ohlcv = _make_ohlcv("ETH/EUR", base=2000.0) + self.store.write(bt2, handle="eth") + chunks = sorted((Path(self.tmp) / "ohlcv").iterdir()) + self.assertEqual(len(chunks), 2) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 2) + + # ------------------------------------------------------------------ + # Round-trip via canonical bundle still resolves OHLCV. + # ------------------------------------------------------------------ + def test_open_resolves_ohlcv_from_shared_store(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="rt") + loaded = self.store.open("rt") + self.assertIn("BTC/EUR:1h", loaded.ohlcv) + df = loaded.ohlcv["BTC/EUR:1h"] + self.assertEqual(len(df), 4) + + # ------------------------------------------------------------------ + # Garbage collection + # ------------------------------------------------------------------ + def test_delete_keeps_shared_ohlcv_until_gc(self): + bt1 = self._backtest_with_ohlcv("BTC/EUR") + bt2 = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt1, handle="a") + self.store.write(bt2, handle="b") + self.store.delete("a") + # The chunk is still referenced by handle 'b' -> must stay. + self.assertEqual( + self.store.ohlcv_stats()["stored_blobs"], 1, + ) + self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0) + + def test_garbage_collect_ohlcv_removes_orphans(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="solo") + # Delete the only referencing bundle. + self.store.delete("solo") + stats = self.store.ohlcv_stats() + self.assertEqual(stats["stored_blobs"], 1) + self.assertEqual(stats["orphan_blobs"], 1) + + # Dry-run lists but does not delete. + orphans = self.store.garbage_collect_ohlcv(dry_run=True) + self.assertEqual(len(orphans), 1) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 1) + + removed = self.store.garbage_collect_ohlcv() + self.assertEqual(len(removed), 1) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0) + self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0) + + # ------------------------------------------------------------------ + # iter_ohlcv_hashes / ohlcv_referenced_hashes — input for the + # dedup-upload negotiate step (docs/design/ohlcv-dedup-protocol.md). + # ------------------------------------------------------------------ + def test_iter_ohlcv_hashes_lists_referenced_chunks(self): + self.store.write( + self._backtest_with_ohlcv("BTC/EUR"), handle="a", + ) + self.store.write( + self._backtest_with_ohlcv("BTC/EUR"), handle="b", + ) + hashes = list(self.store.iter_ohlcv_hashes()) + # Two references to the same hash (one per bundle). + self.assertEqual(len(hashes), 2) + self.assertEqual(len(set(hashes)), 1) + # Hashes are bare hex digests. + for h in hashes: + self.assertEqual(len(h), 64) + self.assertTrue(all(c in "0123456789abcdef" for c in h)) + + def test_referenced_hashes_set_dedups(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="a") + self.store.write(bt, handle="b") + self.assertEqual(len(self.store.ohlcv_referenced_hashes()), 1)