Skip to content

Commit d0ca721

Browse files
authored
Fix!: Derive the name of a snapshot's physical schema from its model's schema (#885)
1 parent 4b16d6f commit d0ca721

13 files changed

Lines changed: 131 additions & 82 deletions

File tree

docs/reference/configuration.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ See [Airflow Integration Guide](../integrations/airflow.md) for information on h
146146
## SQLMesh-specific configurations
147147
| Option | Description | Type | Required |
148148
|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------:|:--------:|
149-
| `physical_schema` | The default schema used to store physical tables for models (Default: `sqlmesh`) | string | N |
150149
| `snapshot_ttl` | The period of time that a model snapshot not a part of any environment should exist before being deleted. This is defined as a string with the default `in 1 week`. Other [relative dates](https://dateparser.readthedocs.io/en/latest/) can be used, such as `in 30 days`. (Default: `in 1 week`) | string | N |
151150
| `environment_ttl` | The period of time that a development environment should exist before being deleted. This is defined as a string with the default `in 1 week`. Other [relative dates](https://dateparser.readthedocs.io/en/latest/) can be used, such as `in 30 days`. (Default: `in 1 week`) | string | N |
152151
| `ignore_patterns` | Files that match glob patterns specified in this list are ignored when scanning the project folder (Default: `[]`) | list[string] | N |

sqlmesh/core/config/root.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ class Config(BaseConfig):
2828
default_gateway: The default gateway.
2929
notification_targets: The notification targets to use.
3030
dialect: The default sql dialect of model queries. Default: same as engine dialect.
31-
physical_schema: The default schema used to store materialized tables.
3231
project: The project name of this config. Used for multi-repo setups.
3332
snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
3433
environment_ttl: The period of time that a development environment should exist before being deleted.
@@ -47,7 +46,6 @@ class Config(BaseConfig):
4746
default_scheduler: SchedulerConfig = BuiltInSchedulerConfig()
4847
default_gateway: str = ""
4948
notification_targets: t.List[NotificationTarget] = []
50-
physical_schema: str = c.SQLMESH
5149
project: str = ""
5250
snapshot_ttl: str = c.DEFAULT_SNAPSHOT_TTL
5351
environment_ttl: t.Optional[str] = c.DEFAULT_ENVIRONMENT_TTL

sqlmesh/core/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,6 @@
3838
TESTS = "tests"
3939
CACHE = ".cache"
4040
SCHEMA_YAML = "schema.yaml"
41+
42+
43+
DEFAULT_SCHEMA = "default"

sqlmesh/core/context.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,12 +432,10 @@ def snapshots(self) -> t.Dict[str, Snapshot]:
432432
for model in models.values():
433433
if model.name in remote_snapshots:
434434
snapshot = remote_snapshots[model.name]
435-
physical_schema = snapshot.physical_schema
436435
ttl = snapshot.ttl
437436
project = snapshot.project
438437
else:
439438
config = self.config_for_model(model)
440-
physical_schema = config.physical_schema
441439
ttl = config.snapshot_ttl
442440
project = config.project
443441

@@ -446,7 +444,6 @@ def snapshots(self) -> t.Dict[str, Snapshot]:
446444
models=models,
447445
audits=audits,
448446
cache=fingerprint_cache,
449-
physical_schema=physical_schema,
450447
ttl=ttl,
451448
project=project,
452449
)

sqlmesh/core/snapshot/definition.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from datetime import datetime
88
from enum import IntEnum
99

10-
from pydantic import validator
10+
from pydantic import Field, validator
1111
from sqlglot import exp
1212
from sqlglot.helper import seq_get
1313

@@ -132,10 +132,17 @@ class SnapshotDataVersion(PydanticModel, frozen=True):
132132
version: str
133133
temp_version: t.Optional[str]
134134
change_category: t.Optional[SnapshotChangeCategory]
135+
physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
135136

136137
def snapshot_id(self, name: str) -> SnapshotId:
137138
return SnapshotId(name=name, identifier=self.fingerprint.to_identifier())
138139

140+
@property
141+
def physical_schema(self) -> str:
142+
# The physical schema here is optional to maintain backwards compatibility with
143+
# records stored by previous versions of SQLMesh.
144+
return self.physical_schema_ or c.SQLMESH
145+
139146
@property
140147
def data_version(self) -> SnapshotDataVersion:
141148
return self
@@ -163,7 +170,7 @@ def for_environment(self, environment: str) -> str:
163170
)
164171

165172
def schema_for_environment(self, environment: str) -> str:
166-
schema = self.schema_name or "default"
173+
schema = self.schema_name or c.DEFAULT_SCHEMA
167174
if environment.lower() != c.PROD:
168175
schema = f"{schema}__{environment}"
169176
return schema
@@ -174,7 +181,6 @@ class SnapshotInfoMixin(ModelKindMixin):
174181
temp_version: t.Optional[str]
175182
change_category: t.Optional[SnapshotChangeCategory]
176183
fingerprint: SnapshotFingerprint
177-
physical_schema: str
178184
previous_versions: t.Tuple[SnapshotDataVersion, ...] = ()
179185

180186
def is_temporary_table(self, is_dev: bool) -> bool:
@@ -193,7 +199,7 @@ def snapshot_id(self) -> SnapshotId:
193199

194200
@property
195201
def qualified_view_name(self) -> QualifiedViewName:
196-
(catalog, schema, table) = parse_model_name(self.name)
202+
catalog, schema, table = parse_model_name(self.name)
197203
return QualifiedViewName(catalog=catalog, schema_name=schema, table=table)
198204

199205
@property
@@ -203,6 +209,10 @@ def previous_version(self) -> t.Optional[SnapshotDataVersion]:
203209
return self.previous_versions[-1]
204210
return None
205211

212+
@property
213+
def physical_schema(self) -> str:
214+
raise NotImplementedError
215+
206216
@property
207217
def data_version(self) -> SnapshotDataVersion:
208218
raise NotImplementedError
@@ -262,7 +272,7 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
262272
fingerprint: SnapshotFingerprint
263273
version: str
264274
temp_version: t.Optional[str]
265-
physical_schema: str
275+
physical_schema_: str = Field(alias="physical_schema")
266276
parents: t.Tuple[SnapshotId, ...]
267277
previous_versions: t.Tuple[SnapshotDataVersion, ...] = ()
268278
change_category: t.Optional[SnapshotChangeCategory]
@@ -277,6 +287,10 @@ def table_name(self, is_dev: bool = False, for_read: bool = False) -> str:
277287
"""
278288
return self._table_name(self.version, is_dev, for_read)
279289

290+
@property
291+
def physical_schema(self) -> str:
292+
return self.physical_schema_
293+
280294
@property
281295
def table_info(self) -> SnapshotTableInfo:
282296
"""Helper method to return self."""
@@ -289,6 +303,7 @@ def data_version(self) -> SnapshotDataVersion:
289303
version=self.version,
290304
temp_version=self.temp_version,
291305
change_category=self.change_category,
306+
physical_schema=self.physical_schema,
292307
)
293308

294309
@property
@@ -342,7 +357,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
342357

343358
name: str
344359
fingerprint: SnapshotFingerprint
345-
physical_schema: str
360+
physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
346361
model: Model
347362
parents: t.Tuple[SnapshotId, ...]
348363
audits: t.Tuple[Audit, ...]
@@ -442,7 +457,6 @@ def from_model(
442457
model: Model,
443458
*,
444459
models: t.Dict[str, Model],
445-
physical_schema: str = c.SQLMESH,
446460
ttl: str = c.DEFAULT_SNAPSHOT_TTL,
447461
project: str = "",
448462
version: t.Optional[str] = None,
@@ -472,19 +486,16 @@ def from_model(
472486
name=model.name,
473487
fingerprint=fingerprint_from_model(
474488
model,
475-
physical_schema=physical_schema,
476489
models=models,
477490
audits=audits,
478491
cache=cache,
479492
),
480-
physical_schema=physical_schema,
481493
model=model,
482494
parents=tuple(
483495
SnapshotId(
484496
name=name,
485497
identifier=fingerprint_from_model(
486498
models[name],
487-
physical_schema=physical_schema,
488499
models=models,
489500
audits=audits,
490501
cache=cache,
@@ -672,6 +683,7 @@ def categorize_as(self, category: SnapshotChangeCategory) -> None:
672683
)
673684
if is_forward_only and self.previous_version:
674685
self.version = self.previous_version.data_version.version
686+
self.physical_schema_ = self.previous_version.physical_schema
675687
else:
676688
self.version = self.fingerprint.to_version()
677689

@@ -716,6 +728,15 @@ def version_get_or_generate(self) -> str:
716728
"""Helper method to get the version or generate it from the fingerprint."""
717729
return self.version or self.fingerprint.to_version()
718730

731+
@property
732+
def physical_schema(self) -> str:
733+
if self.physical_schema_ is not None:
734+
return self.physical_schema_
735+
_, schema, _ = parse_model_name(self.name)
736+
if schema is None:
737+
schema = c.DEFAULT_SCHEMA
738+
return f"{c.SQLMESH}__{schema}"
739+
719740
@property
720741
def table_info(self) -> SnapshotTableInfo:
721742
"""Helper method to get the SnapshotTableInfo from the Snapshot."""
@@ -740,6 +761,7 @@ def data_version(self) -> SnapshotDataVersion:
740761
version=self.version,
741762
temp_version=self.temp_version,
742763
change_category=self.change_category,
764+
physical_schema=self.physical_schema,
743765
)
744766

745767
@property
@@ -803,7 +825,6 @@ def fingerprint_from_model(
803825
model: Model,
804826
*,
805827
models: t.Dict[str, Model],
806-
physical_schema: str = "",
807828
audits: t.Optional[t.Dict[str, Audit]] = None,
808829
cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
809830
) -> SnapshotFingerprint:
@@ -815,7 +836,6 @@ def fingerprint_from_model(
815836
816837
Args:
817838
model: Model to fingerprint.
818-
physical_schema: The physical_schema of the snapshot which represents where it is stored.
819839
models: Dictionary of all models in the graph to make the fingerprint dependent on parent changes.
820840
If no dictionary is passed in the fingerprint will not be dependent on a model's parents.
821841
audits: Available audits by name.
@@ -831,7 +851,6 @@ def fingerprint_from_model(
831851
fingerprint_from_model(
832852
models[table],
833853
models=models,
834-
physical_schema=physical_schema,
835854
audits=audits,
836855
cache=cache,
837856
)
@@ -846,7 +865,7 @@ def fingerprint_from_model(
846865
)
847866

848867
cache[model.name] = SnapshotFingerprint(
849-
data_hash=_model_data_hash(model, physical_schema),
868+
data_hash=_model_data_hash(model),
850869
metadata_hash=_model_metadata_hash(model, audits or {}),
851870
parent_data_hash=parent_data_hash,
852871
parent_metadata_hash=parent_metadata_hash,
@@ -855,7 +874,7 @@ def fingerprint_from_model(
855874
return cache[model.name]
856875

857876

858-
def _model_data_hash(model: Model, physical_schema: str) -> str:
877+
def _model_data_hash(model: Model) -> str:
859878
def serialize_hooks(hooks: t.List[HookCall]) -> t.Iterable[str]:
860879
serialized = []
861880
for hook in hooks:
@@ -875,7 +894,6 @@ def serialize_hooks(hooks: t.List[HookCall]) -> t.Iterable[str]:
875894
model.cron,
876895
model.storage_format,
877896
str(model.lookback),
878-
physical_schema,
879897
*(model.partitioned_by or []),
880898
*(expression.sql(comments=False) for expression in model.expressions or []),
881899
*serialize_hooks(model.pre),

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,6 @@ def _migrate_rows(self) -> None:
652652

653653
new_snapshot.fingerprint = fingerprint_from_model(
654654
model,
655-
physical_schema=snapshot.physical_schema,
656655
models=models,
657656
audits=audits,
658657
)
@@ -661,7 +660,6 @@ def _migrate_rows(self) -> None:
661660
name=name,
662661
identifier=fingerprint_from_model(
663662
models[name],
664-
physical_schema=snapshot.physical_schema,
665663
models=models,
666664
audits=audits,
667665
cache=fingerprint_cache,

tests/conftest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ def _make_function(model: Model, version: t.Optional[str] = None, **kwargs) -> S
132132
return Snapshot.from_model(
133133
model,
134134
**{ # type: ignore
135-
"physical_schema": "sqlmesh",
136135
"models": {},
137136
"ttl": "in 1 week",
138137
**kwargs,

tests/core/test_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,11 @@ def test_load_config_from_paths(yaml_config_path: Path, python_config_path: Path
152152
def test_load_config_multiple_config_files_in_folder(tmp_path):
153153
config_a_path = tmp_path / "config.yaml"
154154
with open(config_a_path, "w") as fd:
155-
fd.write("physical_schema: schema_a")
155+
fd.write("project: project_a")
156156

157157
config_b_path = tmp_path / "config.yml"
158158
with open(config_b_path, "w") as fd:
159-
fd.write("physical_schema: schema_b")
159+
fd.write("project: project_b")
160160

161161
with pytest.raises(ConfigError, match=r"^Multiple configuration files found in folder.*"):
162162
load_config_from_paths(config_a_path, config_b_path)

tests/core/test_context.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
def test_global_config():
2020
context = Context(paths="examples/sushi")
2121
assert context.config.dialect is None
22-
assert context.config.physical_schema == "sqlmesh"
22+
assert context.config.time_column_format == "%Y-%m-%d"
2323

2424

2525
def test_named_config():
@@ -41,10 +41,10 @@ def test_missing_named_config():
4141

4242

4343
def test_config_parameter():
44-
config = Config(model_defaults=ModelDefaultsConfig(dialect="presto"), physical_schema="dev")
44+
config = Config(model_defaults=ModelDefaultsConfig(dialect="presto"), project="test_project")
4545
context = Context(paths="examples/sushi", config=config)
4646
assert context.config.dialect == "presto"
47-
assert context.config.physical_schema == "dev"
47+
assert context.config.project == "test_project"
4848

4949

5050
def test_config_not_found():
@@ -139,10 +139,10 @@ def test_render(sushi_context, assert_exp_eq):
139139
CAST(o.waiter_id AS INT) AS waiter_id, /* Waiter id */
140140
CAST(SUM(oi.quantity * i.price) AS DOUBLE) AS revenue, /* Revenue from orders taken by this waiter */
141141
CAST(o.ds AS TEXT) AS ds /* Date */
142-
FROM sqlmesh.sushi__orders__2048186253 AS o
143-
LEFT JOIN sqlmesh.sushi__order_items__3982722653 AS oi
142+
FROM sqlmesh__sushi.sushi__orders__619968963 AS o
143+
LEFT JOIN sqlmesh__sushi.sushi__order_items__3090566586 AS oi
144144
ON o.ds = oi.ds AND o.id = oi.order_id
145-
LEFT JOIN sqlmesh.sushi__items__3758356954 AS i
145+
LEFT JOIN sqlmesh__sushi.sushi__items__1837306384 AS i
146146
ON oi.ds = i.ds AND oi.item_id = i.id
147147
WHERE
148148
o.ds <= '2021-01-01' AND o.ds >= '2021-01-01'

tests/core/test_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def test_run(sushi_context_fixed_date: Context, scheduler: Scheduler):
124124
assert (
125125
adapter.fetchone(
126126
f"""
127-
SELECT id, name, price FROM sqlmesh.sushi__items__{snapshot.version} ORDER BY ds LIMIT 1
127+
SELECT id, name, price FROM sqlmesh__sushi.sushi__items__{snapshot.version} ORDER BY ds LIMIT 1
128128
"""
129129
)
130130
== (0, "Hotate", 5.99)

0 commit comments

Comments
 (0)