Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion docusaurus/docs/Advanced Concepts/pipelines-live.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,132 @@ If you want to experiment, the same example covers both:

- A streaming panel that appends new bars instead of rebuilding from
scratch.
- Validation of `warmup_window` against `pipeline.required_window()`
- Validation of `warmup_window` against `pipeline.required_window()`
so you get a clear error at startup if any data source is too short.
- ✅ Live envelope validation (≤ 50 symbols, daily-or-coarser
timeframes) at first iteration in non-backtest environments.
- ✅ Universe-refresh cadence (`Pipeline.refresh_universe_every`) so
the universe filter doesn't have to run every bar.
- ✅ Per-pipeline error resilience — a single failing pipeline is
logged and skipped in live mode instead of killing the iteration.
- 🚧 Batched / async OHLCV fetch in `CCXTOHLCVDataProvider` (tracked
for a follow-up PR — needs live integration testing).
- First-class handling of partial bars (so you don't accidentally trade
on an unclosed candle).
- Live observability hooks for pipeline output (print/log/snapshot).

## Live envelope (shipped)

v1 of the live pipeline engine is intentionally conservative:

| Constraint | Value |
| --- | --- |
| Maximum unique OHLCV symbols per strategy | `50` |
| Minimum supported timeframe | daily (`24h`) |

These limits are checked once per run at the first iteration when the
environment is not `BACKTEST`. Violations raise
`OperationalException` with an actionable message naming the
strategy and the offending symbols / timeframe. Backtests are
unaffected — sub-daily timeframes and large universes keep working in
backtest mode exactly as before.

## Universe refresh cadence (shipped)

A pipeline can declare how often the universe filter should be
re-evaluated by setting the class attribute
`refresh_universe_every: timedelta`. Inside that cadence the engine
keeps the surviving symbol set fixed and skips evaluating the
universe filter, which is typically the most expensive part of the
pipeline.

```python
from datetime import timedelta

class DailyMomentum(Pipeline):
sma200 = SMA(window=200)
dollar_volume = AverageDollarVolume(window=30)

universe = dollar_volume.top(50)
signal = sma200.zscore(mask=universe)

# Re-pick the top-50 universe once per day; reuse the
# selection on every intra-day bar.
refresh_universe_every = timedelta(days=1)
```

When `refresh_universe_every` is `None` (the default) the universe is
re-evaluated every bar, preserving Phase 1 / Phase 2 behaviour
exactly.

## Per-pipeline resilience (shipped)

In non-backtest environments a single pipeline raising during
`evaluate` is logged and the iteration continues with an empty output
frame for that pipeline — so an unrelated strategy on the same event
loop is not knocked out by one bad data source. Backtests still
re-raise so failures stay deterministic.

## Warmup validation (shipped)

When a strategy declares `pipelines = [...]`, the framework validates
at construction time that every OHLCV data source has a
`warmup_window` ≥ the pipeline's longest factor window. If any source
is short — or `warmup_window` is left unset — strategy instantiation
raises `OperationalException` with an actionable message listing the
offending sources and the required window size.

```python
class MomentumScreener(Pipeline):
sma = SMA(window=200)


class MyStrategy(TradingStrategy):
time_unit = TimeUnit.HOUR
interval = 1
pipelines = [MomentumScreener]
data_sources = [
DataSource(
symbol="BTC/EUR",
data_type=DataType.OHLCV,
time_frame=TimeFrame.ONE_HOUR,
market="BITVAVO",
warmup_window=200, # must be >= 200 to satisfy SMA(200)
),
]
```

This eliminates a common failure mode in live deployments: a bot
silently trading on NaN factor columns until enough bars accrue.

## What stays the same

The same `Pipeline` subclasses you use in backtests run live.

## Stateless / serverless deployment (AWS Lambda, Azure Functions)

Live trading is frequently deployed on **AWS Lambda** or **Azure
Functions** via the framework's stateless mode (see
`investing_algorithm_framework/cli/deploy_to_aws_lambda.py` and
`deploy_to_azure_function.py`). The pipeline runtime is designed to
be safe in those environments:

- **No cross-invocation state.** Pipelines hold no module-level
mutable state. Each call to `PipelineEngine.evaluate(...)` (event
mode) and `VectorPipelineEngine.evaluate_window(...)` (vector mode)
builds a fresh panel and a fresh result frame.
- **Per-evaluation cache, scoped via `contextvars`.** The shared
sub-expression cache used by composite factors (e.g. `r + r.zscore()`
reusing `r`'s computation) lives in a `ContextVar` that is
installed at the start of each `evaluate` call and reset in a
`finally` block. A warm Lambda / Functions container reusing the
process between invocations sees a clean cache every time.
- **Pure factor composition.** `Factor.zscore()`, `demean()`,
`winsorize()`, and arithmetic (`+ - * /`, unary `-`) all return new
factor objects without mutating their inputs. Building a pipeline
is a pure operation, so it's safe to construct pipelines at module
load time on Lambda/Functions cold start.

## Want to help?

Track or comment on the implementation issue:
Expand Down
98 changes: 68 additions & 30 deletions docusaurus/docs/Advanced Concepts/pipelines-vector-backtest.md
Original file line number Diff line number Diff line change
@@ -1,46 +1,84 @@
---
sidebar_position: 11
title: Pipelines — Vector backtest (roadmap)
description: Vector-mode pipeline execution. Tracked under #502.
title: Pipelines — Vector backtest
description: Vector-mode pipeline execution. Phase 2 (#502) — shipped.
---

# Pipelines: Vector backtest

:::info Status: not yet shipped (Phase 2)

Vector-mode pipelines are tracked under
[#502](https://github.com/coding-kitties/investing-algorithm-framework/issues/502).
The public API (`Pipeline`, `Factor`, `Filter`) defined in
[Pipelines: Event-driven backtest](pipelines-event-backtest.md) is
intentionally engine-agnostic, so strategies you write against Phase 1
will keep working when Phase 2 lands.
:::tip Status: shipped (Phase 2)
Vector-mode pipelines run by default whenever you backtest with
`BacktestService` — no opt-in needed. The vector engine evaluates each
declared factor across the **entire** backtest window once per
strategy iteration, with shared sub-expression caching.
:::

## What's planned
## How it works

When `BacktestService` runs, it inspects each strategy's `pipelines`
list and routes them through `VectorPipelineEngine`. For every
iteration:

1. The engine builds a long-form Polars panel
`(datetime, symbol, open, high, low, close, volume)` truncated at
the current bar (no look-ahead).
2. Each declared `Factor` is evaluated **once** in vectorised Polars,
per symbol, over the full window.
3. A per-evaluation cache (a `ContextVar`) memoises shared
sub-expressions — for example `r.zscore() - r.demean()` only
computes `r` once.
4. The optional `universe` mask filters the result; the universe
column itself is dropped from the output.
5. The strategy receives the wide frame via
`data["YourPipelineClassName"]`.

The strategy author surface is unchanged from
[Pipelines: Event-driven backtest](pipelines-event-backtest.md): you
write the same `Pipeline` subclasses and read the same
`data["..."]` frames.

## Lazy / streaming execution

For memory-bound runs over very large universes you can opt the
post-factor pipeline (universe filter + drop + sort) onto Polars'
streaming engine:

```python
from investing_algorithm_framework.services.pipeline import (
VectorPipelineEngine,
)

- A vector executor that materialises every factor in the pipeline
once over the **entire** backtest window, instead of rebuilding the
panel on each event.
- Integration with the existing vector backtester (see
[Vector backtesting](vector-backtesting.md)).
- Cached intermediate frames so a `rank` of a `Returns` doesn't
recompute returns.
- Optional Polars **lazy** execution path for memory-bound runs.
engine = VectorPipelineEngine(lazy=True)
result = engine.evaluate_window(
pipeline_cls=MomentumScreener,
data_object=panel_data,
symbol_to_identifier=sym_id,
)
```

## What stays the same
`lazy=True` is **bit-for-bit equivalent** to the default eager mode
(this is verified by an equivalence test in the suite). It only
changes how the result frame is collected — factors themselves still
return eager `pl.Series` values per symbol. On older Polars versions
that don't accept `engine="streaming"` on `collect`, the engine falls
back to a default collect transparently.

The strategy author surface — declaring a `Pipeline` subclass, listing
it on `strategy.pipelines`, and reading
`data["YourPipelineClassName"]` inside `run_strategy` — does not
change. Switching from event mode to vector mode is meant to be a
runner choice, not a strategy rewrite.
You typically don't need to instantiate `VectorPipelineEngine`
yourself; `BacktestService` handles it. The `lazy` flag is exposed for
direct users of the engine and for performance experiments.

## Want to help?
## Equivalence with event mode

Track or comment on the implementation issue:
[#502 — Pipeline API: Phase 2 (vector executor)](https://github.com/coding-kitties/investing-algorithm-framework/issues/502).
Vector and event mode are required to produce **identical** factor
values for the same panel and same `as_of`. The test suite enforces
this with cross-mode equivalence tests in
`tests/services/pipeline/test_vector_pipeline_engine.py`. If you find
a discrepancy, that's a bug — please file an issue.

## See also

- [Pipelines](pipelines.md) — concept page.
- [Pipelines: Event-driven backtest](pipelines-event-backtest.md) — what works today.
- [Pipelines](pipelines.md) — concept page (factor algebra, transforms).
- [Pipelines: Event-driven backtest](pipelines-event-backtest.md) —
same surface, event executor.
- [Pipelines: Live trading](pipelines-live.md) — stateless / serverless
notes.
112 changes: 111 additions & 1 deletion docusaurus/docs/Advanced Concepts/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ A per-symbol time-series computation. Phase 1 ships these built-ins:
| `SMA(window)` | close | simple moving average |
| `RSI(window)` | close | Wilder's RSI |
| `Volatility(window, periods_per_year=252)` | close | annualised stdev of log returns |
| `StaticPerSymbol(mapping, default=None)` | — | broadcasts a `dict[symbol, value]` (e.g. sector / market-cap) into the cross-section |
| `CrossSectionalMean(base, mask=None)` | base factor | per-bar equal-weight mean across surviving symbols |
| `RollingBeta(target, market, window>=2)` | two factors | rolling-window OLS beta `cov(t,m)/var(m)`; null when `var(m) == 0` |
| `Neutralize(target, exposures=[...], mask=None, add_intercept=True)` | factors | per-bar OLS residualisation of `target` against `exposures`; null on rank-deficient bars |

You can also subclass `CustomFactor` to compute your own.

Expand All @@ -90,6 +94,112 @@ factor.top(n) # boolean mask: top-n by descending value
factor.bottom(n) # boolean mask: bottom-n by ascending value
```

### Cross-sectional transforms

Per-bar normalisation operators (Phase 2). Each takes an optional
`mask` so the statistic is computed only over the universe that
passes the mask:

```python
factor.zscore(mask=universe) # (x - mean) / std per bar
factor.demean(mask=universe) # x - mean per bar
factor.winsorize(0.01, 0.99, # clip to per-bar quantiles
mask=universe)

# Group-relative variants — stats computed within each group
# (typically sector). `groups` accepts a dict[symbol, key] or any
# Factor that emits a per-symbol category.
factor.zscore(groups=SECTORS) # z-score within sector
factor.demean(groups=SECTORS, mask=universe)
```

Where the cross-sectional `std` is `0` or undefined (e.g. only one
symbol survives the mask), `zscore` returns `null` rather than
`inf`/`NaN`. Masked-out symbols are excluded from the bar's
statistic *and* from the bar's output.

### Risk neutrality

When you want a factor's signal to be independent of structural
exposures (sector, beta to the market, multi-factor risk model),
use the built-in risk-neutrality primitives. They cover three
common cases:

**Sector neutrality** — z-score or demean *within* each sector
instead of across the whole universe by passing `groups=`. The
mapping can be a `dict[symbol, sector]` or any `Factor` that
emits a per-symbol category:

```python
SECTORS = {"AAPL": "Tech", "MSFT": "Tech", "JPM": "Fin", ...}

class SectorNeutralMomentum(Pipeline):
momentum = Returns(window=60)
signal = momentum.zscore(groups=SECTORS) # z-score within sector
```

**Beta neutralisation** — strip a factor's exposure to the market
(or any other reference series) using `RollingBeta` and
`Neutralize`:

```python
from investing_algorithm_framework import (
Returns, RollingBeta, CrossSectionalMean, Neutralize,
)

class BetaNeutralAlpha(Pipeline):
r = Returns(window=1)
market = CrossSectionalMean(r) # equal-weight market
beta = RollingBeta(r, market, window=60)
alpha = Neutralize(r, exposures=[beta]) # market-neutral residual
```

**Multi-factor risk model** — pass several exposures to
`Neutralize` and the residual is orthogonal to all of them at
each bar (per-bar OLS):

```python
class FactorNeutralAlpha(Pipeline):
r = Returns(window=1)
size = StaticPerSymbol(MARKET_CAPS) # cross-sectional size
val = BookToPrice()
mom = Returns(window=252)
residual = Neutralize(r, exposures=[size, val, mom])
```

Bars where the system is rank-deficient (more exposures than
surviving symbols) yield `null` residuals so they're skipped
downstream rather than producing `NaN`.

### Factor algebra

Factors compose via the standard arithmetic operators. The framework
auto-coerces scalar operands and shares sub-expression results via a
per-evaluation cache, so the same input factor is computed once even
when it appears multiple times:

```python
class MyScreener(Pipeline):
momentum = Returns(window=30)
vol = Volatility(window=30)

universe = AverageDollarVolume(window=30).top(100)

# Composite alphas — `momentum` is computed once even though it
# appears in two terms.
risk_adjusted = momentum / vol
score = (
momentum.zscore(mask=universe)
- 0.5 * vol.zscore(mask=universe)
)
```

Supported operators: `+`, `-`, `*`, `/`, unary `-`. Both operands may
be `Factor` instances; either may be a Python `int` or `float`.
Division by zero leaves `inf` in place (downstream filters can drop
it) — for safe normalisation prefer `zscore`, which guards against
zero dispersion.

## Phased rollout

Pipelines run today in the **event-driven backtest** path and in
Expand All @@ -99,7 +209,7 @@ and cached/lazy execution are tracked separately.
| Mode | Status | Page |
| --- | --- | --- |
| Event-driven backtest | ✅ Phase 1 | [Pipelines: Event-driven backtest](pipelines-event-backtest.md) |
| Vector backtest | 🚧 Phase 2 ([#502](https://github.com/coding-kitties/investing-algorithm-framework/issues/502)) | [Pipelines: Vector backtest](pipelines-vector-backtest.md) |
| Vector backtest | Phase 2 ([#502](https://github.com/coding-kitties/investing-algorithm-framework/issues/502)) | [Pipelines: Vector backtest](pipelines-vector-backtest.md) |
| Live trading | 🚧 Phase 3 ([#503](https://github.com/coding-kitties/investing-algorithm-framework/issues/503)) | [Pipelines: Live trading](pipelines-live.md) |

Start with the event-driven backtest page — it covers the full Phase 1
Expand Down
Loading
Loading