|
1 | | -"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + canonical |
2 | | -``.iafbt`` bundles on a local filesystem (epic #540 phase 3b). |
| 1 | +"""``LocalTieredStore`` — Tier-1 SQLite + Tier-2 Parquet + Tier-3 |
| 2 | +content-addressed OHLCV chunks + canonical ``.iafbt`` bundles on a |
| 3 | +local filesystem (epic #540 phases 3b + 3c). |
3 | 4 |
|
4 | 5 | Layout under *root*:: |
5 | 6 |
|
|
10 | 11 | portfolio_snapshots/run_id=<handle>/part-0.parquet |
11 | 12 | trades/run_id=<handle>/part-0.parquet |
12 | 13 | orders/run_id=<handle>/part-0.parquet |
| 14 | + ohlcv/ # Tier-3, content-addressed by SHA-256 |
| 15 | + <sha256>.parquet # shared across every bundle in the store |
13 | 16 |
|
14 | 17 | The bundle file is the canonical representation. The Tier-1 index and |
15 | 18 | the Tier-2 Parquet datasets are derived, eagerly maintained, and |
|
61 | 64 | INDEX_FILENAME = "index.sqlite" |
62 | 65 | BUNDLES_SUBDIR = "bundles" |
63 | 66 | PARQUET_SUBDIR = "parquet" |
| 67 | +OHLCV_SUBDIR = "ohlcv" |
64 | 68 |
|
65 | 69 |
|
66 | 70 | def _records_to_table(records: List[dict]) -> Optional[Any]: |
@@ -122,9 +126,12 @@ def __init__(self, root: Union[str, Path]) -> None: |
122 | 126 | self.root.mkdir(parents=True, exist_ok=True) |
123 | 127 | self._bundles_dir = self.root / BUNDLES_SUBDIR |
124 | 128 | self._parquet_dir = self.root / PARQUET_SUBDIR |
| 129 | + self._ohlcv_dir = self.root / OHLCV_SUBDIR |
125 | 130 | self._index_path = self.root / INDEX_FILENAME |
126 | 131 | self._bundles_dir.mkdir(parents=True, exist_ok=True) |
127 | 132 | self._parquet_dir.mkdir(parents=True, exist_ok=True) |
| 133 | + # Tier-3 dir is created lazily on first OHLCV-bearing write so |
| 134 | + # bundles without attached price data leave no empty subdir. |
128 | 135 |
|
129 | 136 | # ------------------------------------------------------------------ |
130 | 137 | # Internal helpers |
@@ -180,8 +187,19 @@ def write( |
180 | 187 | bundle_path = self._bundle_path(bare) |
181 | 188 | bundle_path.parent.mkdir(parents=True, exist_ok=True) |
182 | 189 |
|
183 | | - # 1. Canonical bytes — bundle. |
184 | | - backtest.save_bundle(bundle_path) |
| 190 | + # 1. Canonical bytes — bundle. When the backtest has OHLCV |
| 191 | + # attached, route the side-store to the shared Tier-3 chunk |
| 192 | + # directory so identical (symbol, timeframe) Parquet bytes are |
| 193 | + # written once and referenced by every bundle that needs them. |
| 194 | + has_ohlcv = bool(getattr(backtest, "ohlcv", None)) |
| 195 | + if has_ohlcv: |
| 196 | + backtest.save_bundle( |
| 197 | + bundle_path, |
| 198 | + include_ohlcv=True, |
| 199 | + ohlcv_store=self._ohlcv_dir, |
| 200 | + ) |
| 201 | + else: |
| 202 | + backtest.save_bundle(bundle_path) |
185 | 203 |
|
186 | 204 | # 2. Tier-1 — SQLite row. |
187 | 205 | stat = bundle_path.stat() |
@@ -242,7 +260,16 @@ def open( |
242 | 260 | path = self._bundle_path(handle) |
243 | 261 | if not path.is_file(): |
244 | 262 | raise StoreHandleNotFoundError(handle) |
245 | | - return Backtest.open(str(path), summary_only=summary_only) |
| 263 | + # Route OHLCV resolution through the shared Tier-3 chunk |
| 264 | + # directory regardless of what path the bundle was written |
| 265 | + # with. Bundles without OHLCV ignore the override. |
| 266 | + from investing_algorithm_framework.domain.backtesting.bundle \ |
| 267 | + import open_bundle as _open_bundle |
| 268 | + return _open_bundle( |
| 269 | + str(path), |
| 270 | + ohlcv_store=self._ohlcv_dir, |
| 271 | + summary_only=summary_only, |
| 272 | + ) |
246 | 273 |
|
247 | 274 | def exists(self, handle: StoreHandle) -> bool: |
248 | 275 | try: |
@@ -380,6 +407,10 @@ def rebuild_index(self) -> int: |
380 | 407 | Useful when sidecars were edited out-of-band or after a |
381 | 408 | software upgrade adds new index columns. Returns the number of |
382 | 409 | rows written. |
| 410 | +
|
| 411 | + Tier-3 OHLCV chunks are *not* touched \u2014 they are |
| 412 | + content-addressed and globally shared. Use |
| 413 | + :meth:`garbage_collect_ohlcv` to reclaim orphans. |
383 | 414 | """ |
384 | 415 | if self._index_path.is_file(): |
385 | 416 | self._index_path.unlink() |
@@ -408,3 +439,118 @@ def rebuild_index(self) -> int: |
408 | 439 |
|
409 | 440 | def __repr__(self) -> str: |
410 | 441 | return f"LocalTieredStore(root={str(self.root)!r})" |
| 442 | + |
| 443 | + # ------------------------------------------------------------------ |
| 444 | + # Tier-3 — content-addressed OHLCV chunks |
| 445 | + # ------------------------------------------------------------------ |
| 446 | + def _read_ohlcv_manifest(self, handle: StoreHandle) -> dict: |
| 447 | + """Return the ``{key: <sha>.parquet}`` manifest stored in the |
| 448 | + bundle envelope, or an empty dict if the bundle has no OHLCV. |
| 449 | +
|
| 450 | + Decodes the envelope directly so we avoid the cost of |
| 451 | + instantiating a full :class:`Backtest`. |
| 452 | + """ |
| 453 | + from investing_algorithm_framework.domain.backtesting.bundle \ |
| 454 | + import _decode_payload # noqa: WPS437 — internal helper |
| 455 | + path = self._bundle_path(handle) |
| 456 | + if not path.is_file(): |
| 457 | + raise StoreHandleNotFoundError(handle) |
| 458 | + try: |
| 459 | + _, doc = _decode_payload(path.read_bytes()) |
| 460 | + except Exception as exc: |
| 461 | + logger.warning( |
| 462 | + "Tier-3 manifest read failed for %s: %s", handle, exc, |
| 463 | + ) |
| 464 | + return {} |
| 465 | + meta = doc.get("ohlcv") or {} |
| 466 | + return dict(meta.get("manifest") or {}) |
| 467 | + |
| 468 | + def iter_ohlcv_hashes(self) -> Iterator[str]: |
| 469 | + """Yield every OHLCV chunk hash referenced by any bundle. |
| 470 | +
|
| 471 | + Hashes are emitted with possible duplicates (one per |
| 472 | + ``(handle, key)`` reference); de-duplicate in the caller if |
| 473 | + needed. Useful as the input for the dedup-upload negotiate |
| 474 | + step in ``docs/design/ohlcv-dedup-protocol.md``. |
| 475 | + """ |
| 476 | + for handle in self.iter_handles(): |
| 477 | + for rel in self._read_ohlcv_manifest(handle).values(): |
| 478 | + if rel.endswith(".parquet"): |
| 479 | + yield rel[: -len(".parquet")] |
| 480 | + else: |
| 481 | + yield rel |
| 482 | + |
| 483 | + def ohlcv_referenced_hashes(self) -> set: |
| 484 | + """Return the de-duplicated set of OHLCV hashes referenced |
| 485 | + across all bundles in the store. |
| 486 | + """ |
| 487 | + return set(self.iter_ohlcv_hashes()) |
| 488 | + |
| 489 | + def ohlcv_stored_hashes(self) -> set: |
| 490 | + """Return the set of OHLCV hashes physically present on disk |
| 491 | + under ``<root>/ohlcv/``. |
| 492 | + """ |
| 493 | + if not self._ohlcv_dir.is_dir(): |
| 494 | + return set() |
| 495 | + out: set = set() |
| 496 | + for p in self._ohlcv_dir.iterdir(): |
| 497 | + if p.is_file() and p.suffix == ".parquet": |
| 498 | + out.add(p.stem) |
| 499 | + return out |
| 500 | + |
| 501 | + def ohlcv_stats(self) -> dict: |
| 502 | + """Return a snapshot of the Tier-3 store. |
| 503 | +
|
| 504 | + Keys: ``stored_blobs`` (int), ``stored_bytes`` (int), |
| 505 | + ``referenced_blobs`` (int), ``orphan_blobs`` (int), |
| 506 | + ``missing_blobs`` (int — referenced but absent on disk, |
| 507 | + should be 0 in a healthy store). |
| 508 | + """ |
| 509 | + stored = self.ohlcv_stored_hashes() |
| 510 | + referenced = self.ohlcv_referenced_hashes() |
| 511 | + stored_bytes = 0 |
| 512 | + if self._ohlcv_dir.is_dir(): |
| 513 | + for p in self._ohlcv_dir.iterdir(): |
| 514 | + if p.is_file() and p.suffix == ".parquet": |
| 515 | + try: |
| 516 | + stored_bytes += p.stat().st_size |
| 517 | + except OSError: # pragma: no cover |
| 518 | + pass |
| 519 | + return { |
| 520 | + "stored_blobs": len(stored), |
| 521 | + "stored_bytes": stored_bytes, |
| 522 | + "referenced_blobs": len(referenced), |
| 523 | + "orphan_blobs": len(stored - referenced), |
| 524 | + "missing_blobs": len(referenced - stored), |
| 525 | + } |
| 526 | + |
| 527 | + def garbage_collect_ohlcv( |
| 528 | + self, *, dry_run: bool = False, |
| 529 | + ) -> List[str]: |
| 530 | + """Remove OHLCV chunks that no bundle in the store references. |
| 531 | +
|
| 532 | + Args: |
| 533 | + dry_run: When True, no files are deleted; the list of |
| 534 | + orphans is returned so callers can audit before |
| 535 | + committing. |
| 536 | +
|
| 537 | + Returns: |
| 538 | + The list of hashes that were (or would be) removed. |
| 539 | + """ |
| 540 | + stored = self.ohlcv_stored_hashes() |
| 541 | + referenced = self.ohlcv_referenced_hashes() |
| 542 | + orphans = sorted(stored - referenced) |
| 543 | + if dry_run: |
| 544 | + return orphans |
| 545 | + for digest in orphans: |
| 546 | + path = self._ohlcv_dir / f"{digest}.parquet" |
| 547 | + try: |
| 548 | + path.unlink() |
| 549 | + except FileNotFoundError: # pragma: no cover - race |
| 550 | + pass |
| 551 | + except OSError as exc: # pragma: no cover - logged |
| 552 | + logger.warning( |
| 553 | + "garbage_collect_ohlcv: failed to remove %s: %s", |
| 554 | + path, exc, |
| 555 | + ) |
| 556 | + return orphans |
0 commit comments