Skip to content

Commit 24c7739

Browse files
authored
Merge branch 'main' into vaggelisd/external_model_freshness
2 parents d8ecf37 + 0bf202c commit 24c7739

File tree

11 files changed

+228
-20
lines changed

11 files changed

+228
-20
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.18.0",
27+
"sqlglot[rs]~=27.19.0",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"

sqlmesh/core/console.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3641,7 +3641,10 @@ def show_linter_violations(
36413641
msg = f"\nLinter {severity} for `{model._path}`:\n{violations_msg}\n"
36423642

36433643
self._print(msg)
3644-
self._errors.append(msg)
3644+
if is_error:
3645+
self._errors.append(msg)
3646+
else:
3647+
self._warnings.append(msg)
36453648

36463649
@property
36473650
def captured_warnings(self) -> str:

sqlmesh/core/plan/builder.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,14 @@ def _categorize_snapshot(
680680
if mode == AutoCategorizationMode.FULL:
681681
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only)
682682
elif self._context_diff.indirectly_modified(snapshot.name):
683+
if snapshot.is_materialized_view and not forward_only:
684+
# We categorize changes as breaking to allow for instantaneous switches in a virtual layer.
685+
# Otherwise, there might be a potentially long downtime during MVs recreation.
686+
# In the case of forward-only changes this optimization is not applicable because we want to continue
687+
# using the same (existing) table version.
688+
snapshot.categorize_as(SnapshotChangeCategory.INDIRECT_BREAKING, forward_only)
689+
return
690+
683691
all_upstream_forward_only = set()
684692
all_upstream_categories = set()
685693
direct_parent_categories = set()

sqlmesh/core/state_sync/db/migrator.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ def _migrate_snapshot_rows(
229229
"updated_ts": updated_ts,
230230
"unpaused_ts": unpaused_ts,
231231
"unrestorable": unrestorable,
232+
"forward_only": forward_only,
232233
}
233234
for where in (
234235
snapshot_id_filter(
@@ -237,10 +238,16 @@ def _migrate_snapshot_rows(
237238
if snapshots is not None
238239
else [None]
239240
)
240-
for name, identifier, raw_snapshot, updated_ts, unpaused_ts, unrestorable in fetchall(
241+
for name, identifier, raw_snapshot, updated_ts, unpaused_ts, unrestorable, forward_only in fetchall(
241242
self.engine_adapter,
242243
exp.select(
243-
"name", "identifier", "snapshot", "updated_ts", "unpaused_ts", "unrestorable"
244+
"name",
245+
"identifier",
246+
"snapshot",
247+
"updated_ts",
248+
"unpaused_ts",
249+
"unrestorable",
250+
"forward_only",
244251
)
245252
.from_(self.snapshot_state.snapshots_table)
246253
.where(where)

sqlmesh/dbt/basemodel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ def sqlmesh_model_kwargs(
305305
jinja_macros.add_globals(self._model_jinja_context(model_context, dependencies))
306306

307307
model_kwargs = {
308-
"audits": [(test.name, {}) for test in self.tests],
308+
"audits": [(test.canonical_name, {}) for test in self.tests],
309309
"column_descriptions": column_descriptions_to_sqlmesh(self.columns) or None,
310310
"depends_on": {
311311
model.canonical_name(context) for model in model_context.refs.values()

sqlmesh/dbt/loader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ def _load_audits(
172172
for test in package.tests.values():
173173
logger.debug("Converting '%s' to sqlmesh format", test.name)
174174
try:
175-
audits[test.name] = test.to_sqlmesh(package_context)
175+
audits[test.canonical_name] = test.to_sqlmesh(package_context)
176+
176177
except BaseMissingReferenceError as e:
177178
ref_type = "model" if isinstance(e, MissingModelError) else "source"
178179
logger.warning(

sqlmesh/dbt/test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ def _validate_severity(cls, v: t.Union[Severity, str]) -> Severity:
109109
def _lowercase_name(cls, v: str) -> str:
110110
return v.lower()
111111

112+
@property
113+
def canonical_name(self) -> str:
114+
return f"{self.package_name}.{self.name}" if self.package_name else self.name
115+
112116
@property
113117
def is_standalone(self) -> bool:
114118
# A test is standalone if:

tests/core/integration/test_dbt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def test_dbt_is_incremental_table_is_missing(sushi_test_dbt_context: Context):
4848
model = context.get_model("sushi.waiter_revenue_by_day_v2")
4949
model = model.copy(update={"kind": IncrementalUnmanagedKind(), "start": "2023-01-01"})
5050
context.upsert_model(model)
51-
context._standalone_audits["test_top_waiters"].start = "2023-01-01"
51+
context._standalone_audits["sushi.test_top_waiters"].start = "2023-01-01"
5252

5353
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
5454

tests/core/test_plan.py

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
SqlModel,
2727
ModelKindName,
2828
)
29-
from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange
29+
from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange, ViewKind
3030
from sqlmesh.core.model.seed import Seed
3131
from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals
3232
from sqlmesh.core.snapshot import (
@@ -4162,3 +4162,143 @@ def test_plan_ignore_cron_flag(make_snapshot):
41624162
],
41634163
)
41644164
]
4165+
4166+
4167+
def test_indirect_change_to_materialized_view_is_breaking(make_snapshot):
4168+
snapshot_a_old = make_snapshot(
4169+
SqlModel(
4170+
name="a",
4171+
query=parse_one("select 1 as col_a"),
4172+
kind=ViewKind(materialized=True),
4173+
)
4174+
)
4175+
snapshot_a_old.categorize_as(SnapshotChangeCategory.BREAKING)
4176+
4177+
snapshot_b_old = make_snapshot(
4178+
SqlModel(
4179+
name="b",
4180+
query=parse_one("select col_a from a"),
4181+
kind=ViewKind(materialized=True),
4182+
),
4183+
nodes={'"a"': snapshot_a_old.model},
4184+
)
4185+
snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING)
4186+
4187+
snapshot_a_new = make_snapshot(
4188+
SqlModel(
4189+
name="a",
4190+
query=parse_one("select 1 as col_a, 2 as col_b"),
4191+
kind=ViewKind(materialized=True),
4192+
)
4193+
)
4194+
4195+
snapshot_a_new.previous_versions = snapshot_a_old.all_versions
4196+
4197+
snapshot_b_new = make_snapshot(
4198+
snapshot_b_old.model,
4199+
nodes={'"a"': snapshot_a_new.model},
4200+
)
4201+
snapshot_b_new.previous_versions = snapshot_b_old.all_versions
4202+
4203+
context_diff = ContextDiff(
4204+
environment="test_environment",
4205+
is_new_environment=True,
4206+
is_unfinalized_environment=False,
4207+
normalize_environment_name=True,
4208+
create_from="prod",
4209+
create_from_env_exists=True,
4210+
added=set(),
4211+
removed_snapshots={},
4212+
modified_snapshots={
4213+
snapshot_a_new.name: (snapshot_a_new, snapshot_a_old),
4214+
snapshot_b_new.name: (snapshot_b_new, snapshot_b_old),
4215+
},
4216+
snapshots={
4217+
snapshot_a_new.snapshot_id: snapshot_a_new,
4218+
snapshot_b_new.snapshot_id: snapshot_b_new,
4219+
},
4220+
new_snapshots={
4221+
snapshot_a_new.snapshot_id: snapshot_a_new,
4222+
snapshot_b_new.snapshot_id: snapshot_b_new,
4223+
},
4224+
previous_plan_id=None,
4225+
previously_promoted_snapshot_ids=set(),
4226+
previous_finalized_snapshots=None,
4227+
previous_gateway_managed_virtual_layer=False,
4228+
gateway_managed_virtual_layer=False,
4229+
environment_statements=[],
4230+
)
4231+
4232+
PlanBuilder(context_diff, forward_only=False).build()
4233+
4234+
assert snapshot_b_new.change_category == SnapshotChangeCategory.INDIRECT_BREAKING
4235+
4236+
4237+
def test_forward_only_indirect_change_to_materialized_view(make_snapshot):
4238+
snapshot_a_old = make_snapshot(
4239+
SqlModel(
4240+
name="a",
4241+
query=parse_one("select 1 as col_a"),
4242+
)
4243+
)
4244+
snapshot_a_old.categorize_as(SnapshotChangeCategory.BREAKING)
4245+
4246+
snapshot_b_old = make_snapshot(
4247+
SqlModel(
4248+
name="b",
4249+
query=parse_one("select col_a from a"),
4250+
kind=ViewKind(materialized=True),
4251+
),
4252+
nodes={'"a"': snapshot_a_old.model},
4253+
)
4254+
snapshot_b_old.categorize_as(SnapshotChangeCategory.BREAKING)
4255+
4256+
snapshot_a_new = make_snapshot(
4257+
SqlModel(
4258+
name="a",
4259+
query=parse_one("select 1 as col_a, 2 as col_b"),
4260+
)
4261+
)
4262+
4263+
snapshot_a_new.previous_versions = snapshot_a_old.all_versions
4264+
4265+
snapshot_b_new = make_snapshot(
4266+
snapshot_b_old.model,
4267+
nodes={'"a"': snapshot_a_new.model},
4268+
)
4269+
snapshot_b_new.previous_versions = snapshot_b_old.all_versions
4270+
4271+
context_diff = ContextDiff(
4272+
environment="test_environment",
4273+
is_new_environment=True,
4274+
is_unfinalized_environment=False,
4275+
normalize_environment_name=True,
4276+
create_from="prod",
4277+
create_from_env_exists=True,
4278+
added=set(),
4279+
removed_snapshots={},
4280+
modified_snapshots={
4281+
snapshot_a_new.name: (snapshot_a_new, snapshot_a_old),
4282+
snapshot_b_new.name: (snapshot_b_new, snapshot_b_old),
4283+
},
4284+
snapshots={
4285+
snapshot_a_new.snapshot_id: snapshot_a_new,
4286+
snapshot_b_new.snapshot_id: snapshot_b_new,
4287+
},
4288+
new_snapshots={
4289+
snapshot_a_new.snapshot_id: snapshot_a_new,
4290+
snapshot_b_new.snapshot_id: snapshot_b_new,
4291+
},
4292+
previous_plan_id=None,
4293+
previously_promoted_snapshot_ids=set(),
4294+
previous_finalized_snapshots=None,
4295+
previous_gateway_managed_virtual_layer=False,
4296+
gateway_managed_virtual_layer=False,
4297+
environment_statements=[],
4298+
)
4299+
4300+
PlanBuilder(context_diff, forward_only=True).build()
4301+
4302+
# Forward-only indirect changes to MVs should not always be classified as indirect breaking.
4303+
# Instead, we want to preserve the standard categorization.
4304+
assert snapshot_b_new.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING

tests/dbt/test_model.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,23 +190,23 @@ def test_manifest_filters_standalone_tests_from_models(
190190
# Should only have "not_null" test, not the "relationships" test
191191
model1_audit_names = [audit[0] for audit in model1_snapshot.model.audits]
192192
assert len(model1_audit_names) == 1
193-
assert model1_audit_names[0] == "not_null_model1_id"
193+
assert model1_audit_names[0] == "local.not_null_model1_id"
194194

195195
# Verify model2 has its non-standalone test
196196
model2_audit_names = [audit[0] for audit in model2_snapshot.model.audits]
197197
assert len(model2_audit_names) == 1
198-
assert model2_audit_names[0] == "not_null_model2_id"
198+
assert model2_audit_names[0] == "local.not_null_model2_id"
199199

200200
# Verify the standalone test (relationships) exists as a StandaloneAudit
201201
all_non_standalone_audits = [name for name in context._audits]
202202
assert sorted(all_non_standalone_audits) == [
203-
"not_null_model1_id",
204-
"not_null_model2_id",
203+
"local.not_null_model1_id",
204+
"local.not_null_model2_id",
205205
]
206206

207207
standalone_audits = [name for name in context._standalone_audits]
208208
assert len(standalone_audits) == 1
209-
assert standalone_audits[0] == "relationships_model1_id__id__ref_model2_"
209+
assert standalone_audits[0] == "local.relationships_model1_id__id__ref_model2_"
210210

211211
plan_builder = context.plan_builder()
212212
dag = plan_builder._build_dag()

0 commit comments

Comments
 (0)