Skip to content

Commit feb53db

Browse files
tobymaovchangeorgesittas
authored
Feat: Add external models and loader to improve optimizer. (#871)
* Feat!: Add external models and loader to improve optimizer. * Update sqlmesh/core/model/definition.py Co-authored-by: Vincent Chan <vchan@users.noreply.github.com> * Update sqlmesh/core/schema_loader.py Co-authored-by: Vincent Chan <vchan@users.noreply.github.com> * Chore: bump sqlglot * Chore: cleanup * pr feedback * fix remaining issues * Update setup.py Co-authored-by: Jo <46752250+GeorgeSittas@users.noreply.github.com> --------- Co-authored-by: Vincent Chan <vchan@users.noreply.github.com> Co-authored-by: Jo <46752250+GeorgeSittas@users.noreply.github.com>
1 parent dde91d2 commit feb53db

26 files changed

Lines changed: 397 additions & 110 deletions

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
"requests",
4545
"rich",
4646
"ruamel.yaml",
47-
"sqlglot~=13.0.1",
47+
"sqlglot~=13.2.1",
4848
"fsspec",
4949
],
5050
extras_require={

sqlmesh/cli/main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,14 @@ def rollback(obj: Context) -> None:
379379
obj.rollback()
380380

381381

382+
@cli.command("create_external_models")
383+
@click.pass_obj
384+
@error_handler
385+
def create_external_models(obj: Context) -> None:
386+
"""Create a schema file containing external model schemas."""
387+
obj.create_external_models()
388+
389+
382390
@cli.command("prompt")
383391
@click.argument("prompt")
384392
@click.option(

sqlmesh/core/console.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,12 +470,12 @@ def _snapshot_change_choices(
470470
indirect = "indirectly modified children"
471471
if use_rich_formatting:
472472
indirect = f"[indirect]{indirect}[/indirect]"
473-
if snapshot.is_view_kind:
473+
if snapshot.is_view:
474474
choices = {
475475
SnapshotChangeCategory.BREAKING: f"Update {direct} and backfill {indirect}",
476476
SnapshotChangeCategory.NON_BREAKING: f"Update {direct} but don't backfill {indirect}",
477477
}
478-
elif snapshot.is_embedded_kind:
478+
elif snapshot.is_symbolic:
479479
choices = {
480480
SnapshotChangeCategory.BREAKING: f"Backfill {indirect}",
481481
SnapshotChangeCategory.NON_BREAKING: f"Don't backfill {indirect}",

sqlmesh/core/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,4 @@
3737
MODELS = "models"
3838
TESTS = "tests"
3939
CACHE = ".cache"
40+
SCHEMA_YAML = "schema.yaml"

sqlmesh/core/context.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
from sqlmesh.core.model.definition import _Model
6060
from sqlmesh.core.plan import Plan
6161
from sqlmesh.core.scheduler import Scheduler
62+
from sqlmesh.core.schema_loader import create_schema_file
6263
from sqlmesh.core.snapshot import (
6364
Snapshot,
6465
SnapshotEvaluator,
@@ -841,6 +842,25 @@ def rollback(self) -> None:
841842
"""
842843
self._new_state_sync().rollback()
843844

845+
def create_external_models(self) -> None:
846+
"""Create a schema file with all external models.
847+
848+
The schema file contains all columns and types of external models, allowing for more robust
849+
lineage, validation, and optimizations.
850+
"""
851+
for path, config in self.configs.items():
852+
create_schema_file(
853+
path=path / c.SCHEMA_YAML,
854+
models={
855+
name: model
856+
for name, model in self._models.items()
857+
if self.config_for_model(model) is config
858+
},
859+
adapter=self._engine_adapter,
860+
dialect=config.model_defaults.dialect,
861+
max_workers=self.concurrent_tasks,
862+
)
863+
844864
def print_info(self) -> None:
845865
"""Prints information about connections, models, macros, etc. to the console."""
846866
self.console.log_status_update(f"Models: {len(self.models)}")

sqlmesh/core/engine_adapter/base_spark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def _create_table_properties(
8282
format_property = None
8383
partition_columns_property = None
8484
if storage_format:
85-
format_property = exp.TableFormatProperty(this=exp.Var(this=storage_format))
85+
format_property = exp.FileFormatProperty(this=exp.Var(this=storage_format))
8686
if partitioned_by:
8787
partition_columns_property = exp.PartitionedByProperty(
8888
this=exp.Schema(

sqlmesh/core/loader.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,23 @@
1212
from dataclasses import dataclass
1313
from pathlib import Path
1414

15+
from ruamel.yaml import YAML
1516
from sqlglot.errors import SqlglotError
1617
from sqlglot.optimizer.qualify_columns import validate_qualify_columns
17-
from sqlglot.schema import MappingSchema, _nested_set
18+
from sqlglot.schema import MappingSchema, nested_set
1819

1920
from sqlmesh.core import constants as c
2021
from sqlmesh.core.audit import Audit
2122
from sqlmesh.core.dialect import parse
2223
from sqlmesh.core.hooks import HookRegistry, hook
2324
from sqlmesh.core.macros import MacroRegistry, macro
24-
from sqlmesh.core.model import Model, ModelCache, SeedModel, load_model
25+
from sqlmesh.core.model import (
26+
Model,
27+
ModelCache,
28+
SeedModel,
29+
create_external_model,
30+
load_model,
31+
)
2532
from sqlmesh.core.model import model as model_registry
2633
from sqlmesh.utils import UniqueKeyDict
2734
from sqlmesh.utils.dag import DAG
@@ -51,7 +58,7 @@ def update_model_schemas(dag: DAG[str], models: UniqueKeyDict[str, Model]) -> No
5158
mapping_schema = schema.find(table)
5259

5360
if mapping_schema:
54-
_nested_set(
61+
nested_set(
5562
model.mapping_schema,
5663
tuple(str(part) for part in table.parts),
5764
{k: str(v) for k, v in mapping_schema.items()},
@@ -228,10 +235,29 @@ def _load_models(
228235
audits into a Dict and creates the dag
229236
"""
230237
models = self._load_sql_models(macros, hooks, jinja_macros)
238+
models.update(self._load_external_models())
231239
models.update(self._load_python_models())
232240

233241
return models
234242

243+
def _load_external_models(self) -> UniqueKeyDict[str, Model]:
244+
models: UniqueKeyDict = UniqueKeyDict("models")
245+
for context_path, config in self._context.configs.items():
246+
path = Path(context_path / c.SCHEMA_YAML)
247+
248+
if path.exists():
249+
self._track_file(path)
250+
251+
with open(path, "r", encoding="utf-8") as file:
252+
for row in YAML().load(file.read()):
253+
model = create_external_model(
254+
**row,
255+
dialect=config.model_defaults.dialect,
256+
path=path,
257+
)
258+
models[model.name] = model
259+
return models
260+
235261
def _load_sql_models(
236262
self, macros: MacroRegistry, hooks: HookRegistry, jinja_macros: JinjaMacroRegistry
237263
) -> UniqueKeyDict[str, Model]:

sqlmesh/core/model/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
PythonModel,
77
SeedModel,
88
SqlModel,
9+
create_external_model,
910
create_python_model,
1011
create_seed_model,
1112
create_sql_model,
@@ -15,6 +16,7 @@
1516
IncrementalByTimeRangeKind,
1617
IncrementalByUniqueKeyKind,
1718
ModelKind,
19+
ModelKindMixin,
1820
ModelKindName,
1921
SeedKind,
2022
TimeColumn,

sqlmesh/core/model/definition.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,15 @@ def __repr__(self) -> str:
881881
return f"Model<name: {self.name}, entrypoint: {self.entrypoint}>"
882882

883883

884-
Model = Annotated[t.Union[SqlModel, SeedModel, PythonModel], Field(discriminator="source_type")]
884+
class ExternalModel(_Model):
885+
"""The model definition which represents an external source/table."""
886+
887+
source_type: Literal["external"] = "external"
888+
889+
890+
Model = Annotated[
891+
t.Union[SqlModel, SeedModel, PythonModel, ExternalModel], Field(discriminator="source_type")
892+
]
885893

886894

887895
def load_model(
@@ -1129,6 +1137,30 @@ def create_python_model(
11291137
)
11301138

11311139

1140+
def create_external_model(
1141+
name: str,
1142+
*,
1143+
dialect: t.Optional[str] = None,
1144+
path: Path = Path(),
1145+
**kwargs: t.Any,
1146+
) -> Model:
1147+
"""Creates an external model.
1148+
1149+
Args:
1150+
name: The name of the model, which is of the form [catalog].[db].table.
1151+
The catalog and db are optional.
1152+
dialect: The dialect to serialize.
1153+
path: An optional path to the model definition file.
1154+
"""
1155+
return _create_model(
1156+
ExternalModel,
1157+
name,
1158+
dialect=dialect,
1159+
path=path,
1160+
**kwargs,
1161+
)
1162+
1163+
11321164
def _create_model(
11331165
klass: t.Type[_Model],
11341166
name: str,

sqlmesh/core/model/kind.py

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,68 +13,81 @@
1313
from sqlmesh.utils.pydantic import PydanticModel
1414

1515

16-
# TODO: switch to autoname when sqlglot is typed
17-
class ModelKindName(str, Enum):
18-
"""The kind of model, determining how this data is computed and stored in the warehouse."""
19-
20-
INCREMENTAL_BY_TIME_RANGE = "INCREMENTAL_BY_TIME_RANGE"
21-
INCREMENTAL_BY_UNIQUE_KEY = "INCREMENTAL_BY_UNIQUE_KEY"
22-
FULL = "FULL"
23-
VIEW = "VIEW"
24-
EMBEDDED = "EMBEDDED"
25-
SEED = "SEED"
26-
# TODO: Add support for snapshots
27-
# SNAPSHOT = "SNAPSHOT"
28-
29-
30-
class ModelKind(PydanticModel):
31-
name: ModelKindName
16+
class ModelKindMixin:
17+
@property
18+
def model_kind_name(self) -> ModelKindName:
19+
"""Returns the model kind name."""
20+
raise NotImplementedError
3221

3322
@property
3423
def is_incremental_by_time_range(self) -> bool:
35-
return self.name == ModelKindName.INCREMENTAL_BY_TIME_RANGE
24+
return self.model_kind_name == ModelKindName.INCREMENTAL_BY_TIME_RANGE
3625

3726
@property
3827
def is_incremental_by_unique_key(self) -> bool:
39-
return self.name == ModelKindName.INCREMENTAL_BY_UNIQUE_KEY
40-
41-
@property
42-
def is_incremental(self) -> bool:
43-
"""Whether or not this model is incremental."""
44-
return isinstance(self, _Incremental)
28+
return self.model_kind_name == ModelKindName.INCREMENTAL_BY_UNIQUE_KEY
4529

4630
@property
4731
def is_full(self) -> bool:
48-
return self.name == ModelKindName.FULL
49-
50-
# @property
51-
# def is_snapshot(self) -> bool:
52-
# return self.name == ModelKindName.SNAPSHOT
32+
return self.model_kind_name == ModelKindName.FULL
5333

5434
@property
5535
def is_view(self) -> bool:
56-
return self.name == ModelKindName.VIEW
36+
return self.model_kind_name == ModelKindName.VIEW
5737

5838
@property
5939
def is_embedded(self) -> bool:
60-
return self.name == ModelKindName.EMBEDDED
40+
return self.model_kind_name == ModelKindName.EMBEDDED
6141

6242
@property
6343
def is_seed(self) -> bool:
64-
return self.name == ModelKindName.SEED
44+
return self.model_kind_name == ModelKindName.SEED
45+
46+
@property
47+
def is_external(self) -> bool:
48+
return self.model_kind_name == ModelKindName.EXTERNAL
49+
50+
@property
51+
def is_symbolic(self) -> bool:
52+
"""A symbolic model is one that doesn't execute at all."""
53+
return self.model_kind_name in (ModelKindName.EMBEDDED, ModelKindName.EXTERNAL)
6554

6655
@property
6756
def is_materialized(self) -> bool:
68-
return self.name not in (ModelKindName.VIEW, ModelKindName.EMBEDDED)
57+
return not (self.is_symbolic or self.is_view)
6958

7059
@property
7160
def only_latest(self) -> bool:
7261
"""Whether or not this model only cares about latest date to render."""
73-
return self.name in (ModelKindName.VIEW, ModelKindName.FULL)
62+
return self.model_kind_name in (ModelKindName.VIEW, ModelKindName.FULL)
63+
64+
65+
class ModelKindName(str, ModelKindMixin, Enum):
66+
"""The kind of model, determining how this data is computed and stored in the warehouse."""
67+
68+
INCREMENTAL_BY_TIME_RANGE = "INCREMENTAL_BY_TIME_RANGE"
69+
INCREMENTAL_BY_UNIQUE_KEY = "INCREMENTAL_BY_UNIQUE_KEY"
70+
FULL = "FULL"
71+
VIEW = "VIEW"
72+
EMBEDDED = "EMBEDDED"
73+
SEED = "SEED"
74+
EXTERNAL = "EXTERNAL"
75+
76+
@property
77+
def model_kind_name(self) -> ModelKindName:
78+
return self
79+
80+
81+
class ModelKind(PydanticModel, ModelKindMixin):
82+
name: ModelKindName
7483

7584
def to_expression(self, **kwargs: t.Any) -> d.ModelKind:
7685
return d.ModelKind(this=self.name.value.upper(), **kwargs)
7786

87+
@property
88+
def model_kind_name(self) -> ModelKindName:
89+
return self.name
90+
7891

7992
class TimeColumn(PydanticModel):
8093
column: str

0 commit comments

Comments
 (0)