Skip to content

Commit 0d0e99c

Browse files
authored
Merge pull request #515 from coding-kitties/feat/503-pipeline-api-phase3-live
feat(pipeline): land Phase 2 + 3 + 4 on dev (vector engine, live envelope, risk-neutrality)
2 parents 9ece0fb + 0f05c47 commit 0d0e99c

24 files changed

Lines changed: 3150 additions & 68 deletions

docusaurus/docs/Advanced Concepts/pipelines-live.md

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,132 @@ If you want to experiment, the same example covers both:
3232

3333
- A streaming panel that appends new bars instead of rebuilding from
3434
scratch.
35-
- Validation of `warmup_window` against `pipeline.required_window()`
35+
- Validation of `warmup_window` against `pipeline.required_window()`
3636
so you get a clear error at startup if any data source is too short.
37+
- ✅ Live envelope validation (≤ 50 symbols, daily-or-coarser
38+
timeframes) at first iteration in non-backtest environments.
39+
- ✅ Universe-refresh cadence (`Pipeline.refresh_universe_every`) so
40+
the universe filter doesn't have to run every bar.
41+
- ✅ Per-pipeline error resilience — a single failing pipeline is
42+
logged and skipped in live mode instead of killing the iteration.
43+
- 🚧 Batched / async OHLCV fetch in `CCXTOHLCVDataProvider` (tracked
44+
for a follow-up PR — needs live integration testing).
3745
- First-class handling of partial bars (so you don't accidentally trade
3846
on an unclosed candle).
3947
- Live observability hooks for pipeline output (print/log/snapshot).
4048

49+
## Live envelope (shipped)
50+
51+
v1 of the live pipeline engine is intentionally conservative:
52+
53+
| Constraint | Value |
54+
| --- | --- |
55+
| Maximum unique OHLCV symbols per strategy | `50` |
56+
| Minimum supported timeframe | daily (`24h`) |
57+
58+
These limits are checked once per run at the first iteration when the
59+
environment is not `BACKTEST`. Violations raise
60+
`OperationalException` with an actionable message naming the
61+
strategy and the offending symbols / timeframe. Backtests are
62+
unaffected — sub-daily timeframes and large universes keep working in
63+
backtest mode exactly as before.
64+
65+
## Universe refresh cadence (shipped)
66+
67+
A pipeline can declare how often the universe filter should be
68+
re-evaluated by setting the class attribute
69+
`refresh_universe_every: timedelta`. Inside that cadence the engine
70+
keeps the surviving symbol set fixed and skips evaluating the
71+
universe filter, which is typically the most expensive part of the
72+
pipeline.
73+
74+
```python
75+
from datetime import timedelta
76+
77+
class DailyMomentum(Pipeline):
78+
sma200 = SMA(window=200)
79+
dollar_volume = AverageDollarVolume(window=30)
80+
81+
universe = dollar_volume.top(50)
82+
signal = sma200.zscore(mask=universe)
83+
84+
# Re-pick the top-50 universe once per day; reuse the
85+
# selection on every intra-day bar.
86+
refresh_universe_every = timedelta(days=1)
87+
```
88+
89+
When `refresh_universe_every` is `None` (the default) the universe is
90+
re-evaluated every bar, preserving Phase 1 / Phase 2 behaviour
91+
exactly.
92+
93+
## Per-pipeline resilience (shipped)
94+
95+
In non-backtest environments a single pipeline raising during
96+
`evaluate` is logged and the iteration continues with an empty output
97+
frame for that pipeline — so an unrelated strategy on the same event
98+
loop is not knocked out by one bad data source. Backtests still
99+
re-raise so failures stay deterministic.
100+
101+
## Warmup validation (shipped)
102+
103+
When a strategy declares `pipelines = [...]`, the framework validates
104+
at construction time that every OHLCV data source has a
105+
`warmup_window` ≥ the pipeline's longest factor window. If any source
106+
is short — or `warmup_window` is left unset — strategy instantiation
107+
raises `OperationalException` with an actionable message listing the
108+
offending sources and the required window size.
109+
110+
```python
111+
class MomentumScreener(Pipeline):
112+
sma = SMA(window=200)
113+
114+
115+
class MyStrategy(TradingStrategy):
116+
time_unit = TimeUnit.HOUR
117+
interval = 1
118+
pipelines = [MomentumScreener]
119+
data_sources = [
120+
DataSource(
121+
symbol="BTC/EUR",
122+
data_type=DataType.OHLCV,
123+
time_frame=TimeFrame.ONE_HOUR,
124+
market="BITVAVO",
125+
warmup_window=200, # must be >= 200 to satisfy SMA(200)
126+
),
127+
]
128+
```
129+
130+
This eliminates a common failure mode in live deployments: a bot
131+
silently trading on NaN factor columns until enough bars accrue.
132+
41133
## What stays the same
42134

43135
The same `Pipeline` subclasses you use in backtests run live.
44136

137+
## Stateless / serverless deployment (AWS Lambda, Azure Functions)
138+
139+
Live trading is frequently deployed on **AWS Lambda** or **Azure
140+
Functions** via the framework's stateless mode (see
141+
`investing_algorithm_framework/cli/deploy_to_aws_lambda.py` and
142+
`deploy_to_azure_function.py`). The pipeline runtime is designed to
143+
be safe in those environments:
144+
145+
- **No cross-invocation state.** Pipelines hold no module-level
146+
mutable state. Each call to `PipelineEngine.evaluate(...)` (event
147+
mode) and `VectorPipelineEngine.evaluate_window(...)` (vector mode)
148+
builds a fresh panel and a fresh result frame.
149+
- **Per-evaluation cache, scoped via `contextvars`.** The shared
150+
sub-expression cache used by composite factors (e.g. `r + r.zscore()`
151+
reusing `r`'s computation) lives in a `ContextVar` that is
152+
installed at the start of each `evaluate` call and reset in a
153+
`finally` block. A warm Lambda / Functions container reusing the
154+
process between invocations sees a clean cache every time.
155+
- **Pure factor composition.** `Factor.zscore()`, `demean()`,
156+
`winsorize()`, and arithmetic (`+ - * /`, unary `-`) all return new
157+
factor objects without mutating their inputs. Building a pipeline
158+
is a pure operation, so it's safe to construct pipelines at module
159+
load time on Lambda/Functions cold start.
160+
45161
## Want to help?
46162

47163
Track or comment on the implementation issue:
Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,84 @@
11
---
22
sidebar_position: 11
3-
title: Pipelines — Vector backtest (roadmap)
4-
description: Vector-mode pipeline execution. Tracked under #502.
3+
title: Pipelines — Vector backtest
4+
description: Vector-mode pipeline execution. Phase 2 (#502) — shipped.
55
---
66

77
# Pipelines: Vector backtest
88

9-
:::info Status: not yet shipped (Phase 2)
10-
11-
Vector-mode pipelines are tracked under
12-
[#502](https://github.com/coding-kitties/investing-algorithm-framework/issues/502).
13-
The public API (`Pipeline`, `Factor`, `Filter`) defined in
14-
[Pipelines: Event-driven backtest](pipelines-event-backtest.md) is
15-
intentionally engine-agnostic, so strategies you write against Phase 1
16-
will keep working when Phase 2 lands.
9+
:::tip Status: shipped (Phase 2)
10+
Vector-mode pipelines run by default whenever you backtest with
11+
`BacktestService` — no opt-in needed. The vector engine evaluates each
12+
declared factor across the **entire** backtest window once per
13+
strategy iteration, with shared sub-expression caching.
1714
:::
1815

19-
## What's planned
16+
## How it works
17+
18+
When `BacktestService` runs, it inspects each strategy's `pipelines`
19+
list and routes them through `VectorPipelineEngine`. For every
20+
iteration:
21+
22+
1. The engine builds a long-form Polars panel
23+
`(datetime, symbol, open, high, low, close, volume)` truncated at
24+
the current bar (no look-ahead).
25+
2. Each declared `Factor` is evaluated **once** in vectorised Polars,
26+
per symbol, over the full window.
27+
3. A per-evaluation cache (a `ContextVar`) memoises shared
28+
sub-expressions — for example `r.zscore() - r.demean()` only
29+
computes `r` once.
30+
4. The optional `universe` mask filters the result; the universe
31+
column itself is dropped from the output.
32+
5. The strategy receives the wide frame via
33+
`data["YourPipelineClassName"]`.
34+
35+
The strategy author surface is unchanged from
36+
[Pipelines: Event-driven backtest](pipelines-event-backtest.md): you
37+
write the same `Pipeline` subclasses and read the same
38+
`data["..."]` frames.
39+
40+
## Lazy / streaming execution
41+
42+
For memory-bound runs over very large universes you can opt the
43+
post-factor pipeline (universe filter + drop + sort) onto Polars'
44+
streaming engine:
45+
46+
```python
47+
from investing_algorithm_framework.services.pipeline import (
48+
VectorPipelineEngine,
49+
)
2050

21-
- A vector executor that materialises every factor in the pipeline
22-
once over the **entire** backtest window, instead of rebuilding the
23-
panel on each event.
24-
- Integration with the existing vector backtester (see
25-
[Vector backtesting](vector-backtesting.md)).
26-
- Cached intermediate frames so a `rank` of a `Returns` doesn't
27-
recompute returns.
28-
- Optional Polars **lazy** execution path for memory-bound runs.
51+
engine = VectorPipelineEngine(lazy=True)
52+
result = engine.evaluate_window(
53+
pipeline_cls=MomentumScreener,
54+
data_object=panel_data,
55+
symbol_to_identifier=sym_id,
56+
)
57+
```
2958

30-
## What stays the same
59+
`lazy=True` is **bit-for-bit equivalent** to the default eager mode
60+
(this is verified by an equivalence test in the suite). It only
61+
changes how the result frame is collected — factors themselves still
62+
return eager `pl.Series` values per symbol. On older Polars versions
63+
that don't accept `engine="streaming"` on `collect`, the engine falls
64+
back to a default collect transparently.
3165

32-
The strategy author surface — declaring a `Pipeline` subclass, listing
33-
it on `strategy.pipelines`, and reading
34-
`data["YourPipelineClassName"]` inside `run_strategy` — does not
35-
change. Switching from event mode to vector mode is meant to be a
36-
runner choice, not a strategy rewrite.
66+
You typically don't need to instantiate `VectorPipelineEngine`
67+
yourself; `BacktestService` handles it. The `lazy` flag is exposed for
68+
direct users of the engine and for performance experiments.
3769

38-
## Want to help?
70+
## Equivalence with event mode
3971

40-
Track or comment on the implementation issue:
41-
[#502 — Pipeline API: Phase 2 (vector executor)](https://github.com/coding-kitties/investing-algorithm-framework/issues/502).
72+
Vector and event mode are required to produce **identical** factor
73+
values for the same panel and same `as_of`. The test suite enforces
74+
this with cross-mode equivalence tests in
75+
`tests/services/pipeline/test_vector_pipeline_engine.py`. If you find
76+
a discrepancy, that's a bug — please file an issue.
4277

4378
## See also
4479

45-
- [Pipelines](pipelines.md) — concept page.
46-
- [Pipelines: Event-driven backtest](pipelines-event-backtest.md) — what works today.
80+
- [Pipelines](pipelines.md) — concept page (factor algebra, transforms).
81+
- [Pipelines: Event-driven backtest](pipelines-event-backtest.md)
82+
same surface, event executor.
83+
- [Pipelines: Live trading](pipelines-live.md) — stateless / serverless
84+
notes.

docusaurus/docs/Advanced Concepts/pipelines.md

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ A per-symbol time-series computation. Phase 1 ships these built-ins:
7171
| `SMA(window)` | close | simple moving average |
7272
| `RSI(window)` | close | Wilder's RSI |
7373
| `Volatility(window, periods_per_year=252)` | close | annualised stdev of log returns |
74+
| `StaticPerSymbol(mapping, default=None)` || broadcasts a `dict[symbol, value]` (e.g. sector / market-cap) into the cross-section |
75+
| `CrossSectionalMean(base, mask=None)` | base factor | per-bar equal-weight mean across surviving symbols |
76+
| `RollingBeta(target, market, window>=2)` | two factors | rolling-window OLS beta `cov(t,m)/var(m)`; null when `var(m) == 0` |
77+
| `Neutralize(target, exposures=[...], mask=None, add_intercept=True)` | factors | per-bar OLS residualisation of `target` against `exposures`; null on rank-deficient bars |
7478

7579
You can also subclass `CustomFactor` to compute your own.
7680

@@ -90,6 +94,112 @@ factor.top(n) # boolean mask: top-n by descending value
9094
factor.bottom(n) # boolean mask: bottom-n by ascending value
9195
```
9296

97+
### Cross-sectional transforms
98+
99+
Per-bar normalisation operators (Phase 2). Each takes an optional
100+
`mask` so the statistic is computed only over the universe that
101+
passes the mask:
102+
103+
```python
104+
factor.zscore(mask=universe) # (x - mean) / std per bar
105+
factor.demean(mask=universe) # x - mean per bar
106+
factor.winsorize(0.01, 0.99, # clip to per-bar quantiles
107+
mask=universe)
108+
109+
# Group-relative variants — stats computed within each group
110+
# (typically sector). `groups` accepts a dict[symbol, key] or any
111+
# Factor that emits a per-symbol category.
112+
factor.zscore(groups=SECTORS) # z-score within sector
113+
factor.demean(groups=SECTORS, mask=universe)
114+
```
115+
116+
Where the cross-sectional `std` is `0` or undefined (e.g. only one
117+
symbol survives the mask), `zscore` returns `null` rather than
118+
`inf`/`NaN`. Masked-out symbols are excluded from the bar's
119+
statistic *and* from the bar's output.
120+
121+
### Risk neutrality
122+
123+
When you want a factor's signal to be independent of structural
124+
exposures (sector, beta to the market, multi-factor risk model),
125+
use the built-in risk-neutrality primitives. They cover three
126+
common cases:
127+
128+
**Sector neutrality** — z-score or demean *within* each sector
129+
instead of across the whole universe by passing `groups=`. The
130+
mapping can be a `dict[symbol, sector]` or any `Factor` that
131+
emits a per-symbol category:
132+
133+
```python
134+
SECTORS = {"AAPL": "Tech", "MSFT": "Tech", "JPM": "Fin", ...}
135+
136+
class SectorNeutralMomentum(Pipeline):
137+
momentum = Returns(window=60)
138+
signal = momentum.zscore(groups=SECTORS) # z-score within sector
139+
```
140+
141+
**Beta neutralisation** — strip a factor's exposure to the market
142+
(or any other reference series) using `RollingBeta` and
143+
`Neutralize`:
144+
145+
```python
146+
from investing_algorithm_framework import (
147+
Returns, RollingBeta, CrossSectionalMean, Neutralize,
148+
)
149+
150+
class BetaNeutralAlpha(Pipeline):
151+
r = Returns(window=1)
152+
market = CrossSectionalMean(r) # equal-weight market
153+
beta = RollingBeta(r, market, window=60)
154+
alpha = Neutralize(r, exposures=[beta]) # market-neutral residual
155+
```
156+
157+
**Multi-factor risk model** — pass several exposures to
158+
`Neutralize` and the residual is orthogonal to all of them at
159+
each bar (per-bar OLS):
160+
161+
```python
162+
class FactorNeutralAlpha(Pipeline):
163+
r = Returns(window=1)
164+
size = StaticPerSymbol(MARKET_CAPS) # cross-sectional size
165+
val = BookToPrice()
166+
mom = Returns(window=252)
167+
residual = Neutralize(r, exposures=[size, val, mom])
168+
```
169+
170+
Bars where the system is rank-deficient (more exposures than
171+
surviving symbols) yield `null` residuals so they're skipped
172+
downstream rather than producing `NaN`.
173+
174+
### Factor algebra
175+
176+
Factors compose via the standard arithmetic operators. The framework
177+
auto-coerces scalar operands and shares sub-expression results via a
178+
per-evaluation cache, so the same input factor is computed once even
179+
when it appears multiple times:
180+
181+
```python
182+
class MyScreener(Pipeline):
183+
momentum = Returns(window=30)
184+
vol = Volatility(window=30)
185+
186+
universe = AverageDollarVolume(window=30).top(100)
187+
188+
# Composite alphas — `momentum` is computed once even though it
189+
# appears in two terms.
190+
risk_adjusted = momentum / vol
191+
score = (
192+
momentum.zscore(mask=universe)
193+
- 0.5 * vol.zscore(mask=universe)
194+
)
195+
```
196+
197+
Supported operators: `+`, `-`, `*`, `/`, unary `-`. Both operands may
198+
be `Factor` instances; either may be a Python `int` or `float`.
199+
Division by zero leaves `inf` in place (downstream filters can drop
200+
it) — for safe normalisation prefer `zscore`, which guards against
201+
zero dispersion.
202+
93203
## Phased rollout
94204

95205
Pipelines run today in the **event-driven backtest** path and in
@@ -99,7 +209,7 @@ and cached/lazy execution are tracked separately.
99209
| Mode | Status | Page |
100210
| --- | --- | --- |
101211
| Event-driven backtest | ✅ Phase 1 | [Pipelines: Event-driven backtest](pipelines-event-backtest.md) |
102-
| Vector backtest | 🚧 Phase 2 ([#502](https://github.com/coding-kitties/investing-algorithm-framework/issues/502)) | [Pipelines: Vector backtest](pipelines-vector-backtest.md) |
212+
| Vector backtest | Phase 2 ([#502](https://github.com/coding-kitties/investing-algorithm-framework/issues/502)) | [Pipelines: Vector backtest](pipelines-vector-backtest.md) |
103213
| Live trading | 🚧 Phase 3 ([#503](https://github.com/coding-kitties/investing-algorithm-framework/issues/503)) | [Pipelines: Live trading](pipelines-live.md) |
104214

105215
Start with the event-driven backtest page — it covers the full Phase 1

0 commit comments

Comments
 (0)