diff --git a/docs/design/bundle-format-v2.md b/docs/design/bundle-format-v2.md new file mode 100644 index 00000000..c59a3856 --- /dev/null +++ b/docs/design/bundle-format-v2.md @@ -0,0 +1,241 @@ +# Bundle Format v2 — Public Specification + +**Status:** Stable. Default writer since `v8.9.0` (May 2026). +**File extension:** `.iafbt` +**Backwards compatibility:** v1 bundles remain readable indefinitely. + +This document describes the on-disk binary format produced by +`save_bundle()` and consumed by `open_bundle()` / +`Backtest.open()`. Third-party tools (e.g. the Finterion upload CLI +and ingestion pipeline) can rely on this contract. + +--- + +## Why v2 + +v1 stored the entire `Backtest.to_dict()` graph as a single +zstd-compressed MessagePack document. That was already efficient for +small backtests, but two structural problems became visible at scale +(thousands of bundles per user): + +1. **Heavy time series stored as JSON-ish lists of `(float, + ISO-string)` tuples** — the strings dominate the on-disk size for + long-running backtests (e.g. 10y daily ≈ 2,500 entries × 8 series). + ISO-8601 strings are ~25 bytes each; an `int64` epoch-ms is 8 bytes + and Parquet's columnar dictionary compression collapses repeated + timestamps further. + +2. **No way to distinguish vector from event backtests** in the + on-disk envelope, even though they're produced by separate engines + with subtly different semantics. Reports and analyses had to + guess from filename or metadata. + +v2 fixes both without breaking v1. + +--- + +## Outer envelope (unchanged from v1) + +``` ++-----------+-----------+--------------------------------+ +| 4 bytes | 4 bytes | N bytes | +| "IAFB" | uint32 LE | zstd(level=7, msgpack(doc)) | ++-----------+-----------+--------------------------------+ + magic version compressed body +``` + +The 4-byte little-endian uint32 holds the format version (1 or 2). +The body is always zstd-compressed MessagePack with `use_bin_type=True`. + +Readers MUST reject any version > the highest they support, and SHOULD +inspect the magic before attempting to decompress. + +--- + +## v2 document structure + +```python +{ + "format_version": 2, + "engine_type": "vector" | "event" | None, + + # Engine-agnostic top-level fields (carry across both engines) + "algorithm_id": str, + "metadata": dict, + "risk_free_rate": float | None, + "strategy_ids": list, + "parameters": dict, + "tag": str | None, + "backtest_permutation_tests": list | None, + + # Exactly ONE of these pairs is populated based on engine_type: + "vector_runs": [run_dict, ...], # if engine_type == "vector" + "vector_metrics": summary_dict, # if engine_type == "vector" + + "event_runs": [run_dict, ...], # if engine_type == "event" + "event_metrics": summary_dict, # if engine_type == "event" + + # Fallback for legacy / unknown-engine bundles: + "backtest_runs": [run_dict, ...], # if engine_type is None + "backtest_summary": summary_dict, # if engine_type is None + + # Optional: embedded heavy-series Parquet blobs + "blobs": { + "runs//metrics/.parquet": bytes, + ... + }, + + # Optional: OHLCV manifest (unchanged from v1) + "ohlcv": { + "store_dir": str, # relative to bundle file + "manifest": {key: relative_path}, + }, +} +``` + +### Engine routing + +| `engine_type` | Runs key | Summary key | +| ------------- | ------------- | ----------------- | +| `"vector"` | `vector_runs` | `vector_metrics` | +| `"event"` | `event_runs` | `event_metrics` | +| `None` | `backtest_runs` | `backtest_summary` | + +A bundle holds exactly **one** engine's results. Mixing engines in a +single bundle is not supported in v2 — produce two bundles and store +them in the same directory. + +### Metric blob extraction + +Eight `BacktestMetrics` fields are extracted from each run's +`backtest_metrics` dict and replaced with a `{"@blob": ""}` +reference; the actual Parquet bytes go into the top-level `blobs` map. + +The eight fields are all `List[Tuple[float, datetime|date]]`: + +- `equity_curve` +- `drawdown_series` +- `cumulative_return_series` +- `rolling_sharpe_ratio` +- `monthly_returns` +- `yearly_returns` +- `twr_equity_curve` +- `twr_drawdown_series` + +Each blob is a 2-column Parquet file (zstd compression level 5): + +| Column | Type | Semantics | +| ------ | ------ | ------------------------------------------ | +| `ts` | int64 | UTC epoch milliseconds | +| `value`| float64| The metric value | + +The blob key follows the convention +`runs//metrics/.parquet` where `` is the +zero-based offset of the run within `vector_runs` / `event_runs` / +`backtest_runs` and `` is one of the eight names above. + +If a series has fewer than 2 entries, the writer leaves it inline +(no blob extraction). Readers MUST handle both cases for any field. + +### Other fields + +Fields that are NOT extracted into Parquet blobs in v2: + +- `portfolio_snapshots`, `trades`, `orders`, `positions` — stay as + msgpack lists of dicts. Their schemas are unstable across model + changes, and msgpack is sufficient for the typical row counts. +- All scalar metrics (`sharpe_ratio`, `max_drawdown`, etc.) — stay + inline. The whole point is keeping these fast to read. +- `signals`, `signal_events`, `recorded_values`, `data_sources`, + `metadata` on each run — stay inline. + +A future v2.x revision MAY extract additional fields. Readers MUST +treat the `blobs` map as authoritative: any key found there +overrides the inline value (the writer is required to leave the +inline placeholder as `{"@blob": ""}` to make this unambiguous). + +--- + +## Reader contract + +`open_bundle(path)` MUST: + +1. Read 8 bytes; verify magic, parse version. +2. Decompress (zstd) and unpack (msgpack) the body. +3. If `version == 1`: dispatch through the v1 reader (legacy + `{"backtest": }` envelope). +4. If `version == 2`: route runs/summary based on `engine_type`, + resolve blob references against the `blobs` map (replacing each + `{"@blob": ""}` with the decoded `[(value, iso_string), ...]` + list), and reconstruct a `Backtest` via `Backtest.from_dict`. +5. Reject any `version > BUNDLE_FORMAT_VERSION`. + +### Summary-only mode + +`open_bundle(path, summary_only=True)` skips the Parquet decode step. +Each blob reference is replaced with an empty list (so +`BacktestMetrics.from_dict` doesn't choke). All scalar summary +metrics (Sharpe, Sortino, max DD, CAGR, win-rate, …) remain fully +populated. Use this for bulk listing / ranking pipelines that don't +draw charts. + +--- + +## Writer contract + +`save_bundle(backtest, path)` MUST: + +1. Default to `format_version = BUNDLE_FORMAT_VERSION` (currently 2). +2. Accept `format_version=1` for explicit downgrade. +3. Write atomically (write to `.tmp`, then `os.replace`). +4. Set `engine_type` from `backtest.engine_type`. +5. For v2: extract the eight metric series into Parquet blobs only + when the source list has at least one usable `(value, datetime)` + pair; leave malformed or empty series inline. + +### OHLCV float32 quantization + +`save_bundle(..., float32_ohlcv=True)` downcasts float64 OHLCV +columns to float32 before Parquet encoding. Typical reduction is ~2x +on the OHLCV side store; backtest metrics are unaffected for +crypto / equity time series. Off by default to preserve the v1 +exact-round-trip contract — opt in for upload / archive workflows. + +--- + +## Size expectations + +For a 10-year daily backtest with one run, three trades per week, +typical metric-series savings: + +| Item | v1 inline (ISO strings)| v2 Parquet blob | +| ------------------------------ | ----------------------:| ---------------:| +| `equity_curve` (2,500 entries) | ~120 KB | ~25 KB | +| `drawdown_series` (2,500) | ~120 KB | ~22 KB | +| `monthly_returns` (120) | ~6 KB | ~2 KB | +| 8 series total | ~500 KB | ~80 KB | + +Typical full-bundle size reduction for "metric-heavy" backtests +(many runs, long horizons): **30-80%**. For "snapshot-heavy" +backtests where `portfolio_snapshots` dominates, savings are smaller +(snapshots aren't extracted in v2.0); a future v2.x revision will +address this. + +For tiny / smoke-test backtests with <50 entries per series, v2 can +be **slightly larger** than v1 because Parquet's per-file overhead +(~100 bytes) exceeds the savings. This is expected and harmless. + +--- + +## Versioning policy + +- Bumping the bundle `format_version` integer is a **breaking change + for readers** of older framework versions. +- The framework will continue to read all historical versions + indefinitely. There is no plan to drop v1 read support. +- Writers default to the highest version the framework knows about. +- Additive changes within v2 (e.g. extracting more fields into + blobs) MUST be safe for v2 readers that don't know about the new + blobs — they should receive the inline value as a fallback. +- A bundle with `format_version=2` MAY contain blob keys the reader + doesn't recognise. Readers MUST ignore unknown blob keys. diff --git a/docs/design/ohlcv-dedup-protocol.md b/docs/design/ohlcv-dedup-protocol.md new file mode 100644 index 00000000..4188514e --- /dev/null +++ b/docs/design/ohlcv-dedup-protocol.md @@ -0,0 +1,208 @@ +# OHLCV Deduplication Protocol — Public Specification + +**Status:** Draft. Reference implementation pending. +**Companion to:** [bundle-format-v2.md](./bundle-format-v2.md) +**Audience:** Backtest archival / upload services (e.g. Finterion). + +This document specifies a content-addressed protocol for uploading +backtest bundles without re-uploading shared OHLCV (price) data. The +framework ships only the protocol; the server is out of scope and +proprietary to whoever runs it. + +--- + +## Problem + +A user has run 1,000 vector backtests over the same universe of 50 +crypto pairs at 1-hour resolution for 2024–2026. Each `.iafbt` +bundle, if saved with `include_ohlcv=True`, embeds the same ~50 OHLCV +Parquet blobs. The OHLCV is roughly 2 MB per pair, so 100 MB per +bundle, 100 GB across the 1,000 bundles — but only 100 MB of unique +OHLCV. + +The framework already content-addresses OHLCV at rest (each +`.parquet` is stored once in the local OHLCV side store). The +protocol below extends that identity beyond the local filesystem. + +--- + +## Identity + +Each OHLCV blob is identified by the lowercase hex SHA-256 of its +**Parquet bytes** as written by `_df_to_parquet_bytes()`: + +```python +sha256(parquet_bytes_with_zstd_level_5).hexdigest() +``` + +Crucially, the hash is computed **after** Parquet encoding, not on +the raw DataFrame. This means two clients writing the same logical +DataFrame with different Parquet writer settings (compression level, +column ordering, dictionary encoding) will produce different +identities, which is intentional: byte-identical inputs produce +byte-identical outputs. + +When `float32_ohlcv=True` is used (see bundle v2 spec), the hash +covers the float32 bytes — float32 and float64 representations of +the "same" data are different OHLCV blobs. + +--- + +## Wire format + +All requests and responses are JSON unless noted otherwise. Binary +blobs use `application/octet-stream`. The protocol assumes HTTPS and +a bearer-token auth scheme; both are out of scope here. + +### 1. Negotiate + +The client lists the OHLCV hashes it intends to upload. The server +replies with the subset it does NOT yet have. + +``` +POST /api/v1/ohlcv/negotiate +Content-Type: application/json + +{ + "hashes": [ + "a1b2c3d4...", + "e5f6g7h8...", + ... + ] +} +``` + +``` +HTTP/1.1 200 OK +Content-Type: application/json + +{ + "missing": [ + "e5f6g7h8..." + ] +} +``` + +The server SHOULD respond in <200ms even for large inputs; treat the +endpoint as a set-difference query against a hash index. + +The client MAY chunk the `hashes` array (recommended chunk size: +1,000) for very large uploads. The server MUST accept at least 10,000 +hashes per request. + +### 2. Upload missing OHLCV + +For each hash in the `missing` list, the client uploads the raw +Parquet bytes. Order doesn't matter; uploads MAY be parallel. + +``` +PUT /api/v1/ohlcv/ +Content-Type: application/octet-stream +Content-Length: + + +``` + +``` +HTTP/1.1 201 Created +``` + +The server MUST verify `sha256(body) == ` before storing, +and reject with `400 Bad Request` on mismatch. This prevents a buggy +client from poisoning the shared store. + +A `409 Conflict` MAY be returned if the blob already exists (race +between negotiate and upload). The client SHOULD treat 409 as +success. + +### 3. Upload bundle envelope + +After all OHLCV blobs are stored on the server, the client uploads +the bundle envelope. The envelope is a normal `.iafbt` file MINUS +the OHLCV bytes — the manifest in the bundle still references the +OHLCV by hash, but the side-store files are NOT shipped. + +``` +PUT /api/v1/backtests/ +Content-Type: application/vnd.investing-algorithm-framework.bundle+zstd +Content-Length: + + +``` + +``` +HTTP/1.1 201 Created +Content-Type: application/json + +{ + "id": "...", + "ohlcv_hashes": ["a1b2c3d4...", ...] +} +``` + +The server MUST verify that every hash referenced by the bundle's +OHLCV manifest exists in the OHLCV store before accepting the +bundle, returning `424 Failed Dependency` with the missing hashes if +not. The client should re-run the negotiate + upload-missing steps +and retry. + +The bundle's `ohlcv.store_dir` field is meaningless on the server +side — the server resolves blobs by hash, not by path. The field is +preserved on the wire for round-trip fidelity. + +--- + +## Client algorithm + +```python +def upload_backtests(paths, endpoint, api_key, parallelism=4): + # Phase 1: discover hashes across all bundles + bundle_hashes = {path: read_ohlcv_hashes(path) for path in paths} + all_hashes = list({h for hs in bundle_hashes.values() for h in hs}) + + # Phase 2: negotiate (chunked) + missing = [] + for chunk in batched(all_hashes, 1000): + missing += negotiate(endpoint, api_key, chunk) + missing = set(missing) + + # Phase 3: upload missing OHLCV in parallel + with ThreadPoolExecutor(parallelism) as ex: + list(ex.map( + lambda h: upload_ohlcv(endpoint, api_key, h), + missing + )) + + # Phase 4: upload bundle envelopes (envelope only, not OHLCV) + with ThreadPoolExecutor(parallelism) as ex: + list(ex.map( + lambda p: upload_bundle(endpoint, api_key, p), + paths + )) +``` + +Concrete reference implementation lives outside the framework +(intentionally — see the rationale in +[bundle-format-v2.md](./bundle-format-v2.md)). + +--- + +## Security & abuse considerations + +- **Hash poisoning:** mitigated by server-side hash verification on + upload (step 2). +- **Cross-tenant leakage:** servers SHOULD scope the OHLCV store + per-tenant if their pricing model treats OHLCV as proprietary + data; a global shared store is appropriate only when all OHLCV + comes from public sources. +- **Replay attacks:** out of scope; rely on transport-layer auth. +- **Storage exhaustion:** the server SHOULD enforce per-tenant + quotas on the OHLCV store independently of bundle quotas. + +--- + +## Versioning + +This protocol uses URL versioning (`/api/v1/...`). Breaking changes +require a new major version. Additive changes (new optional fields +in JSON bodies) MUST be safe for clients that ignore unknown fields. diff --git a/docs/design/tiered-backtest-storage.md b/docs/design/tiered-backtest-storage.md new file mode 100644 index 00000000..198df086 --- /dev/null +++ b/docs/design/tiered-backtest-storage.md @@ -0,0 +1,261 @@ +# Tiered Backtest Storage — Design + +> Status: **Proposal** +> Targets: framework v8.10 (read-side) and v8.11 (store abstraction). +> Companion docs: [`bundle-format-v2.md`](./bundle-format-v2.md), [`ohlcv-dedup-protocol.md`](./ohlcv-dedup-protocol.md). + +## 1. Motivation + +Empirical measurements on a real production-shape archive (12,500 bundles, ~64 GB, 10 May 2026): + +| Configuration | Avg / bundle | Total | Notes | +|---|---:|---:|---| +| v1, zstd 7 (pre-v8.9) | 569 KB | 64.0 GB | baseline | +| v2 + zstd 19 (v8.9, shipped) | 489 KB | ~55 GB | per-file ceiling | +| v2 + zstd 19 + daily snapshots | 431 KB | ~48 GB | requires user behaviour change | +| Tiered store + content-addressed dedup | n/a (decomposed) | **< 20 GB projected** | this proposal | + +Two structural problems remain after v8.9: + +1. **Per-file compression has hit its ceiling.** zstd at level 22 saturates at the same size as level 19. Within a single `.iafbt`, there is no remaining headroom. +2. **The `.iafbt` is the wrong primitive for two of the three real workloads.** + - **Email / hand-off / archive** — single file is right. + - **Listing / ranking / dashboard** — single file is wrong. Decoding 12,500 zstd payloads to read 50 scalar metrics each is a 30-minute loop instead of a 50 ms SQL query. + - **Cross-run analytics** — single file is wrong. "Plot the equity curves for these 50 sweep variants" should be one DuckDB query, not 50 decode-and-merge round trips. + +Cross-bundle redundancy is the dominant unexploited source of size: + +- Strategy params/schema: identical across all bundles in a sweep +- Symbol metadata: identical across every bundle on the same universe +- OHLCV: same slice referenced by hundreds of bundles +- Recurring trade/order patterns: significant overlap when entry rules repeat + +`zstd` cannot see across files. The fix lives one layer up. + +## 2. Principles + +1. A backtest is **not one thing** — it is a small header + heavy bulk + shared reference data. Treat them as separate citizens. +2. **Listing must not decode bulk.** Scalar metrics live in an indexable tier. +3. **Cross-bundle dedup is a storage concern**, not a per-file format trick. +4. **Columnar wins at scale, not per file.** Per-bundle Parquet was measured neutral-to-negative. Per-project Parquet over thousands of runs is a 10× analytics story. +5. **One canonical schema per artifact, additive forever.** No schema-on-read JSON. +6. **`.iafbt` becomes an export view, not the source of truth.** Assembled deterministically from the tiers on demand; still byte-portable. + +## 3. The three-tier model + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Tier 1 — Index (Postgres / SQLite) │ +│ One row per backtest run. All scalar metrics, params, │ +│ tags, dates, engine_type, plus refs into Tiers 2 & 3. │ +│ Indexed for sort, filter, rank, sweep comparison. │ +├─────────────────────────────────────────────────────────────────┤ +│ Tier 2 — Columnar bulk (Parquet on object storage) │ +│ Per-project Parquet datasets, partitioned by run_id: │ +│ portfolio_snapshots/ trades/ orders/ metric_series/ │ +│ Queryable directly by DuckDB / Polars / Arrow. │ +├─────────────────────────────────────────────────────────────────┤ +│ Tier 3 — Content-addressed chunks (S3-compatible) │ +│ SHA-256-keyed immutable blobs: │ +│ OHLCV slices, symbol metadata, strategy schemas, code. │ +│ One physical copy per unique chunk, per dedup scope. │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### 3.1 Tier 1 schema (sketch) + +| Column class | Examples | +|---|---| +| Identity | `run_id` (uuid7), `bundle_id`, `parent_sweep_id`, `tenant_id`, `project_id` | +| Provenance | `algorithm_id`, `code_hash`, `framework_version`, `created_at` | +| Config | `engine_type`, `params_hash`, `symbols_hash`, `date_range_name`, `start_date`, `end_date`, `tag` | +| Scalar metrics | `BacktestSummaryMetrics` fields (nested in `BacktestIndexRow.summary_metrics`) — Sharpe, Sortino, max_dd, CAGR, total_net_gain, win_rate, … | +| Refs | `snapshots_dataset_uri`, `trades_dataset_uri`, `metric_series_dataset_uri`, `ohlcv_chunk_hashes[]`, `code_chunk_hash`, `symbols_chunk_hash`, `params_chunk_hash` | + +Row size: ~1–2 KB. 12,500 rows ≈ 25 MB. Fits comfortably in SQLite for local users. + +> **Status (epic #540 phase 1, v8.10):** the typed contract for this row +> ships as `investing_algorithm_framework.BacktestIndexRow`, derived +> via `Backtest.index_row(bundle_path=...)`. The method works against +> bundles loaded with `Backtest.open(path, summary_only=True)` — no +> Parquet metric blobs are decoded on the fast index path. The +> existing `BacktestIndex` Parquet sidecar is now built on top of this +> typed row (`BacktestIndexRow.to_flat_dict()`), making the wire +> shape and the in-memory shape a single source of truth. +> +> **Status (epic #540 phase 2, v8.10):** the SQLite implementation +> ships as `investing_algorithm_framework.services.backtest_index +> .SqliteBacktestIndex`, with `iaf index ` as the CLI +> entry point. Every scalar field of `BacktestSummaryMetrics` is +> promoted to its own SQL column (`summary_`), so analysts can +> filter without opening any bundle, e.g. `SELECT bundle_path FROM +> backtest_index WHERE summary_sharpe_ratio > 1.0`. + +### 3.2 Tier 2 schemas (Parquet, long format) + +`portfolio_snapshots/`: +``` +run_id (string, dict-encoded) — partition key +ts (int64, epoch-ms UTC) +portfolio_id (string, dict) +trading_symbol (string, dict) +unallocated (float64) +total_value (float64) +total_net_gain (float64) +total_revenue (float64) +total_cost (float64) +cash_flow (float64) +net_size (float64) +pending_value (float64) +``` + +`trades/`: +``` +run_id (string, dict) — partition key +trade_id (string) +symbol (string, dict) +opened_at (int64, epoch-ms) +closed_at (int64, epoch-ms, nullable) +side (string) +amount (float64) +open_price (float64) +close_price (float64, nullable) +net_gain (float64, nullable) +… +``` + +`orders/`: analogous. + +`metric_series/`: +``` +run_id (string, dict) — partition key +metric_name (string, dict) — equity_curve, drawdown_series, … +ts (int64, epoch-ms) +value (float64) +``` +(Replaces v2's per-run metric blobs with one project-wide table — better dictionary compression.) + +### 3.3 Tier 3 chunks + +| Chunk type | Content | Lifecycle | +|---|---|---| +| `ohlcv/` | Parquet bytes for one symbol/timeframe/date-range slice | Long-lived; protocol already specced | +| `code/` | gzip'd source of strategy module(s) | Long-lived | +| `params/` | canonical-JSON of params dict | Long-lived | +| `symbols/` | symbol metadata bundle | Long-lived | +| `schema/` | strategy class schema (signals, recorded_values shape) | Long-lived | + +Dedup scope: **per-project by default, per-tenant opt-in for cross-project reuse.** No cross-tenant dedup (privacy + clarity). + +## 4. Read paths + +### 4.1 List / rank / filter +```sql +SELECT run_id, sharpe_ratio, sortino_ratio, max_drawdown, total_net_gain +FROM backtest_runs +WHERE project_id = ? AND created_at > ? +ORDER BY sharpe_ratio DESC +LIMIT 20; +``` +50 ms regardless of archive size. Today: 30 min (decompress 12,500 bundles). + +### 4.2 Single-run deep view +1. Fetch Tier 1 row (~1 KB) +2. Fetch its Parquet partitions for snapshots / trades / orders (partition-pruned) +3. Resolve OHLCV chunks lazily as plot pans/zooms + +### 4.3 Cross-run analytics +```python +duckdb.sql(""" + SELECT run_id, ts, value + FROM read_parquet('s3://.../metric_series/**') + WHERE metric_name = 'equity_curve' + AND run_id IN (SELECT run_id FROM sweep_xyz_runs) +""").df() +``` +One scan, dictionary-decoded, partition-pruned. Trivially feeds Polars/pandas/plotly. + +### 4.4 Download as `.iafbt` +1. Read Tier 1 row +2. Pull partitions from Tier 2 +3. Pull referenced chunks from Tier 3 +4. Assemble a v2 envelope, zstd-19, write `.iafbt` + +Deterministic: same `run_id` → byte-identical export (modulo writer timestamp). + +## 5. Write paths + +### 5.1 During a backtest run (framework, in-process) +- Snapshots / trades / orders accumulate in Arrow record batches +- On run completion: append batches to the Tier 2 datasets in one transaction (per-project Parquet writer with `run_id` partition column) +- Compute scalar summary → insert Tier 1 row +- Hash & upload reference chunks via the negotiate protocol + +### 5.2 Importing an existing `.iafbt` +- Decompose: scalars → Tier 1, snapshot/trade/order/metric series → Tier 2, OHLCV/code/params → Tier 3 +- Idempotent on `bundle_id` (re-import is a no-op) + +## 6. The `.iafbt` is now an export format + +```python +backtest = store.get(run_id) +backtest.export("run_xyz.iafbt") # deterministic packaging from tiers +imported = Backtest.import_("run_xyz.iafbt") # re-decomposes into tiers +``` + +- Still **one file**, still **self-contained**, still **portable**, still **versioned**. +- No longer the storage primitive. Used for: email, archive, offline analysis, OSS-only users, regulator hand-off. + +## 7. The OSS-only path stays clean + +For users who never touch a server, the same `BacktestStore` interface has a `LocalTieredStore` implementation: + +``` +~/.iaf/store/ +├── index.sqlite # Tier 1 +├── parquet/ +│ └── / +│ ├── portfolio_snapshots/ +│ ├── trades/ +│ ├── orders/ +│ └── metric_series/ +└── chunks/ # Tier 3 + ├── ohlcv//.parquet + ├── code//.gz + └── … +``` + +Zero behavioural difference vs today. Single-file `.iafbt` users get `export()` / `import_()`. + +## 8. Migration path + +| Phase | Change | Risk | +|---|---|---| +| **v8.9 (shipped)** | Bundle format v2; engine_type split; zstd 19; summary_only read | — | +| **v8.10** | `Backtest.index_row()` (no decode of bulk); `iaf index ` builds a SQLite index over a folder of bundles; `BacktestIndexRow` DTO with stable schema | Low — additive read paths | +| **v8.11** | `BacktestStore` interface with `LocalDirStore` (today) and `LocalTieredStore`. `.iafbt` becomes export format; service constructors accept a store | Medium — touches every backtest service constructor; deprecation flag for one minor cycle | +| **Finterion (closed)** | `RemoteTieredStore` over Postgres + S3 + chunk service | Closed-source, unblocked by v8.11 | + +No flag day. v1 and v2 bundles remain readable inputs forever. + +## 9. Non-goals + +- Per-bundle Parquet for everything (measured neutral-to-negative on real data) +- Custom binary column format (Parquet is solved; leverage it) +- Lossy snapshot/trade compression (user data, hands off) +- Cross-tenant dedup (privacy) +- Schema-on-read JSON anywhere (`(value, ISO-string)` lists are exactly the trap that led to v2) +- A unified "do everything" mega-file (the original `.iafbt` mistake) + +## 10. Open questions + +- **Chunk boundary strategy** for Tier 3 of opaque blobs: fixed-size vs FastCDC. Default to whole-object (one chunk per logical artifact) until profiling proves otherwise. +- **Tier 2 compaction**: per-run partitions are write-cheap but read-suboptimal at 10⁴+ runs. Periodic compaction job that merges small partitions by sweep — schedule TBD. +- **Tier 1 scalar metric set**: freeze the column set or allow extension? Lean toward a fixed core + a `JSONB extras` column for forward compat. +- **Local SQLite contention** under multi-process backtest sweeps. WAL mode + per-process write batching should be enough for the OSS user; revisit if profiling shows otherwise. + +## 11. Headline + +> Today's design treats a backtest as a file. The future design treats a backtest as **a row that points to chunks**, with the file as one of several possible views. +> +> That single shift turns the 64 GB problem into the 20 GB problem, makes "list 12,500 backtests sorted by Sharpe" a 50 ms query instead of a 30-minute decode loop, and unlocks DuckDB/Polars analytics over the entire archive without writing any new code. diff --git a/investing_algorithm_framework/__init__.py b/investing_algorithm_framework/__init__.py index 7920915c..bdb7ef45 100644 --- a/investing_algorithm_framework/__init__.py +++ b/investing_algorithm_framework/__init__.py @@ -19,6 +19,7 @@ Trade, APP_MODE, AppMode, DATETIME_FORMAT, load_backtests_from_directory, \ iter_backtests_from_directory, \ BacktestDateRange, convert_polars_to_pandas, BacktestRun, \ + BacktestIndexRow, \ DEFAULT_LOGGING_CONFIG, DataType, DataProvider, StopLossRule, \ ScalingRule, TradingCost, \ TradeStatus, generate_backtest_summary_metrics, generate_algorithm_id, \ @@ -222,6 +223,7 @@ "get_positive_trades", "get_number_of_trades", "BacktestRun", + "BacktestIndexRow", "load_backtests_from_directory", "iter_backtests_from_directory", "save_backtests_to_directory", diff --git a/investing_algorithm_framework/cli/cli.py b/investing_algorithm_framework/cli/cli.py index a25daf67..cc83f41e 100644 --- a/investing_algorithm_framework/cli/cli.py +++ b/investing_algorithm_framework/cli/cli.py @@ -320,3 +320,52 @@ def migrate_backtests_cmd( cli.add_command(migrate_backtests_cmd) + + +@click.command(name="index") +@click.argument( + "directory", + type=click.Path(exists=True, file_okay=False, dir_okay=True), +) +@click.option( + "--output", "-o", + type=click.Path(file_okay=True, dir_okay=False), + default=None, + help="Path to the SQLite index file (default: /index.sqlite).", +) +@click.option( + "--absolute-paths", is_flag=True, default=False, + help="Store absolute bundle paths in the index " + "(default: paths relative to , so the index stays " + "portable when the folder is moved).", +) +@click.option( + "--no-progress", is_flag=True, default=False, + help="Suppress the progress bar.", +) +def index_cmd(directory, output, absolute_paths, no_progress): + """Build a SQLite Tier-1 index over a folder of ``.iafbt`` bundles. + + The resulting ``index.sqlite`` file holds one row per bundle with + identity / provenance / config columns and every scalar + ``BacktestSummaryMetrics`` field promoted to its own column, so + analysts can run ad-hoc SQL queries (e.g. + ``SELECT bundle_path FROM backtest_index + WHERE summary_sharpe_ratio > 1.0``) without opening any bundle. + + Each bundle is opened with ``summary_only=True`` so no Parquet + metric blobs are decoded \u2014 indexing 12,500 bundles is bounded by + msgpack header parsing, not metric reconstruction. + """ + from .index_command import build_index + + out = build_index( + directory=directory, + output=output, + relative_paths=not absolute_paths, + show_progress=not no_progress, + ) + click.echo(f"Wrote SQLite index to {out}") + + +cli.add_command(index_cmd) diff --git a/investing_algorithm_framework/cli/index_command.py b/investing_algorithm_framework/cli/index_command.py new file mode 100644 index 00000000..224e079f --- /dev/null +++ b/investing_algorithm_framework/cli/index_command.py @@ -0,0 +1,98 @@ +"""``iaf index`` CLI \u2014 build a SQLite Tier-1 index over a folder of +``.iafbt`` bundles (epic #540 phase 2). + +Walks the directory, opens each bundle with ``summary_only=True`` (no +Parquet metric-blob decode), derives a :class:`BacktestIndexRow` via +:meth:`Backtest.index_row`, and upserts into a +:class:`SqliteBacktestIndex`. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Iterable, List, Optional + +from investing_algorithm_framework.domain import ( + Backtest, + BUNDLE_EXT, +) +from investing_algorithm_framework.services.backtest_index import ( + SqliteBacktestIndex, +) + +logger = logging.getLogger(__name__) + + +DEFAULT_INDEX_NAME = "index.sqlite" + + +def _iter_bundle_paths(directory: Path) -> Iterable[Path]: + """Yield every ``*.iafbt`` file under *directory* (sorted).""" + return sorted(directory.rglob(f"*{BUNDLE_EXT}")) + + +def build_index( + directory: str, + output: Optional[str] = None, + relative_paths: bool = True, + show_progress: bool = False, +) -> str: + """Build (or refresh) a SQLite Tier-1 index over *directory*. + + Args: + directory: Folder to scan for ``.iafbt`` bundles. + output: Path to the SQLite file. Defaults to + ``/index.sqlite``. + relative_paths: if True, store ``bundle_path`` relative to + *directory* so the index file stays portable when the + folder is moved/renamed. + show_progress: emit a tqdm progress bar. + + Returns: + Absolute path of the SQLite file that was written. + """ + src = Path(directory).resolve() + if not src.is_dir(): + raise NotADirectoryError(f"Not a directory: {src}") + + out = Path(output).resolve() if output else src / DEFAULT_INDEX_NAME + paths: List[Path] = list(_iter_bundle_paths(src)) + + pbar = None + if show_progress: + try: + from tqdm import tqdm + pbar = tqdm(total=len(paths), desc="Indexing bundles") + except ImportError: # pragma: no cover - tqdm is a dep + pbar = None + + index = SqliteBacktestIndex.create(out) + n_ok = 0 + n_err = 0 + try: + for path in paths: + try: + bt = Backtest.open(str(path), summary_only=True) + bundle_path = ( + str(path.relative_to(src)) if relative_paths + else str(path) + ) + row = bt.index_row(bundle_path=bundle_path) + index.upsert(row) + n_ok += 1 + except Exception as exc: # noqa: BLE001 \u2014 best-effort scan + logger.warning("failed to index %s: %s", path, exc) + n_err += 1 + finally: + if pbar is not None: + pbar.update(1) + finally: + if pbar is not None: + pbar.close() + index.close() + + logger.info( + "Indexed %d bundle(s) into %s (%d failed)", n_ok, out, n_err, + ) + return str(out) diff --git a/investing_algorithm_framework/domain/__init__.py b/investing_algorithm_framework/domain/__init__.py index 89e28c01..5de65b1e 100644 --- a/investing_algorithm_framework/domain/__init__.py +++ b/investing_algorithm_framework/domain/__init__.py @@ -43,6 +43,7 @@ csv_to_list, StoppableThread, load_csv_into_dict, tqdm, \ is_timezone_aware, sync_timezones, get_timezone from .backtesting import BacktestRun, BacktestSummaryMetrics, \ + BacktestIndexRow, \ BacktestDateRange, Backtest, BacktestMetrics, combine_backtests, \ BacktestPermutationTest, BacktestEvaluationFocus, \ generate_backtest_summary_metrics, load_backtests_from_directory, \ @@ -108,6 +109,7 @@ "parse_decimal_to_string", "parse_string_to_decimal", "BacktestRun", + "BacktestIndexRow", "DATETIME_FORMAT_BACKTESTING", "BACKTESTING_FLAG", "PortfolioSnapshot", diff --git a/investing_algorithm_framework/domain/backtesting/__init__.py b/investing_algorithm_framework/domain/backtesting/__init__.py index 9e53f347..5e2c75a3 100644 --- a/investing_algorithm_framework/domain/backtesting/__init__.py +++ b/investing_algorithm_framework/domain/backtesting/__init__.py @@ -1,4 +1,5 @@ from .backtest_summary_metrics import BacktestSummaryMetrics +from .backtest_index_row import BacktestIndexRow from .backtest_date_range import BacktestDateRange from .backtest_metrics import BacktestMetrics from .backtest_run import BacktestRun @@ -25,6 +26,7 @@ __all__ = [ "Backtest", "BacktestSummaryMetrics", + "BacktestIndexRow", "BacktestDateRange", "BacktestMetrics", "BacktestRun", diff --git a/investing_algorithm_framework/domain/backtesting/backtest.py b/investing_algorithm_framework/domain/backtesting/backtest.py index 70eaec43..09fbb6c2 100644 --- a/investing_algorithm_framework/domain/backtesting/backtest.py +++ b/investing_algorithm_framework/domain/backtesting/backtest.py @@ -13,6 +13,7 @@ from .backtest_permutation_test import BacktestPermutationTest from .backtest_date_range import BacktestDateRange from .backtest_summary_metrics import BacktestSummaryMetrics +from .backtest_index_row import BacktestIndexRow from .combine_backtests import generate_backtest_summary_metrics @@ -57,12 +58,55 @@ class Backtest: strategy_ids: List[int] = field(default_factory=list) parameters: Dict = field(default_factory=dict) tag: str = None + # Backtest engine that produced this report. One of + # ``"vector"``, ``"event"``, or ``None`` (unknown / legacy bundle). + # Bundle format v2 uses this to route the runs into the + # ``vector_runs`` / ``event_runs`` slots in the on-disk envelope. + engine_type: str = None # OHLCV payload optionally attached for save_bundle(include_ohlcv=True). # Keys are conventionally "@" (e.g. # "BTC/EUR@1h"), values are pandas DataFrames or any object that # has ``to_pandas()``. See issue #487. ohlcv: Dict[str, object] = field(default_factory=dict, repr=False) + # ------------------------------------------------------------------ + # Engine-aware run / metrics views (bundle v2) + # ------------------------------------------------------------------ + @property + def vector_runs(self) -> List[BacktestRun]: + """Runs produced by the vectorized backtest engine. + + Returns the full ``backtest_runs`` list when this bundle's + ``engine_type == "vector"``, otherwise an empty list. Bundles + loaded from format v1 (no ``engine_type`` recorded) return an + empty list here; consumers that don't care about the engine + should keep using ``backtest_runs``. + """ + return list(self.backtest_runs) if self.engine_type == "vector" \ + else [] + + @property + def event_runs(self) -> List[BacktestRun]: + """Runs produced by the event-based backtest engine. + + See :py:attr:`vector_runs` for the empty-list semantics on + legacy / unknown-engine bundles. + """ + return list(self.backtest_runs) if self.engine_type == "event" \ + else [] + + @property + def vector_metrics(self) -> Union[BacktestSummaryMetrics, None]: + """Summary metrics for the vector engine (or None).""" + return self.backtest_summary if self.engine_type == "vector" \ + else None + + @property + def event_metrics(self) -> Union[BacktestSummaryMetrics, None]: + """Summary metrics for the event engine (or None).""" + return self.backtest_summary if self.engine_type == "event" \ + else None + def get_all_backtest_runs( self, backtest_date_ranges=None ) -> List[BacktestRun]: @@ -172,6 +216,42 @@ def get_backtest_metrics( return run.backtest_metrics return None + def index_row( + self, bundle_path: Union[str, None] = None, + ) -> BacktestIndexRow: + """Return the typed Tier-1 row contract for this backtest. + + The row carries identity, provenance, config and the scalar + :class:`BacktestSummaryMetrics`, but **no heavy time-series + data**. It can therefore be built without decoding any v2 + Parquet metric blobs (``Backtest.open(path, + summary_only=True)`` is the canonical fast read path). + + Args: + bundle_path: Optional location the bundle was loaded from + (relative or absolute). Stored verbatim in + :pyattr:`BacktestIndexRow.bundle_path` for downstream + indexers that need to round-trip back to the file. + + Returns: + BacktestIndexRow: typed, flat-friendly row. + + See also: + ``docs/design/tiered-backtest-storage.md`` §3.1 — the + authoritative schema this row implements. + """ + return BacktestIndexRow( + algorithm_id=self.algorithm_id, + tag=self.tag, + bundle_path=bundle_path, + engine_type=self.engine_type, + risk_free_rate=self.risk_free_rate, + parameters=dict(self.parameters or {}), + strategy_ids=list(self.strategy_ids or []), + number_of_runs=len(self.backtest_runs or []), + summary_metrics=self.backtest_summary, + ) + def get_backtest_summary(self) -> Union[BacktestSummaryMetrics, None]: """ Retrieve the cross-window BacktestSummaryMetrics roll-up for @@ -208,6 +288,7 @@ def to_dict(self) -> dict: "algorithm_id": self.algorithm_id, "parameters": self.parameters, "tag": self.tag, + "engine_type": self.engine_type, } @classmethod @@ -249,12 +330,14 @@ def from_dict(cls, data: dict) -> 'Backtest': strategy_ids=data.get("strategy_ids") or [], parameters=data.get("parameters") or {}, tag=data.get("tag"), + engine_type=data.get("engine_type"), ) @staticmethod def open( directory_path: Union[str, Path], backtest_date_ranges: List[BacktestDateRange] = None, + summary_only: bool = False, ) -> 'Backtest': """ Open a backtest report from a directory **or** a ``.iafbt`` @@ -265,6 +348,14 @@ def open( backtest_date_ranges (List[BacktestDateRange], optional): A list of date ranges to filter the backtest runs. If provided, only backtest runs matching these date ranges will be loaded. + summary_only (bool): When True (bundle format v2 only), skip + eager decoding of heavy time-series blobs (equity curves, + drawdown series, monthly/yearly returns, etc.). The + blobs remain in the in-memory dict as + ``{"@blob": ""}`` references and consumers that + only need the scalar summary metrics can avoid the + Parquet decode cost. Ignored for legacy directory + loaders and v1 bundles, where these series live inline. Returns: Backtest: An instance of Backtest with the loaded metrics @@ -281,7 +372,7 @@ def open( if os.path.isfile(path_str): from .bundle import BUNDLE_EXT, is_bundle_file, open_bundle if path_str.endswith(BUNDLE_EXT) or is_bundle_file(path_str): - bt = open_bundle(path_str) + bt = open_bundle(path_str, summary_only=summary_only) if backtest_date_ranges is not None: bt.backtest_runs = bt.get_all_backtest_runs( backtest_date_ranges @@ -296,7 +387,7 @@ def open( from .bundle import BUNDLE_EXT, open_bundle candidate = path_str + BUNDLE_EXT if os.path.isfile(candidate): - bt = open_bundle(candidate) + bt = open_bundle(candidate, summary_only=summary_only) if backtest_date_ranges is not None: bt.backtest_runs = bt.get_all_backtest_runs( backtest_date_ranges diff --git a/investing_algorithm_framework/domain/backtesting/backtest_index_row.py b/investing_algorithm_framework/domain/backtesting/backtest_index_row.py new file mode 100644 index 00000000..d466d347 --- /dev/null +++ b/investing_algorithm_framework/domain/backtesting/backtest_index_row.py @@ -0,0 +1,174 @@ +"""Typed Tier-1 row contract for the tiered backtest store (epic #540). + +A :class:`BacktestIndexRow` is the authoritative *flat, scalar-only* +view of a backtest. It is what gets stored as a single row in: + +* the :class:`BacktestIndex` Parquet sidecar produced by + :func:`save_backtests_to_directory`; +* the SQLite index built by ``iaf index`` (epic #540 phase 2); +* the Tier-1 SQL table in any tiered store implementation + (``LocalTieredStore`` and the closed-source remote stores). + +The schema is **deliberately frozen** — adding a new column is an +explicit decision and a doc update. Callers can always stash +non-canonical fields in :pyattr:`extras` (a JSON-friendly dict) which +is round-tripped opaquely. + +This row is built without decoding any heavy time-series payloads; +it is safe to materialise from a bundle opened with +``Backtest.open(path, summary_only=True)``. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field, fields +from typing import Any, Dict, List, Optional + +from .backtest_summary_metrics import BacktestSummaryMetrics + + +# Prefix used when flattening the nested summary metrics into a +# single-level dict (e.g. for Parquet / SQL columns). Kept as a +# module-level constant so consumers can reuse it without hard-coding +# the string in two places. +SUMMARY_FIELD_PREFIX = "summary." + + +@dataclass +class BacktestIndexRow: + """One row of the backtest index — the Tier-1 contract. + + Field groups follow the design doc + (``docs/design/tiered-backtest-storage.md`` §3.1): + + * **Identity** — ``algorithm_id``, ``tag``, ``bundle_path`` + * **Provenance** — ``framework_version``, ``engine_type``, + ``risk_free_rate`` + * **Config** — ``parameters``, ``strategy_ids``, ``number_of_runs`` + * **Scalar metrics** — :pyattr:`summary_metrics`, the existing + :class:`BacktestSummaryMetrics` dataclass + * **Forward-compat** — :pyattr:`extras`, a free-form dict the + bundle reader populates for non-canonical scalar fields + + Notes: + The schema is intentionally flat for the wire shapes that need + flatness (Parquet, SQL). For ergonomic Python use, prefer + accessing :pyattr:`summary_metrics` directly. + """ + + # -- Identity -------------------------------------------------------- + algorithm_id: Optional[str] = None + tag: Optional[str] = None + bundle_path: Optional[str] = None + + # -- Provenance ------------------------------------------------------ + framework_version: Optional[str] = None + engine_type: Optional[str] = None + risk_free_rate: Optional[float] = None + + # -- Config ---------------------------------------------------------- + parameters: Dict[str, Any] = field(default_factory=dict) + strategy_ids: List[Any] = field(default_factory=list) + number_of_runs: int = 0 + + # -- Scalar metrics -------------------------------------------------- + summary_metrics: Optional[BacktestSummaryMetrics] = None + + # -- Forward-compat -------------------------------------------------- + extras: Dict[str, Any] = field(default_factory=dict) + + # ------------------------------------------------------------------ + # Flat-dict round-trip (Parquet / SQL / JSON wire shape) + # ------------------------------------------------------------------ + def to_flat_dict(self) -> Dict[str, Any]: + """Flatten into a single-level dict. + + Summary-metric scalars are emitted under + :data:`SUMMARY_FIELD_PREFIX` keys (``summary.sharpe_ratio`` + etc.). Complex fields (``parameters``, ``strategy_ids``) are + JSON-encoded so the result fits any tabular sink. + """ + out: Dict[str, Any] = { + "algorithm_id": self.algorithm_id, + "tag": self.tag, + "bundle_path": self.bundle_path, + "framework_version": self.framework_version, + "engine_type": self.engine_type, + "risk_free_rate": self.risk_free_rate, + "number_of_runs": self.number_of_runs, + } + + # parameters / strategy_ids → JSON for tabular round-trip + out["parameters"] = ( + _safe_json(self.parameters) if self.parameters else None + ) + out["strategy_ids"] = ( + _safe_json(self.strategy_ids) if self.strategy_ids else None + ) + + # Scalar summary metrics, prefixed + if self.summary_metrics is not None: + for k, v in self.summary_metrics.to_dict().items(): + if isinstance(v, (int, float, str, bool)) or v is None: + out[f"{SUMMARY_FIELD_PREFIX}{k}"] = v + + # Forward-compat extras, prefixed to avoid colliding with the + # canonical column set. + for k, v in (self.extras or {}).items(): + if isinstance(v, (int, float, str, bool)) or v is None: + out[f"extras.{k}"] = v + + return out + + @classmethod + def from_flat_dict(cls, row: Dict[str, Any]) -> "BacktestIndexRow": + """Reconstruct a row from the flat dict shape produced by + :meth:`to_flat_dict`. Unknown keys land in :pyattr:`extras`.""" + canonical = {f.name for f in fields(cls)} - { + "summary_metrics", "extras" + } + + kwargs: Dict[str, Any] = {} + summary_dict: Dict[str, Any] = {} + extras: Dict[str, Any] = {} + + for k, v in row.items(): + if k in canonical: + if k in ("parameters", "strategy_ids"): + if v is None: + kwargs[k] = {} if k == "parameters" else [] + continue + if isinstance(v, str): + try: + kwargs[k] = json.loads(v) + continue + except (TypeError, ValueError): + pass + kwargs[k] = v + elif k.startswith(SUMMARY_FIELD_PREFIX): + summary_dict[k[len(SUMMARY_FIELD_PREFIX):]] = v + elif k.startswith("extras."): + extras[k[len("extras."):]] = v + else: + # Unknown key — preserve under extras (round-trip safety). + extras[k] = v + + kwargs.setdefault("parameters", {}) + kwargs.setdefault("strategy_ids", []) + + return cls( + **kwargs, + summary_metrics=( + BacktestSummaryMetrics.from_dict(summary_dict) + if summary_dict else None + ), + extras=extras, + ) + + +def _safe_json(obj: Any) -> Optional[str]: + try: + return json.dumps(obj, default=str) + except (TypeError, ValueError): + return None diff --git a/investing_algorithm_framework/domain/backtesting/backtest_utils.py b/investing_algorithm_framework/domain/backtesting/backtest_utils.py index 2be662cf..ea9ec4fc 100644 --- a/investing_algorithm_framework/domain/backtesting/backtest_utils.py +++ b/investing_algorithm_framework/domain/backtesting/backtest_utils.py @@ -551,28 +551,14 @@ def iter_backtests_from_directory( def _backtest_to_index_row(bt: Backtest, bundle_path: Optional[str] = None): - """Flatten a backtest's summary + identity into a single row.""" - summary = ( - bt.backtest_summary.to_dict() if bt.backtest_summary else {} - ) - row = { - "algorithm_id": getattr(bt, "algorithm_id", None), - "tag": getattr(bt, "tag", None), - "risk_free_rate": getattr(bt, "risk_free_rate", None), - "bundle_path": bundle_path, - "number_of_runs": len(bt.backtest_runs or []), - } - # Include scalar summary metrics only (no nested structures). - for k, v in summary.items(): - if isinstance(v, (int, float, str, bool)) or v is None: - row[f"summary.{k}"] = v - # Parameters as JSON for round-trippability without exploding columns. - if getattr(bt, "parameters", None): - try: - row["parameters"] = json.dumps(bt.parameters, default=str) - except (TypeError, ValueError): - row["parameters"] = None - return row + """Flatten a backtest's summary + identity into a single row. + + Thin wrapper around :meth:`Backtest.index_row` for callers that + want the legacy flat dict shape (Parquet / SQL columns). The + typed :class:`BacktestIndexRow` is the authoritative contract \u2014 + see ``docs/design/tiered-backtest-storage.md`` \u00a73.1. + """ + return bt.index_row(bundle_path=bundle_path).to_flat_dict() def _write_index(directory_path: Union[str, Path], backtests: List[Backtest]): diff --git a/investing_algorithm_framework/domain/backtesting/bundle.py b/investing_algorithm_framework/domain/backtesting/bundle.py index 1d043c2c..3ccac964 100644 --- a/investing_algorithm_framework/domain/backtesting/bundle.py +++ b/investing_algorithm_framework/domain/backtesting/bundle.py @@ -1,15 +1,39 @@ """Single-file binary bundle persistence for :class:`Backtest`. -Implements the bundle format proposed in issue #487: - -- One ``.iafbt`` file per backtest (zstd-compressed MessagePack). -- Optional content-addressed Parquet store for OHLCV payloads, shared - across many bundles in the same parent directory. -- Round-trips perfectly through :py:meth:`Backtest.to_dict` / - :py:meth:`Backtest.from_dict`. - -The directory format produced by :py:meth:`Backtest.save` is preserved -as a fall-back; this module never touches it. +Implements the bundle format proposed in issue #487, plus the v2 +extensions described in ``docs/design/bundle-format-v2.md``. + +Format versions +--------------- + +- **v1** (legacy): a single zstd-compressed MessagePack envelope of + ``{"format_version": 1, "backtest": , + "ohlcv": }``. Heavy time-series fields + (equity / drawdown / monthly / yearly / cumulative_return / + rolling_sharpe / TWR variants) live inline as lists of + ``(value, ISO-string)`` tuples. + +- **v2** (default since v8.9.0): + - Splits runs into ``vector_runs`` / ``event_runs`` based on + ``Backtest.engine_type``; the matching summary metrics live + under ``vector_metrics`` / ``event_metrics``. Bundles with no + engine_type fall back to ``backtest_runs``. + - Heavy time series are extracted from each ``backtest_metrics`` + dict and stored as embedded Parquet bytes under a top-level + ``blobs`` map keyed ``runs//metrics/.parquet``. + Each blob has two columns: ``ts`` (int64 epoch-ms in UTC) and + ``value`` (float64). Yearly returns store ``ts`` as a date + midnight epoch-ms. The original field is replaced with a + ``{"@blob": ""}`` reference. + - On read, blob references are resolved back to lists of + ``(value, datetime)`` tuples so consumers see the same shape + as v1. ``open_bundle(..., summary_only=True)`` skips the + Parquet decode entirely (the references are left in place + as opaque dicts), which keeps bulk-listing fast. + +OHLCV side store and the ``LazyOhlcvDict`` are unchanged across +versions. The OHLCV writer accepts ``float32_ohlcv=True`` to +quantize OHLCV columns to float32 before Parquet encoding. """ from __future__ import annotations @@ -17,8 +41,9 @@ import io import logging import os +from datetime import date, datetime, timezone from pathlib import Path -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional, Tuple, Union import msgpack import zstandard as zstd @@ -28,18 +53,46 @@ logger = logging.getLogger(__name__) -BUNDLE_FORMAT_VERSION = 1 +# Bumped to 2 when v2 format went live (issue #538). Writers always +# emit v2 unless ``save_bundle(..., format_version=1)`` is requested +# explicitly. Readers accept both v1 and v2. +BUNDLE_FORMAT_VERSION = 2 BUNDLE_EXT = ".iafbt" -# ``zstd`` compression level. Level 7 is a sweet spot: ~6-8x on the -# typical run.json/metrics.json payload, ~10ms encode for a 17MB doc. -_ZSTD_LEVEL = 7 +# ``zstd`` compression level. Level 19 is the highest level still in +# the "fast" tier (i.e. without ``--ultra``). Measured on real +# 7-run/2192-snapshot bundles it cuts ~14% off the on-disk size vs +# level 7 with no observable decode-speed impact, and is what we +# default to since the v8.9 size review. +_ZSTD_LEVEL = 19 # Header used to detect bundle files cheaply without decoding. # 4 bytes magic ("IAFB") + 4 bytes little-endian uint32 format version. _MAGIC = b"IAFB" +# Metric fields that are extracted into Parquet blobs in format v2. +# All of these have shape ``List[Tuple[float, datetime|date]]`` in +# ``BacktestMetrics.to_dict()``; any non-list value is left untouched. +_METRIC_BLOB_FIELDS: Tuple[str, ...] = ( + "equity_curve", + "drawdown_series", + "cumulative_return_series", + "rolling_sharpe_ratio", + "monthly_returns", + "yearly_returns", + "twr_equity_curve", + "twr_drawdown_series", +) + +# Marker key inserted in place of an extracted heavy series. The +# top-level ``blobs`` dict resolves the key to the actual Parquet +# bytes. Kept as a single-key dict so any consumer that walks the +# document without resolving blobs can still distinguish a reference +# from real data. +_BLOB_REF_KEY = "@blob" + + # --------------------------------------------------------------------------- # OHLCV side-store (content-addressed Parquet) # --------------------------------------------------------------------------- @@ -49,8 +102,17 @@ def _hash_bytes(payload: bytes) -> str: return hashlib.sha256(payload).hexdigest() -def _df_to_parquet_bytes(df: Any) -> bytes: - """Serialize a pandas/polars DataFrame to zstd-compressed Parquet.""" +def _df_to_parquet_bytes(df: Any, *, float32: bool = False) -> bytes: + """Serialize a pandas/polars DataFrame to zstd-compressed Parquet. + + Args: + df: Source DataFrame (pandas or polars). + float32: When True, downcast any float64 columns to float32 + before encoding. For OHLCV payloads this typically + halves on-disk size with no observable effect on backtest + metrics. Use only for OHLCV / market data; metric series + keep float64 to preserve precision. + """ import pyarrow as pa import pyarrow.parquet as pq @@ -58,6 +120,15 @@ def _df_to_parquet_bytes(df: Any) -> bytes: if hasattr(df, "to_pandas") and not hasattr(df, "to_records"): df = df.to_pandas() + if float32: + try: + import numpy as np + float_cols = df.select_dtypes(include=[np.float64]).columns + if len(float_cols) > 0: + df = df.astype({c: np.float32 for c in float_cols}) + except Exception: # pragma: no cover - numpy/pandas optional path + pass + table = pa.Table.from_pandas(df, preserve_index=False) buf = io.BytesIO() pq.write_table(table, buf, compression="zstd", compression_level=5) @@ -71,13 +142,130 @@ def _parquet_bytes_to_df(payload: bytes): return table.to_pandas() +# --------------------------------------------------------------------------- +# v2 metric-series Parquet helpers (small two-column blobs) +# --------------------------------------------------------------------------- + + +def _to_epoch_ms(value: Any) -> Optional[int]: + """Return *value* as an int64 UTC epoch-millisecond timestamp. + + Accepts ``datetime``, ``date``, ISO-8601 strings, and pre-converted + ints. Returns None for None / unparseable inputs so the encoder can + drop them without raising. + """ + if value is None: + return None + if isinstance(value, bool): + return None + if isinstance(value, int): + # Already epoch-ms (assume — we never write ints into the + # series ourselves; this branch only fires on weird inputs). + return value + if isinstance(value, float): + return int(value) + if isinstance(value, datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=timezone.utc) + else: + value = value.astimezone(timezone.utc) + return int(value.timestamp() * 1000) + if isinstance(value, date): + # midnight UTC for the calendar day + dt = datetime(value.year, value.month, value.day, tzinfo=timezone.utc) + return int(dt.timestamp() * 1000) + if isinstance(value, str): + try: + dt = datetime.fromisoformat(value) + except ValueError: + try: + dt = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") + except ValueError: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) + return int(dt.timestamp() * 1000) + return None + + +def _from_epoch_ms(ts_ms: Optional[int]) -> Optional[datetime]: + """Inverse of :func:`_to_epoch_ms`. Returns timezone-aware UTC.""" + if ts_ms is None: + return None + return datetime.fromtimestamp(int(ts_ms) / 1000, tz=timezone.utc) + + +def _series_to_parquet_bytes(series: Any) -> Optional[bytes]: + """Encode a ``[(value, datetime), ...]`` series as 2-column Parquet. + + Returns None if the series is empty or not a list of pairs (the + caller then leaves the field inline — i.e. blob extraction is a + no-op and the original list is preserved). + """ + if not isinstance(series, (list, tuple)) or not series: + return None + + timestamps = [] + values = [] + for entry in series: + if not isinstance(entry, (list, tuple)) or len(entry) != 2: + return None + value, ts = entry + ts_ms = _to_epoch_ms(ts) + if ts_ms is None: + return None + timestamps.append(ts_ms) + try: + values.append(float(value) if value is not None else None) + except (TypeError, ValueError): + return None + + import pyarrow as pa + import pyarrow.parquet as pq + + table = pa.table({ + "ts": pa.array(timestamps, type=pa.int64()), + "value": pa.array(values, type=pa.float64()), + }) + buf = io.BytesIO() + pq.write_table(table, buf, compression="zstd", compression_level=5) + return buf.getvalue() + + +def _parquet_bytes_to_series(payload: bytes) -> list: + """Inverse of :func:`_series_to_parquet_bytes`. Returns + ``[(value, iso_string), ...]`` matching the v1 inline shape so + downstream consumers (``BacktestMetrics.from_dict``) don't need + to know which format produced the bundle. + """ + import pyarrow.parquet as pq + + table = pq.read_table(io.BytesIO(payload)) + ts_col = table.column("ts").to_pylist() + value_col = table.column("value").to_pylist() + out = [] + for i in range(len(ts_col)): + dt = _from_epoch_ms(ts_col[i]) + iso = dt.isoformat() if dt is not None else None + out.append((value_col[i], iso)) + return out + + def _write_ohlcv_to_store( ohlcv: Dict[str, Any], store_dir: Union[str, Path], + *, + float32: bool = False, ) -> Dict[str, str]: """Write each (symbol, timeframe) DataFrame to *store_dir* keyed by content hash. Returns a manifest mapping the original key to the relative path of the stored Parquet blob. + + When ``float32=True``, OHLCV float64 columns are downcast to + float32 before encoding (~2x size reduction with no observable + impact on backtest metrics for crypto/equity time series). """ if not ohlcv: return {} @@ -89,7 +277,7 @@ def _write_ohlcv_to_store( for key, df in ohlcv.items(): if df is None: continue - payload = _df_to_parquet_bytes(df) + payload = _df_to_parquet_bytes(df, float32=float32) digest = _hash_bytes(payload) rel = f"{digest}.parquet" target = store_dir / rel @@ -201,7 +389,7 @@ def _msgpack_default(obj): ) -def _encode_payload(doc: dict) -> bytes: +def _encode_payload(doc: dict, *, format_version: int) -> bytes: raw = msgpack.packb( doc, use_bin_type=True, @@ -210,10 +398,16 @@ def _encode_payload(doc: dict) -> bytes: ) cctx = zstd.ZstdCompressor(level=_ZSTD_LEVEL) body = cctx.compress(raw) - return _MAGIC + BUNDLE_FORMAT_VERSION.to_bytes(4, "little") + body + return _MAGIC + format_version.to_bytes(4, "little") + body + +def _decode_payload(blob: bytes) -> Tuple[int, dict]: + """Decode a bundle byte string and return ``(format_version, doc)``. -def _decode_payload(blob: bytes) -> dict: + Both v1 and v2 envelopes share the same outer ``IAFB`` + uint32 + version header and the same zstd-compressed msgpack body — only + the *contents* of ``doc`` differ. + """ if not blob.startswith(_MAGIC): raise ValueError( "Not a valid Backtest bundle (missing IAFB magic bytes)." @@ -227,7 +421,170 @@ def _decode_payload(blob: bytes) -> dict: ) dctx = zstd.ZstdDecompressor() raw = dctx.decompress(blob[8:]) - return msgpack.unpackb(raw, raw=False) + return version, msgpack.unpackb(raw, raw=False) + + +# --------------------------------------------------------------------------- +# v2 envelope construction / disassembly +# --------------------------------------------------------------------------- + + +def _extract_metric_blobs( + backtest_dict: dict, + blobs: Dict[str, bytes], + *, + runs_key: str, +) -> None: + """Walk ``backtest_dict[runs_key]`` and replace heavy metric series + with ``{"@blob": ""}`` references; the actual Parquet bytes + are appended to *blobs* keyed by ``runs//metrics/.parquet``. + + Mutates *backtest_dict* in place. Fields that aren't recognised + list-of-tuples shapes are left inline (the encoder is conservative + by design — never lose data). + """ + runs = backtest_dict.get(runs_key) + if not runs: + return + for idx, run in enumerate(runs): + if not isinstance(run, dict): + continue + metrics = run.get("backtest_metrics") + if not isinstance(metrics, dict): + continue + for field in _METRIC_BLOB_FIELDS: + series = metrics.get(field) + if series is None: + continue + payload = _series_to_parquet_bytes(series) + if payload is None: + # Unrecognised shape — keep inline as v1 fallback. + continue + key = f"runs/{idx}/metrics/{field}.parquet" + blobs[key] = payload + metrics[field] = {_BLOB_REF_KEY: key} + + +def _resolve_metric_blobs( + backtest_dict: dict, + blobs: Dict[str, bytes], + *, + runs_key: str, + summary_only: bool = False, +) -> None: + """Inverse of :func:`_extract_metric_blobs`: walk + ``backtest_dict[runs_key]`` and replace any + ``{"@blob": ""}`` reference with the decoded series. + + When ``summary_only=True``, references are replaced with empty + lists instead of being decoded — this keeps the + ``BacktestMetrics.from_dict`` contract (it expects lists for + these fields, not refs) while skipping the Parquet decode cost. + The scalar summary fields (sharpe / sortino / max_dd / etc.) + on the same metrics object remain fully populated, which is the + whole point of this mode. + """ + runs = backtest_dict.get(runs_key) + if not runs: + return + for run in runs: + if not isinstance(run, dict): + continue + metrics = run.get("backtest_metrics") + if not isinstance(metrics, dict): + continue + for field in _METRIC_BLOB_FIELDS: + value = metrics.get(field) + if isinstance(value, dict) and _BLOB_REF_KEY in value: + if summary_only: + metrics[field] = [] + continue + key = value[_BLOB_REF_KEY] + payload = blobs.get(key) + if payload is None: + metrics[field] = [] + else: + metrics[field] = _parquet_bytes_to_series(payload) + + +def _build_v2_envelope(backtest: Backtest) -> dict: + """Build the ``doc`` dict for a v2 bundle from *backtest*. + + Routes runs and summary into engine-specific slots + (``vector_runs`` / ``vector_metrics`` / ``event_runs`` / + ``event_metrics``) when ``backtest.engine_type`` is set, else + falls back to the engine-agnostic ``backtest_runs`` / + ``backtest_summary`` keys (compatible with consumers that don't + care about the engine). + """ + backtest_dict = backtest.to_dict() + blobs: Dict[str, bytes] = {} + + engine = backtest.engine_type + envelope: Dict[str, Any] = { + "format_version": 2, + "engine_type": engine, + "algorithm_id": backtest_dict.get("algorithm_id"), + "metadata": backtest_dict.get("metadata") or {}, + "risk_free_rate": backtest_dict.get("risk_free_rate"), + "strategy_ids": backtest_dict.get("strategy_ids") or [], + "parameters": backtest_dict.get("parameters") or {}, + "tag": backtest_dict.get("tag"), + "backtest_permutation_tests": + backtest_dict.get("backtest_permutation_tests"), + } + + if engine == "vector": + runs_key = "vector_runs" + metrics_key = "vector_metrics" + elif engine == "event": + runs_key = "event_runs" + metrics_key = "event_metrics" + else: + runs_key = "backtest_runs" + metrics_key = "backtest_summary" + + envelope[runs_key] = backtest_dict.get("backtest_runs") + envelope[metrics_key] = backtest_dict.get("backtest_summary") + + _extract_metric_blobs(envelope, blobs, runs_key=runs_key) + + if blobs: + envelope["blobs"] = blobs + return envelope + + +def _envelope_to_backtest_dict(doc: dict) -> dict: + """Inverse of :func:`_build_v2_envelope`: collapse the v2 envelope + back into the engine-agnostic dict shape consumed by + :py:meth:`Backtest.from_dict`. Resolves embedded blob references + (unless the caller passed ``summary_only`` upstream — in that case + the references are preserved as opaque dicts). + """ + engine = doc.get("engine_type") + if engine == "vector": + runs_key = "vector_runs" + metrics_key = "vector_metrics" + elif engine == "event": + runs_key = "event_runs" + metrics_key = "event_metrics" + else: + runs_key = "backtest_runs" + metrics_key = "backtest_summary" + + return { + "algorithm_id": doc.get("algorithm_id"), + "backtest_runs": doc.get(runs_key), + "backtest_summary": doc.get(metrics_key), + "backtest_permutation_tests": + doc.get("backtest_permutation_tests"), + "metadata": doc.get("metadata") or {}, + "risk_free_rate": doc.get("risk_free_rate"), + "strategy_ids": doc.get("strategy_ids") or [], + "parameters": doc.get("parameters") or {}, + "tag": doc.get("tag"), + "engine_type": engine, + } def save_bundle( @@ -236,6 +593,8 @@ def save_bundle( *, include_ohlcv: bool = False, ohlcv_store: Optional[Union[str, Path]] = None, + format_version: Optional[int] = None, + float32_ohlcv: bool = False, ) -> Path: """Write *backtest* to a single ``.iafbt`` bundle. @@ -252,10 +611,28 @@ def save_bundle( the bundle. ohlcv_store: Override for the OHLCV store directory. Useful when persisting many bundles to share a single store. + format_version: Force a specific bundle format. Defaults to + :data:`BUNDLE_FORMAT_VERSION` (currently 2). Pass ``1`` to + emit a legacy envelope for compatibility with downstream + tools that haven't been upgraded yet. + float32_ohlcv: When True, OHLCV float columns are downcast to + float32 before Parquet encoding (~2x size reduction with + no observable impact on backtest metrics for typical + crypto / equity series). Off by default to preserve the + v1 round-trip contract; opt in for upload / archive + workflows. Returns: The final bundle file path. """ + if format_version is None: + format_version = BUNDLE_FORMAT_VERSION + if format_version not in (1, 2): + raise ValueError( + f"Unsupported bundle format_version {format_version}; " + f"valid values are 1 and 2." + ) + path = Path(path) if path.is_dir(): name = backtest.algorithm_id or "backtest" @@ -266,10 +643,13 @@ def save_bundle( ) target.parent.mkdir(parents=True, exist_ok=True) - doc = { - "format_version": BUNDLE_FORMAT_VERSION, - "backtest": backtest.to_dict(), - } + if format_version == 2: + doc = _build_v2_envelope(backtest) + else: + doc = { + "format_version": 1, + "backtest": backtest.to_dict(), + } if include_ohlcv and getattr(backtest, "ohlcv", None): store = ( @@ -277,7 +657,9 @@ def save_bundle( if ohlcv_store is not None else target.parent / "ohlcv" ) - manifest = _write_ohlcv_to_store(backtest.ohlcv, store) + manifest = _write_ohlcv_to_store( + backtest.ohlcv, store, float32=float32_ohlcv + ) if manifest: try: rel_store = os.path.relpath(store, target.parent) @@ -288,7 +670,7 @@ def save_bundle( "manifest": manifest, } - payload = _encode_payload(doc) + payload = _encode_payload(doc, format_version=format_version) tmp = target.with_suffix(target.suffix + ".tmp") tmp.write_bytes(payload) os.replace(tmp, target) @@ -299,6 +681,7 @@ def open_bundle( path: Union[str, Path], *, ohlcv_store: Optional[Union[str, Path]] = None, + summary_only: bool = False, ) -> Backtest: """Load a :class:`Backtest` from a ``.iafbt`` bundle file. @@ -307,13 +690,43 @@ def open_bundle( ohlcv_store: Override for the OHLCV store directory. Defaults to the value persisted in the bundle, resolved relative to the bundle's parent directory. + summary_only: When True (v2 only), skip eager Parquet decode of + the per-run heavy time series (equity / drawdown / monthly + / yearly / cumulative_return / rolling_sharpe / TWR + variants). The blob references are preserved on the run + dict as opaque ``{"@blob": ""}`` markers; the + scalar summary metrics (Sharpe, Sortino, max DD, CAGR, + etc.) are fully populated. Useful for bulk listings / + ranking pipelines that don't draw charts. Ignored for v1 + bundles, where these series are inline. """ path = Path(path) if not path.is_file(): raise FileNotFoundError(f"Bundle file not found: {path}") - doc = _decode_payload(path.read_bytes()) - backtest = Backtest.from_dict(doc.get("backtest") or {}) + version, doc = _decode_payload(path.read_bytes()) + + if version >= 2: + blobs = doc.get("blobs") or {} + engine = doc.get("engine_type") + runs_key = ( + "vector_runs" if engine == "vector" + else "event_runs" if engine == "event" + else "backtest_runs" + ) + # Always walk the run dicts: when summary_only=True we still + # need to replace blob references with empty lists so the + # downstream ``BacktestMetrics.from_dict`` parser doesn't + # choke on dict values where it expects lists. The Parquet + # decode is what we actually skip. + if blobs: + _resolve_metric_blobs( + doc, blobs, runs_key=runs_key, summary_only=summary_only + ) + flat = _envelope_to_backtest_dict(doc) + backtest = Backtest.from_dict(flat) + else: + backtest = Backtest.from_dict(doc.get("backtest") or {}) ohlcv_meta = doc.get("ohlcv") if ohlcv_meta: diff --git a/investing_algorithm_framework/infrastructure/services/backtesting/backtest_service.py b/investing_algorithm_framework/infrastructure/services/backtesting/backtest_service.py index ac8f0933..0deb621a 100644 --- a/investing_algorithm_framework/infrastructure/services/backtesting/backtest_service.py +++ b/investing_algorithm_framework/infrastructure/services/backtesting/backtest_service.py @@ -608,6 +608,7 @@ def create_backtest( tag=algorithm.metadata.get('tag') if hasattr( algorithm, 'metadata') and algorithm.metadata else None, + engine_type="event", ) def backtest_exists( @@ -676,7 +677,11 @@ def load_backtest_by_strategy_and_backtest_date_range( backtest = Backtest.open(backtest_directory) run = backtest.get_backtest_run(backtest_date_range) metadata = backtest.get_metadata() - return Backtest(backtest_runs=[run], metadata=metadata) + return Backtest( + backtest_runs=[run], + metadata=metadata, + engine_type=backtest.engine_type, + ) else: raise OperationalException("Backtest does not exist.") @@ -2129,6 +2134,7 @@ def _run_batch_backtest_worker(args): tag=strategy.metadata.get('tag') if hasattr( strategy, 'metadata') and strategy.metadata else None, + engine_type="vector", ) batch_results.append(backtest) @@ -2307,7 +2313,8 @@ def run_vector_backtest( algorithm_id=strategy.algorithm_id, backtest_runs=[], risk_free_rate=risk_free_rate or 0.0, - metadata=metadata or {} + metadata=metadata or {}, + engine_type="vector", ) def _get_risk_free_rate(self) -> float: @@ -3074,7 +3081,8 @@ def run_backtest( algorithm_id=algorithm_id, backtest_runs=[], risk_free_rate=risk_free_rate or 0.0, - metadata=metadata or {} + metadata=metadata or {}, + engine_type="event", ), {} def create_ohlcv_permutation( diff --git a/investing_algorithm_framework/infrastructure/services/backtesting/event_backtest_service.py b/investing_algorithm_framework/infrastructure/services/backtesting/event_backtest_service.py index 7b10ca1f..cbe347e4 100644 --- a/investing_algorithm_framework/infrastructure/services/backtesting/event_backtest_service.py +++ b/investing_algorithm_framework/infrastructure/services/backtesting/event_backtest_service.py @@ -310,4 +310,5 @@ def create_backtest( [run.backtest_metrics] ), risk_free_rate=risk_free_rate, + engine_type="event", ) diff --git a/investing_algorithm_framework/services/backtest_index/__init__.py b/investing_algorithm_framework/services/backtest_index/__init__.py new file mode 100644 index 00000000..a492c132 --- /dev/null +++ b/investing_algorithm_framework/services/backtest_index/__init__.py @@ -0,0 +1,3 @@ +from .sqlite_index import SqliteBacktestIndex, SCHEMA_VERSION + +__all__ = ["SqliteBacktestIndex", "SCHEMA_VERSION"] diff --git a/investing_algorithm_framework/services/backtest_index/sqlite_index.py b/investing_algorithm_framework/services/backtest_index/sqlite_index.py new file mode 100644 index 00000000..c6a93709 --- /dev/null +++ b/investing_algorithm_framework/services/backtest_index/sqlite_index.py @@ -0,0 +1,385 @@ +"""SQLite-backed Tier-1 backtest index (epic #540 phase 2). + +A :class:`SqliteBacktestIndex` is a single-file SQLite database that +holds one row per backtest bundle, derived from +:class:`BacktestIndexRow`. It is the local-disk implementation of the +Tier-1 store described in +``docs/design/tiered-backtest-storage.md`` \u00a73.1. + +Schema +------ +The schema is generated from two sources of truth: + +* The canonical *identity / provenance / config* columns of + :class:`BacktestIndexRow`. +* All numeric / string fields of :class:`BacktestSummaryMetrics`, + promoted as ``summary_`` columns so analysts can filter on + e.g. ``WHERE summary_sharpe_ratio > 1.0``. + +Anything that doesn't fit those is round-tripped opaquely in the +``extras_json`` and ``summary_extras_json`` columns. ``parameters`` +and ``strategy_ids`` are stored as JSON text. + +The file carries ``PRAGMA user_version = SCHEMA_VERSION`` so future +migrations can detect and upgrade older index files additively. + +Concurrency +----------- +Writes go through a single connection in ``WAL`` mode; multiple +readers from other processes are safe. +""" + +from __future__ import annotations + +import json +import logging +import sqlite3 +from dataclasses import fields as dc_fields +from pathlib import Path +from typing import ( + Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union, +) + +from investing_algorithm_framework.domain.backtesting.backtest_index_row \ + import BacktestIndexRow +from investing_algorithm_framework.domain.backtesting \ + .backtest_summary_metrics import BacktestSummaryMetrics + + +logger = logging.getLogger(__name__) + + +# Bumped on any additive schema change. Old files are upgraded +# in-place by :meth:`SqliteBacktestIndex._migrate`. +SCHEMA_VERSION = 1 + +# Columns of BacktestIndexRow that map 1:1 to typed SQL columns. +# (parameters / strategy_ids are emitted as JSON text columns; the +# scalar metrics are promoted from BacktestSummaryMetrics below.) +_IDENTITY_COLUMNS: Tuple[Tuple[str, str], ...] = ( + ("bundle_path", "TEXT PRIMARY KEY"), + ("algorithm_id", "TEXT"), + ("tag", "TEXT"), + ("framework_version", "TEXT"), + ("engine_type", "TEXT"), + ("risk_free_rate", "REAL"), + ("number_of_runs", "INTEGER"), + ("parameters_json", "TEXT"), + ("strategy_ids_json", "TEXT"), + ("extras_json", "TEXT"), + ("summary_extras_json", "TEXT"), +) + + +def _summary_columns() -> List[Tuple[str, str]]: + """Promote BacktestSummaryMetrics fields to ``summary_`` cols. + + Numeric fields become ``REAL`` (or ``INTEGER`` if annotated ``int``); + everything else degrades to ``TEXT``. + """ + cols: List[Tuple[str, str]] = [] + for f in dc_fields(BacktestSummaryMetrics): + ann = f.type + if ann is int or ann == "int": + sql_type = "INTEGER" + elif ann is float or ann == "float": + sql_type = "REAL" + elif ann is bool or ann == "bool": + sql_type = "INTEGER" + else: + sql_type = "TEXT" + cols.append((f"summary_{f.name}", sql_type)) + return cols + + +_SUMMARY_COLUMNS: Tuple[Tuple[str, str], ...] = tuple(_summary_columns()) +_SUMMARY_FIELD_NAMES: frozenset = frozenset( + f.name for f in dc_fields(BacktestSummaryMetrics) +) + + +def _all_columns() -> List[Tuple[str, str]]: + return list(_IDENTITY_COLUMNS) + list(_SUMMARY_COLUMNS) + + +_TABLE = "backtest_index" + + +class SqliteBacktestIndex: + """Single-file SQLite index over a directory of ``.iafbt`` bundles. + + Use :meth:`create` to make a fresh file (overwrites if exists), + :meth:`open` to connect to an existing one (creating tables if + needed), :meth:`upsert` to add/replace a row, and + :meth:`iter_rows` / :meth:`query` for read access. + """ + + def __init__(self, path: Union[str, Path], conn: sqlite3.Connection): + self.path = Path(path) + self._conn = conn + + # ------------------------------------------------------------------ + # Construction + # ------------------------------------------------------------------ + @classmethod + def create(cls, path: Union[str, Path]) -> "SqliteBacktestIndex": + """Create a fresh index file (overwrites any existing file).""" + p = Path(path) + if p.exists(): + p.unlink() + p.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(p)) + conn.row_factory = sqlite3.Row + cls._init_schema(conn) + return cls(p, conn) + + @classmethod + def open(cls, path: Union[str, Path]) -> "SqliteBacktestIndex": + """Open an existing index file, creating tables on first use.""" + p = Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(p)) + conn.row_factory = sqlite3.Row + cls._init_schema(conn) + cls._migrate(conn) + return cls(p, conn) + + # ------------------------------------------------------------------ + # Schema + # ------------------------------------------------------------------ + @staticmethod + def _init_schema(conn: sqlite3.Connection) -> None: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + cols = ", ".join( + f'"{name}" {sql_type}' for name, sql_type in _all_columns() + ) + conn.execute(f'CREATE TABLE IF NOT EXISTS "{_TABLE}" ({cols})') + conn.execute( + f'CREATE INDEX IF NOT EXISTS idx_{_TABLE}_algorithm_id ' + f'ON "{_TABLE}"(algorithm_id)' + ) + conn.execute( + f'CREATE INDEX IF NOT EXISTS idx_{_TABLE}_tag ' + f'ON "{_TABLE}"(tag)' + ) + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.commit() + + @staticmethod + def _migrate(conn: sqlite3.Connection) -> None: + """Additive forward-only migration based on PRAGMA user_version. + + Adds any columns that the current code knows about but the + on-disk file is missing. Never drops or rewrites existing + columns. + """ + existing = { + row["name"] + for row in conn.execute(f'PRAGMA table_info("{_TABLE}")') + } + for name, sql_type in _all_columns(): + if name not in existing: + conn.execute( + f'ALTER TABLE "{_TABLE}" ' + f'ADD COLUMN "{name}" {sql_type}' + ) + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.commit() + + # ------------------------------------------------------------------ + # Writes + # ------------------------------------------------------------------ + def upsert(self, row: BacktestIndexRow) -> None: + """Insert or replace a single row, keyed by ``bundle_path``. + + Raises: + ValueError: if ``row.bundle_path`` is None (it is the PK). + """ + if not row.bundle_path: + raise ValueError( + "BacktestIndexRow.bundle_path is required for SQLite " + "upsert (used as the primary key)." + ) + record = self._row_to_record(row) + cols = list(record.keys()) + placeholders = ", ".join("?" for _ in cols) + col_list = ", ".join(f'"{c}"' for c in cols) + self._conn.execute( + f'INSERT OR REPLACE INTO "{_TABLE}" ({col_list}) ' + f'VALUES ({placeholders})', + [record[c] for c in cols], + ) + self._conn.commit() + + def upsert_many(self, rows: Iterable[BacktestIndexRow]) -> int: + """Bulk insert/replace; returns the number of rows written.""" + rows = list(rows) + if not rows: + return 0 + # Use the first row to fix the column set; record-builder is + # deterministic so all rows produce the same keys. + first = self._row_to_record(rows[0]) + cols = list(first.keys()) + placeholders = ", ".join("?" for _ in cols) + col_list = ", ".join(f'"{c}"' for c in cols) + sql = ( + f'INSERT OR REPLACE INTO "{_TABLE}" ({col_list}) ' + f'VALUES ({placeholders})' + ) + payload = [first] + [self._row_to_record(r) for r in rows[1:]] + for r in payload: + if not r.get("bundle_path"): + raise ValueError( + "BacktestIndexRow.bundle_path is required for SQLite " + "upsert (used as the primary key)." + ) + self._conn.executemany(sql, [[r[c] for c in cols] for r in payload]) + self._conn.commit() + return len(rows) + + @staticmethod + def _row_to_record(row: BacktestIndexRow) -> Dict[str, Any]: + """Map a typed row onto a flat dict ready for SQL binding.""" + record: Dict[str, Any] = { + "bundle_path": row.bundle_path, + "algorithm_id": row.algorithm_id, + "tag": row.tag, + "framework_version": row.framework_version, + "engine_type": row.engine_type, + "risk_free_rate": row.risk_free_rate, + "number_of_runs": row.number_of_runs, + "parameters_json": ( + _safe_json(row.parameters) if row.parameters else None + ), + "strategy_ids_json": ( + _safe_json(row.strategy_ids) if row.strategy_ids else None + ), + "extras_json": ( + _safe_json(row.extras) if row.extras else None + ), + } + + summary_extras: Dict[str, Any] = {} + if row.summary_metrics is not None: + summary_dict = row.summary_metrics.to_dict() + for k, v in summary_dict.items(): + if k in _SUMMARY_FIELD_NAMES: + record[f"summary_{k}"] = _coerce_scalar(v) + else: + summary_extras[k] = v + + record["summary_extras_json"] = ( + _safe_json(summary_extras) if summary_extras else None + ) + return record + + # ------------------------------------------------------------------ + # Reads + # ------------------------------------------------------------------ + def __len__(self) -> int: + cur = self._conn.execute(f'SELECT COUNT(*) AS n FROM "{_TABLE}"') + return int(cur.fetchone()["n"]) + + def iter_rows(self) -> Iterator[BacktestIndexRow]: + """Yield every row as a :class:`BacktestIndexRow`.""" + for sql_row in self._conn.execute(f'SELECT * FROM "{_TABLE}"'): + yield self._record_to_row(sql_row) + + def query( + self, where: Optional[str] = None, + params: Optional[Tuple[Any, ...]] = None, + ) -> List[BacktestIndexRow]: + """Run a parameterised ``SELECT`` and return typed rows. + + Args: + where: optional SQL fragment (without the ``WHERE`` keyword). + params: positional bind values for ``where``. + """ + sql = f'SELECT * FROM "{_TABLE}"' + if where: + sql += f" WHERE {where}" + cur = self._conn.execute(sql, params or ()) + return [self._record_to_row(r) for r in cur] + + @staticmethod + def _record_to_row(sql_row: sqlite3.Row) -> BacktestIndexRow: + d = dict(sql_row) + + params_json = d.pop("parameters_json", None) + strat_json = d.pop("strategy_ids_json", None) + extras_json = d.pop("extras_json", None) + summary_extras_json = d.pop("summary_extras_json", None) + + summary_dict: Dict[str, Any] = {} + for name in list(d.keys()): + if name.startswith("summary_"): + value = d.pop(name) + if value is not None: + summary_dict[name[len("summary_"):]] = value + if summary_extras_json: + try: + summary_dict.update(json.loads(summary_extras_json)) + except (TypeError, ValueError): + pass + + kwargs: Dict[str, Any] = { + "algorithm_id": d.get("algorithm_id"), + "tag": d.get("tag"), + "bundle_path": d.get("bundle_path"), + "framework_version": d.get("framework_version"), + "engine_type": d.get("engine_type"), + "risk_free_rate": d.get("risk_free_rate"), + "number_of_runs": d.get("number_of_runs") or 0, + "parameters": _safe_loads(params_json) or {}, + "strategy_ids": _safe_loads(strat_json) or [], + "extras": _safe_loads(extras_json) or {}, + "summary_metrics": ( + BacktestSummaryMetrics.from_dict(summary_dict) + if summary_dict else None + ), + } + return BacktestIndexRow(**kwargs) + + # ------------------------------------------------------------------ + # House-keeping + # ------------------------------------------------------------------ + def close(self) -> None: + try: + self._conn.close() + except Exception: # pragma: no cover - best-effort + pass + + def __enter__(self) -> "SqliteBacktestIndex": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- +def _safe_json(obj: Any) -> Optional[str]: + try: + return json.dumps(obj, default=str) + except (TypeError, ValueError): + return None + + +def _safe_loads(text: Optional[str]) -> Any: + if not text: + return None + try: + return json.loads(text) + except (TypeError, ValueError): + return None + + +def _coerce_scalar(v: Any) -> Any: + """Bind helper: SQLite accepts None / int / float / str / bytes only.""" + if v is None or isinstance(v, (int, float, str, bytes)): + return v + if isinstance(v, bool): + return int(v) + return str(v) diff --git a/pyproject.toml b/pyproject.toml index 999cdae6..a24be6c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "investing-algorithm-framework" -version = "v8.8.0" +version = "v8.9.0" description = "A framework for creating trading bots" authors = ["MDUYN"] readme = "README.md" diff --git a/tests/cli/test_index_command.py b/tests/cli/test_index_command.py new file mode 100644 index 00000000..f2bdc493 --- /dev/null +++ b/tests/cli/test_index_command.py @@ -0,0 +1,93 @@ +"""Integration tests for the ``iaf index`` CLI (epic #540 phase 2).""" +import os +import shutil +import tempfile +from unittest import TestCase + +from click.testing import CliRunner + +from investing_algorithm_framework.domain import Backtest, BUNDLE_EXT +from investing_algorithm_framework.domain.backtesting.bundle import ( + save_bundle, +) +from investing_algorithm_framework.cli.cli import index_cmd +from investing_algorithm_framework.cli.index_command import build_index +from investing_algorithm_framework.services.backtest_index import ( + SqliteBacktestIndex, +) + + +_FIXTURE = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + "resources", + "backtest_reports_for_testing", + "test_algorithm_backtest", +) + + +class TestIndexCommand(TestCase): + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + # Drop a few bundles into the temp dir. + for i in range(3): + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.algorithm_id = f"algo_{i}" + bt.tag = "demo" + save_bundle( + bt, os.path.join(self.tmp, f"algo_{i}{BUNDLE_EXT}"), + ) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + # ------------------------------------------------------------------ + # Builder API + # ------------------------------------------------------------------ + def test_build_index_writes_one_row_per_bundle(self): + out = build_index(self.tmp, show_progress=False) + self.assertTrue(os.path.isfile(out)) + + with SqliteBacktestIndex.open(out) as idx: + self.assertEqual(len(idx), 3) + algos = sorted(r.algorithm_id for r in idx.iter_rows()) + self.assertEqual(algos, ["algo_0", "algo_1", "algo_2"]) + + def test_build_index_uses_relative_paths_by_default(self): + out = build_index(self.tmp, show_progress=False) + with SqliteBacktestIndex.open(out) as idx: + for row in idx.iter_rows(): + self.assertFalse( + os.path.isabs(row.bundle_path), + f"expected relative path, got {row.bundle_path}", + ) + + def test_build_index_absolute_paths_when_requested(self): + out = build_index( + self.tmp, show_progress=False, relative_paths=False, + ) + with SqliteBacktestIndex.open(out) as idx: + for row in idx.iter_rows(): + self.assertTrue(os.path.isabs(row.bundle_path)) + + # ------------------------------------------------------------------ + # Click CLI surface + # ------------------------------------------------------------------ + def test_cli_invocation(self): + runner = CliRunner() + out = os.path.join(self.tmp, "custom.sqlite") + result = runner.invoke( + index_cmd, + [self.tmp, "--output", out, "--no-progress"], + ) + self.assertEqual( + result.exit_code, 0, + msg=f"stdout={result.output!r} exc={result.exception!r}", + ) + self.assertTrue(os.path.isfile(out)) + with SqliteBacktestIndex.open(out) as idx: + self.assertEqual(len(idx), 3) diff --git a/tests/domain/backtests/test_bundle.py b/tests/domain/backtests/test_bundle.py index b2afa1ee..617950a6 100644 --- a/tests/domain/backtests/test_bundle.py +++ b/tests/domain/backtests/test_bundle.py @@ -191,3 +191,132 @@ def test_migrate_backtests_converts_legacy_to_bundles(self): ) loaded = load_backtests_from_directory(bundle_dir, workers=2) self.assertEqual(len(loaded), 4) + + +class TestBundleFormatV2(TestCase): + """Verify bundle format v2 specifics: engine_type split, + embedded Parquet metric blobs, summary_only mode, and v1 + backwards-compatible reads.""" + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_default_save_emits_v2_header(self): + path = os.path.join(self.tmp, "v2" + BUNDLE_EXT) + save_bundle(self.fixture, path) + with open(path, "rb") as fh: + head = fh.read(8) + self.assertEqual(head[:4], _MAGIC) + self.assertEqual(int.from_bytes(head[4:8], "little"), 2) + + def test_v1_writer_still_supported(self): + path = os.path.join(self.tmp, "v1" + BUNDLE_EXT) + save_bundle(self.fixture, path, format_version=1) + with open(path, "rb") as fh: + head = fh.read(8) + self.assertEqual(int.from_bytes(head[4:8], "little"), 1) + # And v1 still round-trips through the (upgraded) reader. + loaded = open_bundle(path) + self.assertEqual( + loaded.algorithm_id, self.fixture.algorithm_id + ) + + def test_v2_round_trip_preserves_metric_series(self): + path = os.path.join(self.tmp, "v2" + BUNDLE_EXT) + save_bundle(self.fixture, path) + loaded = open_bundle(path) + + # Find a run on the fixture that has a non-empty equity_curve + # to compare against, and assert structural equivalence. + if ( + self.fixture.backtest_runs + and self.fixture.backtest_runs[0].backtest_metrics + and self.fixture.backtest_runs[0] + .backtest_metrics.equity_curve + ): + self.assertEqual( + len(loaded.backtest_runs[0].backtest_metrics.equity_curve), + len(self.fixture.backtest_runs[0] + .backtest_metrics.equity_curve), + ) + + def test_engine_type_round_trips_into_vector_runs_slot(self): + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.engine_type = "vector" + path = os.path.join(self.tmp, "vector" + BUNDLE_EXT) + save_bundle(bt, path) + + loaded = open_bundle(path) + self.assertEqual(loaded.engine_type, "vector") + # Properties route to the right slot. + self.assertEqual(len(loaded.vector_runs), len(bt.backtest_runs)) + self.assertEqual(loaded.event_runs, []) + + def test_engine_type_round_trips_into_event_runs_slot(self): + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.engine_type = "event" + path = os.path.join(self.tmp, "event" + BUNDLE_EXT) + save_bundle(bt, path) + + loaded = open_bundle(path) + self.assertEqual(loaded.engine_type, "event") + self.assertEqual(len(loaded.event_runs), len(bt.backtest_runs)) + self.assertEqual(loaded.vector_runs, []) + + def test_summary_only_skips_blob_decode(self): + bt = Backtest.from_dict(self.fixture.to_dict()) + bt.engine_type = "vector" + path = os.path.join(self.tmp, "summary" + BUNDLE_EXT) + save_bundle(bt, path) + + loaded = open_bundle(path, summary_only=True) + # Scalar metrics still populated; blob fields remain as + # opaque references {"@blob": "..."} rather than lists. + if loaded.backtest_runs and loaded.backtest_runs[0].backtest_metrics: + metrics = loaded.backtest_runs[0].backtest_metrics + # equity_curve in summary_only mode is left as an + # unresolved reference dict (or empty list if the source + # had no series). Either is acceptable; what we assert + # is that scalar fields are populated. + self.assertIsNotNone(metrics.sharpe_ratio) + + def test_v2_bundle_round_trips_through_writer(self): + """Smoke test: v2 bundle written via save_bundle reads back + with the same structural content. Per-bundle size comparison + against v1 isn't meaningful for tiny test fixtures (Parquet + headers add a fixed ~100-byte overhead per blob, which + dominates when series have <50 entries). Real-world backtests + with hundreds of equity-curve points see 30-80% reduction; + see ``docs/design/bundle-format-v2.md``. + """ + v2_path = os.path.join(self.tmp, "v2" + BUNDLE_EXT) + save_bundle(self.fixture, v2_path, format_version=2) + loaded = open_bundle(v2_path) + self.assertEqual( + loaded.algorithm_id, self.fixture.algorithm_id + ) + self.assertEqual( + len(loaded.backtest_runs), len(self.fixture.backtest_runs) + ) + + def test_engine_type_default_is_none_for_legacy_bundles(self): + # Legacy v1 bundles never recorded engine_type. Reading one + # should leave engine_type at None and place the runs under + # backtest_runs (the engine-agnostic slot). + path = os.path.join(self.tmp, "legacy" + BUNDLE_EXT) + save_bundle(self.fixture, path, format_version=1) + loaded = open_bundle(path) + self.assertIsNone(loaded.engine_type) + self.assertEqual(loaded.vector_runs, []) + self.assertEqual(loaded.event_runs, []) + # But the runs are still accessible via the union view. + self.assertEqual( + len(loaded.backtest_runs), len(self.fixture.backtest_runs) + ) diff --git a/tests/domain/backtests/test_index_row.py b/tests/domain/backtests/test_index_row.py new file mode 100644 index 00000000..2b3d3149 --- /dev/null +++ b/tests/domain/backtests/test_index_row.py @@ -0,0 +1,142 @@ +"""Tests for the Tier-1 :class:`BacktestIndexRow` contract (epic #540). + +These tests verify that: + +* ``Backtest.index_row()`` produces a typed row with the right fields. +* The flat-dict round-trip is lossless for canonical scalar fields. +* The row can be built from a bundle opened with ``summary_only=True`` + \u2014 i.e. without decoding any v2 Parquet metric blobs. This is the + fast read path the tiered-storage indexer relies on. +""" +import os +import shutil +import tempfile +from unittest import TestCase + +from investing_algorithm_framework.domain import ( + Backtest, + BacktestIndexRow, + BUNDLE_EXT, +) +from investing_algorithm_framework.domain.backtesting.bundle import ( + save_bundle, +) + + +_FIXTURE = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "resources", + "backtest_reports_for_testing", + "test_algorithm_backtest", +) + + +class TestBacktestIndexRow(TestCase): + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + # ------------------------------------------------------------------ + # In-memory derivation + # ------------------------------------------------------------------ + def test_index_row_has_expected_identity(self): + row = self.fixture.index_row(bundle_path="/tmp/x.iafbt") + + self.assertIsInstance(row, BacktestIndexRow) + self.assertEqual(row.algorithm_id, self.fixture.algorithm_id) + self.assertEqual(row.tag, self.fixture.tag) + self.assertEqual(row.bundle_path, "/tmp/x.iafbt") + self.assertEqual( + row.number_of_runs, len(self.fixture.backtest_runs or []) + ) + # summary_metrics should be the live BacktestSummaryMetrics dataclass + self.assertIs(row.summary_metrics, self.fixture.backtest_summary) + + def test_to_flat_dict_carries_summary_prefix(self): + row = self.fixture.index_row(bundle_path="bundle.iafbt") + flat = row.to_flat_dict() + + # Identity columns are present and unprefixed. + self.assertEqual(flat["algorithm_id"], self.fixture.algorithm_id) + self.assertEqual(flat["bundle_path"], "bundle.iafbt") + + # Summary-metric scalars are emitted with the canonical prefix. + if self.fixture.backtest_summary is not None: + summary_keys = [ + k for k in flat if k.startswith("summary.") + ] + self.assertGreater( + len(summary_keys), 0, + "expected at least one summary.* scalar column", + ) + + def test_flat_dict_round_trip(self): + import math + + row = self.fixture.index_row(bundle_path="bundle.iafbt") + flat = row.to_flat_dict() + restored = BacktestIndexRow.from_flat_dict(flat) + + self.assertEqual(restored.algorithm_id, row.algorithm_id) + self.assertEqual(restored.tag, row.tag) + self.assertEqual(restored.bundle_path, row.bundle_path) + self.assertEqual(restored.number_of_runs, row.number_of_runs) + self.assertEqual(restored.parameters, row.parameters) + + # Scalar metrics that survived the flat hop should match. + if row.summary_metrics is not None: + self.assertIsNotNone(restored.summary_metrics) + original = row.summary_metrics.to_dict() + restored_dict = restored.summary_metrics.to_dict() + for k, v in original.items(): + if isinstance(v, bool) or not isinstance(v, (int, float)): + if isinstance(v, str) or v is None: + self.assertEqual(restored_dict.get(k), v) + continue + rv = restored_dict.get(k) + if isinstance(v, float) and math.isnan(v): + self.assertTrue( + isinstance(rv, float) and math.isnan(rv) + ) + else: + self.assertEqual(rv, v) + + def test_unknown_columns_land_in_extras(self): + row = BacktestIndexRow.from_flat_dict({ + "algorithm_id": "a1", + "summary.sharpe_ratio": 1.5, + "extras.custom": "hello", + "totally_unknown": 7, + }) + self.assertEqual(row.algorithm_id, "a1") + self.assertEqual(row.extras.get("custom"), "hello") + self.assertEqual(row.extras.get("totally_unknown"), 7) + + # ------------------------------------------------------------------ + # Fast-path: build row from a summary-only bundle load + # ------------------------------------------------------------------ + def test_index_row_from_summary_only_bundle(self): + path = os.path.join(self.tmp, "report" + BUNDLE_EXT) + save_bundle(self.fixture, path) + + # summary_only=True skips Parquet metric-blob decoding \u2014 + # the index row must still build without error. + loaded = Backtest.open(path, summary_only=True) + row = loaded.index_row(bundle_path=path) + + self.assertEqual(row.algorithm_id, self.fixture.algorithm_id) + self.assertEqual(row.bundle_path, path) + self.assertEqual( + row.number_of_runs, len(self.fixture.backtest_runs or []) + ) + + # Flat dict shape must also work in the summary-only path. + flat = row.to_flat_dict() + self.assertEqual(flat["algorithm_id"], self.fixture.algorithm_id) diff --git a/tests/services/backtest_index/__init__.py b/tests/services/backtest_index/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/services/backtest_index/test_sqlite_index.py b/tests/services/backtest_index/test_sqlite_index.py new file mode 100644 index 00000000..7f238bc1 --- /dev/null +++ b/tests/services/backtest_index/test_sqlite_index.py @@ -0,0 +1,155 @@ +"""Tests for :class:`SqliteBacktestIndex` (epic #540 phase 2).""" +import os +import shutil +import tempfile +from unittest import TestCase + +from investing_algorithm_framework.domain import ( + Backtest, + BacktestIndexRow, + BUNDLE_EXT, +) +from investing_algorithm_framework.domain.backtesting.bundle import ( + save_bundle, +) +from investing_algorithm_framework.services.backtest_index import ( + SqliteBacktestIndex, +) + + +_FIXTURE = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "resources", + "backtest_reports_for_testing", + "test_algorithm_backtest", +) + + +class TestSqliteBacktestIndex(TestCase): + + @classmethod + def setUpClass(cls): + cls.fixture = Backtest.open(_FIXTURE) + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.index_path = os.path.join(self.tmp, "index.sqlite") + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + # ------------------------------------------------------------------ + # Schema / lifecycle + # ------------------------------------------------------------------ + def test_create_initialises_schema(self): + idx = SqliteBacktestIndex.create(self.index_path) + try: + self.assertTrue(os.path.isfile(self.index_path)) + self.assertEqual(len(idx), 0) + finally: + idx.close() + + def test_open_creates_file_when_missing(self): + idx = SqliteBacktestIndex.open(self.index_path) + try: + self.assertTrue(os.path.isfile(self.index_path)) + finally: + idx.close() + + def test_upsert_requires_bundle_path(self): + with SqliteBacktestIndex.create(self.index_path) as idx: + row = self.fixture.index_row(bundle_path=None) + with self.assertRaises(ValueError): + idx.upsert(row) + + # ------------------------------------------------------------------ + # Round-trip + # ------------------------------------------------------------------ + def test_round_trip_preserves_identity_and_metrics(self): + row = self.fixture.index_row(bundle_path="bundle.iafbt") + + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert(row) + self.assertEqual(len(idx), 1) + (loaded,) = list(idx.iter_rows()) + + self.assertIsInstance(loaded, BacktestIndexRow) + self.assertEqual(loaded.algorithm_id, row.algorithm_id) + self.assertEqual(loaded.tag, row.tag) + self.assertEqual(loaded.bundle_path, row.bundle_path) + self.assertEqual(loaded.number_of_runs, row.number_of_runs) + self.assertEqual(loaded.parameters, row.parameters) + + # If the fixture has scalar metrics, key ones must round-trip. + # SQLite stores NaN as NULL, so treat NaN/None as equivalent. + if row.summary_metrics is not None: + import math + + self.assertIsNotNone(loaded.summary_metrics) + for name in ("sharpe_ratio", "total_net_gain"): + got = getattr(loaded.summary_metrics, name, None) + exp = getattr(row.summary_metrics, name, None) + if isinstance(exp, float) and math.isnan(exp): + self.assertIsNone(got) + else: + self.assertEqual(got, exp) + + def test_upsert_replaces_on_duplicate_bundle_path(self): + row = self.fixture.index_row(bundle_path="dup.iafbt") + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert(row) + + # Mutate algorithm_id and re-upsert \u2014 should not duplicate. + row.algorithm_id = "new_algo" + idx.upsert(row) + + self.assertEqual(len(idx), 1) + (loaded,) = list(idx.iter_rows()) + self.assertEqual(loaded.algorithm_id, "new_algo") + + def test_upsert_many_writes_all(self): + rows = [] + for i in range(3): + r = self.fixture.index_row(bundle_path=f"b{i}.iafbt") + r.algorithm_id = f"algo_{i}" + rows.append(r) + + with SqliteBacktestIndex.create(self.index_path) as idx: + n = idx.upsert_many(rows) + self.assertEqual(n, 3) + self.assertEqual(len(idx), 3) + + # ------------------------------------------------------------------ + # Query + # ------------------------------------------------------------------ + def test_query_with_where_clause(self): + rows = [] + for i in range(3): + r = self.fixture.index_row(bundle_path=f"q{i}.iafbt") + r.algorithm_id = "alpha" if i == 0 else "beta" + rows.append(r) + + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert_many(rows) + hits = idx.query("algorithm_id = ?", ("alpha",)) + self.assertEqual(len(hits), 1) + self.assertEqual(hits[0].bundle_path, "q0.iafbt") + + # ------------------------------------------------------------------ + # Build from real bundle on disk + # ------------------------------------------------------------------ + def test_index_built_from_bundle_uses_summary_only_path(self): + bundle_path = os.path.join(self.tmp, "report" + BUNDLE_EXT) + save_bundle(self.fixture, bundle_path) + + # Open the bundle in summary_only mode — mirrors what the CLI + # does. + bt = Backtest.open(bundle_path, summary_only=True) + row = bt.index_row(bundle_path=bundle_path) + + with SqliteBacktestIndex.create(self.index_path) as idx: + idx.upsert(row) + (loaded,) = list(idx.iter_rows()) + + self.assertEqual(loaded.bundle_path, bundle_path) + self.assertEqual(loaded.algorithm_id, self.fixture.algorithm_id)