Skip to content

Commit a91ddd5

Browse files
authored
fix: respect disable_restatement remove intervals across env (#3838)
1 parent ebcb67d commit a91ddd5

6 files changed

Lines changed: 136 additions & 4 deletions

File tree

sqlmesh/core/plan/definition.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,11 @@ def to_evaluatable(self) -> EvaluatablePlan:
255255
models_to_backfill=self.models_to_backfill,
256256
interval_end_per_model=self.interval_end_per_model,
257257
execution_time=self.execution_time,
258+
disabled_restatement_models={
259+
s.name
260+
for s in self.snapshots.values()
261+
if s.is_model and s.model.disable_restatement
262+
},
258263
)
259264

260265
@cached_property
@@ -285,6 +290,7 @@ class EvaluatablePlan(PydanticModel):
285290
models_to_backfill: t.Optional[t.Set[str]] = None
286291
interval_end_per_model: t.Optional[t.Dict[str, int]] = None
287292
execution_time: t.Optional[TimeLike] = None
293+
disabled_restatement_models: t.Set[str]
288294

289295
def is_selected_for_backfill(self, model_fqn: str) -> bool:
290296
return self.models_to_backfill is None or model_fqn in self.models_to_backfill

sqlmesh/core/plan/evaluator.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,9 @@ def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapsho
409409
#
410410
# Without this rule, its possible that promoting a dev table to prod will introduce old data to prod
411411
snapshot_intervals_to_restate.update(
412-
self._restatement_intervals_across_all_environments(plan.restatements)
412+
self._restatement_intervals_across_all_environments(
413+
plan.restatements, plan.disabled_restatement_models
414+
)
413415
)
414416

415417
self.state_sync.remove_intervals(
@@ -418,12 +420,12 @@ def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapsho
418420
)
419421

420422
def _restatement_intervals_across_all_environments(
421-
self, prod_restatements: t.Dict[str, Interval]
423+
self, prod_restatements: t.Dict[str, Interval], disable_restatement_models: t.Set[str]
422424
) -> t.Set[t.Tuple[SnapshotTableInfo, Interval]]:
423425
"""
424426
Given a map of snapshot names + intervals to restate in prod:
425427
- Look up matching snapshots across all environments (match based on name - regardless of version)
426-
- For each match, also match downstream snapshots
428+
- For each match, also match downstream snapshots while filtering out models that have restatement disabled
427429
- Return all matches mapped to the intervals of the prod snapshot being restated
428430
429431
The goal here is to produce a list of intervals to invalidate across all environments so that a cadence
@@ -444,7 +446,11 @@ def _restatement_intervals_across_all_environments(
444446
for restatement, intervals in prod_restatements.items():
445447
if restatement not in keyed_snapshots:
446448
continue
447-
affected_snapshot_names = [restatement] + env_dag.downstream(restatement)
449+
affected_snapshot_names = [
450+
x
451+
for x in ([restatement] + env_dag.downstream(restatement))
452+
if x not in disable_restatement_models
453+
]
448454
snapshots_to_restate.update(
449455
{(keyed_snapshots[a], intervals) for a in affected_snapshot_names}
450456
)

tests/core/test_integration.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2172,6 +2172,118 @@ def _dates_in_table(table_name: str) -> t.List[str]:
21722172
], f"Table {tbl} wasnt cleared"
21732173

21742174

2175+
def test_restatement_plan_respects_disable_restatements(tmp_path: Path):
2176+
model_a = """
2177+
MODEL (
2178+
name test.a,
2179+
kind INCREMENTAL_BY_TIME_RANGE (
2180+
time_column "ts"
2181+
),
2182+
start '2024-01-01',
2183+
cron '@daily'
2184+
);
2185+
2186+
select account_id, ts from test.external_table;
2187+
"""
2188+
2189+
model_b = """
2190+
MODEL (
2191+
name test.b,
2192+
kind INCREMENTAL_BY_TIME_RANGE (
2193+
time_column "ts",
2194+
disable_restatement true,
2195+
),
2196+
start '2024-01-01',
2197+
cron '@daily'
2198+
);
2199+
2200+
select account_id, ts from test.a;
2201+
"""
2202+
2203+
models_dir = tmp_path / "models"
2204+
models_dir.mkdir()
2205+
2206+
for path, defn in {"a.sql": model_a, "b.sql": model_b}.items():
2207+
with open(models_dir / path, "w") as f:
2208+
f.write(defn)
2209+
2210+
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
2211+
ctx = Context(paths=[tmp_path], config=config)
2212+
2213+
engine_adapter = ctx.engine_adapter
2214+
engine_adapter.create_schema("test")
2215+
2216+
# source data
2217+
df = pd.DataFrame(
2218+
{
2219+
"account_id": [1001, 1002, 1003, 1004],
2220+
"ts": [
2221+
"2024-01-01 00:30:00",
2222+
"2024-01-01 01:30:00",
2223+
"2024-01-01 02:30:00",
2224+
"2024-01-02 00:30:00",
2225+
],
2226+
}
2227+
)
2228+
columns_to_types = {
2229+
"account_id": exp.DataType.build("int"),
2230+
"ts": exp.DataType.build("timestamp"),
2231+
}
2232+
external_table = exp.table_(table="external_table", db="test", quoted=True)
2233+
engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types)
2234+
engine_adapter.insert_append(
2235+
table_name=external_table, query_or_df=df, columns_to_types=columns_to_types
2236+
)
2237+
2238+
# plan + apply
2239+
ctx.plan(auto_apply=True, no_prompts=True)
2240+
2241+
def _dates_in_table(table_name: str) -> t.List[str]:
2242+
return [
2243+
str(r[0]) for r in engine_adapter.fetchall(f"select ts from {table_name} order by ts")
2244+
]
2245+
2246+
def get_snapshot_intervals(snapshot_id):
2247+
return list(ctx.state_sync.get_snapshots([snapshot_id]).values())[0].intervals
2248+
2249+
# verify initial state
2250+
for tbl in ["test.a", "test.b"]:
2251+
assert _dates_in_table(tbl) == [
2252+
"2024-01-01 00:30:00",
2253+
"2024-01-01 01:30:00",
2254+
"2024-01-01 02:30:00",
2255+
"2024-01-02 00:30:00",
2256+
]
2257+
2258+
# restate A and expect b to be ignored
2259+
starting_b_intervals = get_snapshot_intervals(ctx.snapshots['"memory"."test"."b"'].snapshot_id)
2260+
engine_adapter.execute("delete from test.external_table where ts = '2024-01-01 01:30:00'")
2261+
ctx.plan(
2262+
restate_models=["test.a"],
2263+
start="2024-01-01",
2264+
end="2024-01-02",
2265+
auto_apply=True,
2266+
no_prompts=True,
2267+
)
2268+
2269+
# verify A was changed and not b
2270+
assert _dates_in_table("test.a") == [
2271+
"2024-01-01 00:30:00",
2272+
"2024-01-01 02:30:00",
2273+
"2024-01-02 00:30:00",
2274+
]
2275+
assert _dates_in_table("test.b") == [
2276+
"2024-01-01 00:30:00",
2277+
"2024-01-01 01:30:00",
2278+
"2024-01-01 02:30:00",
2279+
"2024-01-02 00:30:00",
2280+
]
2281+
2282+
# Verify B intervals were not touched
2283+
b_intervals = get_snapshot_intervals(ctx.snapshots['"memory"."test"."b"'].snapshot_id)
2284+
assert starting_b_intervals == b_intervals
2285+
2286+
21752287
def test_restatement_plan_clears_correct_intervals_across_environments(tmp_path: Path):
21762288
model1 = """
21772289
MODEL (

tests/schedulers/airflow/test_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
7777
removed_snapshots=[],
7878
requires_backfill=True,
7979
models_to_backfill={'"test_model"'},
80+
disabled_restatement_models=set(),
8081
)
8182

8283
client = AirflowClient(airflow_url=common.AIRFLOW_LOCAL_URL, session=requests.Session())
@@ -196,6 +197,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
196197
'"test_model"': [to_timestamp("2024-01-01"), to_timestamp("2024-01-02")]
197198
},
198199
"requires_backfill": True,
200+
"disabled_restatement_models": [],
199201
},
200202
"notification_targets": [],
201203
"backfill_concurrent_tasks": 1,

tests/schedulers/airflow/test_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ def _create_evaluatable_plan(
115115
indirectly_modified_snapshots={},
116116
removed_snapshots=[],
117117
requires_backfill=True,
118+
disabled_restatement_models=set(),
118119
)
119120

120121

tests/schedulers/airflow/test_plan.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def test_create_plan_dag_spec(
125125
interval_end_per_model=None,
126126
allow_destructive_models=set(),
127127
requires_backfill=True,
128+
disabled_restatement_models=set(),
128129
)
129130

130131
plan_request = common.PlanApplicationRequest(
@@ -269,6 +270,7 @@ def test_restatement(
269270
interval_end_per_model=None,
270271
allow_destructive_models=set(),
271272
requires_backfill=True,
273+
disabled_restatement_models=set(),
272274
)
273275

274276
plan_request = common.PlanApplicationRequest(
@@ -390,6 +392,7 @@ def test_select_models_for_backfill(mocker: MockerFixture, random_name, make_sna
390392
interval_end_per_model=None,
391393
allow_destructive_models=set(),
392394
requires_backfill=True,
395+
disabled_restatement_models=set(),
393396
)
394397

395398
plan_request = common.PlanApplicationRequest(
@@ -475,6 +478,7 @@ def test_create_plan_dag_spec_duplicated_snapshot(
475478
interval_end_per_model=None,
476479
allow_destructive_models=set(),
477480
requires_backfill=True,
481+
disabled_restatement_models=set(),
478482
)
479483

480484
plan_request = common.PlanApplicationRequest(
@@ -537,6 +541,7 @@ def test_create_plan_dag_spec_unbounded_end(
537541
interval_end_per_model=None,
538542
allow_destructive_models=set(),
539543
requires_backfill=True,
544+
disabled_restatement_models=set(),
540545
)
541546

542547
plan_request = common.PlanApplicationRequest(

0 commit comments

Comments
 (0)