|
| 1 | +"""Backtest → flat-records decomposer for Tier-2 Parquet writes |
| 2 | +(epic #540 phase 3b). |
| 3 | +
|
| 4 | +Given a :class:`Backtest`, yields flat record lists for each Tier-2 |
| 5 | +dataset (``portfolio_snapshots`` / ``trades`` / ``orders``). The |
| 6 | +output is shaped for direct ingestion by |
| 7 | +:func:`pyarrow.dataset.write_dataset` with hive partitioning on |
| 8 | +``run_id``. |
| 9 | +
|
| 10 | +The bundle (``.iafbt``) remains the canonical source of truth in |
| 11 | +Phase 3b; these Parquet datasets are *auxiliary* — they make |
| 12 | +DuckDB / Polars analytics work across thousands of runs without |
| 13 | +re-decoding bundles. Round-tripping Tier-2 back to a Backtest is |
| 14 | +Phase 3d territory. |
| 15 | +""" |
| 16 | + |
| 17 | +from __future__ import annotations |
| 18 | + |
| 19 | +from typing import Any, Dict, Iterator, List |
| 20 | + |
| 21 | +from investing_algorithm_framework.domain import Backtest |
| 22 | + |
| 23 | + |
| 24 | +def _windowed_records( |
| 25 | + backtest: Backtest, attr: str, run_id: str, |
| 26 | +) -> Iterator[Dict[str, Any]]: |
| 27 | + """Yield ``to_dict()``-shaped records from every BacktestRun. |
| 28 | +
|
| 29 | + Adds ``run_id`` and ``window_name`` columns so downstream |
| 30 | + columnar tools can group / partition cleanly across a Backtest's |
| 31 | + walk-forward windows. |
| 32 | + """ |
| 33 | + if not backtest.backtest_runs: |
| 34 | + return |
| 35 | + for window in backtest.backtest_runs: |
| 36 | + items = getattr(window, attr, None) or [] |
| 37 | + window_name = getattr(window, "backtest_date_range_name", None) or ( |
| 38 | + window.create_directory_name() |
| 39 | + if hasattr(window, "create_directory_name") else "" |
| 40 | + ) |
| 41 | + for obj in items: |
| 42 | + d = obj.to_dict() if hasattr(obj, "to_dict") else dict(obj) |
| 43 | + d["run_id"] = run_id |
| 44 | + d["window_name"] = window_name |
| 45 | + yield d |
| 46 | + |
| 47 | + |
| 48 | +def snapshots(backtest: Backtest, run_id: str) -> List[Dict[str, Any]]: |
| 49 | + """Flat portfolio_snapshots records, one per BacktestRun timestep.""" |
| 50 | + return list(_windowed_records(backtest, "portfolio_snapshots", run_id)) |
| 51 | + |
| 52 | + |
| 53 | +def trades(backtest: Backtest, run_id: str) -> List[Dict[str, Any]]: |
| 54 | + """Flat trades records (one per trade across all windows).""" |
| 55 | + return list(_windowed_records(backtest, "trades", run_id)) |
| 56 | + |
| 57 | + |
| 58 | +def orders(backtest: Backtest, run_id: str) -> List[Dict[str, Any]]: |
| 59 | + """Flat orders records (one per order across all windows).""" |
| 60 | + return list(_windowed_records(backtest, "orders", run_id)) |
| 61 | + |
| 62 | + |
| 63 | +# Datasets exposed by LocalTieredStore. Each entry is |
| 64 | +# (dataset_name, decomposer_fn). Add new kinds here (e.g. |
| 65 | +# metric_series) once their decomposer is in place. |
| 66 | +DATASETS = ( |
| 67 | + ("portfolio_snapshots", snapshots), |
| 68 | + ("trades", trades), |
| 69 | + ("orders", orders), |
| 70 | +) |
0 commit comments