Skip to content

Commit 2411649

Browse files
authored
Fix: restatements for incremental by partition (#3656)
1 parent 017da33 commit 2411649

3 files changed

Lines changed: 53 additions & 7 deletions

File tree

sqlmesh/core/snapshot/evaluator.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,12 +1414,15 @@ def insert(
14141414
is_first_insert: bool,
14151415
**kwargs: t.Any,
14161416
) -> None:
1417-
self.adapter.insert_overwrite_by_partition(
1418-
table_name,
1419-
query_or_df,
1420-
partitioned_by=model.partitioned_by,
1421-
columns_to_types=model.columns_to_types,
1422-
)
1417+
if is_first_insert:
1418+
self._replace_query_for_model(model, table_name, query_or_df)
1419+
else:
1420+
self.adapter.insert_overwrite_by_partition(
1421+
table_name,
1422+
query_or_df,
1423+
partitioned_by=model.partitioned_by,
1424+
columns_to_types=model.columns_to_types,
1425+
)
14231426

14241427

14251428
class IncrementalByTimeRangeStrategy(MaterializableStrategy):

tests/core/test_integration.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1622,9 +1622,10 @@ def test_incremental_by_partition(init_and_plan_context: t.Callable):
16221622
f"""
16231623
MODEL (
16241624
name {model_name},
1625-
kind INCREMENTAL_BY_PARTITION,
1625+
kind INCREMENTAL_BY_PARTITION (disable_restatement false),
16261626
partitioned_by [key],
16271627
allow_partials true,
1628+
start '2023-01-07',
16281629
);
16291630
16301631
SELECT key, value FROM {source_name};
@@ -1665,6 +1666,16 @@ def test_incremental_by_partition(init_and_plan_context: t.Callable):
16651666
("key_a", 2),
16661667
]
16671668

1669+
# model should fully refresh on restatement
1670+
context.engine_adapter.replace_query(
1671+
source_name,
1672+
d.parse_one("SELECT 'key_c' AS key, 3 AS value"),
1673+
)
1674+
context.plan(auto_apply=True, no_prompts=True, restate_models=[model_name])
1675+
assert context.engine_adapter.fetchall(f"SELECT * FROM {model_name}") == [
1676+
("key_c", 3),
1677+
]
1678+
16681679

16691680
@time_machine.travel("2023-01-08 15:00:00 UTC")
16701681
def test_custom_materialization(init_and_plan_context: t.Callable):

tests/core/test_snapshot_evaluator.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3012,6 +3012,8 @@ def test_evaluate_incremental_by_partition(mocker: MockerFixture, make_snapshot,
30123012
snapshot = make_snapshot(model)
30133013
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
30143014

3015+
adapter_mock.columns.return_value = model.columns_to_types
3016+
30153017
evaluator = SnapshotEvaluator(adapter_mock)
30163018
evaluator.evaluate(
30173019
snapshot,
@@ -3021,6 +3023,36 @@ def test_evaluate_incremental_by_partition(mocker: MockerFixture, make_snapshot,
30213023
snapshots={},
30223024
)
30233025

3026+
# uses `replace_query` on first model execution
3027+
adapter_mock.replace_query.assert_called_once_with(
3028+
snapshot.table_name(),
3029+
model.render_query(),
3030+
partitioned_by=[
3031+
exp.to_column("ds", quoted=True),
3032+
exp.to_column("b", quoted=True),
3033+
],
3034+
columns_to_types=model.columns_to_types,
3035+
clustered_by=[],
3036+
table_properties={},
3037+
column_descriptions={},
3038+
partition_interval_unit=None,
3039+
storage_format=None,
3040+
table_description=None,
3041+
table_format=None,
3042+
)
3043+
3044+
adapter_mock.reset_mock()
3045+
snapshot.intervals = [(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))]
3046+
3047+
evaluator.evaluate(
3048+
snapshot,
3049+
start="2020-01-02",
3050+
end="2020-01-03",
3051+
execution_time="2020-01-03",
3052+
snapshots={},
3053+
)
3054+
3055+
# uses `insert_overwrite_by_partition` on all subsequent model executions
30243056
adapter_mock.insert_overwrite_by_partition.assert_called_once_with(
30253057
snapshot.table_name(),
30263058
model.render_query(),

0 commit comments

Comments
 (0)