Skip to content

Commit 017da33

Browse files
authored
Fix: Defer creation of the deploable table for a new forward-only model (#3657)
1 parent 90cd883 commit 017da33

5 files changed

Lines changed: 232 additions & 14 deletions

File tree

sqlmesh/core/snapshot/evaluator.py

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,8 @@ def _create_snapshot(
762762
and adapter.SUPPORTS_CLONING
763763
# managed models cannot have their schema mutated because theyre based on queries, so clone + alter wont work
764764
and not snapshot.is_managed
765+
# If the deployable table is missing we can't clone it
766+
and True not in deployability_flags
765767
):
766768
target_table_name = snapshot.table_name(is_deployable=False)
767769
tmp_table_name = f"{target_table_name}__schema_migration_source"
@@ -797,6 +799,19 @@ def _create_snapshot(
797799
else:
798800
dry_run = len(deployability_flags) == 1
799801
for is_table_deployable in deployability_flags:
802+
if (
803+
is_table_deployable
804+
and snapshot.model.forward_only
805+
and not is_snapshot_representative
806+
):
807+
logger.info(
808+
"Skipping creation of the deployable table '%s' for the forward-only model %s. "
809+
"The table will be created when the snapshot is deployed to production",
810+
snapshot.table_name(is_deployable=is_table_deployable),
811+
snapshot.snapshot_id,
812+
)
813+
continue
814+
800815
evaluation_strategy.create(
801816
table_name=snapshot.table_name(is_deployable=is_table_deployable),
802817
model=snapshot.model,
@@ -829,15 +844,47 @@ def _migrate_snapshot(
829844
if not needs_migration:
830845
return
831846

832-
tmp_table_name = snapshot.table_name(is_deployable=False)
847+
evaluation_strategy = _evaluation_strategy(snapshot, adapter)
848+
833849
target_table_name = snapshot.table_name()
834-
_evaluation_strategy(snapshot, adapter).migrate(
835-
target_table_name=target_table_name,
836-
source_table_name=tmp_table_name,
837-
snapshot=snapshot,
838-
snapshots=parent_snapshots_by_name(snapshot, snapshots),
839-
allow_destructive_snapshots=allow_destructive_snapshots,
840-
)
850+
if adapter.table_exists(target_table_name):
851+
tmp_table_name = snapshot.table_name(is_deployable=False)
852+
logger.info(
853+
"Migrating table schema from '%s' to '%s'",
854+
tmp_table_name,
855+
target_table_name,
856+
)
857+
evaluation_strategy.migrate(
858+
target_table_name=target_table_name,
859+
source_table_name=tmp_table_name,
860+
snapshot=snapshot,
861+
snapshots=parent_snapshots_by_name(snapshot, snapshots),
862+
allow_destructive_snapshots=allow_destructive_snapshots,
863+
)
864+
else:
865+
logger.info(
866+
"Creating table '%s' for the snapshot of the forward-only model %s",
867+
target_table_name,
868+
snapshot.snapshot_id,
869+
)
870+
render_kwargs: t.Dict[str, t.Any] = dict(
871+
engine_adapter=adapter,
872+
snapshots=parent_snapshots_by_name(snapshot, snapshots),
873+
runtime_stage=RuntimeStage.CREATING,
874+
deployability_index=DeployabilityIndex.all_deployable(),
875+
)
876+
with adapter.transaction(), adapter.session(snapshot.model.session_properties):
877+
adapter.execute(snapshot.model.render_pre_statements(**render_kwargs))
878+
evaluation_strategy.create(
879+
table_name=target_table_name,
880+
model=snapshot.model,
881+
is_table_deployable=True,
882+
render_kwargs=render_kwargs,
883+
is_snapshot_deployable=True,
884+
is_snapshot_representative=True,
885+
dry_run=False,
886+
)
887+
adapter.execute(snapshot.model.render_post_statements(**render_kwargs))
841888

842889
def _promote_snapshot(
843890
self,

tests/conftest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
SnapshotChangeCategory,
3636
SnapshotDataVersion,
3737
SnapshotFingerprint,
38+
DeployabilityIndex,
3839
)
3940
from sqlmesh.utils import random_id
4041
from sqlmesh.utils.date import TimeLike, to_date
@@ -246,9 +247,12 @@ def push_plan(context: Context, plan: Plan) -> None:
246247
context.create_scheduler,
247248
context.default_catalog,
248249
)
249-
plan_evaluator._push(plan.to_evaluatable(), plan.snapshots)
250+
deployability_index = DeployabilityIndex.create(context.snapshots.values())
251+
plan_evaluator._push(plan.to_evaluatable(), plan.snapshots, deployability_index)
250252
promotion_result = plan_evaluator._promote(plan.to_evaluatable(), plan.snapshots)
251-
plan_evaluator._update_views(plan.to_evaluatable(), plan.snapshots, promotion_result)
253+
plan_evaluator._update_views(
254+
plan.to_evaluatable(), plan.snapshots, promotion_result, deployability_index
255+
)
252256

253257

254258
@pytest.fixture()

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,10 +1476,6 @@ def test_sushi(ctx: TestContext, tmp_path_factory: pytest.TempPathFactory):
14761476
"table": "Sushi customer data",
14771477
"column": {"customer_id": "customer_id uniquely identifies customers"},
14781478
},
1479-
"marketing": {
1480-
"table": "Sushi marketing data",
1481-
"column": {"customer_id": "customer_id uniquely identifies customers \\"},
1482-
},
14831479
"orders": {
14841480
"table": "Table of sushi orders.",
14851481
},

tests/core/test_integration.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,24 @@ def test_forward_only_parent_created_in_dev_child_created_in_prod(
788788
context.apply(plan)
789789

790790

791+
@time_machine.travel("2023-01-08 00:00:00 UTC")
792+
def test_new_forward_only_model(init_and_plan_context: t.Callable):
793+
context, _ = init_and_plan_context("examples/sushi")
794+
795+
context.plan("dev", skip_tests=True, no_prompts=True, auto_apply=True)
796+
797+
snapshot = context.get_snapshot("sushi.marketing")
798+
799+
# The deployable table should not exist yet
800+
assert not context.engine_adapter.table_exists(snapshot.table_name())
801+
assert context.engine_adapter.table_exists(snapshot.table_name(is_deployable=False))
802+
803+
context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True)
804+
805+
assert context.engine_adapter.table_exists(snapshot.table_name())
806+
assert context.engine_adapter.table_exists(snapshot.table_name(is_deployable=False))
807+
808+
791809
@time_machine.travel("2023-01-08 15:00:00 UTC")
792810
def test_plan_set_choice_is_reflected_in_missing_intervals(init_and_plan_context: t.Callable):
793811
context, plan = init_and_plan_context("examples/sushi")

tests/core/test_snapshot_evaluator.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,54 @@ def test_create_only_dev_table_exists(mocker: MockerFixture, adapter_mock, make_
765765
)
766766

767767

768+
def test_create_new_forward_only_model(mocker: MockerFixture, adapter_mock, make_snapshot):
769+
model = load_sql_based_model(
770+
parse( # type: ignore
771+
"""
772+
MODEL (
773+
name test_schema.test_model,
774+
kind INCREMENTAL_BY_TIME_RANGE (
775+
time_column ds,
776+
forward_only true,
777+
)
778+
);
779+
780+
SELECT a::int, '2024-01-01' as ds FROM tbl;
781+
"""
782+
),
783+
)
784+
785+
snapshot = make_snapshot(model)
786+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
787+
788+
adapter_mock.get_data_objects.return_value = []
789+
adapter_mock.table_exists.return_value = False
790+
evaluator = SnapshotEvaluator(adapter_mock)
791+
792+
evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
793+
adapter_mock.create_schema.assert_called_once_with(to_schema("sqlmesh__test_schema"))
794+
# Only non-deployable table should be created
795+
adapter_mock.create_table.assert_called_once_with(
796+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.temp_version_get_or_generate()}__temp",
797+
columns_to_types={"a": exp.DataType.build("int"), "ds": exp.DataType.build("varchar")},
798+
table_format=None,
799+
storage_format=None,
800+
partitioned_by=model.partitioned_by,
801+
partition_interval_unit=model.partition_interval_unit,
802+
clustered_by=[],
803+
table_properties={},
804+
table_description=None,
805+
column_descriptions=None,
806+
)
807+
adapter_mock.get_data_objects.assert_called_once_with(
808+
schema_("sqlmesh__test_schema"),
809+
{
810+
f"test_schema__test_model__{snapshot.version}",
811+
f"test_schema__test_model__{snapshot.temp_version_get_or_generate()}__temp",
812+
},
813+
)
814+
815+
768816
@pytest.mark.parametrize(
769817
"deployability_index, snapshot_category, deployability_flags",
770818
[
@@ -1122,6 +1170,7 @@ def columns(table_name):
11221170
}
11231171

11241172
adapter.columns = columns # type: ignore
1173+
adapter.table_exists = lambda _: True # type: ignore
11251174

11261175
evaluator = SnapshotEvaluator(adapter)
11271176

@@ -1148,6 +1197,42 @@ def columns(table_name):
11481197
)
11491198

11501199

1200+
def test_migrate_missing_table(mocker: MockerFixture, make_snapshot):
1201+
connection_mock = mocker.NonCallableMock()
1202+
cursor_mock = mocker.Mock()
1203+
connection_mock.cursor.return_value = cursor_mock
1204+
adapter = EngineAdapter(lambda: connection_mock, "")
1205+
1206+
adapter.table_exists = lambda _: False # type: ignore
1207+
1208+
evaluator = SnapshotEvaluator(adapter)
1209+
1210+
model = SqlModel(
1211+
name="test_schema.test_model",
1212+
kind=IncrementalByTimeRangeKind(
1213+
time_column="a", on_destructive_change=OnDestructiveChange.ALLOW
1214+
),
1215+
storage_format="parquet",
1216+
query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"),
1217+
pre_statements=[parse_one("CREATE TABLE pre (a INT)")],
1218+
post_statements=[parse_one("DROP TABLE pre")],
1219+
)
1220+
snapshot = make_snapshot(model, version="1")
1221+
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
1222+
1223+
evaluator.migrate([snapshot], {})
1224+
1225+
cursor_mock.execute.assert_has_calls(
1226+
[
1227+
call('CREATE TABLE "pre" ("a" INT)'),
1228+
call(
1229+
'CREATE TABLE IF NOT EXISTS "sqlmesh__test_schema"."test_schema__test_model__1" AS SELECT "c" AS "c", "a" AS "a" FROM "tbl" AS "tbl" WHERE "ds" BETWEEN \'1970-01-01\' AND \'1970-01-01\' AND FALSE LIMIT 0'
1230+
),
1231+
call('DROP TABLE "pre"'),
1232+
]
1233+
)
1234+
1235+
11511236
@pytest.mark.parametrize(
11521237
"change_category",
11531238
[SnapshotChangeCategory.FORWARD_ONLY, SnapshotChangeCategory.INDIRECT_NON_BREAKING],
@@ -1386,6 +1471,14 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot)
13861471
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
13871472
snapshot.previous_versions = snapshot.all_versions
13881473

1474+
adapter_mock.get_data_objects.return_value = [
1475+
DataObject(
1476+
name=f"test_schema__test_model__{snapshot.version}",
1477+
schema="sqlmesh__test_schema",
1478+
type=DataObjectType.TABLE,
1479+
),
1480+
]
1481+
13891482
evaluator.create([snapshot], {})
13901483

13911484
adapter_mock.create_table.assert_called_once_with(
@@ -1419,6 +1512,50 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot)
14191512
)
14201513

14211514

1515+
def test_create_clone_in_dev_missing_table(mocker: MockerFixture, adapter_mock, make_snapshot):
1516+
adapter_mock.SUPPORTS_CLONING = True
1517+
adapter_mock.get_alter_expressions.return_value = []
1518+
evaluator = SnapshotEvaluator(adapter_mock)
1519+
1520+
model = load_sql_based_model(
1521+
parse( # type: ignore
1522+
"""
1523+
MODEL (
1524+
name test_schema.test_model,
1525+
kind INCREMENTAL_BY_TIME_RANGE (
1526+
time_column ds,
1527+
forward_only true,
1528+
)
1529+
);
1530+
1531+
SELECT 1::INT as a, ds::DATE FROM a;
1532+
"""
1533+
),
1534+
)
1535+
1536+
snapshot = make_snapshot(model)
1537+
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
1538+
snapshot.previous_versions = snapshot.all_versions
1539+
1540+
evaluator.create([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
1541+
1542+
adapter_mock.create_table.assert_called_once_with(
1543+
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.temp_version_get_or_generate()}__temp",
1544+
columns_to_types={"a": exp.DataType.build("int"), "ds": exp.DataType.build("date")},
1545+
table_format=None,
1546+
storage_format=None,
1547+
partitioned_by=[exp.to_column("ds", quoted=True)],
1548+
partition_interval_unit=IntervalUnit.DAY,
1549+
clustered_by=[],
1550+
table_properties={},
1551+
table_description=None,
1552+
column_descriptions=None,
1553+
)
1554+
1555+
adapter_mock.clone_table.assert_not_called()
1556+
adapter_mock.alter_table.assert_not_called()
1557+
1558+
14221559
def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_mock, make_snapshot):
14231560
adapter_mock.SUPPORTS_CLONING = True
14241561
adapter_mock.get_alter_expressions.return_value = []
@@ -1445,6 +1582,14 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m
14451582
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
14461583
snapshot.previous_versions = snapshot.all_versions
14471584

1585+
adapter_mock.get_data_objects.return_value = [
1586+
DataObject(
1587+
name=f"test_schema__test_model__{snapshot.version}",
1588+
schema="sqlmesh__test_schema",
1589+
type=DataObjectType.TABLE,
1590+
),
1591+
]
1592+
14481593
evaluator.create([snapshot], {})
14491594

14501595
adapter_mock.clone_table.assert_called_once_with(
@@ -1494,6 +1639,14 @@ def test_create_clone_in_dev_self_referencing(mocker: MockerFixture, adapter_moc
14941639
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
14951640
snapshot.previous_versions = snapshot.all_versions
14961641

1642+
adapter_mock.get_data_objects.return_value = [
1643+
DataObject(
1644+
name=f"test_schema__test_model__{snapshot.version}",
1645+
schema="sqlmesh__test_schema",
1646+
type=DataObjectType.TABLE,
1647+
),
1648+
]
1649+
14971650
evaluator.create([snapshot], {})
14981651

14991652
adapter_mock.create_table.assert_called_once_with(

0 commit comments

Comments
 (0)