Skip to content

Commit fb559d2

Browse files
authored
Merge branch 'main' into indirect_change_to_materialized_view_is_breaking
2 parents b18945f + e46caec commit fb559d2

File tree

20 files changed

+440
-97
lines changed

20 files changed

+440
-97
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ module = [
225225
"pydantic_core.*",
226226
"dlt.*",
227227
"bigframes.*",
228-
"json_stream.*"
228+
"json_stream.*",
229+
"duckdb.*"
229230
]
230231
ignore_missing_imports = true
231232

sqlmesh/core/context.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,6 +1429,7 @@ def plan_builder(
14291429
explain: t.Optional[bool] = None,
14301430
ignore_cron: t.Optional[bool] = None,
14311431
min_intervals: t.Optional[int] = None,
1432+
always_include_local_changes: t.Optional[bool] = None,
14321433
) -> PlanBuilder:
14331434
"""Creates a plan builder.
14341435
@@ -1467,6 +1468,8 @@ def plan_builder(
14671468
diff_rendered: Whether the diff should compare raw vs rendered models
14681469
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
14691470
on every model when checking for missing intervals
1471+
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
1472+
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.
14701473
14711474
Returns:
14721475
The plan builder.
@@ -1583,13 +1586,20 @@ def plan_builder(
15831586
"Selector did not return any models. Please check your model selection and try again."
15841587
)
15851588

1589+
if always_include_local_changes is None:
1590+
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
1591+
force_no_diff = restate_models is not None or (
1592+
backfill_models is not None and not backfill_models
1593+
)
1594+
else:
1595+
force_no_diff = not always_include_local_changes
1596+
15861597
snapshots = self._snapshots(models_override)
15871598
context_diff = self._context_diff(
15881599
environment or c.PROD,
15891600
snapshots=snapshots,
15901601
create_from=create_from,
1591-
force_no_diff=restate_models is not None
1592-
or (backfill_models is not None and not backfill_models),
1602+
force_no_diff=force_no_diff,
15931603
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
15941604
diff_rendered=diff_rendered,
15951605
always_recreate_environment=self.config.plan.always_recreate_environment,
@@ -1644,13 +1654,22 @@ def plan_builder(
16441654
elif forward_only is None:
16451655
forward_only = self.config.plan.forward_only
16461656

1657+
# When handling prod restatements, only clear intervals from other model versions if we are using full virtual environments
1658+
# If we are not, then there is no point, because none of the data in dev environments can be promoted by definition
1659+
restate_all_snapshots = (
1660+
expanded_restate_models is not None
1661+
and not is_dev
1662+
and self.config.virtual_environment_mode.is_full
1663+
)
1664+
16471665
return self.PLAN_BUILDER_TYPE(
16481666
context_diff=context_diff,
16491667
start=start,
16501668
end=end,
16511669
execution_time=execution_time,
16521670
apply=self.apply,
16531671
restate_models=expanded_restate_models,
1672+
restate_all_snapshots=restate_all_snapshots,
16541673
backfill_models=backfill_models,
16551674
no_gaps=no_gaps,
16561675
skip_backfill=skip_backfill,

sqlmesh/core/plan/builder.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ class PlanBuilder:
6565
restate_models: A list of models for which the data should be restated for the time range
6666
specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
6767
of the restatement.
68+
restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals
69+
being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments).
70+
If set to None, the default behaviour is to not clear anything unless the target environment is prod.
6871
backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
6972
no_gaps: Whether to ensure that new snapshots for nodes that are already a
7073
part of the target environment have no data gaps when compared against previous
@@ -103,6 +106,7 @@ def __init__(
103106
execution_time: t.Optional[TimeLike] = None,
104107
apply: t.Optional[t.Callable[[Plan], None]] = None,
105108
restate_models: t.Optional[t.Iterable[str]] = None,
109+
restate_all_snapshots: bool = False,
106110
backfill_models: t.Optional[t.Iterable[str]] = None,
107111
no_gaps: bool = False,
108112
skip_backfill: bool = False,
@@ -154,6 +158,7 @@ def __init__(
154158
self._auto_categorization_enabled = auto_categorization_enabled
155159
self._include_unmodified = include_unmodified
156160
self._restate_models = set(restate_models) if restate_models is not None else None
161+
self._restate_all_snapshots = restate_all_snapshots
157162
self._effective_from = effective_from
158163

159164
# note: this deliberately doesnt default to now() here.
@@ -277,7 +282,6 @@ def build(self) -> Plan:
277282
if self._latest_plan:
278283
return self._latest_plan
279284

280-
self._ensure_no_new_snapshots_with_restatements()
281285
self._ensure_new_env_with_changes()
282286
self._ensure_valid_date_range()
283287
self._ensure_no_broken_references()
@@ -340,6 +344,7 @@ def build(self) -> Plan:
340344
deployability_index=deployability_index,
341345
selected_models_to_restate=self._restate_models,
342346
restatements=restatements,
347+
restate_all_snapshots=self._restate_all_snapshots,
343348
start_override_per_model=self._start_override_per_model,
344349
end_override_per_model=end_override_per_model,
345350
selected_models_to_backfill=self._backfill_models,
@@ -867,15 +872,6 @@ def _ensure_no_broken_references(self) -> None:
867872
f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding."""
868873
)
869874

870-
def _ensure_no_new_snapshots_with_restatements(self) -> None:
871-
if self._restate_models is not None and (
872-
self._context_diff.new_snapshots or self._context_diff.modified_snapshots
873-
):
874-
raise PlanError(
875-
"Model changes and restatements can't be a part of the same plan. "
876-
"Revert or apply changes before proceeding with restatements."
877-
)
878-
879875
def _ensure_new_env_with_changes(self) -> None:
880876
if (
881877
self._is_dev

sqlmesh/core/plan/definition.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class Plan(PydanticModel, frozen=True):
6767
Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty
6868
while :restatements is still populated with dev previews
6969
"""
70+
restate_all_snapshots: bool
71+
"""Whether or not to clear intervals from state for other versions of the models listed in :restatements"""
7072

7173
start_override_per_model: t.Optional[t.Dict[str, datetime]]
7274
end_override_per_model: t.Optional[t.Dict[str, datetime]]
@@ -268,6 +270,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
268270
skip_backfill=self.skip_backfill,
269271
empty_backfill=self.empty_backfill,
270272
restatements={s.name: i for s, i in self.restatements.items()},
273+
restate_all_snapshots=self.restate_all_snapshots,
271274
is_dev=self.is_dev,
272275
allow_destructive_models=self.allow_destructive_models,
273276
allow_additive_models=self.allow_additive_models,
@@ -312,6 +315,7 @@ class EvaluatablePlan(PydanticModel):
312315
skip_backfill: bool
313316
empty_backfill: bool
314317
restatements: t.Dict[str, Interval]
318+
restate_all_snapshots: bool
315319
is_dev: bool
316320
allow_destructive_models: t.Set[str]
317321
allow_additive_models: t.Set[str]

sqlmesh/core/plan/stages.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
Snapshot,
1313
SnapshotTableInfo,
1414
SnapshotId,
15+
snapshots_to_dag,
1516
)
17+
from sqlmesh.utils.errors import PlanError
1618

1719

1820
@dataclass
@@ -248,6 +250,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
248250
stored_snapshots = self.state_reader.get_snapshots(plan.environment.snapshots)
249251
snapshots = {**new_snapshots, **stored_snapshots}
250252
snapshots_by_name = {s.name: s for s in snapshots.values()}
253+
dag = snapshots_to_dag(snapshots.values())
251254

252255
all_selected_for_backfill_snapshots = {
253256
s.snapshot_id for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
@@ -271,8 +274,15 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
271274
after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots
272275
deployability_index = DeployabilityIndex.all_deployable()
273276

277+
snapshot_ids_with_schema_migration = [
278+
s.snapshot_id for s in snapshots.values() if s.requires_schema_migration_in_prod
279+
]
280+
# Include all upstream dependencies of snapshots that require schema migration to make sure
281+
# the upstream tables are created before the schema updates are applied
274282
snapshots_with_schema_migration = [
275-
s for s in snapshots.values() if s.requires_schema_migration_in_prod
283+
snapshots[s_id]
284+
for s_id in dag.subdag(*snapshot_ids_with_schema_migration)
285+
if snapshots[s_id].supports_schema_migration_in_prod
276286
]
277287

278288
snapshots_to_intervals = self._missing_intervals(
@@ -452,13 +462,18 @@ def _get_after_all_stage(
452462
def _get_restatement_stage(
453463
self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]
454464
) -> t.Optional[RestatementStage]:
455-
if not plan.restatements or plan.is_dev:
456-
# The RestatementStage to clear intervals from state across all environments is not needed for plans against dev, only prod
457-
return None
465+
if plan.restate_all_snapshots:
466+
if plan.is_dev:
467+
raise PlanError(
468+
"Clearing intervals from state across dev model versions is only valid for prod plans"
469+
)
458470

459-
return RestatementStage(
460-
all_snapshots=snapshots_by_name,
461-
)
471+
if plan.restatements:
472+
return RestatementStage(
473+
all_snapshots=snapshots_by_name,
474+
)
475+
476+
return None
462477

463478
def _get_physical_layer_update_stage(
464479
self,

sqlmesh/core/snapshot/definition.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,19 +1477,19 @@ def expiration_ts(self) -> int:
14771477
check_categorical_relative_expression=False,
14781478
)
14791479

1480+
@property
1481+
def supports_schema_migration_in_prod(self) -> bool:
1482+
"""Returns whether or not this snapshot supports schema migration when deployed to production."""
1483+
return self.is_paused and self.is_model and not self.is_symbolic
1484+
14801485
@property
14811486
def requires_schema_migration_in_prod(self) -> bool:
14821487
"""Returns whether or not this snapshot requires a schema migration when deployed to production."""
1483-
return (
1484-
self.is_paused
1485-
and self.is_model
1486-
and self.is_materialized
1487-
and (
1488-
(self.previous_version and self.previous_version.version == self.version)
1489-
or self.model.forward_only
1490-
or bool(self.model.physical_version)
1491-
or not self.virtual_environment_mode.is_full
1492-
)
1488+
return self.supports_schema_migration_in_prod and (
1489+
(self.previous_version and self.previous_version.version == self.version)
1490+
or self.model.forward_only
1491+
or bool(self.model.physical_version)
1492+
or not self.virtual_environment_mode.is_full
14931493
)
14941494

14951495
@property

sqlmesh/core/snapshot/evaluator.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -489,15 +489,14 @@ def migrate(
489489
allow_destructive_snapshots = allow_destructive_snapshots or set()
490490
allow_additive_snapshots = allow_additive_snapshots or set()
491491
snapshots_by_name = {s.name: s for s in snapshots.values()}
492-
snapshots_with_data_objects = [snapshots[s_id] for s_id in target_data_objects]
493492
with self.concurrent_context():
494493
# Only migrate snapshots for which there's an existing data object
495494
concurrent_apply_to_snapshots(
496-
snapshots_with_data_objects,
495+
snapshots_by_name.values(),
497496
lambda s: self._migrate_snapshot(
498497
s,
499498
snapshots_by_name,
500-
target_data_objects[s.snapshot_id],
499+
target_data_objects.get(s.snapshot_id),
501500
allow_destructive_snapshots,
502501
allow_additive_snapshots,
503502
self.get_adapter(s.model_gateway),
@@ -747,11 +746,15 @@ def _evaluate_snapshot(
747746
adapter.execute(model.render_pre_statements(**render_statements_kwargs))
748747

749748
if not target_table_exists or (model.is_seed and not snapshot.intervals):
750-
columns_to_types_provided = (
749+
# Only create the empty table if the columns were provided explicitly by the user
750+
should_create_empty_table = (
751751
model.kind.is_materialized
752752
and model.columns_to_types_
753753
and columns_to_types_all_known(model.columns_to_types_)
754754
)
755+
if not should_create_empty_table:
756+
# Or if the model is self-referential and its query is fully annotated with types
757+
should_create_empty_table = model.depends_on_self and model.annotated
755758
if self._can_clone(snapshot, deployability_index):
756759
self._clone_snapshot_in_dev(
757760
snapshot=snapshot,
@@ -764,7 +767,7 @@ def _evaluate_snapshot(
764767
)
765768
runtime_stage = RuntimeStage.EVALUATING
766769
target_table_exists = True
767-
elif columns_to_types_provided or model.is_seed or model.kind.is_scd_type_2:
770+
elif should_create_empty_table or model.is_seed or model.kind.is_scd_type_2:
768771
self._execute_create(
769772
snapshot=snapshot,
770773
table_name=target_table_name,
@@ -1059,7 +1062,7 @@ def _migrate_snapshot(
10591062
adapter: EngineAdapter,
10601063
deployability_index: DeployabilityIndex,
10611064
) -> None:
1062-
if not snapshot.requires_schema_migration_in_prod:
1065+
if not snapshot.is_model or snapshot.is_symbolic:
10631066
return
10641067

10651068
deployability_index = DeployabilityIndex.all_deployable()
@@ -1081,20 +1084,32 @@ def _migrate_snapshot(
10811084
):
10821085
table_exists = False
10831086

1087+
rendered_physical_properties = snapshot.model.render_physical_properties(
1088+
**render_kwargs
1089+
)
1090+
10841091
if table_exists:
10851092
self._migrate_target_table(
10861093
target_table_name=target_table_name,
10871094
snapshot=snapshot,
10881095
snapshots=snapshots,
10891096
deployability_index=deployability_index,
10901097
render_kwargs=render_kwargs,
1091-
rendered_physical_properties=snapshot.model.render_physical_properties(
1092-
**render_kwargs
1093-
),
1098+
rendered_physical_properties=rendered_physical_properties,
10941099
allow_destructive_snapshots=allow_destructive_snapshots,
10951100
allow_additive_snapshots=allow_additive_snapshots,
10961101
run_pre_post_statements=True,
10971102
)
1103+
else:
1104+
self._execute_create(
1105+
snapshot=snapshot,
1106+
table_name=snapshot.table_name(is_deployable=True),
1107+
is_table_deployable=True,
1108+
deployability_index=deployability_index,
1109+
create_render_kwargs=render_kwargs,
1110+
rendered_physical_properties=rendered_physical_properties,
1111+
dry_run=True,
1112+
)
10981113

10991114
def _migrate_target_table(
11001115
self,

sqlmesh_dbt/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ def dbt(
9090
@click.option(
9191
"-f",
9292
"--full-refresh",
93-
help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.",
93+
is_flag=True,
94+
default=False,
95+
help="If specified, sqlmesh will drop incremental models and fully-recalculate the incremental table from the model definition.",
9496
)
9597
@click.option(
9698
"--env",

0 commit comments

Comments
 (0)