Skip to content

Commit 47776c1

Browse files
committed
feat(pipeline): add VectorPipelineEngine (#502 phase 2a)
- Add VectorPipelineEngine: evaluates the full long-form panel and every declared factor once over the entire backtest window. - Add per-evaluation factor result cache via a contextvar consulted by Factor.evaluate(). Routes _Rank/_TopN/_BottomN through it so a factor reused as both a column and a universe filter is computed only once per panel. - Wire the same cache into PipelineEngine for parity. - Equivalence tests: per-bar slice of VectorPipelineEngine matches PipelineEngine.evaluate(...) for the same dataset and as_of. Public API (Pipeline/Factor/Filter) unchanged; strategies that work in event mode work unchanged in vector mode.
1 parent 5b8aad4 commit 47776c1

6 files changed

Lines changed: 492 additions & 17 deletions

File tree

investing_algorithm_framework/domain/pipeline/factor.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,25 @@
1414
"""
1515
from __future__ import annotations
1616

17-
from typing import List, Optional, TYPE_CHECKING
17+
from contextvars import ContextVar
18+
from typing import Dict, List, Optional, TYPE_CHECKING
1819

1920
import polars as pl
2021

2122
if TYPE_CHECKING: # pragma: no cover - avoid runtime cycle
2223
from .filter import Filter
2324

2425

26+
# Per-evaluation memoisation cache (Phase 2 / #502). The pipeline
27+
# engines push a fresh dict here while evaluating a panel; nested
28+
# factors (``_Rank._base``, ``_TopN._base``, …) consult it via
29+
# :meth:`Factor.evaluate` so that a factor instance shared between a
30+
# pipeline column and a universe filter is computed only once.
31+
_EVAL_CACHE: ContextVar[Optional[Dict[tuple, pl.Series]]] = ContextVar(
32+
"_pipeline_factor_eval_cache", default=None
33+
)
34+
35+
2536
class Factor:
2637
"""Base class for all factor expressions.
2738
@@ -64,6 +75,30 @@ def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
6475
"""
6576
raise NotImplementedError
6677

78+
# ------------------------------------------------------------------ #
79+
# Cached evaluation (Phase 2 / #502)
80+
# ------------------------------------------------------------------ #
81+
def evaluate(self, panel: pl.DataFrame) -> pl.Series:
82+
"""Compute and cache this factor's values on ``panel``.
83+
84+
Identical to :meth:`compute_panel` when called outside an
85+
engine context. When a pipeline engine has installed an
86+
evaluation cache (via the ``_EVAL_CACHE`` context var), the
87+
result is memoised by ``(id(panel), id(self))`` so that the
88+
same factor instance reused as both a column and a filter is
89+
only computed once per panel.
90+
"""
91+
cache = _EVAL_CACHE.get()
92+
if cache is None:
93+
return self.compute_panel(panel)
94+
key = (id(panel), id(self))
95+
cached = cache.get(key)
96+
if cached is not None:
97+
return cached
98+
values = self.compute_panel(panel)
99+
cache[key] = values
100+
return values
101+
67102
# ------------------------------------------------------------------ #
68103
# Cross-sectional ops (Phase 1 surface)
69104
# ------------------------------------------------------------------ #
@@ -118,12 +153,12 @@ def required_window(self) -> int:
118153
return int(self.window)
119154

120155
def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
121-
values = self._base.compute_panel(panel)
156+
values = self._base.evaluate(panel)
122157
df = panel.select(["datetime", "symbol"]).with_columns(
123158
values.alias("__rank_input__")
124159
)
125160
if self._mask is not None:
126-
mask_values = self._mask.compute_panel(panel)
161+
mask_values = self._mask.evaluate(panel)
127162
df = df.with_columns(
128163
pl.when(mask_values)
129164
.then(pl.col("__rank_input__"))

investing_algorithm_framework/domain/pipeline/filter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def required_window(self) -> int:
4646
return int(self.window)
4747

4848
def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
49-
values = self._base.compute_panel(panel)
49+
values = self._base.evaluate(panel)
5050
df = panel.select(["datetime", "symbol"]).with_columns(
5151
values.alias("__topn_input__")
5252
)
@@ -82,7 +82,7 @@ def required_window(self) -> int:
8282
return int(self.window)
8383

8484
def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
85-
values = self._base.compute_panel(panel)
85+
values = self._base.evaluate(panel)
8686
df = panel.select(["datetime", "symbol"]).with_columns(
8787
values.alias("__bottomn_input__")
8888
)
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Pipeline service package."""
22
from .pipeline_engine import PipelineEngine
3+
from .vector_pipeline_engine import VectorPipelineEngine
34

4-
__all__ = ["PipelineEngine"]
5+
__all__ = ["PipelineEngine", "VectorPipelineEngine"]

investing_algorithm_framework/services/pipeline/pipeline_engine.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import pandas as pd
1717
import polars as pl
1818

19+
from investing_algorithm_framework.domain.pipeline.factor import _EVAL_CACHE
1920
from investing_algorithm_framework.domain.pipeline.pipeline import Pipeline
2021

2122

@@ -108,17 +109,22 @@ def evaluate_at(
108109
if panel.is_empty():
109110
return self._empty_output(pipeline_cls)
110111

111-
result = panel.select(["datetime", "symbol"])
112-
for name, factor in pipeline_cls.get_columns().items():
113-
values = factor.compute_panel(panel)
114-
result = result.with_columns(values.alias(name))
115-
116-
universe = pipeline_cls.get_universe()
117-
if universe is not None:
118-
mask = universe.compute_panel(panel)
119-
result = result.with_columns(mask.alias("__universe__"))
120-
result = result.filter(pl.col("__universe__"))
121-
result = result.drop("__universe__")
112+
cache: Dict[tuple, pl.Series] = {}
113+
token = _EVAL_CACHE.set(cache)
114+
try:
115+
result = panel.select(["datetime", "symbol"])
116+
for name, factor in pipeline_cls.get_columns().items():
117+
values = factor.evaluate(panel)
118+
result = result.with_columns(values.alias(name))
119+
120+
universe = pipeline_cls.get_universe()
121+
if universe is not None:
122+
mask = universe.evaluate(panel)
123+
result = result.with_columns(mask.alias("__universe__"))
124+
result = result.filter(pl.col("__universe__"))
125+
result = result.drop("__universe__")
126+
finally:
127+
_EVAL_CACHE.reset(token)
122128

123129
# Slice to as_of bar
124130
result = result.filter(pl.col("datetime") == pl.lit(as_of))
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""VectorPipelineEngine — Phase 2 (#502).
2+
3+
Materialises the full long-form OHLCV panel and every declared factor
4+
**once** over the entire backtest window, instead of rebuilding the
5+
panel on each event-loop iteration. The resulting long frame can then
6+
be sliced per bar by callers (e.g. the vector backtest service).
7+
8+
Compared to :class:`PipelineEngine`:
9+
10+
- ``evaluate_window`` returns the full ``(datetime, symbol, *factors)``
11+
long frame for every bar in the panel — not just ``as_of``.
12+
- A per-engine factor-result cache keyed by ``id(factor)`` ensures that
13+
shared sub-expressions (e.g. a ``Returns`` reused inside
14+
``Returns(...).rank(mask=universe)``) are computed only once per
15+
evaluation.
16+
17+
The public Pipeline / Factor / Filter surface from Phase 1 is reused
18+
unchanged. Strategies that work in event mode also work here.
19+
"""
20+
from __future__ import annotations
21+
22+
from datetime import datetime
23+
from typing import Any, Dict, Mapping, Optional, Type
24+
25+
import polars as pl
26+
27+
from investing_algorithm_framework.domain.pipeline.factor import _EVAL_CACHE
28+
from investing_algorithm_framework.domain.pipeline.pipeline import Pipeline
29+
30+
from .pipeline_engine import PANEL_COLUMNS, PipelineEngine
31+
32+
33+
class VectorPipelineEngine:
34+
"""Vector-mode pipeline executor (#502).
35+
36+
Build the panel and evaluate every declared factor over the full
37+
window in one shot. The output is a long-form
38+
``pl.DataFrame`` with columns
39+
``(datetime, symbol, <factor_1>, ..., <factor_n>)``, sorted by
40+
``(datetime, symbol)``.
41+
42+
Universe filtering is applied as in event mode — symbols failing
43+
the universe mask at a given bar are dropped from that bar's
44+
output, and the universe column itself is not exposed.
45+
"""
46+
47+
# ------------------------------------------------------------------ #
48+
# Panel construction (delegates to event engine for parity)
49+
# ------------------------------------------------------------------ #
50+
@staticmethod
51+
def build_panel(
52+
data_object: Mapping[str, Any],
53+
symbol_to_identifier: Mapping[str, str],
54+
end: Optional[datetime] = None,
55+
) -> pl.DataFrame:
56+
"""Stack per-symbol OHLCV frames into a long-form panel.
57+
58+
``end`` is an optional inclusive upper bound (no look-ahead).
59+
There is intentionally no ``start`` parameter here — the panel
60+
keeps all earlier bars so factors get full warmup. Callers that
61+
want to restrict the *output* range should pass ``start`` to
62+
:meth:`evaluate_window` instead.
63+
"""
64+
return PipelineEngine.build_panel(
65+
data_object=data_object,
66+
symbol_to_identifier=symbol_to_identifier,
67+
as_of=end,
68+
)
69+
70+
# ------------------------------------------------------------------ #
71+
# Evaluation
72+
# ------------------------------------------------------------------ #
73+
def evaluate_window(
74+
self,
75+
pipeline_cls: Type[Pipeline],
76+
data_object: Mapping[str, Any],
77+
symbol_to_identifier: Mapping[str, str],
78+
start: Optional[datetime] = None,
79+
end: Optional[datetime] = None,
80+
) -> pl.DataFrame:
81+
"""Evaluate ``pipeline_cls`` over the full window.
82+
83+
Returns a long-form frame with one row per ``(datetime, symbol)``
84+
pair that survives the universe mask, plus one column per
85+
declared pipeline output factor.
86+
87+
``end`` truncates the input panel (no look-ahead). ``start``
88+
only restricts the *output* — earlier bars remain in the panel
89+
as warmup so rolling factors compute correctly at ``start``.
90+
"""
91+
panel = self.build_panel(
92+
data_object=data_object,
93+
symbol_to_identifier=symbol_to_identifier,
94+
end=end,
95+
)
96+
result = self.evaluate_panel(pipeline_cls, panel)
97+
if start is not None and not result.is_empty():
98+
result = result.filter(pl.col("datetime") >= pl.lit(start))
99+
return result
100+
101+
def evaluate_panel(
102+
self,
103+
pipeline_cls: Type[Pipeline],
104+
panel: pl.DataFrame,
105+
) -> pl.DataFrame:
106+
"""Evaluate ``pipeline_cls`` over an already-built ``panel``.
107+
108+
Exposed separately so callers (e.g. the vector backtest service)
109+
that already maintain a panel can reuse it without paying the
110+
rebuild cost.
111+
"""
112+
if panel.is_empty():
113+
return self._empty_long_output(pipeline_cls)
114+
115+
cache: Dict[tuple, pl.Series] = {}
116+
token = _EVAL_CACHE.set(cache)
117+
try:
118+
result = panel.select(["datetime", "symbol"])
119+
for name, factor in pipeline_cls.get_columns().items():
120+
values = factor.evaluate(panel)
121+
result = result.with_columns(values.alias(name))
122+
123+
universe = pipeline_cls.get_universe()
124+
if universe is not None:
125+
mask = universe.evaluate(panel)
126+
result = result.with_columns(mask.alias("__universe__"))
127+
result = result.filter(pl.col("__universe__"))
128+
result = result.drop("__universe__")
129+
finally:
130+
_EVAL_CACHE.reset(token)
131+
132+
return result.sort(["datetime", "symbol"])
133+
134+
# ------------------------------------------------------------------ #
135+
# Slicing helpers
136+
# ------------------------------------------------------------------ #
137+
@staticmethod
138+
def slice_at(long_result: pl.DataFrame, as_of: datetime) -> pl.DataFrame:
139+
"""Return the wide ``(symbol, *factors)`` frame for one bar.
140+
141+
Equivalent to what :meth:`PipelineEngine.evaluate` returns. The
142+
``datetime`` column is dropped so the shape matches event mode.
143+
"""
144+
if long_result.is_empty():
145+
return long_result.drop("datetime") \
146+
if "datetime" in long_result.columns else long_result
147+
sliced = long_result.filter(pl.col("datetime") == pl.lit(as_of))
148+
if "datetime" in sliced.columns:
149+
sliced = sliced.drop("datetime")
150+
return sliced
151+
152+
# ------------------------------------------------------------------ #
153+
# Helpers
154+
# ------------------------------------------------------------------ #
155+
@staticmethod
156+
def _empty_long_output(pipeline_cls: Type[Pipeline]) -> pl.DataFrame:
157+
schema: Dict[str, Any] = {
158+
"datetime": pl.Datetime,
159+
"symbol": pl.Utf8,
160+
}
161+
for name in pipeline_cls.get_columns():
162+
schema[name] = pl.Float64
163+
return pl.DataFrame(schema=schema)
164+
165+
166+
__all__ = ["VectorPipelineEngine", "PANEL_COLUMNS"]

0 commit comments

Comments
 (0)