Skip to content

Commit 0950f1b

Browse files
committed
feat(store): Tier-3 content-addressed OHLCV chunks in LocalTieredStore (epic #540 phase 3c)
Wires LocalTieredStore into the existing OHLCV side-store machinery so identical (symbol, timeframe) Parquet bytes are written exactly once and shared across every bundle that references them. - write() now routes save_bundle's OHLCV writes to <root>/ohlcv/ whenever backtest.ohlcv is non-empty. The bundle envelope keeps its content-addressed manifest unchanged, so old bundles remain readable. - open() forwards the same shared directory to open_bundle so OHLCV lookups resolve regardless of what path the bundle was originally written with. - delete() intentionally does NOT touch ohlcv/. Chunks are globally shared; orphans are reclaimed via garbage_collect_ohlcv(dry_run=…). - Introspection helpers required by the dedup-upload protocol (docs/design/ohlcv-dedup-protocol.md): * iter_ohlcv_hashes() / ohlcv_referenced_hashes() * ohlcv_stored_hashes() * ohlcv_stats() -> stored_blobs / stored_bytes / referenced_blobs / orphan_blobs / missing_blobs * garbage_collect_ohlcv(dry_run=False) Manifests are decoded straight from the bundle envelope (_decode_payload) so the cost is one msgpack read per bundle — no full Backtest instantiation. 9 new tests: - No OHLCV -> no chunk dir created. - Identical OHLCV is stored once across distinct handles (dedup). - Different OHLCV yields separate chunks. - Round-trip via store.open() resolves OHLCV from the shared dir. - delete() keeps still-referenced chunks; orphans only after GC. - garbage_collect_ohlcv(dry_run=True) lists without deleting; the real call removes them. - iter_ohlcv_hashes() emits per-reference; ohlcv_referenced_hashes() dedups. - Hash strings are 64-char lowercase hex (matches the upload protocol spec). Targeted suite (backtest_store + backtest_index + cli): 110 / 110 passing.
1 parent f4072bb commit 0950f1b

2 files changed

Lines changed: 333 additions & 5 deletions

File tree

investing_algorithm_framework/services/backtest_store/local_tiered_store.py

Lines changed: 151 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + canonical
2-
``.iafbt`` bundles on a local filesystem (epic #540 phase 3b).
1+
"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + Tier-3
2+
content-addressed OHLCV chunks + canonical ``.iafbt`` bundles on a
3+
local filesystem (epic #540 phases 3b + 3c).
34
45
Layout under *root*::
56
@@ -10,6 +11,8 @@
1011
portfolio_snapshots/run_id=<handle>/part-0.parquet
1112
trades/run_id=<handle>/part-0.parquet
1213
orders/run_id=<handle>/part-0.parquet
14+
ohlcv/ # Tier-3, content-addressed by SHA-256
15+
<sha256>.parquet # shared across every bundle in the store
1316
1417
The bundle file is the canonical representation. The Tier-1 index and
1518
the Tier-2 Parquet datasets are derived, eagerly maintained, and
@@ -61,6 +64,7 @@
6164
INDEX_FILENAME = "index.sqlite"
6265
BUNDLES_SUBDIR = "bundles"
6366
PARQUET_SUBDIR = "parquet"
67+
OHLCV_SUBDIR = "ohlcv"
6468

6569

6670
def _records_to_table(records: List[dict]) -> Optional[Any]:
@@ -122,9 +126,12 @@ def __init__(self, root: Union[str, Path]) -> None:
122126
self.root.mkdir(parents=True, exist_ok=True)
123127
self._bundles_dir = self.root / BUNDLES_SUBDIR
124128
self._parquet_dir = self.root / PARQUET_SUBDIR
129+
self._ohlcv_dir = self.root / OHLCV_SUBDIR
125130
self._index_path = self.root / INDEX_FILENAME
126131
self._bundles_dir.mkdir(parents=True, exist_ok=True)
127132
self._parquet_dir.mkdir(parents=True, exist_ok=True)
133+
# Tier-3 dir is created lazily on first OHLCV-bearing write so
134+
# bundles without attached price data leave no empty subdir.
128135

129136
# ------------------------------------------------------------------
130137
# Internal helpers
@@ -180,8 +187,19 @@ def write(
180187
bundle_path = self._bundle_path(bare)
181188
bundle_path.parent.mkdir(parents=True, exist_ok=True)
182189

183-
# 1. Canonical bytes — bundle.
184-
backtest.save_bundle(bundle_path)
190+
# 1. Canonical bytes — bundle. When the backtest has OHLCV
191+
# attached, route the side-store to the shared Tier-3 chunk
192+
# directory so identical (symbol, timeframe) Parquet bytes are
193+
# written once and referenced by every bundle that needs them.
194+
has_ohlcv = bool(getattr(backtest, "ohlcv", None))
195+
if has_ohlcv:
196+
backtest.save_bundle(
197+
bundle_path,
198+
include_ohlcv=True,
199+
ohlcv_store=self._ohlcv_dir,
200+
)
201+
else:
202+
backtest.save_bundle(bundle_path)
185203

186204
# 2. Tier-1 — SQLite row.
187205
stat = bundle_path.stat()
@@ -242,7 +260,16 @@ def open(
242260
path = self._bundle_path(handle)
243261
if not path.is_file():
244262
raise StoreHandleNotFoundError(handle)
245-
return Backtest.open(str(path), summary_only=summary_only)
263+
# Route OHLCV resolution through the shared Tier-3 chunk
264+
# directory regardless of what path the bundle was written
265+
# with. Bundles without OHLCV ignore the override.
266+
from investing_algorithm_framework.domain.backtesting.bundle \
267+
import open_bundle as _open_bundle
268+
return _open_bundle(
269+
str(path),
270+
ohlcv_store=self._ohlcv_dir,
271+
summary_only=summary_only,
272+
)
246273

247274
def exists(self, handle: StoreHandle) -> bool:
248275
try:
@@ -380,6 +407,10 @@ def rebuild_index(self) -> int:
380407
Useful when sidecars were edited out-of-band or after a
381408
software upgrade adds new index columns. Returns the number of
382409
rows written.
410+
411+
Tier-3 OHLCV chunks are *not* touched \u2014 they are
412+
content-addressed and globally shared. Use
413+
:meth:`garbage_collect_ohlcv` to reclaim orphans.
383414
"""
384415
if self._index_path.is_file():
385416
self._index_path.unlink()
@@ -408,3 +439,118 @@ def rebuild_index(self) -> int:
408439

409440
def __repr__(self) -> str:
410441
return f"LocalTieredStore(root={str(self.root)!r})"
442+
443+
# ------------------------------------------------------------------
444+
# Tier-3 — content-addressed OHLCV chunks
445+
# ------------------------------------------------------------------
446+
def _read_ohlcv_manifest(self, handle: StoreHandle) -> dict:
447+
"""Return the ``{key: <sha>.parquet}`` manifest stored in the
448+
bundle envelope, or an empty dict if the bundle has no OHLCV.
449+
450+
Decodes the envelope directly so we avoid the cost of
451+
instantiating a full :class:`Backtest`.
452+
"""
453+
from investing_algorithm_framework.domain.backtesting.bundle \
454+
import _decode_payload # noqa: WPS437 — internal helper
455+
path = self._bundle_path(handle)
456+
if not path.is_file():
457+
raise StoreHandleNotFoundError(handle)
458+
try:
459+
_, doc = _decode_payload(path.read_bytes())
460+
except Exception as exc:
461+
logger.warning(
462+
"Tier-3 manifest read failed for %s: %s", handle, exc,
463+
)
464+
return {}
465+
meta = doc.get("ohlcv") or {}
466+
return dict(meta.get("manifest") or {})
467+
468+
def iter_ohlcv_hashes(self) -> Iterator[str]:
469+
"""Yield every OHLCV chunk hash referenced by any bundle.
470+
471+
Hashes are emitted with possible duplicates (one per
472+
``(handle, key)`` reference); de-duplicate in the caller if
473+
needed. Useful as the input for the dedup-upload negotiate
474+
step in ``docs/design/ohlcv-dedup-protocol.md``.
475+
"""
476+
for handle in self.iter_handles():
477+
for rel in self._read_ohlcv_manifest(handle).values():
478+
if rel.endswith(".parquet"):
479+
yield rel[: -len(".parquet")]
480+
else:
481+
yield rel
482+
483+
def ohlcv_referenced_hashes(self) -> set:
484+
"""Return the de-duplicated set of OHLCV hashes referenced
485+
across all bundles in the store.
486+
"""
487+
return set(self.iter_ohlcv_hashes())
488+
489+
def ohlcv_stored_hashes(self) -> set:
490+
"""Return the set of OHLCV hashes physically present on disk
491+
under ``<root>/ohlcv/``.
492+
"""
493+
if not self._ohlcv_dir.is_dir():
494+
return set()
495+
out: set = set()
496+
for p in self._ohlcv_dir.iterdir():
497+
if p.is_file() and p.suffix == ".parquet":
498+
out.add(p.stem)
499+
return out
500+
501+
def ohlcv_stats(self) -> dict:
502+
"""Return a snapshot of the Tier-3 store.
503+
504+
Keys: ``stored_blobs`` (int), ``stored_bytes`` (int),
505+
``referenced_blobs`` (int), ``orphan_blobs`` (int),
506+
``missing_blobs`` (int — referenced but absent on disk,
507+
should be 0 in a healthy store).
508+
"""
509+
stored = self.ohlcv_stored_hashes()
510+
referenced = self.ohlcv_referenced_hashes()
511+
stored_bytes = 0
512+
if self._ohlcv_dir.is_dir():
513+
for p in self._ohlcv_dir.iterdir():
514+
if p.is_file() and p.suffix == ".parquet":
515+
try:
516+
stored_bytes += p.stat().st_size
517+
except OSError: # pragma: no cover
518+
pass
519+
return {
520+
"stored_blobs": len(stored),
521+
"stored_bytes": stored_bytes,
522+
"referenced_blobs": len(referenced),
523+
"orphan_blobs": len(stored - referenced),
524+
"missing_blobs": len(referenced - stored),
525+
}
526+
527+
def garbage_collect_ohlcv(
528+
self, *, dry_run: bool = False,
529+
) -> List[str]:
530+
"""Remove OHLCV chunks that no bundle in the store references.
531+
532+
Args:
533+
dry_run: When True, no files are deleted; the list of
534+
orphans is returned so callers can audit before
535+
committing.
536+
537+
Returns:
538+
The list of hashes that were (or would be) removed.
539+
"""
540+
stored = self.ohlcv_stored_hashes()
541+
referenced = self.ohlcv_referenced_hashes()
542+
orphans = sorted(stored - referenced)
543+
if dry_run:
544+
return orphans
545+
for digest in orphans:
546+
path = self._ohlcv_dir / f"{digest}.parquet"
547+
try:
548+
path.unlink()
549+
except FileNotFoundError: # pragma: no cover - race
550+
pass
551+
except OSError as exc: # pragma: no cover - logged
552+
logger.warning(
553+
"garbage_collect_ohlcv: failed to remove %s: %s",
554+
path, exc,
555+
)
556+
return orphans
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""Tests for Tier-3 content-addressed OHLCV chunks in
2+
:class:`LocalTieredStore` (epic #540 phase 3c)."""
3+
from __future__ import annotations
4+
5+
import os
6+
import shutil
7+
import tempfile
8+
from copy import deepcopy
9+
from pathlib import Path
10+
from unittest import TestCase
11+
12+
import pandas as pd
13+
14+
from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT
15+
from investing_algorithm_framework.services.backtest_store import (
16+
LocalTieredStore,
17+
)
18+
19+
20+
_FIXTURE = os.path.join(
21+
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
22+
"resources",
23+
"backtest_reports_for_testing",
24+
"test_algorithm_backtest",
25+
)
26+
27+
28+
def _make_ohlcv(symbol: str, *, n: int = 4, base: float = 100.0) -> dict:
29+
idx = pd.date_range("2024-01-01", periods=n, freq="h")
30+
df = pd.DataFrame(
31+
{
32+
"Datetime": idx,
33+
"Open": [base + i for i in range(n)],
34+
"High": [base + i + 0.5 for i in range(n)],
35+
"Low": [base + i - 0.5 for i in range(n)],
36+
"Close": [base + i + 0.25 for i in range(n)],
37+
"Volume": [1000 + i for i in range(n)],
38+
}
39+
)
40+
return {f"{symbol}:1h": df}
41+
42+
43+
class TestLocalTieredStoreOhlcv(TestCase):
44+
"""OHLCV content-addressed chunk store (Tier-3)."""
45+
46+
@classmethod
47+
def setUpClass(cls):
48+
cls.fixture = Backtest.open(_FIXTURE)
49+
50+
def setUp(self):
51+
self.tmp = tempfile.mkdtemp()
52+
self.store = LocalTieredStore(self.tmp)
53+
54+
def tearDown(self):
55+
shutil.rmtree(self.tmp, ignore_errors=True)
56+
57+
def _backtest_with_ohlcv(self, symbol: str) -> Backtest:
58+
# Deep-copy the fixture and attach a synthetic OHLCV blob.
59+
bt = deepcopy(self.fixture)
60+
bt.ohlcv = _make_ohlcv(symbol)
61+
return bt
62+
63+
# ------------------------------------------------------------------
64+
# Bundles without OHLCV must not create the chunk dir.
65+
# ------------------------------------------------------------------
66+
def test_no_ohlcv_no_chunk_dir(self):
67+
self.store.write(self.fixture, handle="plain")
68+
self.assertFalse((Path(self.tmp) / "ohlcv").is_dir())
69+
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0)
70+
71+
# ------------------------------------------------------------------
72+
# Tier-3 layout & dedup
73+
# ------------------------------------------------------------------
74+
def test_ohlcv_chunks_written_to_shared_store(self):
75+
bt = self._backtest_with_ohlcv("BTC/EUR")
76+
self.store.write(bt, handle="r1")
77+
chunks = list((Path(self.tmp) / "ohlcv").iterdir())
78+
self.assertEqual(len(chunks), 1)
79+
# Filename is <sha256>.parquet — 64 hex chars + .parquet.
80+
name = chunks[0].name
81+
self.assertTrue(name.endswith(".parquet"))
82+
self.assertEqual(len(name) - len(".parquet"), 64)
83+
84+
def test_identical_ohlcv_is_deduplicated_across_handles(self):
85+
bt1 = self._backtest_with_ohlcv("BTC/EUR")
86+
# Different algorithm_id but identical OHLCV bytes.
87+
bt2 = self._backtest_with_ohlcv("BTC/EUR")
88+
self.store.write(bt1, handle="a")
89+
self.store.write(bt2, handle="b")
90+
chunks = sorted((Path(self.tmp) / "ohlcv").iterdir())
91+
self.assertEqual(
92+
len(chunks), 1,
93+
"identical OHLCV must be stored exactly once",
94+
)
95+
stats = self.store.ohlcv_stats()
96+
self.assertEqual(stats["stored_blobs"], 1)
97+
self.assertEqual(stats["referenced_blobs"], 1)
98+
self.assertEqual(stats["orphan_blobs"], 0)
99+
self.assertEqual(stats["missing_blobs"], 0)
100+
101+
def test_different_ohlcv_yields_separate_chunks(self):
102+
self.store.write(
103+
self._backtest_with_ohlcv("BTC/EUR"), handle="btc",
104+
)
105+
# Different base price -> different bytes -> different hash.
106+
bt2 = self._backtest_with_ohlcv("ETH/EUR")
107+
bt2.ohlcv = _make_ohlcv("ETH/EUR", base=2000.0)
108+
self.store.write(bt2, handle="eth")
109+
chunks = sorted((Path(self.tmp) / "ohlcv").iterdir())
110+
self.assertEqual(len(chunks), 2)
111+
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 2)
112+
113+
# ------------------------------------------------------------------
114+
# Round-trip via canonical bundle still resolves OHLCV.
115+
# ------------------------------------------------------------------
116+
def test_open_resolves_ohlcv_from_shared_store(self):
117+
bt = self._backtest_with_ohlcv("BTC/EUR")
118+
self.store.write(bt, handle="rt")
119+
loaded = self.store.open("rt")
120+
self.assertIn("BTC/EUR:1h", loaded.ohlcv)
121+
df = loaded.ohlcv["BTC/EUR:1h"]
122+
self.assertEqual(len(df), 4)
123+
124+
# ------------------------------------------------------------------
125+
# Garbage collection
126+
# ------------------------------------------------------------------
127+
def test_delete_keeps_shared_ohlcv_until_gc(self):
128+
bt1 = self._backtest_with_ohlcv("BTC/EUR")
129+
bt2 = self._backtest_with_ohlcv("BTC/EUR")
130+
self.store.write(bt1, handle="a")
131+
self.store.write(bt2, handle="b")
132+
self.store.delete("a")
133+
# The chunk is still referenced by handle 'b' -> must stay.
134+
self.assertEqual(
135+
self.store.ohlcv_stats()["stored_blobs"], 1,
136+
)
137+
self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0)
138+
139+
def test_garbage_collect_ohlcv_removes_orphans(self):
140+
bt = self._backtest_with_ohlcv("BTC/EUR")
141+
self.store.write(bt, handle="solo")
142+
# Delete the only referencing bundle.
143+
self.store.delete("solo")
144+
stats = self.store.ohlcv_stats()
145+
self.assertEqual(stats["stored_blobs"], 1)
146+
self.assertEqual(stats["orphan_blobs"], 1)
147+
148+
# Dry-run lists but does not delete.
149+
orphans = self.store.garbage_collect_ohlcv(dry_run=True)
150+
self.assertEqual(len(orphans), 1)
151+
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 1)
152+
153+
removed = self.store.garbage_collect_ohlcv()
154+
self.assertEqual(len(removed), 1)
155+
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0)
156+
self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0)
157+
158+
# ------------------------------------------------------------------
159+
# iter_ohlcv_hashes / ohlcv_referenced_hashes — input for the
160+
# dedup-upload negotiate step (docs/design/ohlcv-dedup-protocol.md).
161+
# ------------------------------------------------------------------
162+
def test_iter_ohlcv_hashes_lists_referenced_chunks(self):
163+
self.store.write(
164+
self._backtest_with_ohlcv("BTC/EUR"), handle="a",
165+
)
166+
self.store.write(
167+
self._backtest_with_ohlcv("BTC/EUR"), handle="b",
168+
)
169+
hashes = list(self.store.iter_ohlcv_hashes())
170+
# Two references to the same hash (one per bundle).
171+
self.assertEqual(len(hashes), 2)
172+
self.assertEqual(len(set(hashes)), 1)
173+
# Hashes are bare hex digests.
174+
for h in hashes:
175+
self.assertEqual(len(h), 64)
176+
self.assertTrue(all(c in "0123456789abcdef" for c in h))
177+
178+
def test_referenced_hashes_set_dedups(self):
179+
bt = self._backtest_with_ohlcv("BTC/EUR")
180+
self.store.write(bt, handle="a")
181+
self.store.write(bt, handle="b")
182+
self.assertEqual(len(self.store.ohlcv_referenced_hashes()), 1)

0 commit comments

Comments
 (0)