Skip to content

Commit d01736e

Browse files
authored
Fix: Invalidate the in-memory cache for the columns-to-types mapping when updating the model's schema (#971)
1 parent 19fcc0d commit d01736e

File tree

2 files changed

+21
-16
lines changed

2 files changed

+21
-16
lines changed

sqlmesh/core/loader.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
from pathlib import Path
1313

1414
from ruamel.yaml import YAML
15-
from sqlglot import exp
1615
from sqlglot.errors import SqlglotError
1716
from sqlglot.optimizer.qualify_columns import validate_qualify_columns
18-
from sqlglot.schema import MappingSchema, nested_set
17+
from sqlglot.schema import MappingSchema
1918

2019
from sqlmesh.core import constants as c
2120
from sqlmesh.core.audit import Audit
@@ -49,20 +48,8 @@ def update_model_schemas(dag: DAG[str], models: UniqueKeyDict[str, Model]) -> No
4948
if not model:
5049
continue
5150

52-
external = False
53-
54-
for dep in model.depends_on:
55-
external = external or dep not in models
56-
table = exp.to_table(dep, dialect=model.dialect)
57-
mapping_schema = schema.find(table)
58-
59-
if mapping_schema:
60-
nested_set(
61-
model.mapping_schema,
62-
tuple(str(part) for part in table.parts),
63-
{k: str(v) for k, v in mapping_schema.items()},
64-
)
65-
51+
external = any(dep not in models for dep in model.depends_on)
52+
model.update_schema(schema)
6653
schema.add_table(name, model.columns_to_types, dialect=model.dialect)
6754

6855
if external:

sqlmesh/core/model/definition.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from sqlglot.diff import Insert, Keep
1717
from sqlglot.helper import ensure_list
1818
from sqlglot.optimizer.scope import traverse_scope
19+
from sqlglot.schema import MappingSchema, nested_set
1920
from sqlglot.time import format_time
2021

2122
from sqlmesh.core import constants as c
@@ -405,6 +406,19 @@ def convert_to_time_column(self, time: TimeLike) -> exp.Expression:
405406
return exp.cast(exp.Literal.string(time), time_column_type)
406407
return exp.convert(time)
407408

409+
def update_schema(self, schema: MappingSchema) -> None:
410+
"""Updates the schema for this model's dependencies based on the given mapping schema."""
411+
for dep in self.depends_on:
412+
table = exp.to_table(dep, dialect=self.dialect)
413+
mapping_schema = schema.find(table)
414+
415+
if mapping_schema:
416+
nested_set(
417+
self.mapping_schema,
418+
tuple(str(part) for part in table.parts),
419+
{k: str(v) for k, v in mapping_schema.items()},
420+
)
421+
408422
@property
409423
def depends_on(self) -> t.Set[str]:
410424
"""All of the upstream dependencies referenced in the model's query, excluding self references.
@@ -701,6 +715,10 @@ def column_descriptions(self) -> t.Dict[str, str]:
701715
}
702716
return self._column_descriptions
703717

718+
def update_schema(self, schema: MappingSchema) -> None:
719+
super().update_schema(schema)
720+
self._columns_to_types = None
721+
704722
def validate_definition(self) -> None:
705723
query = self._query_renderer.render()
706724

0 commit comments

Comments
 (0)