@@ -107,12 +107,17 @@ def __init__(
107107 self .categorizer_config = categorizer_config or CategorizerConfig ()
108108 self .auto_categorization_enabled = auto_categorization_enabled
109109 self .include_unmodified = include_unmodified
110+
111+ self .__snapshot_mapping : t .Optional [t .Dict [str , Snapshot ]] = None
112+ self .__dag : t .Optional [DAG [str ]] = None
113+
114+ self ._start = start
115+ if not self ._start and is_dev and (forward_only or self ._has_paused_forward_only ()):
116+ self ._start = default_start or yesterday_ds ()
117+
118+ self ._end = end if end or not is_dev else (default_end or now ())
110119 self ._restate_models = set (restate_models or [])
111120 self ._effective_from : t .Optional [TimeLike ] = None
112- self ._start = (
113- start if start or not (is_dev and forward_only ) else (default_start or yesterday_ds ())
114- )
115- self ._end = end if end or not is_dev else (default_end or now ())
116121 self ._execution_time = execution_time or now ()
117122 self ._apply = apply
118123 self .__missing_intervals : t .Optional [t .Dict [t .Tuple [str , str ], Intervals ]] = None
@@ -215,7 +220,15 @@ def snapshots(self) -> t.List[Snapshot]:
215220 @property
216221 def _snapshot_mapping (self ) -> t .Dict [str , Snapshot ]:
217222 """Gets a mapping of snapshot name to snapshot."""
218- return self .__snapshot_mapping
223+ return self .__snapshot_mapping or self .context_diff .snapshots
224+
225+ @property
226+ def _dag (self ) -> DAG [str ]:
227+ if self .__dag is None :
228+ self .__dag = DAG ()
229+ for name , snapshot in self ._snapshot_mapping .items ():
230+ self .__dag .add (name , snapshot .node .depends_on )
231+ return self .__dag
219232
220233 @property
221234 def new_snapshots (self ) -> t .List [Snapshot ]:
@@ -649,7 +662,7 @@ def _ensure_new_env_with_changes(self) -> None:
649662 def _refresh_dag_and_ignored_snapshots (self ) -> None :
650663 self ._restatements = {}
651664 (
652- self ._dag ,
665+ self .__dag ,
653666 self ._snapshots ,
654667 self ._new_snapshots ,
655668 self .__snapshot_mapping ,
@@ -715,6 +728,12 @@ def _build_snapshots_and_dag(
715728 ignored_snapshot_names ,
716729 )
717730
731+ def _has_paused_forward_only (self ) -> bool :
732+ for name , snapshot in self ._snapshot_mapping .items ():
733+ if snapshot .is_paused_forward_only or self ._is_forward_only_model (name ):
734+ return True
735+ return False
736+
718737 def _is_forward_only_model (self , model_name : str ) -> bool :
719738 def _is_forward_only_expected (snapshot : Snapshot ) -> bool :
720739 # Returns True if the snapshot is not categorized yet but is expected
0 commit comments