Skip to content

Commit aabf9ae

Browse files
committed
feat(pipeline): factor arithmetic, cross-sectional transforms, lazy executor (#502 phase 2c+2d)
- Factor arithmetic: __neg__, __add__/__radd__, __sub__/__rsub__, __mul__/__rmul__, __truediv__/__rtruediv__ with scalar auto-coercion via _Constant - Cross-sectional transforms: Factor.zscore(mask=), Factor.demean(mask=), Factor.winsorize(lower, upper, mask=) — all per-bar via over('datetime') - VectorPipelineEngine(lazy=True) routes the post-factor universe filter + sort through Polars' streaming engine for memory-bound runs - Document stateless / serverless guarantees for AWS Lambda + Azure Functions deployments (per-evaluation contextvar cache, no cross-invocation state) - 9 new tests covering arithmetic, transforms, lazy/eager equivalence, and stateless cache lifecycle
1 parent 48bf3e6 commit aabf9ae

4 files changed

Lines changed: 657 additions & 0 deletions

File tree

docusaurus/docs/Advanced Concepts/pipelines-live.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,30 @@ If you want to experiment, the same example covers both:
4242

4343
The same `Pipeline` subclasses you use in backtests run live.
4444

45+
## Stateless / serverless deployment (AWS Lambda, Azure Functions)
46+
47+
Live trading is frequently deployed on **AWS Lambda** or **Azure
48+
Functions** via the framework's stateless mode (see
49+
`investing_algorithm_framework/cli/deploy_to_aws_lambda.py` and
50+
`deploy_to_azure_function.py`). The pipeline runtime is designed to
51+
be safe in those environments:
52+
53+
- **No cross-invocation state.** Pipelines hold no module-level
54+
mutable state. Each call to `PipelineEngine.evaluate(...)` (event
55+
mode) and `VectorPipelineEngine.evaluate_window(...)` (vector mode)
56+
builds a fresh panel and a fresh result frame.
57+
- **Per-evaluation cache, scoped via `contextvars`.** The shared
58+
sub-expression cache used by composite factors (e.g. `r + r.zscore()`
59+
reusing `r`'s computation) lives in a `ContextVar` that is
60+
installed at the start of each `evaluate` call and reset in a
61+
`finally` block. A warm Lambda / Functions container reusing the
62+
process between invocations sees a clean cache every time.
63+
- **Pure factor composition.** `Factor.zscore()`, `demean()`,
64+
`winsorize()`, and arithmetic (`+ - * /`, unary `-`) all return new
65+
factor objects without mutating their inputs. Building a pipeline
66+
is a pure operation, so it's safe to construct pipelines at module
67+
load time on Lambda/Functions cold start.
68+
4569
## Want to help?
4670

4771
Track or comment on the implementation issue:

investing_algorithm_framework/domain/pipeline/factor.py

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,76 @@ def bottom(self, n: int) -> "Filter":
124124
from .filter import _BottomN
125125
return _BottomN(self, n)
126126

127+
# ------------------------------------------------------------------ #
128+
# Cross-sectional transforms (Phase 2 / #502)
129+
# ------------------------------------------------------------------ #
130+
def zscore(self, mask: Optional["Filter"] = None) -> "Factor":
131+
"""Cross-sectional z-score within each timestamp.
132+
133+
Returns ``(x - mean) / std`` computed over the symbols at each
134+
bar. With ``mask``, symbols outside the mask are excluded from
135+
the mean/std and receive ``null`` in the output.
136+
"""
137+
return _Zscore(self, mask=mask)
138+
139+
def demean(self, mask: Optional["Filter"] = None) -> "Factor":
140+
"""Cross-sectional mean removal within each timestamp.
141+
142+
Returns ``x - mean(x)`` computed over the symbols at each bar.
143+
With ``mask``, symbols outside the mask are excluded from the
144+
mean and receive ``null`` in the output.
145+
"""
146+
return _Demean(self, mask=mask)
147+
148+
def winsorize(
149+
self,
150+
lower: float = 0.01,
151+
upper: float = 0.99,
152+
mask: Optional["Filter"] = None,
153+
) -> "Factor":
154+
"""Cross-sectional winsorisation within each timestamp.
155+
156+
Clips values below the ``lower`` quantile and above the
157+
``upper`` quantile (computed per bar). Both bounds are in
158+
``[0, 1]`` with ``lower < upper``.
159+
"""
160+
if not (0.0 <= lower < upper <= 1.0):
161+
raise ValueError(
162+
f"winsorize requires 0 <= lower < upper <= 1, "
163+
f"got lower={lower}, upper={upper}"
164+
)
165+
return _Winsorize(self, lower=lower, upper=upper, mask=mask)
166+
167+
# ------------------------------------------------------------------ #
168+
# Arithmetic (Phase 2 / #502) — composes Factors into expression trees
169+
# ------------------------------------------------------------------ #
170+
def __neg__(self) -> "Factor":
171+
return _UnaryOp(self, op="neg")
172+
173+
def __add__(self, other) -> "Factor":
174+
return _BinaryOp(self, other, op="add")
175+
176+
def __radd__(self, other) -> "Factor":
177+
return _BinaryOp(other, self, op="add")
178+
179+
def __sub__(self, other) -> "Factor":
180+
return _BinaryOp(self, other, op="sub")
181+
182+
def __rsub__(self, other) -> "Factor":
183+
return _BinaryOp(other, self, op="sub")
184+
185+
def __mul__(self, other) -> "Factor":
186+
return _BinaryOp(self, other, op="mul")
187+
188+
def __rmul__(self, other) -> "Factor":
189+
return _BinaryOp(other, self, op="mul")
190+
191+
def __truediv__(self, other) -> "Factor":
192+
return _BinaryOp(self, other, op="div")
193+
194+
def __rtruediv__(self, other) -> "Factor":
195+
return _BinaryOp(other, self, op="div")
196+
127197
# ------------------------------------------------------------------ #
128198
# Repr
129199
# ------------------------------------------------------------------ #
@@ -180,3 +250,218 @@ def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
180250
.alias("__rank__")
181251
)
182252
return ranked["__rank__"]
253+
254+
255+
# --------------------------------------------------------------------- #
256+
# Phase 2 expression-tree wrappers (#502): arithmetic + cross-sectional
257+
# transforms. Each wrapper composes existing factors into a new factor
258+
# without losing the per-evaluation cache (they call ``evaluate`` on
259+
# their children, not ``compute_panel``).
260+
# --------------------------------------------------------------------- #
261+
def _coerce_operand(operand) -> "Factor":
262+
"""Wrap a scalar operand in a :class:`_Constant` so binary ops
263+
can treat ``factor + 1`` and ``factor + other_factor`` uniformly.
264+
"""
265+
if isinstance(operand, Factor):
266+
return operand
267+
if isinstance(operand, (int, float)):
268+
return _Constant(float(operand))
269+
raise TypeError(
270+
f"Unsupported operand type for Factor arithmetic: "
271+
f"{type(operand).__name__}"
272+
)
273+
274+
275+
class _Constant(Factor):
276+
"""A panel-aligned constant series. Window is 1 (no warmup needed)."""
277+
278+
inputs: List[str] = []
279+
280+
def __init__(self, value: float) -> None:
281+
super().__init__(window=1)
282+
self._value = float(value)
283+
284+
def required_columns(self) -> List[str]:
285+
return []
286+
287+
def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
288+
return pl.Series(
289+
"__const__", [self._value] * panel.height, dtype=pl.Float64
290+
)
291+
292+
293+
class _UnaryOp(Factor):
294+
"""Element-wise unary op (currently only ``neg``)."""
295+
296+
def __init__(self, base: Factor, op: str) -> None:
297+
super().__init__(window=base.required_window())
298+
self._base = base
299+
self._op = op
300+
self.inputs = list(base.required_columns())
301+
302+
def required_columns(self) -> List[str]:
303+
return list(self.inputs)
304+
305+
def required_window(self) -> int:
306+
return int(self.window)
307+
308+
def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
309+
values = self._base.evaluate(panel)
310+
if self._op == "neg":
311+
return (-values).rename("__unary__")
312+
raise ValueError(f"Unknown unary op: {self._op}") # pragma: no cover
313+
314+
315+
class _BinaryOp(Factor):
316+
"""Element-wise binary arithmetic between two ``Factor``s.
317+
318+
Either operand may be a scalar; it is auto-wrapped in
319+
:class:`_Constant`.
320+
"""
321+
322+
def __init__(self, left, right, op: str) -> None:
323+
left_f = _coerce_operand(left)
324+
right_f = _coerce_operand(right)
325+
super().__init__(
326+
window=max(
327+
left_f.required_window(), right_f.required_window()
328+
)
329+
)
330+
self._left = left_f
331+
self._right = right_f
332+
self._op = op
333+
cols: List[str] = list(left_f.required_columns())
334+
for c in right_f.required_columns():
335+
if c not in cols:
336+
cols.append(c)
337+
self.inputs = cols
338+
339+
def required_columns(self) -> List[str]:
340+
return list(self.inputs)
341+
342+
def required_window(self) -> int:
343+
return int(self.window)
344+
345+
def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
346+
left = self._left.evaluate(panel)
347+
right = self._right.evaluate(panel)
348+
if self._op == "add":
349+
out = left + right
350+
elif self._op == "sub":
351+
out = left - right
352+
elif self._op == "mul":
353+
out = left * right
354+
elif self._op == "div":
355+
# Polars naturally yields nulls when the divisor is null;
356+
# division by zero produces inf which we leave as-is so
357+
# callers can decide what to do (e.g. ``zscore`` will
358+
# propagate inf and downstream filters can drop it).
359+
out = left / right
360+
else:
361+
raise ValueError( # pragma: no cover
362+
f"Unknown binary op: {self._op}"
363+
)
364+
return out.rename("__binop__")
365+
366+
367+
class _CrossSectionalTransform(Factor):
368+
"""Common base for per-bar transforms (zscore / demean / winsorize).
369+
370+
Subclasses implement :meth:`_transform_per_bar` which receives a
371+
Polars expression for the (possibly mask-nulled) factor values and
372+
returns the transformed expression. The base class handles mask
373+
application and per-``datetime`` grouping.
374+
"""
375+
376+
def __init__(
377+
self,
378+
base: Factor,
379+
mask: Optional["Filter"] = None,
380+
) -> None:
381+
super().__init__(window=base.required_window())
382+
self._base = base
383+
self._mask = mask
384+
cols = list(base.required_columns())
385+
if mask is not None:
386+
for c in mask.required_columns():
387+
if c not in cols:
388+
cols.append(c)
389+
self.window = max(self.window, mask.required_window())
390+
self.inputs = cols
391+
392+
def required_columns(self) -> List[str]:
393+
return list(self.inputs)
394+
395+
def required_window(self) -> int:
396+
return int(self.window)
397+
398+
def _transform_expr(self) -> pl.Expr:
399+
raise NotImplementedError # pragma: no cover
400+
401+
def compute_panel(self, panel: pl.DataFrame) -> pl.Series:
402+
values = self._base.evaluate(panel)
403+
df = panel.select(["datetime", "symbol"]).with_columns(
404+
values.alias("__x__")
405+
)
406+
if self._mask is not None:
407+
mask_values = self._mask.evaluate(panel)
408+
df = df.with_columns(
409+
pl.when(mask_values)
410+
.then(pl.col("__x__"))
411+
.otherwise(None)
412+
.alias("__x__")
413+
)
414+
df = df.with_columns(self._transform_expr().alias("__out__"))
415+
return df["__out__"]
416+
417+
418+
class _Zscore(_CrossSectionalTransform):
419+
"""Cross-sectional z-score per bar."""
420+
421+
def _transform_expr(self) -> pl.Expr:
422+
x = pl.col("__x__")
423+
mean = x.mean().over("datetime")
424+
std = x.std().over("datetime")
425+
# If std is 0 or null, returning null is the safe choice (it
426+
# signals "no dispersion" rather than producing inf/NaN that
427+
# poisons downstream rolling stats).
428+
return (
429+
pl.when((std == 0) | std.is_null())
430+
.then(None)
431+
.otherwise((x - mean) / std)
432+
)
433+
434+
435+
class _Demean(_CrossSectionalTransform):
436+
"""Cross-sectional mean removal per bar."""
437+
438+
def _transform_expr(self) -> pl.Expr:
439+
x = pl.col("__x__")
440+
return x - x.mean().over("datetime")
441+
442+
443+
class _Winsorize(_CrossSectionalTransform):
444+
"""Cross-sectional clip-to-quantiles per bar."""
445+
446+
def __init__(
447+
self,
448+
base: Factor,
449+
lower: float,
450+
upper: float,
451+
mask: Optional["Filter"] = None,
452+
) -> None:
453+
super().__init__(base=base, mask=mask)
454+
self._lower = float(lower)
455+
self._upper = float(upper)
456+
457+
def _transform_expr(self) -> pl.Expr:
458+
x = pl.col("__x__")
459+
lo = x.quantile(self._lower).over("datetime")
460+
hi = x.quantile(self._upper).over("datetime")
461+
return (
462+
pl.when(x < lo)
463+
.then(lo)
464+
.when(x > hi)
465+
.then(hi)
466+
.otherwise(x)
467+
)

investing_algorithm_framework/services/pipeline/vector_pipeline_engine.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,21 @@ class VectorPipelineEngine:
4242
Universe filtering is applied as in event mode — symbols failing
4343
the universe mask at a given bar are dropped from that bar's
4444
output, and the universe column itself is not exposed.
45+
46+
Args:
47+
lazy: If ``True``, the result frame is assembled via
48+
:class:`polars.LazyFrame` and collected with the streaming
49+
engine at the end. Useful for memory-bound runs over large
50+
universes. Built-in factors are still computed eagerly per
51+
symbol (each ``Factor.compute_panel`` returns a ``Series``);
52+
only the ``with_columns`` / ``filter`` / ``sort`` pipeline
53+
on the wide result frame is deferred. Default ``False``
54+
preserves Phase 2a behaviour exactly.
4555
"""
4656

57+
def __init__(self, lazy: bool = False) -> None:
58+
self._lazy = bool(lazy)
59+
4760
# ------------------------------------------------------------------ #
4861
# Panel construction (delegates to event engine for parity)
4962
# ------------------------------------------------------------------ #
@@ -130,13 +143,41 @@ def evaluate_panel(
130143
if universe is not None:
131144
mask = universe.evaluate(panel)
132145
result = result.with_columns(mask.alias("__universe__"))
146+
if self._lazy:
147+
# Stream the filter + drop + sort through Polars'
148+
# streaming engine so memory usage stays bounded
149+
# on large universes.
150+
return self._collect_lazy(
151+
result.lazy()
152+
.filter(pl.col("__universe__"))
153+
.drop("__universe__")
154+
.sort(["datetime", "symbol"])
155+
)
133156
result = result.filter(pl.col("__universe__"))
134157
result = result.drop("__universe__")
135158
finally:
136159
_EVAL_CACHE.reset(token)
137160

161+
if self._lazy:
162+
return self._collect_lazy(
163+
result.lazy().sort(["datetime", "symbol"])
164+
)
138165
return result.sort(["datetime", "symbol"])
139166

167+
@staticmethod
168+
def _collect_lazy(lazy: pl.LazyFrame) -> pl.DataFrame:
169+
"""Collect a :class:`polars.LazyFrame` with the streaming
170+
engine when available; fall back to a default collect on
171+
older Polars versions that don't accept
172+
``engine="streaming"``.
173+
"""
174+
try:
175+
return lazy.collect(engine="streaming")
176+
except TypeError:
177+
return lazy.collect()
178+
except Exception: # pragma: no cover - polars version drift
179+
return lazy.collect()
180+
140181
# ------------------------------------------------------------------ #
141182
# Slicing helpers
142183
# ------------------------------------------------------------------ #

0 commit comments

Comments
 (0)