Skip to content

Commit 9882e69

Browse files
authored
Fix: Evaluate snapshots after pomoting the environment if the plan contains paused forward-only snapshots (#912)
1 parent d3fbf3a commit 9882e69

File tree

6 files changed

+42
-4
lines changed

6 files changed

+42
-4
lines changed

sqlmesh/core/plan/definition.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ def _ensure_no_forward_only_revert(self) -> None:
541541
candidate.snapshot_id not in self.context_diff.new_snapshots
542542
and promoted.is_forward_only
543543
and not candidate.is_forward_only
544+
and not candidate.is_indirect_non_breaking
544545
and (
545546
promoted.version == candidate.version
546547
or candidate.data_version in promoted.previous_versions

sqlmesh/core/plan/evaluator.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
from sqlmesh.core.console import Console, get_console
2121
from sqlmesh.core.plan.definition import Plan
2222
from sqlmesh.core.scheduler import Scheduler
23-
from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotInfoLike
23+
from sqlmesh.core.snapshot import (
24+
SnapshotEvaluator,
25+
SnapshotInfoLike,
26+
has_paused_forward_only,
27+
)
2428
from sqlmesh.core.state_sync import StateSync
2529
from sqlmesh.core.user import User
2630
from sqlmesh.schedulers.airflow import common as airflow_common
@@ -61,7 +65,7 @@ def __init__(
6165
def evaluate(self, plan: Plan) -> None:
6266
tasks = (
6367
[self._push, self._restate, self._backfill, self._promote]
64-
if not plan.forward_only or plan.is_dev
68+
if not has_paused_forward_only(plan.snapshots, plan.snapshots) or plan.is_dev
6569
else [self._push, self._restate, self._promote, self._backfill]
6670
)
6771

sqlmesh/core/snapshot/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
SnapshotNameVersionLike,
1515
SnapshotTableInfo,
1616
fingerprint_from_model,
17+
has_paused_forward_only,
1718
merge_intervals,
1819
table_name,
1920
to_table_mapping,

sqlmesh/core/snapshot/definition.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,3 +1070,16 @@ def to_table_mapping(snapshots: t.Iterable[Snapshot], is_dev: bool) -> t.Dict[st
10701070
for snapshot in snapshots
10711071
if snapshot.version and not snapshot.is_symbolic
10721072
}
1073+
1074+
1075+
def has_paused_forward_only(
1076+
targets: t.Iterable[SnapshotIdLike],
1077+
snapshots: t.Union[t.List[Snapshot], t.Dict[SnapshotId, Snapshot]],
1078+
) -> bool:
1079+
if not isinstance(snapshots, dict):
1080+
snapshots = {s.snapshot_id: s for s in snapshots}
1081+
for target in targets:
1082+
target_snapshot = snapshots[target.snapshot_id]
1083+
if target_snapshot.is_paused and target_snapshot.is_forward_only:
1084+
return True
1085+
return False

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
from sqlmesh.core._typing import NotificationTarget
1313
from sqlmesh.core.environment import Environment
1414
from sqlmesh.core.plan import PlanStatus
15-
from sqlmesh.core.snapshot import Snapshot, SnapshotId, SnapshotTableInfo
15+
from sqlmesh.core.snapshot import (
16+
Snapshot,
17+
SnapshotId,
18+
SnapshotTableInfo,
19+
has_paused_forward_only,
20+
)
1621
from sqlmesh.schedulers.airflow import common, util
1722
from sqlmesh.schedulers.airflow.operators import targets
1823
from sqlmesh.schedulers.airflow.operators.hwm_sensor import HighWaterMarkSensor
@@ -160,7 +165,10 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
160165
) = self._create_promotion_demotion_tasks(plan_dag_spec)
161166

162167
start_task >> create_start_task
163-
if not plan_dag_spec.forward_only or plan_dag_spec.is_dev:
168+
if (
169+
not has_paused_forward_only(plan_dag_spec.promoted_snapshots, all_snapshots)
170+
or plan_dag_spec.is_dev
171+
):
164172
create_end_task >> backfill_start_task
165173
backfill_end_task >> promote_start_task
166174
latest_end_task = promote_end_task

tests/core/test_snapshot.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
SnapshotFingerprint,
2525
categorize_change,
2626
fingerprint_from_model,
27+
has_paused_forward_only,
2728
)
2829
from sqlmesh.utils.date import to_datetime, to_timestamp
2930
from sqlmesh.utils.errors import SQLMeshError
@@ -915,3 +916,13 @@ def test_physical_schema(snapshot: Snapshot):
915916
assert new_snapshot.physical_schema == "custom_schema"
916917
assert new_snapshot.data_version.physical_schema == "custom_schema"
917918
assert new_snapshot.table_info.physical_schema == "custom_schema"
919+
920+
921+
def test_has_paused_forward_only(snapshot: Snapshot):
922+
assert not has_paused_forward_only([snapshot], [snapshot])
923+
924+
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
925+
assert has_paused_forward_only([snapshot], [snapshot])
926+
927+
snapshot.set_unpaused_ts("2023-01-01")
928+
assert not has_paused_forward_only([snapshot], [snapshot])

0 commit comments

Comments
 (0)