Skip to content

Commit da9a69c

Browse files
Fix: allow --select-model to plan a model deletion
When a model file is deleted locally but still exists in the deployed environment, --select-model now correctly plans the removal instead of raising PlanError. Fixes #5741 Signed-off-by: vinicius <viniciusnunes.tavares@gmail.com>
1 parent 0f192c9 commit da9a69c

File tree

4 files changed

+245
-23
lines changed

4 files changed

+245
-23
lines changed

sqlmesh/core/context.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,6 +1605,10 @@ def plan_builder(
16051605
backfill_models = None
16061606

16071607
models_override: t.Optional[UniqueKeyDict[str, Model]] = None
1608+
# FQNs of models that are selected for deletion (present in the deployed environment but
1609+
# absent from local files). These are models whose files have been deleted; they will
1610+
# appear in context_diff.removed_snapshots rather than as backfill candidates.
1611+
selected_deletion_fqns: t.Set[str] = set()
16081612
if select_models:
16091613
try:
16101614
models_override = model_selector.select_models(
@@ -1622,12 +1626,24 @@ def plan_builder(
16221626
# Only backfill selected models unless explicitly specified.
16231627
backfill_models = model_selector.expand_model_selections(select_models)
16241628

1629+
if not backfill_models:
1630+
# The selection matched nothing locally. Check whether it matched models that exist
1631+
# in the deployed environment but have been deleted locally. If so, the selection is
1632+
# valid — the deletions will surface in context_diff.removed_snapshots.
1633+
env_selected = model_selector.expand_model_selections_with_env(
1634+
select_models,
1635+
environment,
1636+
fallback_env_name=create_from or c.PROD,
1637+
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1638+
)
1639+
selected_deletion_fqns = env_selected - set(self._models)
1640+
16251641
expanded_restate_models = None
16261642
if restate_models is not None:
16271643
expanded_restate_models = model_selector.expand_model_selections(restate_models)
16281644

16291645
if (restate_models is not None and not expanded_restate_models) or (
1630-
backfill_models is not None and not backfill_models
1646+
backfill_models is not None and not backfill_models and not selected_deletion_fqns
16311647
):
16321648
raise PlanError(
16331649
"Selector did not return any models. Please check your model selection and try again."
@@ -1636,7 +1652,7 @@ def plan_builder(
16361652
if always_include_local_changes is None:
16371653
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
16381654
force_no_diff = restate_models is not None or (
1639-
backfill_models is not None and not backfill_models
1655+
backfill_models is not None and not backfill_models and not selected_deletion_fqns
16401656
)
16411657
else:
16421658
force_no_diff = not always_include_local_changes

sqlmesh/core/selector.py

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -78,27 +78,9 @@ def select_models(
7878
Returns:
7979
A dictionary of models.
8080
"""
81-
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
82-
if target_env and target_env.expired:
83-
target_env = None
84-
85-
if not target_env and fallback_env_name:
86-
target_env = self._state_reader.get_environment(
87-
Environment.sanitize_name(fallback_env_name)
88-
)
89-
90-
env_models: t.Dict[str, Model] = {}
91-
if target_env:
92-
environment_snapshot_infos = (
93-
target_env.snapshots
94-
if not ensure_finalized_snapshots
95-
else target_env.finalized_or_current_snapshots
96-
)
97-
env_models = {
98-
s.name: s.model
99-
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
100-
if s.is_model
101-
}
81+
env_models = self._load_env_models(
82+
target_env_name, fallback_env_name, ensure_finalized_snapshots
83+
)
10284

10385
all_selected_models = self.expand_model_selections(
10486
model_selections, models={**env_models, **self._models}
@@ -168,6 +150,65 @@ def get_model(fqn: str) -> t.Optional[Model]:
168150

169151
return models
170152

153+
def expand_model_selections_with_env(
154+
self,
155+
model_selections: t.Iterable[str],
156+
target_env_name: str,
157+
fallback_env_name: t.Optional[str] = None,
158+
ensure_finalized_snapshots: bool = False,
159+
) -> t.Set[str]:
160+
"""Expands model selections against both local models and the target environment.
161+
162+
This allows selections to match models that have been deleted locally but still
163+
exist in the deployed environment.
164+
165+
Args:
166+
model_selections: A set of selections.
167+
target_env_name: The name of the target environment.
168+
fallback_env_name: The name of the fallback environment that will be used if the target
169+
environment doesn't exist.
170+
ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized
171+
environment state, or to use whatever snapshots are in the current environment state even if
172+
the environment is not finalized.
173+
174+
Returns:
175+
A set of matched model FQNs.
176+
"""
177+
env_models = self._load_env_models(
178+
target_env_name, fallback_env_name, ensure_finalized_snapshots
179+
)
180+
return self.expand_model_selections(model_selections, models={**env_models, **self._models})
181+
182+
def _load_env_models(
183+
self,
184+
target_env_name: str,
185+
fallback_env_name: t.Optional[str] = None,
186+
ensure_finalized_snapshots: bool = False,
187+
) -> t.Dict[str, "Model"]:
188+
"""Loads models from the target environment, falling back to the fallback environment if needed."""
189+
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
190+
if target_env and target_env.expired:
191+
target_env = None
192+
193+
if not target_env and fallback_env_name:
194+
target_env = self._state_reader.get_environment(
195+
Environment.sanitize_name(fallback_env_name)
196+
)
197+
198+
if not target_env:
199+
return {}
200+
201+
environment_snapshot_infos = (
202+
target_env.snapshots
203+
if not ensure_finalized_snapshots
204+
else target_env.finalized_or_current_snapshots
205+
)
206+
return {
207+
s.name: s.model
208+
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
209+
if s.is_model
210+
}
211+
171212
def expand_model_selections(
172213
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
173214
) -> t.Set[str]:

tests/core/test_context.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2273,6 +2273,44 @@ def test_plan_selector_expression_no_match(sushi_context: Context) -> None:
22732273
sushi_context.plan("prod", restate_models=["*missing*"])
22742274

22752275

2276+
def test_plan_select_model_deleted_model(sushi_context: Context) -> None:
2277+
"""Selecting a model that has been deleted locally but still exists in the deployed
2278+
environment should produce a valid plan with the deletion, not raise PlanError."""
2279+
# Pick a leaf model that can be safely deleted without breaking other models' rendering.
2280+
model_name = "sushi.top_waiters"
2281+
snapshot = sushi_context.get_snapshot(model_name)
2282+
assert snapshot is not None
2283+
2284+
# Delete the model file from disk.
2285+
model = sushi_context.get_model(model_name)
2286+
assert model._path.exists()
2287+
model._path.unlink()
2288+
2289+
# Reload the context so it no longer knows about the deleted model.
2290+
sushi_context.load()
2291+
assert model_name not in [m for m in sushi_context.models]
2292+
2293+
# Planning with select_models for the deleted model should succeed (not raise PlanError).
2294+
plan = sushi_context.plan("prod", select_models=[model_name], no_prompts=True)
2295+
assert plan is not None
2296+
2297+
# The deleted model should appear in removed_snapshots.
2298+
removed_names = {s.name for s in plan.context_diff.removed_snapshots.values()}
2299+
assert snapshot.name in removed_names
2300+
2301+
2302+
def test_plan_select_model_deleted_model_still_rejects_nonexistent(
2303+
sushi_context: Context,
2304+
) -> None:
2305+
"""A model that neither exists locally nor in the deployed environment should still
2306+
raise PlanError."""
2307+
with pytest.raises(
2308+
PlanError,
2309+
match="Selector did not return any models. Please check your model selection and try again.",
2310+
):
2311+
sushi_context.plan("prod", select_models=["sushi.completely_nonexistent"])
2312+
2313+
22762314
def test_plan_on_virtual_update_this_model_in_macro(tmp_path: pathlib.Path):
22772315
models_dir = pathlib.Path("models")
22782316
macros_dir = pathlib.Path("macros")

tests/core/test_selector_native.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,133 @@ def test_select_models_local_tags_take_precedence_over_remote(
801801
)
802802

803803

804+
def test_expand_model_selections_with_env(mocker: MockerFixture, make_snapshot):
805+
"""expand_model_selections_with_env should include models from the deployed environment,
806+
even if they have been deleted locally."""
807+
local_model = SqlModel(
808+
name="db.local_model",
809+
query=d.parse_one("SELECT 1 AS a"),
810+
)
811+
deleted_model = SqlModel(
812+
name="db.deleted_model",
813+
query=d.parse_one("SELECT 2 AS b"),
814+
)
815+
816+
deleted_model_snapshot = make_snapshot(deleted_model)
817+
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
818+
819+
env_name = "test_env"
820+
821+
state_reader_mock = mocker.Mock()
822+
state_reader_mock.get_environment.return_value = Environment(
823+
name=env_name,
824+
snapshots=[deleted_model_snapshot.table_info],
825+
start_at="2023-01-01",
826+
end_at="2023-02-01",
827+
plan_id="test_plan_id",
828+
)
829+
state_reader_mock.get_snapshots.return_value = {
830+
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
831+
}
832+
833+
local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
834+
local_models[local_model.fqn] = local_model
835+
836+
selector = NativeSelector(state_reader_mock, local_models)
837+
838+
# Expanding against local models only should NOT find the deleted model.
839+
assert selector.expand_model_selections(["db.deleted_model"]) == set()
840+
841+
# Expanding with env should find the deleted model.
842+
result = selector.expand_model_selections_with_env(["db.deleted_model"], env_name)
843+
assert deleted_model.fqn in result
844+
845+
# Local model should also be reachable.
846+
result = selector.expand_model_selections_with_env(["db.local_model"], env_name)
847+
assert local_model.fqn in result
848+
849+
# Selecting both should return both.
850+
result = selector.expand_model_selections_with_env(
851+
["db.deleted_model", "db.local_model"], env_name
852+
)
853+
assert result == {deleted_model.fqn, local_model.fqn}
854+
855+
# Wildcard should match env models too.
856+
result = selector.expand_model_selections_with_env(["*_model"], env_name)
857+
assert result == {deleted_model.fqn, local_model.fqn}
858+
859+
# Non-existent model should return empty.
860+
result = selector.expand_model_selections_with_env(["db.nonexistent"], env_name)
861+
assert result == set()
862+
863+
864+
def test_expand_model_selections_with_env_fallback(mocker: MockerFixture, make_snapshot):
865+
"""expand_model_selections_with_env should fall back to the fallback environment."""
866+
deleted_model = SqlModel(
867+
name="db.deleted_model",
868+
query=d.parse_one("SELECT 1 AS a"),
869+
)
870+
871+
deleted_model_snapshot = make_snapshot(deleted_model)
872+
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
873+
874+
fallback_env = Environment(
875+
name="prod",
876+
snapshots=[deleted_model_snapshot.table_info],
877+
start_at="2023-01-01",
878+
end_at="2023-02-01",
879+
plan_id="test_plan_id",
880+
)
881+
882+
state_reader_mock = mocker.Mock()
883+
state_reader_mock.get_environment.side_effect = (
884+
lambda name: fallback_env if name == "prod" else None
885+
)
886+
state_reader_mock.get_snapshots.return_value = {
887+
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
888+
}
889+
890+
local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
891+
selector = NativeSelector(state_reader_mock, local_models)
892+
893+
result = selector.expand_model_selections_with_env(
894+
["db.deleted_model"], "missing_env", fallback_env_name="prod"
895+
)
896+
assert deleted_model.fqn in result
897+
898+
899+
def test_expand_model_selections_with_env_expired(mocker: MockerFixture, make_snapshot):
900+
"""expand_model_selections_with_env should ignore expired environments."""
901+
deleted_model = SqlModel(
902+
name="db.deleted_model",
903+
query=d.parse_one("SELECT 1 AS a"),
904+
)
905+
906+
deleted_model_snapshot = make_snapshot(deleted_model)
907+
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
908+
909+
expired_env = Environment(
910+
name="test_env",
911+
snapshots=[deleted_model_snapshot.table_info],
912+
start_at="2023-01-01",
913+
end_at="2023-02-01",
914+
plan_id="test_plan_id",
915+
expiration_ts=now_timestamp() - 1,
916+
)
917+
918+
state_reader_mock = mocker.Mock()
919+
state_reader_mock.get_environment.return_value = expired_env
920+
state_reader_mock.get_snapshots.return_value = {
921+
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
922+
}
923+
924+
local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
925+
selector = NativeSelector(state_reader_mock, local_models)
926+
927+
result = selector.expand_model_selections_with_env(["db.deleted_model"], "test_env")
928+
assert result == set()
929+
930+
804931
def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None:
805932
assert set(actual) == set(expected)
806933
for name, model in actual.items():

0 commit comments

Comments
 (0)