Skip to content

Commit 48bf3e6

Browse files
committed
feat(pipeline): wire VectorPipelineEngine into VectorBacktestService (#502 phase 2b)
- VectorBacktestService now injects pipeline output into the data dict passed to the strategy's vectorised signal generators, mirroring event-mode behaviour. Each declared pipeline is evaluated once over the full backtest window and exposed as a long-form polars.DataFrame keyed by the pipeline class name (datetime, symbol, *factors). - end_date bounds the panel for no-look-ahead; warmup bars before start_date stay in the panel so rolling factors compute correctly. - Strategies without 'pipelines' pay zero cost (early return). - Normalise tz-aware vs naive datetime comparisons in the engines so BacktestDateRange (tz-aware) works against panels read from naive parquet/csv sources. - Tests cover: no-op when pipelines empty, long-frame shape and key, end_date no-look-ahead, missing-OHLCV warning + skip, and error propagation.
1 parent 47776c1 commit 48bf3e6

5 files changed

Lines changed: 298 additions & 3 deletions

File tree

investing_algorithm_framework/infrastructure/services/backtesting/vector_backtest_service.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from datetime import datetime, timezone
22
from uuid import uuid4
33

4+
import logging
5+
46
import pandas as pd
57

68
from investing_algorithm_framework.domain import BacktestDateRange, \
@@ -9,6 +11,11 @@
911
OrderSide, Trade, TradeStatus, DataType, TradingCost
1012
from investing_algorithm_framework.services import DataProviderService, \
1113
create_backtest_metrics
14+
from investing_algorithm_framework.services.pipeline import \
15+
VectorPipelineEngine
16+
17+
18+
logger = logging.getLogger(__name__)
1219

1320

1421
class VectorBacktestService:
@@ -67,6 +74,19 @@ def run(
6774
end_date=backtest_date_range.end_date
6875
)
6976

77+
# Phase 2 (#502): inject pipeline outputs into ``data`` before
78+
# the strategy's vectorised signal generators are called. Each
79+
# pipeline is evaluated once over the entire backtest window
80+
# and exposed as a long-form ``polars.DataFrame`` keyed by the
81+
# pipeline class name (mirroring event-mode behaviour, except
82+
# the frame is long instead of single-bar wide because vector
83+
# signals consume the full window).
84+
self._inject_pipelines(
85+
strategy=strategy,
86+
data=data,
87+
backtest_date_range=backtest_date_range,
88+
)
89+
7090
# Compute signals from strategy
7191
buy_signals = strategy.generate_buy_signals(data)
7292
sell_signals = strategy.generate_sell_signals(data)
@@ -838,6 +858,66 @@ def _convert_recorded_values(raw_recorded):
838858
recorded_values[key] = entries
839859
return recorded_values
840860

861+
@staticmethod
862+
def _inject_pipelines(
863+
strategy,
864+
data,
865+
backtest_date_range: BacktestDateRange,
866+
):
867+
"""Compute cross-sectional pipelines once over the backtest
868+
window and inject their long-form output into ``data``.
869+
870+
Strategies without ``pipelines`` skip this entirely (zero cost).
871+
Per Phase 2 of the Pipeline API (#502); see
872+
``docs/design/pipeline-api.md``.
873+
874+
``data[pipeline_cls.__name__]`` is set to a long-form
875+
``polars.DataFrame`` with columns
876+
``(datetime, symbol, *factor_columns)``. The dataset spans the
877+
entire panel (warmup included) but is bounded above at
878+
``backtest_date_range.end_date`` to guarantee no look-ahead.
879+
Use the ``datetime`` column to slice per bar inside your
880+
signal generators.
881+
"""
882+
pipelines = getattr(strategy, "pipelines", None)
883+
if not pipelines:
884+
return
885+
886+
# Map symbol -> data-source identifier from the strategy's
887+
# OHLCV data sources. Mirrors the eventloop logic so the same
888+
# mapping rules apply in both modes.
889+
symbol_to_identifier = {}
890+
for ds in strategy.data_sources or []:
891+
if not DataType.OHLCV.equals(ds.data_type):
892+
continue
893+
if ds.symbol is None or ds.symbol in symbol_to_identifier:
894+
continue
895+
symbol_to_identifier[ds.symbol] = ds.get_identifier()
896+
897+
if not symbol_to_identifier:
898+
logger.warning(
899+
"Strategy declares pipelines but has no OHLCV data "
900+
"sources to feed them; pipelines will be skipped."
901+
)
902+
return
903+
904+
engine = VectorPipelineEngine()
905+
for pipeline_cls in pipelines:
906+
try:
907+
output = engine.evaluate_window(
908+
pipeline_cls=pipeline_cls,
909+
data_object=data,
910+
symbol_to_identifier=symbol_to_identifier,
911+
end=backtest_date_range.end_date,
912+
)
913+
except Exception:
914+
logger.exception(
915+
"Pipeline %s failed during vector evaluation",
916+
pipeline_cls.__name__,
917+
)
918+
raise
919+
data[pipeline_cls.__name__] = output
920+
841921
@staticmethod
842922
def get_most_granular_ohlcv_data_source(data_sources):
843923
"""

investing_algorithm_framework/services/pipeline/pipeline_engine.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,14 @@ def build_panel(
7070
df = df.with_columns(pl.lit(symbol).alias("symbol"))
7171
df = df.select(list(PANEL_COLUMNS))
7272
if as_of is not None:
73-
df = df.filter(pl.col("datetime") <= pl.lit(as_of))
73+
# Normalise tz so naive panels and tz-aware ``as_of``
74+
# values (e.g. from BacktestDateRange) compare cleanly.
75+
col_tz = df.schema["datetime"].time_zone
76+
if col_tz is None and as_of.tzinfo is not None:
77+
as_of_cmp = as_of.replace(tzinfo=None)
78+
else:
79+
as_of_cmp = as_of
80+
df = df.filter(pl.col("datetime") <= pl.lit(as_of_cmp))
7481
frames.append(df)
7582

7683
if not frames:

investing_algorithm_framework/services/pipeline/vector_pipeline_engine.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,13 @@ def evaluate_window(
9595
)
9696
result = self.evaluate_panel(pipeline_cls, panel)
9797
if start is not None and not result.is_empty():
98-
result = result.filter(pl.col("datetime") >= pl.lit(start))
98+
col_tz = result.schema["datetime"].time_zone
99+
start_cmp = (
100+
start.replace(tzinfo=None)
101+
if col_tz is None and start.tzinfo is not None
102+
else start
103+
)
104+
result = result.filter(pl.col("datetime") >= pl.lit(start_cmp))
99105
return result
100106

101107
def evaluate_panel(
@@ -144,7 +150,13 @@ def slice_at(long_result: pl.DataFrame, as_of: datetime) -> pl.DataFrame:
144150
if long_result.is_empty():
145151
return long_result.drop("datetime") \
146152
if "datetime" in long_result.columns else long_result
147-
sliced = long_result.filter(pl.col("datetime") == pl.lit(as_of))
153+
col_tz = long_result.schema["datetime"].time_zone
154+
as_of_cmp = (
155+
as_of.replace(tzinfo=None)
156+
if col_tz is None and as_of.tzinfo is not None
157+
else as_of
158+
)
159+
sliced = long_result.filter(pl.col("datetime") == pl.lit(as_of_cmp))
148160
if "datetime" in sliced.columns:
149161
sliced = sliced.drop("datetime")
150162
return sliced

tests/infrastructure/services/backtesting/__init__.py

Whitespace-only changes.
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
"""Tests for VectorBacktestService pipeline injection (#502 phase 2b).
2+
3+
These verify that strategies declaring ``pipelines = [...]`` receive
4+
the pipeline output in their ``data`` dict before the vectorised
5+
signal generators are invoked. Targeted at ``_inject_pipelines`` —
6+
the helper is a pure static method on the service so we exercise it
7+
directly without spinning up a full backtest.
8+
"""
9+
from __future__ import annotations
10+
11+
from datetime import datetime, timedelta
12+
from types import SimpleNamespace
13+
14+
import polars as pl
15+
import pytest
16+
17+
from investing_algorithm_framework import (
18+
AverageDollarVolume,
19+
BacktestDateRange,
20+
DataSource,
21+
DataType,
22+
Pipeline,
23+
Returns,
24+
)
25+
from investing_algorithm_framework.infrastructure.services.backtesting \
26+
.vector_backtest_service import VectorBacktestService
27+
28+
29+
def _ohlcv_polars(closes_volumes):
30+
rows = []
31+
for i, (close, volume) in enumerate(closes_volumes):
32+
rows.append(
33+
{
34+
"Datetime": datetime(2024, 1, 1) + timedelta(days=i),
35+
"Open": close,
36+
"High": close,
37+
"Low": close,
38+
"Close": close,
39+
"Volume": volume,
40+
}
41+
)
42+
return pl.DataFrame(rows)
43+
44+
45+
class _Screener(Pipeline):
46+
adv = AverageDollarVolume(window=2)
47+
momentum = Returns(window=2)
48+
universe = adv.top(2)
49+
50+
51+
class _StubStrategy:
52+
"""Bare-minimum strategy stand-in (no app wiring)."""
53+
54+
def __init__(self, data_sources, pipelines=None):
55+
self.data_sources = data_sources
56+
self.pipelines = pipelines or []
57+
58+
59+
def _make_data_sources_and_data():
60+
sources = []
61+
data = {}
62+
bars = {
63+
"AAA/EUR": [(10, 100), (12, 100), (14, 100), (15, 100)],
64+
"BBB/EUR": [(10, 1), (11, 1), (12, 1), (13, 1)],
65+
"CCC/EUR": [(10, 200), (11, 200), (13, 200), (14, 200)],
66+
}
67+
for symbol, bars_for_symbol in bars.items():
68+
ds = DataSource(
69+
data_type="ohlcv",
70+
market="bitvavo",
71+
symbol=symbol,
72+
time_frame="1d",
73+
identifier=f"{symbol}-ohlcv",
74+
)
75+
sources.append(ds)
76+
data[ds.get_identifier()] = _ohlcv_polars(bars_for_symbol)
77+
return sources, data
78+
79+
80+
def test_inject_pipelines_no_pipelines_is_noop():
81+
sources, data = _make_data_sources_and_data()
82+
strategy = _StubStrategy(data_sources=sources, pipelines=None)
83+
before = dict(data)
84+
85+
VectorBacktestService._inject_pipelines(
86+
strategy=strategy,
87+
data=data,
88+
backtest_date_range=BacktestDateRange(
89+
start_date=datetime(2024, 1, 2),
90+
end_date=datetime(2024, 1, 4),
91+
),
92+
)
93+
assert data == before
94+
95+
96+
def test_inject_pipelines_adds_long_frame_keyed_by_class_name():
97+
sources, data = _make_data_sources_and_data()
98+
strategy = _StubStrategy(data_sources=sources, pipelines=[_Screener])
99+
100+
VectorBacktestService._inject_pipelines(
101+
strategy=strategy,
102+
data=data,
103+
backtest_date_range=BacktestDateRange(
104+
start_date=datetime(2024, 1, 2),
105+
end_date=datetime(2024, 1, 4),
106+
),
107+
)
108+
109+
assert "_Screener" in data
110+
out = data["_Screener"]
111+
assert isinstance(out, pl.DataFrame)
112+
# Long format: datetime + symbol + factor columns (universe dropped)
113+
assert set(out.columns) == {"datetime", "symbol", "adv", "momentum"}
114+
# Universe restricts to top-2 by ADV every bar; BBB always loses
115+
# (volume=1 vs 100/200) so it must never appear in the output.
116+
assert "BBB/EUR" not in out["symbol"].to_list()
117+
118+
119+
def test_inject_pipelines_respects_end_date_no_lookahead():
120+
sources, data = _make_data_sources_and_data()
121+
strategy = _StubStrategy(data_sources=sources, pipelines=[_Screener])
122+
123+
VectorBacktestService._inject_pipelines(
124+
strategy=strategy,
125+
data=data,
126+
backtest_date_range=BacktestDateRange(
127+
start_date=datetime(2024, 1, 2),
128+
end_date=datetime(2024, 1, 3),
129+
),
130+
)
131+
out = data["_Screener"]
132+
# No bars beyond end_date should leak in.
133+
max_dt = out["datetime"].max()
134+
assert max_dt <= datetime(2024, 1, 3)
135+
136+
137+
def test_inject_pipelines_skips_when_no_ohlcv_sources(caplog):
138+
"""A strategy with pipelines but no OHLCV data sources should log
139+
and skip silently without raising."""
140+
strategy = _StubStrategy(data_sources=[], pipelines=[_Screener])
141+
data = {}
142+
143+
with caplog.at_level(
144+
"WARNING",
145+
logger=(
146+
"investing_algorithm_framework.infrastructure.services."
147+
"backtesting.vector_backtest_service"
148+
),
149+
):
150+
VectorBacktestService._inject_pipelines(
151+
strategy=strategy,
152+
data=data,
153+
backtest_date_range=BacktestDateRange(
154+
start_date=datetime(2024, 1, 2),
155+
end_date=datetime(2024, 1, 3),
156+
),
157+
)
158+
159+
assert "_Screener" not in data
160+
assert any(
161+
"no OHLCV data sources" in record.message
162+
for record in caplog.records
163+
)
164+
165+
166+
def test_inject_pipelines_re_raises_engine_errors():
167+
"""Pipeline evaluation errors should propagate to the caller so
168+
backtest authors see a clear failure rather than a silent skip."""
169+
170+
class _Broken(Pipeline):
171+
adv = AverageDollarVolume(window=2)
172+
momentum = Returns(window=2)
173+
174+
sources, data = _make_data_sources_and_data()
175+
# Corrupt one of the inputs to force a polars-level error.
176+
bad_id = sources[0].get_identifier()
177+
data[bad_id] = pl.DataFrame(
178+
{"Datetime": [datetime(2024, 1, 1)], "Close": [10.0]}
179+
)
180+
181+
strategy = _StubStrategy(data_sources=sources, pipelines=[_Broken])
182+
183+
with pytest.raises(Exception):
184+
VectorBacktestService._inject_pipelines(
185+
strategy=strategy,
186+
data=data,
187+
backtest_date_range=BacktestDateRange(
188+
start_date=datetime(2024, 1, 2),
189+
end_date=datetime(2024, 1, 4),
190+
),
191+
)
192+
193+
194+
# Suppress the unused import warning — kept for symmetry with other
195+
# vector-backtest tests in the suite.
196+
_ = SimpleNamespace, DataType

0 commit comments

Comments
 (0)