Skip to content

Commit c199eff

Browse files
Fix: allow --select-model to plan a model deletion (#5759)
1 parent bf84bce commit c199eff

File tree

4 files changed

+219
-34
lines changed

4 files changed

+219
-34
lines changed

sqlmesh/core/context.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,9 +1605,11 @@ def plan_builder(
16051605
backfill_models = None
16061606

16071607
models_override: t.Optional[UniqueKeyDict[str, Model]] = None
1608+
selected_fqns: t.Set[str] = set()
1609+
selected_deletion_fqns: t.Set[str] = set()
16081610
if select_models:
16091611
try:
1610-
models_override = model_selector.select_models(
1612+
models_override, selected_fqns = model_selector.select_models(
16111613
select_models,
16121614
environment,
16131615
fallback_env_name=create_from or c.PROD,
@@ -1622,12 +1624,17 @@ def plan_builder(
16221624
# Only backfill selected models unless explicitly specified.
16231625
backfill_models = model_selector.expand_model_selections(select_models)
16241626

1627+
if not backfill_models:
1628+
# The selection matched nothing locally. Check whether it matched models
1629+
# in the deployed environment that were deleted locally.
1630+
selected_deletion_fqns = selected_fqns - set(self._models)
1631+
16251632
expanded_restate_models = None
16261633
if restate_models is not None:
16271634
expanded_restate_models = model_selector.expand_model_selections(restate_models)
16281635

16291636
if (restate_models is not None and not expanded_restate_models) or (
1630-
backfill_models is not None and not backfill_models
1637+
backfill_models is not None and not backfill_models and not selected_deletion_fqns
16311638
):
16321639
raise PlanError(
16331640
"Selector did not return any models. Please check your model selection and try again."
@@ -1636,7 +1643,7 @@ def plan_builder(
16361643
if always_include_local_changes is None:
16371644
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
16381645
force_no_diff = restate_models is not None or (
1639-
backfill_models is not None and not backfill_models
1646+
backfill_models is not None and not backfill_models and not selected_deletion_fqns
16401647
)
16411648
else:
16421649
force_no_diff = not always_include_local_changes

sqlmesh/core/selector.py

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def select_models(
6262
target_env_name: str,
6363
fallback_env_name: t.Optional[str] = None,
6464
ensure_finalized_snapshots: bool = False,
65-
) -> UniqueKeyDict[str, Model]:
65+
) -> t.Tuple[UniqueKeyDict[str, Model], t.Set[str]]:
6666
"""Given a set of selections returns models from the current state with names matching the
6767
selection while sourcing the remaining models from the target environment.
6868
@@ -76,29 +76,11 @@ def select_models(
7676
the environment is not finalized.
7777
7878
Returns:
79-
A dictionary of models.
79+
A tuple of (models dict, set of all matched FQNs including env models).
8080
"""
81-
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
82-
if target_env and target_env.expired:
83-
target_env = None
84-
85-
if not target_env and fallback_env_name:
86-
target_env = self._state_reader.get_environment(
87-
Environment.sanitize_name(fallback_env_name)
88-
)
89-
90-
env_models: t.Dict[str, Model] = {}
91-
if target_env:
92-
environment_snapshot_infos = (
93-
target_env.snapshots
94-
if not ensure_finalized_snapshots
95-
else target_env.finalized_or_current_snapshots
96-
)
97-
env_models = {
98-
s.name: s.model
99-
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
100-
if s.is_model
101-
}
81+
env_models = self._load_env_models(
82+
target_env_name, fallback_env_name, ensure_finalized_snapshots
83+
)
10284

10385
all_selected_models = self.expand_model_selections(
10486
model_selections, models={**env_models, **self._models}
@@ -166,7 +148,37 @@ def get_model(fqn: str) -> t.Optional[Model]:
166148
if needs_update:
167149
update_model_schemas(dag, models=models, cache_dir=self._cache_dir)
168150

169-
return models
151+
return models, all_selected_models
152+
153+
def _load_env_models(
154+
self,
155+
target_env_name: str,
156+
fallback_env_name: t.Optional[str] = None,
157+
ensure_finalized_snapshots: bool = False,
158+
) -> t.Dict[str, "Model"]:
159+
"""Loads models from the target environment, falling back to the fallback environment if needed."""
160+
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
161+
if target_env and target_env.expired:
162+
target_env = None
163+
164+
if not target_env and fallback_env_name:
165+
target_env = self._state_reader.get_environment(
166+
Environment.sanitize_name(fallback_env_name)
167+
)
168+
169+
if not target_env:
170+
return {}
171+
172+
environment_snapshot_infos = (
173+
target_env.snapshots
174+
if not ensure_finalized_snapshots
175+
else target_env.finalized_or_current_snapshots
176+
)
177+
return {
178+
s.name: s.model
179+
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
180+
if s.is_model
181+
}
170182

171183
def expand_model_selections(
172184
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None

tests/core/test_context.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2273,6 +2273,44 @@ def test_plan_selector_expression_no_match(sushi_context: Context) -> None:
22732273
sushi_context.plan("prod", restate_models=["*missing*"])
22742274

22752275

2276+
def test_plan_select_model_deleted_model(sushi_context: Context) -> None:
2277+
"""Selecting a model that has been deleted locally but still exists in the deployed
2278+
environment should produce a valid plan with the deletion, not raise PlanError."""
2279+
# Pick a leaf model that can be safely deleted without breaking other models' rendering.
2280+
model_name = "sushi.top_waiters"
2281+
snapshot = sushi_context.get_snapshot(model_name)
2282+
assert snapshot is not None
2283+
2284+
# Delete the model file from disk.
2285+
model = sushi_context.get_model(model_name)
2286+
assert model._path is not None and model._path.exists()
2287+
model._path.unlink()
2288+
2289+
# Reload the context so it no longer knows about the deleted model.
2290+
sushi_context.load()
2291+
assert model_name not in [m for m in sushi_context.models]
2292+
2293+
# Planning with select_models for the deleted model should succeed (not raise PlanError).
2294+
plan = sushi_context.plan("prod", select_models=[model_name], no_prompts=True)
2295+
assert plan is not None
2296+
2297+
# The deleted model should appear in removed_snapshots.
2298+
removed_names = {s.name for s in plan.context_diff.removed_snapshots.values()}
2299+
assert snapshot.name in removed_names
2300+
2301+
2302+
def test_plan_select_model_deleted_model_still_rejects_nonexistent(
2303+
sushi_context: Context,
2304+
) -> None:
2305+
"""A model that neither exists locally nor in the deployed environment should still
2306+
raise PlanError."""
2307+
with pytest.raises(
2308+
PlanError,
2309+
match="Selector did not return any models. Please check your model selection and try again.",
2310+
):
2311+
sushi_context.plan("prod", select_models=["sushi.completely_nonexistent"])
2312+
2313+
22762314
def test_plan_on_virtual_update_this_model_in_macro(tmp_path: pathlib.Path):
22772315
models_dir = pathlib.Path("models")
22782316
macros_dir = pathlib.Path("macros")

tests/core/test_selector_native.py

Lines changed: 135 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot):
309309

310310
selector = NativeSelector(state_reader_mock, local_models)
311311

312-
selected = selector.select_models(["db.parent"], env_name)
312+
selected, _ = selector.select_models(["db.parent"], env_name)
313313
assert selected[local_child.fqn].render_query() != child.render_query()
314314

315315
_assert_models_equal(
@@ -320,7 +320,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot):
320320
},
321321
)
322322

323-
selected = selector.select_models(["db.child"], env_name)
323+
selected, _ = selector.select_models(["db.child"], env_name)
324324
assert selected[local_child.fqn].data_hash == child.data_hash
325325

326326
_assert_models_equal(
@@ -343,12 +343,12 @@ def test_select_models_missing_env(mocker: MockerFixture, make_snapshot):
343343

344344
selector = NativeSelector(state_reader_mock, local_models)
345345

346-
assert selector.select_models([model.name], "missing_env").keys() == {model.fqn}
347-
assert not selector.select_models(["missing"], "missing_env")
346+
assert selector.select_models([model.name], "missing_env")[0].keys() == {model.fqn}
347+
assert not selector.select_models(["missing"], "missing_env")[0]
348348

349349
assert selector.select_models(
350350
[model.name], "missing_env", fallback_env_name="another_missing_env"
351-
).keys() == {model.fqn}
351+
)[0].keys() == {model.fqn}
352352

353353
state_reader_mock.get_environment.assert_has_calls(
354354
[
@@ -789,7 +789,7 @@ def test_select_models_local_tags_take_precedence_over_remote(
789789

790790
selector = NativeSelector(state_reader_mock, local_models)
791791

792-
selected = selector.select_models(["tag:a"], env_name)
792+
selected, _ = selector.select_models(["tag:a"], env_name)
793793

794794
# both should get selected because they both now have the 'a' tag locally, even though one exists in remote state without the 'a' tag
795795
_assert_models_equal(
@@ -801,7 +801,135 @@ def test_select_models_local_tags_take_precedence_over_remote(
801801
)
802802

803803

804-
def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None:
804+
def test_select_models_returns_selected_fqns(mocker: MockerFixture, make_snapshot):
805+
"""select_models should return the set of all matched FQNs (including env-only models)
806+
alongside the model dict."""
807+
local_model = SqlModel(
808+
name="db.local_model",
809+
query=d.parse_one("SELECT 1 AS a"),
810+
)
811+
deleted_model = SqlModel(
812+
name="db.deleted_model",
813+
query=d.parse_one("SELECT 2 AS b"),
814+
)
815+
816+
deleted_model_snapshot = make_snapshot(deleted_model)
817+
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
818+
819+
env_name = "test_env"
820+
821+
state_reader_mock = mocker.Mock()
822+
state_reader_mock.get_environment.return_value = Environment(
823+
name=env_name,
824+
snapshots=[deleted_model_snapshot.table_info],
825+
start_at="2023-01-01",
826+
end_at="2023-02-01",
827+
plan_id="test_plan_id",
828+
)
829+
state_reader_mock.get_snapshots.return_value = {
830+
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
831+
}
832+
833+
local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
834+
local_models[local_model.fqn] = local_model
835+
836+
selector = NativeSelector(state_reader_mock, local_models)
837+
838+
# Selecting a deleted model: selected_fqns includes it even though models dict won't.
839+
_, selected_fqns = selector.select_models(["db.deleted_model"], env_name)
840+
assert deleted_model.fqn in selected_fqns
841+
842+
# Selecting a local model: selected_fqns includes it.
843+
_, selected_fqns = selector.select_models(["db.local_model"], env_name)
844+
assert local_model.fqn in selected_fqns
845+
846+
# Mixed selection (active + deleted): both appear in selected_fqns.
847+
_, selected_fqns = selector.select_models(["db.deleted_model", "db.local_model"], env_name)
848+
assert selected_fqns == {deleted_model.fqn, local_model.fqn}
849+
850+
# Wildcard should match both local and env models.
851+
_, selected_fqns = selector.select_models(["*_model"], env_name)
852+
assert selected_fqns == {deleted_model.fqn, local_model.fqn}
853+
854+
# Non-existent model should not appear.
855+
_, selected_fqns = selector.select_models(["db.nonexistent"], env_name)
856+
assert selected_fqns == set()
857+
858+
859+
def test_select_models_selected_fqns_fallback(mocker: MockerFixture, make_snapshot):
860+
"""select_models selected_fqns should include env models found via fallback environment."""
861+
deleted_model = SqlModel(
862+
name="db.deleted_model",
863+
query=d.parse_one("SELECT 1 AS a"),
864+
)
865+
866+
deleted_model_snapshot = make_snapshot(deleted_model)
867+
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
868+
869+
fallback_env = Environment(
870+
name="prod",
871+
snapshots=[deleted_model_snapshot.table_info],
872+
start_at="2023-01-01",
873+
end_at="2023-02-01",
874+
plan_id="test_plan_id",
875+
)
876+
877+
state_reader_mock = mocker.Mock()
878+
state_reader_mock.get_environment.side_effect = (
879+
lambda name: fallback_env if name == "prod" else None
880+
)
881+
state_reader_mock.get_snapshots.return_value = {
882+
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
883+
}
884+
885+
local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
886+
selector = NativeSelector(state_reader_mock, local_models)
887+
888+
_, selected_fqns = selector.select_models(
889+
["db.deleted_model"], "missing_env", fallback_env_name="prod"
890+
)
891+
assert deleted_model.fqn in selected_fqns
892+
893+
894+
def test_select_models_selected_fqns_expired(mocker: MockerFixture, make_snapshot):
895+
"""select_models should not match env models from expired environments."""
896+
deleted_model = SqlModel(
897+
name="db.deleted_model",
898+
query=d.parse_one("SELECT 1 AS a"),
899+
)
900+
901+
deleted_model_snapshot = make_snapshot(deleted_model)
902+
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
903+
904+
expired_env = Environment(
905+
name="test_env",
906+
snapshots=[deleted_model_snapshot.table_info],
907+
start_at="2023-01-01",
908+
end_at="2023-02-01",
909+
plan_id="test_plan_id",
910+
expiration_ts=now_timestamp() - 1,
911+
)
912+
913+
state_reader_mock = mocker.Mock()
914+
state_reader_mock.get_environment.return_value = expired_env
915+
state_reader_mock.get_snapshots.return_value = {
916+
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
917+
}
918+
919+
local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
920+
selector = NativeSelector(state_reader_mock, local_models)
921+
922+
_, selected_fqns = selector.select_models(["db.deleted_model"], "test_env")
923+
assert selected_fqns == set()
924+
925+
926+
def _assert_models_equal(
927+
actual: t.Union[t.Dict[str, Model], t.Tuple[t.Dict[str, Model], t.Set[str]]],
928+
expected: t.Dict[str, Model],
929+
) -> None:
930+
# select_models returns a tuple; unwrap if needed.
931+
if isinstance(actual, tuple):
932+
actual = actual[0]
805933
assert set(actual) == set(expected)
806934
for name, model in actual.items():
807935
# Use dict() to make Pydantic V2 happy.

0 commit comments

Comments
 (0)