Skip to content

Commit 2850d5d

Browse files
committed
feat(pipeline): validate pipeline warmup_window at strategy construction (#503 phase 3a)
When a strategy declares pipelines=[...], TradingStrategy.__init__ now checks that every OHLCV data source has warmup_window >= pipeline.required_window() and raises OperationalException with an actionable message otherwise. Without this guard a live bot silently trades on NaN columns until enough bars accrue. - 6 new tests covering: sufficient/insufficient/unset warmup, no OHLCV source, no pipelines (zero-cost), and multi-pipeline max-window aggregation. - pipelines-live.md: mark warmup validation shipped.
1 parent 4af8b2d commit 2850d5d

3 files changed

Lines changed: 207 additions & 1 deletion

File tree

docusaurus/docs/Advanced Concepts/pipelines-live.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,44 @@ If you want to experiment, the same example covers both:
3232

3333
- A streaming panel that appends new bars instead of rebuilding from
3434
scratch.
35-
- Validation of `warmup_window` against `pipeline.required_window()`
35+
- Validation of `warmup_window` against `pipeline.required_window()`
3636
so you get a clear error at startup if any data source is too short.
3737
- First-class handling of partial bars (so you don't accidentally trade
3838
on an unclosed candle).
3939
- Live observability hooks for pipeline output (print/log/snapshot).
4040

41+
## Warmup validation (shipped)
42+
43+
When a strategy declares `pipelines = [...]`, the framework validates
44+
at construction time that every OHLCV data source has a
45+
`warmup_window` ≥ the pipeline's longest factor window. If any source
46+
is short — or `warmup_window` is left unset — strategy instantiation
47+
raises `OperationalException` with an actionable message listing the
48+
offending sources and the required window size.
49+
50+
```python
51+
class MomentumScreener(Pipeline):
52+
sma = SMA(window=200)
53+
54+
55+
class MyStrategy(TradingStrategy):
56+
time_unit = TimeUnit.HOUR
57+
interval = 1
58+
pipelines = [MomentumScreener]
59+
data_sources = [
60+
DataSource(
61+
symbol="BTC/EUR",
62+
data_type=DataType.OHLCV,
63+
time_frame=TimeFrame.ONE_HOUR,
64+
market="BITVAVO",
65+
warmup_window=200, # must be >= 200 to satisfy SMA(200)
66+
),
67+
]
68+
```
69+
70+
This eliminates a common failure mode in live deployments: a bot
71+
silently trading on NaN factor columns until enough bars accrue.
72+
4173
## What stays the same
4274

4375
The same `Pipeline` subclasses you use in backtests run live.

investing_algorithm_framework/app/strategy.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,57 @@ def __init__(
194194
f"interval or use a smaller data timeframe."
195195
)
196196

197+
# Validate that every declared pipeline has enough warmup on
198+
# the strategy's OHLCV data sources to compute its longest
199+
# factor window. Without this check a live strategy starts
200+
# silently emitting NaN columns until enough bars accrue.
201+
# Tracked under #503 (Phase 3 — live hardening).
202+
if self.pipelines:
203+
ohlcv_sources = [
204+
ds for ds in self.data_sources
205+
if DataType.OHLCV.equals(ds.data_type)
206+
]
207+
if not ohlcv_sources:
208+
raise OperationalException(
209+
f"Strategy '{self.strategy_id}' declares "
210+
f"{len(self.pipelines)} pipeline(s) but has no "
211+
f"OHLCV data sources. Pipelines build their panel "
212+
f"from the strategy's OHLCV data sources — add at "
213+
f"least one DataSource(data_type=DataType.OHLCV, "
214+
f"...) to data_sources."
215+
)
216+
for pipeline_cls in self.pipelines:
217+
required = pipeline_cls.required_window()
218+
# We compare against each OHLCV source's warmup_window
219+
# (canonical) / window_size (legacy alias) — the
220+
# framework keeps them in sync, but a source may
221+
# legitimately leave both unset, which we treat as
222+
# "insufficient" because the pipeline cannot guarantee
223+
# a full window of data.
224+
short_sources = [
225+
ds for ds in ohlcv_sources
226+
if ds.warmup_window is None
227+
or ds.warmup_window < required
228+
]
229+
if short_sources:
230+
short_desc = ", ".join(
231+
f"{ds.symbol}@{ds.time_frame.value if ds.time_frame else '?'}" # noqa: E501
232+
f" (warmup_window="
233+
f"{ds.warmup_window if ds.warmup_window is not None else 'unset'})" # noqa: E501
234+
for ds in short_sources
235+
)
236+
raise OperationalException(
237+
f"Strategy '{self.strategy_id}' pipeline "
238+
f"'{pipeline_cls.__name__}' requires a "
239+
f"warmup window of {required} bars but the "
240+
f"following OHLCV data source(s) have an "
241+
f"insufficient warmup_window: {short_desc}. "
242+
f"Set warmup_window>={required} on each OHLCV "
243+
f"DataSource the pipeline relies on, otherwise "
244+
f"the pipeline will emit NaN columns until "
245+
f"enough bars accrue."
246+
)
247+
197248
# Initialize stop_losses as a new list per instance
198249
if stop_losses is not None:
199250
self.stop_losses = list(stop_losses)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
"""Phase 3 (#503): warmup_window validation for pipelines.
2+
3+
When a strategy declares ``pipelines = [...]`` the framework now
4+
verifies at strategy construction time that every OHLCV data source
5+
has a ``warmup_window`` >= the pipeline's longest factor window.
6+
Without this check a live strategy starts silently emitting NaN
7+
columns until enough bars accrue.
8+
"""
9+
from unittest import TestCase
10+
11+
from investing_algorithm_framework import (
12+
DataSource,
13+
DataType,
14+
Pipeline,
15+
Returns,
16+
SMA,
17+
TimeFrame,
18+
TimeUnit,
19+
TradingStrategy,
20+
)
21+
from investing_algorithm_framework.domain import OperationalException
22+
23+
24+
class _ShortPipeline(Pipeline):
25+
momentum = Returns(window=5)
26+
27+
28+
class _LongPipeline(Pipeline):
29+
sma = SMA(window=200)
30+
31+
32+
class _PipelineStrategy(TradingStrategy):
33+
time_unit = TimeUnit.HOUR
34+
interval = 1
35+
symbols = ["BTC"]
36+
37+
def generate_buy_signals(self, data): # pragma: no cover - not exercised
38+
return {}
39+
40+
def generate_sell_signals(self, data): # pragma: no cover - not exercised
41+
return {}
42+
43+
44+
def _ds(warmup_window=None):
45+
kwargs = dict(
46+
symbol="BTC/EUR",
47+
data_type=DataType.OHLCV,
48+
time_frame=TimeFrame.ONE_HOUR,
49+
market="BITVAVO",
50+
)
51+
if warmup_window is not None:
52+
kwargs["warmup_window"] = warmup_window
53+
return DataSource(**kwargs)
54+
55+
56+
class TestPipelineWarmupValidation(TestCase):
57+
def test_sufficient_warmup_window_passes(self):
58+
strategy = _PipelineStrategy(
59+
data_sources=[_ds(warmup_window=10)],
60+
decorated=None,
61+
)
62+
strategy.pipelines = [_ShortPipeline]
63+
# Re-init to trigger validation with pipelines set.
64+
strategy = _PipelineStrategy(
65+
data_sources=[_ds(warmup_window=10)],
66+
)
67+
# Class-level pipelines = [] by default; explicitly set then
68+
# reconstruct via subclass.
69+
70+
class _Strat(_PipelineStrategy):
71+
pipelines = [_ShortPipeline]
72+
73+
instance = _Strat(data_sources=[_ds(warmup_window=10)])
74+
self.assertEqual(instance.pipelines, [_ShortPipeline])
75+
76+
def test_insufficient_warmup_window_raises(self):
77+
class _Strat(_PipelineStrategy):
78+
pipelines = [_LongPipeline]
79+
80+
with self.assertRaises(OperationalException) as cm:
81+
_Strat(data_sources=[_ds(warmup_window=50)])
82+
83+
msg = str(cm.exception)
84+
self.assertIn("_LongPipeline", msg)
85+
self.assertIn("200", msg)
86+
self.assertIn("warmup_window", msg)
87+
88+
def test_unset_warmup_window_raises(self):
89+
class _Strat(_PipelineStrategy):
90+
pipelines = [_ShortPipeline]
91+
92+
with self.assertRaises(OperationalException) as cm:
93+
_Strat(data_sources=[_ds()])
94+
95+
msg = str(cm.exception)
96+
self.assertIn("unset", msg)
97+
98+
def test_no_ohlcv_data_sources_raises(self):
99+
class _Strat(_PipelineStrategy):
100+
pipelines = [_ShortPipeline]
101+
102+
with self.assertRaises(OperationalException) as cm:
103+
_Strat(data_sources=[])
104+
105+
msg = str(cm.exception)
106+
self.assertIn("no OHLCV data sources", msg)
107+
108+
def test_no_pipelines_skips_validation(self):
109+
"""Strategies that don't declare pipelines pay zero cost — even
110+
a missing warmup_window must not raise."""
111+
instance = _PipelineStrategy(data_sources=[_ds()])
112+
self.assertEqual(instance.pipelines, [])
113+
114+
def test_multiple_pipelines_uses_max_window(self):
115+
class _Strat(_PipelineStrategy):
116+
pipelines = [_ShortPipeline, _LongPipeline]
117+
118+
# warmup of 199 satisfies _ShortPipeline (5) but not
119+
# _LongPipeline (200). The error must mention _LongPipeline.
120+
with self.assertRaises(OperationalException) as cm:
121+
_Strat(data_sources=[_ds(warmup_window=199)])
122+
msg = str(cm.exception)
123+
self.assertIn("_LongPipeline", msg)

0 commit comments

Comments
 (0)