From 0950f1b078b51d2561d0ac19abef91e01b5ebb40 Mon Sep 17 00:00:00 2001 From: marcvanduyn Date: Mon, 11 May 2026 16:09:03 +0200 Subject: [PATCH] feat(store): Tier-3 content-addressed OHLCV chunks in LocalTieredStore (epic #540 phase 3c) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires LocalTieredStore into the existing OHLCV side-store machinery so identical (symbol, timeframe) Parquet bytes are written exactly once and shared across every bundle that references them. - write() now routes save_bundle's OHLCV writes to /ohlcv/ whenever backtest.ohlcv is non-empty. The bundle envelope keeps its content-addressed manifest unchanged, so old bundles remain readable. - open() forwards the same shared directory to open_bundle so OHLCV lookups resolve regardless of what path the bundle was originally written with. - delete() intentionally does NOT touch ohlcv/. Chunks are globally shared; orphans are reclaimed via garbage_collect_ohlcv(dry_run=…). - Introspection helpers required by the dedup-upload protocol (docs/design/ohlcv-dedup-protocol.md): * iter_ohlcv_hashes() / ohlcv_referenced_hashes() * ohlcv_stored_hashes() * ohlcv_stats() -> stored_blobs / stored_bytes / referenced_blobs / orphan_blobs / missing_blobs * garbage_collect_ohlcv(dry_run=False) Manifests are decoded straight from the bundle envelope (_decode_payload) so the cost is one msgpack read per bundle — no full Backtest instantiation. 9 new tests: - No OHLCV -> no chunk dir created. - Identical OHLCV is stored once across distinct handles (dedup). - Different OHLCV yields separate chunks. - Round-trip via store.open() resolves OHLCV from the shared dir. - delete() keeps still-referenced chunks; orphans only after GC. - garbage_collect_ohlcv(dry_run=True) lists without deleting; the real call removes them. - iter_ohlcv_hashes() emits per-reference; ohlcv_referenced_hashes() dedups. - Hash strings are 64-char lowercase hex (matches the upload protocol spec). Targeted suite (backtest_store + backtest_index + cli): 110 / 110 passing. --- .../backtest_store/local_tiered_store.py | 156 ++++++++++++++- .../test_local_tiered_store_ohlcv.py | 182 ++++++++++++++++++ 2 files changed, 333 insertions(+), 5 deletions(-) create mode 100644 tests/services/backtest_store/test_local_tiered_store_ohlcv.py diff --git a/investing_algorithm_framework/services/backtest_store/local_tiered_store.py b/investing_algorithm_framework/services/backtest_store/local_tiered_store.py index 3b9744a8..9865b155 100644 --- a/investing_algorithm_framework/services/backtest_store/local_tiered_store.py +++ b/investing_algorithm_framework/services/backtest_store/local_tiered_store.py @@ -1,5 +1,6 @@ -"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + canonical -``.iafbt`` bundles on a local filesystem (epic #540 phase 3b). +"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + Tier-3 +content-addressed OHLCV chunks + canonical ``.iafbt`` bundles on a +local filesystem (epic #540 phases 3b + 3c). Layout under *root*:: @@ -10,6 +11,8 @@ portfolio_snapshots/run_id=/part-0.parquet trades/run_id=/part-0.parquet orders/run_id=/part-0.parquet + ohlcv/ # Tier-3, content-addressed by SHA-256 + .parquet # shared across every bundle in the store The bundle file is the canonical representation. The Tier-1 index and the Tier-2 Parquet datasets are derived, eagerly maintained, and @@ -61,6 +64,7 @@ INDEX_FILENAME = "index.sqlite" BUNDLES_SUBDIR = "bundles" PARQUET_SUBDIR = "parquet" +OHLCV_SUBDIR = "ohlcv" def _records_to_table(records: List[dict]) -> Optional[Any]: @@ -122,9 +126,12 @@ def __init__(self, root: Union[str, Path]) -> None: self.root.mkdir(parents=True, exist_ok=True) self._bundles_dir = self.root / BUNDLES_SUBDIR self._parquet_dir = self.root / PARQUET_SUBDIR + self._ohlcv_dir = self.root / OHLCV_SUBDIR self._index_path = self.root / INDEX_FILENAME self._bundles_dir.mkdir(parents=True, exist_ok=True) self._parquet_dir.mkdir(parents=True, exist_ok=True) + # Tier-3 dir is created lazily on first OHLCV-bearing write so + # bundles without attached price data leave no empty subdir. # ------------------------------------------------------------------ # Internal helpers @@ -180,8 +187,19 @@ def write( bundle_path = self._bundle_path(bare) bundle_path.parent.mkdir(parents=True, exist_ok=True) - # 1. Canonical bytes — bundle. - backtest.save_bundle(bundle_path) + # 1. Canonical bytes — bundle. When the backtest has OHLCV + # attached, route the side-store to the shared Tier-3 chunk + # directory so identical (symbol, timeframe) Parquet bytes are + # written once and referenced by every bundle that needs them. + has_ohlcv = bool(getattr(backtest, "ohlcv", None)) + if has_ohlcv: + backtest.save_bundle( + bundle_path, + include_ohlcv=True, + ohlcv_store=self._ohlcv_dir, + ) + else: + backtest.save_bundle(bundle_path) # 2. Tier-1 — SQLite row. stat = bundle_path.stat() @@ -242,7 +260,16 @@ def open( path = self._bundle_path(handle) if not path.is_file(): raise StoreHandleNotFoundError(handle) - return Backtest.open(str(path), summary_only=summary_only) + # Route OHLCV resolution through the shared Tier-3 chunk + # directory regardless of what path the bundle was written + # with. Bundles without OHLCV ignore the override. + from investing_algorithm_framework.domain.backtesting.bundle \ + import open_bundle as _open_bundle + return _open_bundle( + str(path), + ohlcv_store=self._ohlcv_dir, + summary_only=summary_only, + ) def exists(self, handle: StoreHandle) -> bool: try: @@ -380,6 +407,10 @@ def rebuild_index(self) -> int: Useful when sidecars were edited out-of-band or after a software upgrade adds new index columns. Returns the number of rows written. + + Tier-3 OHLCV chunks are *not* touched \u2014 they are + content-addressed and globally shared. Use + :meth:`garbage_collect_ohlcv` to reclaim orphans. """ if self._index_path.is_file(): self._index_path.unlink() @@ -408,3 +439,118 @@ def rebuild_index(self) -> int: def __repr__(self) -> str: return f"LocalTieredStore(root={str(self.root)!r})" + + # ------------------------------------------------------------------ + # Tier-3 — content-addressed OHLCV chunks + # ------------------------------------------------------------------ + def _read_ohlcv_manifest(self, handle: StoreHandle) -> dict: + """Return the ``{key: .parquet}`` manifest stored in the + bundle envelope, or an empty dict if the bundle has no OHLCV. + + Decodes the envelope directly so we avoid the cost of + instantiating a full :class:`Backtest`. + """ + from investing_algorithm_framework.domain.backtesting.bundle \ + import _decode_payload # noqa: WPS437 — internal helper + path = self._bundle_path(handle) + if not path.is_file(): + raise StoreHandleNotFoundError(handle) + try: + _, doc = _decode_payload(path.read_bytes()) + except Exception as exc: + logger.warning( + "Tier-3 manifest read failed for %s: %s", handle, exc, + ) + return {} + meta = doc.get("ohlcv") or {} + return dict(meta.get("manifest") or {}) + + def iter_ohlcv_hashes(self) -> Iterator[str]: + """Yield every OHLCV chunk hash referenced by any bundle. + + Hashes are emitted with possible duplicates (one per + ``(handle, key)`` reference); de-duplicate in the caller if + needed. Useful as the input for the dedup-upload negotiate + step in ``docs/design/ohlcv-dedup-protocol.md``. + """ + for handle in self.iter_handles(): + for rel in self._read_ohlcv_manifest(handle).values(): + if rel.endswith(".parquet"): + yield rel[: -len(".parquet")] + else: + yield rel + + def ohlcv_referenced_hashes(self) -> set: + """Return the de-duplicated set of OHLCV hashes referenced + across all bundles in the store. + """ + return set(self.iter_ohlcv_hashes()) + + def ohlcv_stored_hashes(self) -> set: + """Return the set of OHLCV hashes physically present on disk + under ``/ohlcv/``. + """ + if not self._ohlcv_dir.is_dir(): + return set() + out: set = set() + for p in self._ohlcv_dir.iterdir(): + if p.is_file() and p.suffix == ".parquet": + out.add(p.stem) + return out + + def ohlcv_stats(self) -> dict: + """Return a snapshot of the Tier-3 store. + + Keys: ``stored_blobs`` (int), ``stored_bytes`` (int), + ``referenced_blobs`` (int), ``orphan_blobs`` (int), + ``missing_blobs`` (int — referenced but absent on disk, + should be 0 in a healthy store). + """ + stored = self.ohlcv_stored_hashes() + referenced = self.ohlcv_referenced_hashes() + stored_bytes = 0 + if self._ohlcv_dir.is_dir(): + for p in self._ohlcv_dir.iterdir(): + if p.is_file() and p.suffix == ".parquet": + try: + stored_bytes += p.stat().st_size + except OSError: # pragma: no cover + pass + return { + "stored_blobs": len(stored), + "stored_bytes": stored_bytes, + "referenced_blobs": len(referenced), + "orphan_blobs": len(stored - referenced), + "missing_blobs": len(referenced - stored), + } + + def garbage_collect_ohlcv( + self, *, dry_run: bool = False, + ) -> List[str]: + """Remove OHLCV chunks that no bundle in the store references. + + Args: + dry_run: When True, no files are deleted; the list of + orphans is returned so callers can audit before + committing. + + Returns: + The list of hashes that were (or would be) removed. + """ + stored = self.ohlcv_stored_hashes() + referenced = self.ohlcv_referenced_hashes() + orphans = sorted(stored - referenced) + if dry_run: + return orphans + for digest in orphans: + path = self._ohlcv_dir / f"{digest}.parquet" + try: + path.unlink() + except FileNotFoundError: # pragma: no cover - race + pass + except OSError as exc: # pragma: no cover - logged + logger.warning( + "garbage_collect_ohlcv: failed to remove %s: %s", + path, exc, + ) + return orphans diff --git a/tests/services/backtest_store/test_local_tiered_store_ohlcv.py b/tests/services/backtest_store/test_local_tiered_store_ohlcv.py new file mode 100644 index 00000000..18adfdad --- /dev/null +++ b/tests/services/backtest_store/test_local_tiered_store_ohlcv.py @@ -0,0 +1,182 @@ +"""Tests for Tier-3 content-addressed OHLCV chunks in +:class:`LocalTieredStore` (epic #540 phase 3c).""" +from __future__ import annotations + +import os +import shutil +import tempfile +from copy import deepcopy +from pathlib import Path +from unittest import TestCase + +import pandas as pd + +from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT +from investing_algorithm_framework.services.backtest_store import ( + LocalTieredStore, +) + + +_FIXTURE = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "resources", + "backtest_reports_for_testing", + "test_algorithm_backtest", +) + + +def _make_ohlcv(symbol: str, *, n: int = 4, base: float = 100.0) -> dict: + idx = pd.date_range("2024-01-01", periods=n, freq="h") + df = pd.DataFrame( + { + "Datetime": idx, + "Open": [base + i for i in range(n)], + "High": [base + i + 0.5 for i in range(n)], + "Low": [base + i - 0.5 for i in range(n)], + "Close": [base + i + 0.25 for i in range(n)], + "Volume": [1000 + i for i in range(n)], + } + ) + return {f"{symbol}:1h": df} + + +class TestLocalTieredStoreOhlcv(TestCase): + """OHLCV content-addressed chunk store (Tier-3).""" + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.store = LocalTieredStore(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _backtest_with_ohlcv(self, symbol: str) -> Backtest: + # Deep-copy the fixture and attach a synthetic OHLCV blob. + bt = deepcopy(self.fixture) + bt.ohlcv = _make_ohlcv(symbol) + return bt + + # ------------------------------------------------------------------ + # Bundles without OHLCV must not create the chunk dir. + # ------------------------------------------------------------------ + def test_no_ohlcv_no_chunk_dir(self): + self.store.write(self.fixture, handle="plain") + self.assertFalse((Path(self.tmp) / "ohlcv").is_dir()) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0) + + # ------------------------------------------------------------------ + # Tier-3 layout & dedup + # ------------------------------------------------------------------ + def test_ohlcv_chunks_written_to_shared_store(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="r1") + chunks = list((Path(self.tmp) / "ohlcv").iterdir()) + self.assertEqual(len(chunks), 1) + # Filename is .parquet — 64 hex chars + .parquet. + name = chunks[0].name + self.assertTrue(name.endswith(".parquet")) + self.assertEqual(len(name) - len(".parquet"), 64) + + def test_identical_ohlcv_is_deduplicated_across_handles(self): + bt1 = self._backtest_with_ohlcv("BTC/EUR") + # Different algorithm_id but identical OHLCV bytes. + bt2 = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt1, handle="a") + self.store.write(bt2, handle="b") + chunks = sorted((Path(self.tmp) / "ohlcv").iterdir()) + self.assertEqual( + len(chunks), 1, + "identical OHLCV must be stored exactly once", + ) + stats = self.store.ohlcv_stats() + self.assertEqual(stats["stored_blobs"], 1) + self.assertEqual(stats["referenced_blobs"], 1) + self.assertEqual(stats["orphan_blobs"], 0) + self.assertEqual(stats["missing_blobs"], 0) + + def test_different_ohlcv_yields_separate_chunks(self): + self.store.write( + self._backtest_with_ohlcv("BTC/EUR"), handle="btc", + ) + # Different base price -> different bytes -> different hash. + bt2 = self._backtest_with_ohlcv("ETH/EUR") + bt2.ohlcv = _make_ohlcv("ETH/EUR", base=2000.0) + self.store.write(bt2, handle="eth") + chunks = sorted((Path(self.tmp) / "ohlcv").iterdir()) + self.assertEqual(len(chunks), 2) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 2) + + # ------------------------------------------------------------------ + # Round-trip via canonical bundle still resolves OHLCV. + # ------------------------------------------------------------------ + def test_open_resolves_ohlcv_from_shared_store(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="rt") + loaded = self.store.open("rt") + self.assertIn("BTC/EUR:1h", loaded.ohlcv) + df = loaded.ohlcv["BTC/EUR:1h"] + self.assertEqual(len(df), 4) + + # ------------------------------------------------------------------ + # Garbage collection + # ------------------------------------------------------------------ + def test_delete_keeps_shared_ohlcv_until_gc(self): + bt1 = self._backtest_with_ohlcv("BTC/EUR") + bt2 = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt1, handle="a") + self.store.write(bt2, handle="b") + self.store.delete("a") + # The chunk is still referenced by handle 'b' -> must stay. + self.assertEqual( + self.store.ohlcv_stats()["stored_blobs"], 1, + ) + self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0) + + def test_garbage_collect_ohlcv_removes_orphans(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="solo") + # Delete the only referencing bundle. + self.store.delete("solo") + stats = self.store.ohlcv_stats() + self.assertEqual(stats["stored_blobs"], 1) + self.assertEqual(stats["orphan_blobs"], 1) + + # Dry-run lists but does not delete. + orphans = self.store.garbage_collect_ohlcv(dry_run=True) + self.assertEqual(len(orphans), 1) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 1) + + removed = self.store.garbage_collect_ohlcv() + self.assertEqual(len(removed), 1) + self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0) + self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0) + + # ------------------------------------------------------------------ + # iter_ohlcv_hashes / ohlcv_referenced_hashes — input for the + # dedup-upload negotiate step (docs/design/ohlcv-dedup-protocol.md). + # ------------------------------------------------------------------ + def test_iter_ohlcv_hashes_lists_referenced_chunks(self): + self.store.write( + self._backtest_with_ohlcv("BTC/EUR"), handle="a", + ) + self.store.write( + self._backtest_with_ohlcv("BTC/EUR"), handle="b", + ) + hashes = list(self.store.iter_ohlcv_hashes()) + # Two references to the same hash (one per bundle). + self.assertEqual(len(hashes), 2) + self.assertEqual(len(set(hashes)), 1) + # Hashes are bare hex digests. + for h in hashes: + self.assertEqual(len(h), 64) + self.assertTrue(all(c in "0123456789abcdef" for c in h)) + + def test_referenced_hashes_set_dedups(self): + bt = self._backtest_with_ohlcv("BTC/EUR") + self.store.write(bt, handle="a") + self.store.write(bt, handle="b") + self.assertEqual(len(self.store.ohlcv_referenced_hashes()), 1)