Skip to content

Commit 1f15af8

Browse files
authored
Fix!: In Airflow only generate cadence DAGs for snapshots promoted to prod (#1514)
1 parent 46b3e7e commit 1f15af8

2 files changed

Lines changed: 16 additions & 8 deletions

File tree

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo
1414
from sqlmesh.core.notification_target import NotificationTarget
1515
from sqlmesh.core.plan import PlanStatus
16-
from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotTableInfo
16+
from sqlmesh.core.snapshot import (
17+
Snapshot,
18+
SnapshotId,
19+
SnapshotIdLike,
20+
SnapshotTableInfo,
21+
)
1722
from sqlmesh.schedulers.airflow import common, util
1823
from sqlmesh.schedulers.airflow.operators import targets
1924
from sqlmesh.schedulers.airflow.operators.hwm_sensor import HighWaterMarkSensor
@@ -61,12 +66,13 @@ def __init__(
6166
self._ddl_engine_operator_args = ddl_engine_operator_args or {}
6267
self._snapshots = snapshots
6368

64-
def generate_cadence_dags(self) -> t.List[DAG]:
65-
return [
66-
self._create_cadence_dag_for_snapshot(s)
67-
for s in self._snapshots.values()
68-
if s.unpaused_ts and not s.is_symbolic and not s.is_seed
69-
]
69+
def generate_cadence_dags(self, snapshots: t.Iterable[SnapshotIdLike]) -> t.List[DAG]:
70+
dags = []
71+
for s in snapshots:
72+
snapshot = self._snapshots[s.snapshot_id]
73+
if snapshot.unpaused_ts and not snapshot.is_symbolic and not snapshot.is_seed:
74+
dags.append(self._create_cadence_dag_for_snapshot(snapshot))
75+
return dags
7076

7177
def generate_plan_application_dag(self, spec: common.PlanDagSpec) -> DAG:
7278
return self._create_plan_application_dag(spec)

sqlmesh/schedulers/airflow/integration.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from airflow.utils.session import provide_session
1111
from sqlalchemy.orm import Session
1212

13+
from sqlmesh.core import constants as c
1314
from sqlmesh.engines import commands
1415
from sqlmesh.schedulers.airflow import common, util
1516
from sqlmesh.schedulers.airflow.dag_generator import SnapshotDagGenerator
@@ -107,7 +108,8 @@ def dags(self) -> t.List[DAG]:
107108
stored_snapshots,
108109
)
109110

110-
cadence_dags = dag_generator.generate_cadence_dags()
111+
prod_env = state_sync.get_environment(c.PROD)
112+
cadence_dags = dag_generator.generate_cadence_dags(prod_env.snapshots) if prod_env else []
111113

112114
plan_application_dags = [
113115
dag_generator.generate_plan_application_dag(s) for s in _get_plan_dag_specs()

0 commit comments

Comments
 (0)