11from datetime import datetime , timedelta , timezone
22from time import sleep
3- from typing import List , Set , Dict
3+ from typing import List , Set , Dict , FrozenSet , Tuple , Optional , Type
44from logging import getLogger
55
66import polars as pl
@@ -85,6 +85,18 @@ def __init__(
8585 self .history = {}
8686 self ._pipeline_engine = PipelineEngine ()
8787
88+ # Per (strategy_id, pipeline_cls) cache of the most recent
89+ # universe-refresh: (refresh_at, frozen surviving-symbol set).
90+ # Populated when a pipeline declares
91+ # ``refresh_universe_every`` so the engine can skip re-running
92+ # the universe filter every bar.
93+ self ._pipeline_universe_cache : Dict [
94+ Tuple [str , Type ], Tuple [datetime , FrozenSet [str ]]
95+ ] = {}
96+ # One-shot flag: live-mode envelope validation runs once per
97+ # process. Reset by ``cleanup`` so a new run re-validates.
98+ self ._pipelines_live_validated : bool = False
99+
88100 @staticmethod
89101 def _get_data_sources_for_iteration (
90102 strategy_data_sources
@@ -332,6 +344,11 @@ def cleanup(self):
332344 None
333345 """
334346 self ._portfolio_snapshot_service .save_all (self ._snapshots )
347+ # Reset per-run pipeline state so a subsequent run re-runs
348+ # live envelope validation and starts with a fresh universe
349+ # cache.
350+ self ._pipeline_universe_cache .clear ()
351+ self ._pipelines_live_validated = False
335352
336353 def start (
337354 self ,
@@ -445,6 +462,121 @@ def start(
445462
446463 self .cleanup ()
447464
465+ # ------------------------------------------------------------------ #
466+ # Pipeline live-mode helpers (#503 phase 3b/3c/3d)
467+ # ------------------------------------------------------------------ #
468+ # Envelope: v1 of live pipelines supports daily timeframes only and
469+ # caps universes at 50 symbols per pipeline. Any sub-daily timeframe
470+ # on a strategy that declares pipelines, or a strategy whose total
471+ # OHLCV symbol set exceeds the cap, raises at first iteration when
472+ # running outside backtest mode. See #503.
473+ _LIVE_MAX_PIPELINE_SYMBOLS : int = 50
474+ _LIVE_MIN_TIMEFRAME_MINUTES : int = 24 * 60 # daily
475+
476+ def _validate_live_envelope (
477+ self , strategies : List [TradingStrategy ]
478+ ) -> None :
479+ """Validate the v1 live-pipeline envelope (max 50 symbols /
480+ daily-or-coarser timeframes). Called once per run when env is
481+ not BACKTEST. Raises :class:`OperationalException` on
482+ violation."""
483+ for strategy in strategies or []:
484+ pipelines = getattr (strategy , "pipelines" , None )
485+ if not pipelines :
486+ continue
487+
488+ ohlcv_sources = [
489+ ds for ds in (strategy .data_sources or [])
490+ if DataType .OHLCV .equals (ds .data_type )
491+ ]
492+
493+ sub_daily = [
494+ ds for ds in ohlcv_sources
495+ if ds .time_frame is not None
496+ and ds .time_frame .amount_of_minutes
497+ < self ._LIVE_MIN_TIMEFRAME_MINUTES
498+ ]
499+ if sub_daily :
500+ desc = ", " .join (
501+ f"{ ds .symbol } @{ ds .time_frame .value } "
502+ for ds in sub_daily
503+ )
504+ raise OperationalException (
505+ f"Strategy '{ strategy .strategy_id } ' declares "
506+ f"pipelines but uses sub-daily OHLCV timeframes "
507+ f"in live mode: { desc } . v1 of the live pipeline "
508+ f"engine supports daily timeframes only — see "
509+ f"#503. Use a daily timeframe or run the strategy "
510+ f"in backtest mode."
511+ )
512+
513+ unique_symbols = {ds .symbol for ds in ohlcv_sources }
514+ unique_symbols .discard (None )
515+ if len (unique_symbols ) > self ._LIVE_MAX_PIPELINE_SYMBOLS :
516+ raise OperationalException (
517+ f"Strategy '{ strategy .strategy_id } ' declares "
518+ f"pipelines over { len (unique_symbols )} symbols, "
519+ f"which exceeds the v1 live cap of "
520+ f"{ self ._LIVE_MAX_PIPELINE_SYMBOLS } . Reduce the "
521+ f"universe or run the strategy in backtest mode "
522+ f"(see #503)."
523+ )
524+
525+ def _maybe_validate_live_envelope (
526+ self , strategies : List [TradingStrategy ], environment : str
527+ ) -> None :
528+ """One-shot envelope validation; no-op for BACKTEST mode."""
529+ if self ._pipelines_live_validated :
530+ return
531+ if Environment .BACKTEST .equals (environment ):
532+ self ._pipelines_live_validated = True
533+ return
534+ self ._validate_live_envelope (strategies )
535+ self ._pipelines_live_validated = True
536+
537+ def _filter_symbols_for_universe_cache (
538+ self ,
539+ strategy_id : str ,
540+ pipeline_cls : Type ,
541+ symbol_to_identifier : Dict [str , str ],
542+ as_of : datetime ,
543+ ) -> Optional [Dict [str , str ]]:
544+ """If ``pipeline_cls`` declares ``refresh_universe_every`` and
545+ we have a cached surviving-symbol set still inside the cadence,
546+ return a restricted ``symbol_to_identifier`` mapping. Returning
547+ ``None`` means "no cache hit — run a full universe evaluation".
548+ """
549+ cadence : Optional [timedelta ] = getattr (
550+ pipeline_cls , "refresh_universe_every" , None
551+ )
552+ if cadence is None or cadence <= timedelta (0 ):
553+ return None
554+
555+ cache_key = (strategy_id , pipeline_cls )
556+ cached = self ._pipeline_universe_cache .get (cache_key )
557+ if cached is None :
558+ return None
559+
560+ last_refresh , symbols = cached
561+ # Normalise tz so naive backtest datetimes and aware live
562+ # datetimes compare cleanly.
563+ if last_refresh .tzinfo is None and as_of .tzinfo is not None :
564+ cmp_as_of = as_of .replace (tzinfo = None )
565+ elif last_refresh .tzinfo is not None and as_of .tzinfo is None :
566+ cmp_as_of = as_of .replace (tzinfo = last_refresh .tzinfo )
567+ else :
568+ cmp_as_of = as_of
569+ if cmp_as_of - last_refresh >= cadence :
570+ return None # cadence elapsed → refresh
571+
572+ # Cache hit: restrict the symbol set, skipping the universe
573+ # filter entirely on this iteration.
574+ return {
575+ sym : ident
576+ for sym , ident in symbol_to_identifier .items ()
577+ if sym in symbols
578+ }
579+
448580 def _run_pipelines (
449581 self ,
450582 strategy : TradingStrategy ,
@@ -457,13 +589,27 @@ def _run_pipelines(
457589 class name.
458590
459591 Strategies without ``pipelines`` skip this entirely (zero cost).
460- Per Phase 1 of the Pipeline API (#501); see
461- ``docs/design/pipeline-api.md``.
592+
593+ Live-mode hardening (#503):
594+
595+ * The v1 envelope (max 50 symbols, daily-or-coarser timeframes)
596+ is validated once per run.
597+ * Pipelines that declare ``refresh_universe_every`` reuse the
598+ last surviving symbol set within the cadence — saving the
599+ cost of evaluating the universe filter every bar.
600+ * In non-backtest environments, a single failing pipeline is
601+ logged and skipped (the iteration continues with an empty
602+ output) instead of killing the whole event loop. Backtests
603+ keep raising so failures stay deterministic.
462604 """
463605 pipelines = getattr (strategy , "pipelines" , None )
464606 if not pipelines :
465607 return
466608
609+ config = self ._configuration_service .get_config ()
610+ environment = config [ENVIRONMENT ]
611+ is_backtest = Environment .BACKTEST .equals (environment )
612+
467613 # Map symbol -> data-source identifier from the strategy's
468614 # OHLCV data sources. If a symbol appears on multiple data
469615 # sources (e.g. multiple timeframes) the first OHLCV match wins.
@@ -484,20 +630,62 @@ class name.
484630 return
485631
486632 for pipeline_cls in pipelines :
633+ # 3c: universe-refresh cache. If the pipeline declares a
634+ # refresh cadence and we're inside it, restrict the panel
635+ # input to the cached symbols.
636+ cached_mapping = self ._filter_symbols_for_universe_cache (
637+ strategy_id = strategy .strategy_id ,
638+ pipeline_cls = pipeline_cls ,
639+ symbol_to_identifier = symbol_to_identifier ,
640+ as_of = as_of ,
641+ )
642+ mapping = (
643+ cached_mapping
644+ if cached_mapping is not None
645+ else symbol_to_identifier
646+ )
647+
487648 try :
488649 output = self ._pipeline_engine .evaluate (
489650 pipeline_cls = pipeline_cls ,
490651 data_object = data_object ,
491- symbol_to_identifier = symbol_to_identifier ,
652+ symbol_to_identifier = mapping ,
492653 as_of = as_of ,
493654 )
494- except Exception : # pragma: no cover - logged + re-raised
655+ except Exception :
656+ # 3d: live-mode resilience. In live trading a single
657+ # pipeline failure must not kill the iteration —
658+ # surface an empty frame and log so the rest of the
659+ # strategies can still run. Backtests re-raise so
660+ # failures stay deterministic.
495661 logger .exception (
496662 "Pipeline %s failed during evaluation at %s" ,
497663 pipeline_cls .__name__ ,
498664 as_of ,
499665 )
500- raise
666+ if is_backtest :
667+ raise
668+ output = self ._pipeline_engine ._empty_output (
669+ pipeline_cls
670+ )
671+
672+ # 3c: refresh the universe cache when we just ran a full
673+ # evaluation (cached_mapping was None) and the pipeline
674+ # declares a cadence.
675+ cadence = getattr (
676+ pipeline_cls , "refresh_universe_every" , None
677+ )
678+ if (
679+ cadence is not None
680+ and cadence > timedelta (0 )
681+ and cached_mapping is None
682+ and "symbol" in output .columns
683+ ):
684+ surviving = frozenset (output ["symbol" ].to_list ())
685+ self ._pipeline_universe_cache [
686+ (strategy .strategy_id , pipeline_cls )
687+ ] = (as_of , surviving )
688+
501689 data [pipeline_cls .__name__ ] = output
502690
503691 def _run_iteration (
@@ -527,6 +715,11 @@ def _run_iteration(
527715 environment = config [ENVIRONMENT ]
528716 current_datetime = config [INDEX_DATETIME ]
529717
718+ # Validate the live-pipeline envelope (max 50 symbols /
719+ # daily-or-coarser timeframes) once per run. No-op for
720+ # backtests. See #503 phase 3b.
721+ self ._maybe_validate_live_envelope (strategies , environment )
722+
530723 # Step 1: Collect all data for the strategies and for the
531724 # pending orders
532725 open_orders = self ._order_service .get_all (
0 commit comments