Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + canonical
``.iafbt`` bundles on a local filesystem (epic #540 phase 3b).
"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + Tier-3
content-addressed OHLCV chunks + canonical ``.iafbt`` bundles on a
local filesystem (epic #540 phases 3b + 3c).

Layout under *root*::

Expand All @@ -10,6 +11,8 @@
portfolio_snapshots/run_id=<handle>/part-0.parquet
trades/run_id=<handle>/part-0.parquet
orders/run_id=<handle>/part-0.parquet
ohlcv/ # Tier-3, content-addressed by SHA-256
<sha256>.parquet # shared across every bundle in the store

The bundle file is the canonical representation. The Tier-1 index and
the Tier-2 Parquet datasets are derived, eagerly maintained, and
Expand Down Expand Up @@ -61,6 +64,7 @@
INDEX_FILENAME = "index.sqlite"
BUNDLES_SUBDIR = "bundles"
PARQUET_SUBDIR = "parquet"
OHLCV_SUBDIR = "ohlcv"


def _records_to_table(records: List[dict]) -> Optional[Any]:
Expand Down Expand Up @@ -122,9 +126,12 @@ def __init__(self, root: Union[str, Path]) -> None:
self.root.mkdir(parents=True, exist_ok=True)
self._bundles_dir = self.root / BUNDLES_SUBDIR
self._parquet_dir = self.root / PARQUET_SUBDIR
self._ohlcv_dir = self.root / OHLCV_SUBDIR
self._index_path = self.root / INDEX_FILENAME
self._bundles_dir.mkdir(parents=True, exist_ok=True)
self._parquet_dir.mkdir(parents=True, exist_ok=True)
# Tier-3 dir is created lazily on first OHLCV-bearing write so
# bundles without attached price data leave no empty subdir.

# ------------------------------------------------------------------
# Internal helpers
Expand Down Expand Up @@ -180,8 +187,19 @@ def write(
bundle_path = self._bundle_path(bare)
bundle_path.parent.mkdir(parents=True, exist_ok=True)

# 1. Canonical bytes — bundle.
backtest.save_bundle(bundle_path)
# 1. Canonical bytes — bundle. When the backtest has OHLCV
# attached, route the side-store to the shared Tier-3 chunk
# directory so identical (symbol, timeframe) Parquet bytes are
# written once and referenced by every bundle that needs them.
has_ohlcv = bool(getattr(backtest, "ohlcv", None))
if has_ohlcv:
backtest.save_bundle(
bundle_path,
include_ohlcv=True,
ohlcv_store=self._ohlcv_dir,
)
else:
backtest.save_bundle(bundle_path)

# 2. Tier-1 — SQLite row.
stat = bundle_path.stat()
Expand Down Expand Up @@ -242,7 +260,16 @@ def open(
path = self._bundle_path(handle)
if not path.is_file():
raise StoreHandleNotFoundError(handle)
return Backtest.open(str(path), summary_only=summary_only)
# Route OHLCV resolution through the shared Tier-3 chunk
# directory regardless of what path the bundle was written
# with. Bundles without OHLCV ignore the override.
from investing_algorithm_framework.domain.backtesting.bundle \
import open_bundle as _open_bundle
return _open_bundle(
str(path),
ohlcv_store=self._ohlcv_dir,
summary_only=summary_only,
)

def exists(self, handle: StoreHandle) -> bool:
try:
Expand Down Expand Up @@ -380,6 +407,10 @@ def rebuild_index(self) -> int:
Useful when sidecars were edited out-of-band or after a
software upgrade adds new index columns. Returns the number of
rows written.

Tier-3 OHLCV chunks are *not* touched \u2014 they are
content-addressed and globally shared. Use
:meth:`garbage_collect_ohlcv` to reclaim orphans.
"""
if self._index_path.is_file():
self._index_path.unlink()
Expand Down Expand Up @@ -408,3 +439,118 @@ def rebuild_index(self) -> int:

def __repr__(self) -> str:
return f"LocalTieredStore(root={str(self.root)!r})"

# ------------------------------------------------------------------
# Tier-3 — content-addressed OHLCV chunks
# ------------------------------------------------------------------
def _read_ohlcv_manifest(self, handle: StoreHandle) -> dict:
"""Return the ``{key: <sha>.parquet}`` manifest stored in the
bundle envelope, or an empty dict if the bundle has no OHLCV.

Decodes the envelope directly so we avoid the cost of
instantiating a full :class:`Backtest`.
"""
from investing_algorithm_framework.domain.backtesting.bundle \
import _decode_payload # noqa: WPS437 — internal helper
path = self._bundle_path(handle)
if not path.is_file():
raise StoreHandleNotFoundError(handle)
try:
_, doc = _decode_payload(path.read_bytes())
except Exception as exc:
logger.warning(
"Tier-3 manifest read failed for %s: %s", handle, exc,
)
return {}
meta = doc.get("ohlcv") or {}
return dict(meta.get("manifest") or {})

def iter_ohlcv_hashes(self) -> Iterator[str]:
"""Yield every OHLCV chunk hash referenced by any bundle.

Hashes are emitted with possible duplicates (one per
``(handle, key)`` reference); de-duplicate in the caller if
needed. Useful as the input for the dedup-upload negotiate
step in ``docs/design/ohlcv-dedup-protocol.md``.
"""
for handle in self.iter_handles():
for rel in self._read_ohlcv_manifest(handle).values():
if rel.endswith(".parquet"):
yield rel[: -len(".parquet")]
else:
yield rel

def ohlcv_referenced_hashes(self) -> set:
"""Return the de-duplicated set of OHLCV hashes referenced
across all bundles in the store.
"""
return set(self.iter_ohlcv_hashes())

def ohlcv_stored_hashes(self) -> set:
"""Return the set of OHLCV hashes physically present on disk
under ``<root>/ohlcv/``.
"""
if not self._ohlcv_dir.is_dir():
return set()
out: set = set()
for p in self._ohlcv_dir.iterdir():
if p.is_file() and p.suffix == ".parquet":
out.add(p.stem)
return out

def ohlcv_stats(self) -> dict:
"""Return a snapshot of the Tier-3 store.

Keys: ``stored_blobs`` (int), ``stored_bytes`` (int),
``referenced_blobs`` (int), ``orphan_blobs`` (int),
``missing_blobs`` (int — referenced but absent on disk,
should be 0 in a healthy store).
"""
stored = self.ohlcv_stored_hashes()
referenced = self.ohlcv_referenced_hashes()
stored_bytes = 0
if self._ohlcv_dir.is_dir():
for p in self._ohlcv_dir.iterdir():
if p.is_file() and p.suffix == ".parquet":
try:
stored_bytes += p.stat().st_size
except OSError: # pragma: no cover
pass
return {
"stored_blobs": len(stored),
"stored_bytes": stored_bytes,
"referenced_blobs": len(referenced),
"orphan_blobs": len(stored - referenced),
"missing_blobs": len(referenced - stored),
}

def garbage_collect_ohlcv(
self, *, dry_run: bool = False,
) -> List[str]:
"""Remove OHLCV chunks that no bundle in the store references.

Args:
dry_run: When True, no files are deleted; the list of
orphans is returned so callers can audit before
committing.

Returns:
The list of hashes that were (or would be) removed.
"""
stored = self.ohlcv_stored_hashes()
referenced = self.ohlcv_referenced_hashes()
orphans = sorted(stored - referenced)
if dry_run:
return orphans
for digest in orphans:
path = self._ohlcv_dir / f"{digest}.parquet"
try:
path.unlink()
except FileNotFoundError: # pragma: no cover - race
pass
except OSError as exc: # pragma: no cover - logged
logger.warning(
"garbage_collect_ohlcv: failed to remove %s: %s",
path, exc,
)
return orphans
182 changes: 182 additions & 0 deletions tests/services/backtest_store/test_local_tiered_store_ohlcv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Tests for Tier-3 content-addressed OHLCV chunks in
:class:`LocalTieredStore` (epic #540 phase 3c)."""
from __future__ import annotations

import os
import shutil
import tempfile
from copy import deepcopy
from pathlib import Path
from unittest import TestCase

import pandas as pd

from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT
from investing_algorithm_framework.services.backtest_store import (
LocalTieredStore,
)


_FIXTURE = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"resources",
"backtest_reports_for_testing",
"test_algorithm_backtest",
)


def _make_ohlcv(symbol: str, *, n: int = 4, base: float = 100.0) -> dict:
idx = pd.date_range("2024-01-01", periods=n, freq="h")
df = pd.DataFrame(
{
"Datetime": idx,
"Open": [base + i for i in range(n)],
"High": [base + i + 0.5 for i in range(n)],
"Low": [base + i - 0.5 for i in range(n)],
"Close": [base + i + 0.25 for i in range(n)],
"Volume": [1000 + i for i in range(n)],
}
)
return {f"{symbol}:1h": df}


class TestLocalTieredStoreOhlcv(TestCase):
"""OHLCV content-addressed chunk store (Tier-3)."""

@classmethod
def setUpClass(cls):
cls.fixture = Backtest.open(_FIXTURE)

def setUp(self):
self.tmp = tempfile.mkdtemp()
self.store = LocalTieredStore(self.tmp)

def tearDown(self):
shutil.rmtree(self.tmp, ignore_errors=True)

def _backtest_with_ohlcv(self, symbol: str) -> Backtest:
# Deep-copy the fixture and attach a synthetic OHLCV blob.
bt = deepcopy(self.fixture)
bt.ohlcv = _make_ohlcv(symbol)
return bt

# ------------------------------------------------------------------
# Bundles without OHLCV must not create the chunk dir.
# ------------------------------------------------------------------
def test_no_ohlcv_no_chunk_dir(self):
self.store.write(self.fixture, handle="plain")
self.assertFalse((Path(self.tmp) / "ohlcv").is_dir())
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0)

# ------------------------------------------------------------------
# Tier-3 layout & dedup
# ------------------------------------------------------------------
def test_ohlcv_chunks_written_to_shared_store(self):
bt = self._backtest_with_ohlcv("BTC/EUR")
self.store.write(bt, handle="r1")
chunks = list((Path(self.tmp) / "ohlcv").iterdir())
self.assertEqual(len(chunks), 1)
# Filename is <sha256>.parquet — 64 hex chars + .parquet.
name = chunks[0].name
self.assertTrue(name.endswith(".parquet"))
self.assertEqual(len(name) - len(".parquet"), 64)

def test_identical_ohlcv_is_deduplicated_across_handles(self):
bt1 = self._backtest_with_ohlcv("BTC/EUR")
# Different algorithm_id but identical OHLCV bytes.
bt2 = self._backtest_with_ohlcv("BTC/EUR")
self.store.write(bt1, handle="a")
self.store.write(bt2, handle="b")
chunks = sorted((Path(self.tmp) / "ohlcv").iterdir())
self.assertEqual(
len(chunks), 1,
"identical OHLCV must be stored exactly once",
)
stats = self.store.ohlcv_stats()
self.assertEqual(stats["stored_blobs"], 1)
self.assertEqual(stats["referenced_blobs"], 1)
self.assertEqual(stats["orphan_blobs"], 0)
self.assertEqual(stats["missing_blobs"], 0)

def test_different_ohlcv_yields_separate_chunks(self):
self.store.write(
self._backtest_with_ohlcv("BTC/EUR"), handle="btc",
)
# Different base price -> different bytes -> different hash.
bt2 = self._backtest_with_ohlcv("ETH/EUR")
bt2.ohlcv = _make_ohlcv("ETH/EUR", base=2000.0)
self.store.write(bt2, handle="eth")
chunks = sorted((Path(self.tmp) / "ohlcv").iterdir())
self.assertEqual(len(chunks), 2)
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 2)

# ------------------------------------------------------------------
# Round-trip via canonical bundle still resolves OHLCV.
# ------------------------------------------------------------------
def test_open_resolves_ohlcv_from_shared_store(self):
bt = self._backtest_with_ohlcv("BTC/EUR")
self.store.write(bt, handle="rt")
loaded = self.store.open("rt")
self.assertIn("BTC/EUR:1h", loaded.ohlcv)
df = loaded.ohlcv["BTC/EUR:1h"]
self.assertEqual(len(df), 4)

# ------------------------------------------------------------------
# Garbage collection
# ------------------------------------------------------------------
def test_delete_keeps_shared_ohlcv_until_gc(self):
bt1 = self._backtest_with_ohlcv("BTC/EUR")
bt2 = self._backtest_with_ohlcv("BTC/EUR")
self.store.write(bt1, handle="a")
self.store.write(bt2, handle="b")
self.store.delete("a")
# The chunk is still referenced by handle 'b' -> must stay.
self.assertEqual(
self.store.ohlcv_stats()["stored_blobs"], 1,
)
self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0)

def test_garbage_collect_ohlcv_removes_orphans(self):
bt = self._backtest_with_ohlcv("BTC/EUR")
self.store.write(bt, handle="solo")
# Delete the only referencing bundle.
self.store.delete("solo")
stats = self.store.ohlcv_stats()
self.assertEqual(stats["stored_blobs"], 1)
self.assertEqual(stats["orphan_blobs"], 1)

# Dry-run lists but does not delete.
orphans = self.store.garbage_collect_ohlcv(dry_run=True)
self.assertEqual(len(orphans), 1)
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 1)

removed = self.store.garbage_collect_ohlcv()
self.assertEqual(len(removed), 1)
self.assertEqual(self.store.ohlcv_stats()["stored_blobs"], 0)
self.assertEqual(self.store.ohlcv_stats()["orphan_blobs"], 0)

# ------------------------------------------------------------------
# iter_ohlcv_hashes / ohlcv_referenced_hashes — input for the
# dedup-upload negotiate step (docs/design/ohlcv-dedup-protocol.md).
# ------------------------------------------------------------------
def test_iter_ohlcv_hashes_lists_referenced_chunks(self):
self.store.write(
self._backtest_with_ohlcv("BTC/EUR"), handle="a",
)
self.store.write(
self._backtest_with_ohlcv("BTC/EUR"), handle="b",
)
hashes = list(self.store.iter_ohlcv_hashes())
# Two references to the same hash (one per bundle).
self.assertEqual(len(hashes), 2)
self.assertEqual(len(set(hashes)), 1)
# Hashes are bare hex digests.
for h in hashes:
self.assertEqual(len(h), 64)
self.assertTrue(all(c in "0123456789abcdef" for c in h))

def test_referenced_hashes_set_dedups(self):
bt = self._backtest_with_ohlcv("BTC/EUR")
self.store.write(bt, handle="a")
self.store.write(bt, handle="b")
self.assertEqual(len(self.store.ohlcv_referenced_hashes()), 1)
Loading