Skip to content

Commit fe01ac2

Browse files
authored
Feat!: Add depends_on_past through self-reference check support (Built-in only) (#917)
* add full with history * add full with history model with test * fmt model * cleanup * adjust fingerprint * fix integration tests * remove full with history kind * feedback * fix after rebase * update self reference name * fix name * remove properties for getting references * add back extra line * cleanup * add migration script * rename to depends_on_past
1 parent 43ab1d5 commit fe01ac2

13 files changed

Lines changed: 313 additions & 82 deletions

File tree

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Table of lifetime customer revenue.
3+
Date is available to get lifetime value up to a certain date.
4+
Use latest date to get current lifetime value.
5+
*/
6+
MODEL (
7+
name sushi.customer_revenue_lifetime,
8+
kind incremental_by_time_range (
9+
time_column (ds, 'YYYY-MM-dd'),
10+
batch_size 1
11+
),
12+
owner jen,
13+
cron '@daily',
14+
dialect hive,
15+
columns (
16+
customer_id INT,
17+
revenue DOUBLE,
18+
ds STRING
19+
)
20+
);
21+
22+
WITH order_total AS (
23+
SELECT
24+
oi.order_id AS order_id,
25+
SUM(oi.quantity * i.price) AS total
26+
FROM sushi.order_items AS oi
27+
LEFT JOIN sushi.items AS i
28+
ON oi.item_id = i.id AND oi.ds = i.ds
29+
WHERE
30+
oi.ds = @end_ds
31+
GROUP BY
32+
oi.order_id
33+
), incremental_total AS (
34+
SELECT
35+
o.customer_id::INT AS customer_id,
36+
SUM(ot.total)::DOUBLE AS revenue,
37+
FROM sushi.orders AS o
38+
LEFT JOIN order_total AS ot
39+
ON o.id = ot.order_id
40+
WHERE
41+
o.ds = @end_ds
42+
GROUP BY
43+
o.customer_id
44+
), prev_total AS (
45+
SELECT
46+
crl.customer_id,
47+
crl.revenue
48+
FROM sushi.customer_revenue_lifetime AS crl
49+
WHERE
50+
crl.ds = DATE_FORMAT(@end_date - INTERVAL 1 DAY, 'YYYY-MM-dd')
51+
)
52+
SELECT
53+
COALESCE(it.customer_id, prev_total.customer_id) AS customer_id, /* Customer id */
54+
COALESCE(it.revenue, 0) + COALESCE(prev_total.revenue, 0) AS revenue, /* Lifetime revenue from this customer */
55+
@end_ds AS ds /* End date of the lifetime calculation */
56+
FROM incremental_total AS it
57+
FULL OUTER JOIN prev_total AS prev_total
58+
ON it.customer_id = prev_total.customer_id

sqlmesh/core/config/model.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import typing as t
44

55
from sqlmesh.core.config.base import BaseConfig
6-
from sqlmesh.core.model.kind import ModelKind, model_kind_validator
6+
from sqlmesh.core.model.kind import ModelKind
77
from sqlmesh.utils.date import TimeLike
88

99

@@ -34,4 +34,4 @@ class ModelDefaultsConfig(BaseConfig):
3434
batch_size: t.Optional[int]
3535
storage_format: t.Optional[str]
3636

37-
_model_kind_validator = model_kind_validator
37+
_model_kind_validator = ModelKind.field_validator()

sqlmesh/core/model/definition.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class _Model(ModelMeta, frozen=True):
104104

105105
_path: Path = Path()
106106
_depends_on: t.Optional[t.Set[str]] = None
107+
_depends_on_past: t.Optional[bool] = None
107108
_column_descriptions: t.Optional[t.Dict[str, str]] = None
108109

109110
_expressions_validator = expression_validator
@@ -449,6 +450,15 @@ def is_python(self) -> bool:
449450
def is_seed(self) -> bool:
450451
return False
451452

453+
@property
454+
def depends_on_past(self) -> bool:
455+
if self._depends_on_past is None:
456+
self._depends_on_past = (
457+
self.kind.is_incremental_by_unique_key
458+
or self.name in _find_tables([self.render_query()])
459+
)
460+
return self._depends_on_past
461+
452462
def validate_definition(self) -> None:
453463
"""Validates the model's definition.
454464
@@ -817,6 +827,10 @@ def seed_path(self) -> Path:
817827
return self._path.parent / seed_path
818828
return seed_path
819829

830+
@property
831+
def depends_on_past(self) -> bool:
832+
return False
833+
820834
def to_dehydrated(self) -> SeedModel:
821835
"""Creates a dehydrated copy of this model.
822836

sqlmesh/core/model/kind.py

Lines changed: 70 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def is_materialized(self) -> bool:
5959
@property
6060
def only_latest(self) -> bool:
6161
"""Whether or not this model only cares about latest date to render."""
62-
return self.model_kind_name in (ModelKindName.VIEW, ModelKindName.FULL)
62+
return self.is_view or self.is_full
6363

6464

6565
class ModelKindName(str, ModelKindMixin, Enum):
@@ -81,18 +81,80 @@ def model_kind_name(self) -> ModelKindName:
8181
class ModelKind(PydanticModel, ModelKindMixin):
8282
name: ModelKindName
8383

84-
def to_expression(self, **kwargs: t.Any) -> d.ModelKind:
85-
return d.ModelKind(this=self.name.value.upper(), **kwargs)
84+
@classmethod
85+
def field_validator(cls) -> classmethod:
86+
def _model_kind_validator(v: t.Any) -> ModelKind:
87+
if isinstance(v, ModelKind):
88+
return v
89+
90+
if isinstance(v, d.ModelKind):
91+
name = v.this
92+
props = {prop.name: prop.args.get("value") for prop in v.expressions}
93+
klass: t.Type[ModelKind] = ModelKind
94+
if name == ModelKindName.INCREMENTAL_BY_TIME_RANGE:
95+
klass = IncrementalByTimeRangeKind
96+
elif name == ModelKindName.INCREMENTAL_BY_UNIQUE_KEY:
97+
klass = IncrementalByUniqueKeyKind
98+
elif name == ModelKindName.SEED:
99+
klass = SeedKind
100+
else:
101+
props["name"] = ModelKindName(name)
102+
return klass(**props)
103+
104+
if isinstance(v, dict):
105+
if v.get("name") == ModelKindName.INCREMENTAL_BY_TIME_RANGE:
106+
klass = IncrementalByTimeRangeKind
107+
elif v.get("name") == ModelKindName.INCREMENTAL_BY_UNIQUE_KEY:
108+
klass = IncrementalByUniqueKeyKind
109+
elif v.get("name") == ModelKindName.SEED:
110+
klass = SeedKind
111+
else:
112+
klass = ModelKind
113+
return klass(**v)
114+
115+
name = (v.name if isinstance(v, exp.Expression) else str(v)).upper()
116+
117+
try:
118+
return ModelKind(name=ModelKindName(name))
119+
except ValueError:
120+
raise ConfigError(f"Invalid model kind '{name}'")
121+
122+
return validator("kind", pre=True, allow_reuse=True)(_model_kind_validator)
86123

87124
@property
88125
def model_kind_name(self) -> ModelKindName:
89126
return self.name
90127

128+
def to_expression(self, **kwargs: t.Any) -> d.ModelKind:
129+
return d.ModelKind(this=self.name.value.upper(), **kwargs)
130+
91131

92132
class TimeColumn(PydanticModel):
93133
column: str
94134
format: t.Optional[str] = None
95135

136+
@classmethod
137+
def field_validator(cls) -> classmethod:
138+
def _time_column_validator(v: t.Any) -> TimeColumn:
139+
if isinstance(v, exp.Tuple):
140+
kwargs = {
141+
key: v.expressions[i].name
142+
for i, key in enumerate(("column", "format")[: len(v.expressions)])
143+
}
144+
return TimeColumn(**kwargs)
145+
146+
if isinstance(v, exp.Identifier):
147+
return TimeColumn(column=v.name)
148+
149+
if isinstance(v, exp.Expression):
150+
return TimeColumn(column=v.name)
151+
152+
if isinstance(v, str):
153+
return TimeColumn(column=v)
154+
return v
155+
156+
return validator("time_column", pre=True, allow_reuse=True)(_time_column_validator)
157+
96158
@validator("column", pre=True)
97159
def _column_validator(cls, v: str) -> str:
98160
if not v:
@@ -126,6 +188,9 @@ def to_expression(self, dialect: str) -> exp.Column | exp.Tuple:
126188
]
127189
)
128190

191+
def to_property(self, dialect: str = "") -> exp.Property:
192+
return exp.Property(this="time_column", value=self.to_expression(dialect))
193+
129194

130195
class _Incremental(ModelKind):
131196
batch_size: t.Optional[int]
@@ -156,31 +221,10 @@ class IncrementalByTimeRangeKind(_Incremental):
156221
name: ModelKindName = Field(ModelKindName.INCREMENTAL_BY_TIME_RANGE, const=True)
157222
time_column: TimeColumn
158223

159-
@validator("time_column", pre=True)
160-
def _parse_time_column(cls, v: t.Any) -> TimeColumn:
161-
if isinstance(v, exp.Tuple):
162-
kwargs = {
163-
key: v.expressions[i].name
164-
for i, key in enumerate(("column", "format")[: len(v.expressions)])
165-
}
166-
return TimeColumn(**kwargs)
167-
168-
if isinstance(v, exp.Identifier):
169-
return TimeColumn(column=v.name)
170-
171-
if isinstance(v, exp.Expression):
172-
return TimeColumn(column=v.name)
173-
174-
if isinstance(v, str):
175-
return TimeColumn(column=v)
176-
return v
224+
_time_column_validator = TimeColumn.field_validator()
177225

178226
def to_expression(self, dialect: str = "", **kwargs: t.Any) -> d.ModelKind:
179-
return super().to_expression(
180-
expressions=[
181-
exp.Property(this="time_column", value=self.time_column.to_expression(dialect))
182-
],
183-
)
227+
return super().to_expression(expressions=[self.time_column.to_property(dialect)])
184228

185229

186230
class IncrementalByUniqueKeyKind(_Incremental):
@@ -228,43 +272,3 @@ def to_expression(self, **kwargs: t.Any) -> d.ModelKind:
228272
),
229273
],
230274
)
231-
232-
233-
def _model_kind_validator(v: t.Any) -> ModelKind:
234-
if isinstance(v, ModelKind):
235-
return v
236-
237-
if isinstance(v, d.ModelKind):
238-
name = v.this
239-
props = {prop.name: prop.args.get("value") for prop in v.expressions}
240-
klass: t.Type[ModelKind] = ModelKind
241-
if name == ModelKindName.INCREMENTAL_BY_TIME_RANGE:
242-
klass = IncrementalByTimeRangeKind
243-
elif name == ModelKindName.INCREMENTAL_BY_UNIQUE_KEY:
244-
klass = IncrementalByUniqueKeyKind
245-
elif name == ModelKindName.SEED:
246-
klass = SeedKind
247-
else:
248-
props["name"] = ModelKindName(name)
249-
return klass(**props)
250-
251-
if isinstance(v, dict):
252-
if v.get("name") == ModelKindName.INCREMENTAL_BY_TIME_RANGE:
253-
klass = IncrementalByTimeRangeKind
254-
elif v.get("name") == ModelKindName.INCREMENTAL_BY_UNIQUE_KEY:
255-
klass = IncrementalByUniqueKeyKind
256-
elif v.get("name") == ModelKindName.SEED:
257-
klass = SeedKind
258-
else:
259-
klass = ModelKind
260-
return klass(**v)
261-
262-
name = (v.name if isinstance(v, exp.Expression) else str(v)).upper()
263-
264-
try:
265-
return ModelKind(name=ModelKindName(name))
266-
except ValueError:
267-
raise ConfigError(f"Invalid model kind '{name}'")
268-
269-
270-
model_kind_validator = validator("kind", pre=True, allow_reuse=True)(_model_kind_validator)

sqlmesh/core/model/meta.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@
1010

1111
from sqlmesh.core import dialect as d
1212
from sqlmesh.core.model.kind import (
13-
IncrementalByTimeRangeKind,
1413
IncrementalByUniqueKeyKind,
1514
ModelKind,
1615
ModelKindName,
1716
TimeColumn,
1817
_Incremental,
19-
model_kind_validator,
2018
)
2119
from sqlmesh.utils import unique
2220
from sqlmesh.utils.date import TimeLike, preserve_time_like_kind, to_datetime
@@ -64,7 +62,7 @@ class ModelMeta(PydanticModel):
6462
_croniter: t.Optional[croniter] = None
6563
_interval_unit: t.Optional[IntervalUnit] = None
6664

67-
_model_kind_validator = model_kind_validator
65+
_model_kind_validator = ModelKind.field_validator()
6866

6967
@validator("audits", pre=True)
7068
def _audits_validator(cls, v: t.Any) -> t.Any:
@@ -174,9 +172,8 @@ def _kind_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
174172

175173
@property
176174
def time_column(self) -> t.Optional[TimeColumn]:
177-
if isinstance(self.kind, IncrementalByTimeRangeKind):
178-
return self.kind.time_column
179-
return None
175+
"""The time column for incremental models."""
176+
return getattr(self.kind, "time_column", None)
180177

181178
@property
182179
def unique_key(self) -> t.List[str]:
@@ -215,7 +212,7 @@ def lookback_delta(self) -> timedelta:
215212
@property
216213
def batch_size(self) -> t.Optional[int]:
217214
"""The maximal number of units in a single task for a backfill."""
218-
return self.kind.batch_size if isinstance(self.kind, _Incremental) else None
215+
return getattr(self.kind, "batch_size", None)
219216

220217
def interval_unit(self, sample_size: int = 10) -> IntervalUnit:
221218
"""Returns the IntervalUnit of the model

sqlmesh/core/scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,11 @@ def _dag(self, batches: SnapshotToBatches) -> DAG[SchedulingUnit]:
272272
]
273273
for i, interval in enumerate(intervals):
274274
dag.add((snapshot, interval), upstream_dependencies)
275-
if snapshot.is_incremental_by_unique_key:
275+
if snapshot.depends_on_past:
276276
dag.add(
277277
(snapshot, interval),
278278
[(snapshot, _interval) for _interval in intervals[:i]],
279279
)
280-
281280
return dag
282281

283282

sqlmesh/core/snapshot/definition.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,11 @@ def start(self) -> t.Optional[datetime]:
803803
def model_kind_name(self) -> ModelKindName:
804804
return self.model.kind.name
805805

806+
@property
807+
def depends_on_past(self) -> bool:
808+
"""Whether or not this models depends on past intervals to be accurate before loading following intervals."""
809+
return self.model.depends_on_past
810+
806811
def _ensure_categorized(self) -> None:
807812
if not self.change_category:
808813
raise SQLMeshError(f"Snapshot {self.snapshot_id} has not been categorized yet.")

sqlmesh/core/snapshot/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
117117
logger.info("Inserting batch (%s, %s) into %s'", start, end, table_name)
118118
if snapshot.is_incremental_by_time_range:
119119
# A model's time_column could be None but
120-
# it shouldn't be for an incremental by time range model
120+
# it shouldn't be for time based loads
121121
assert model.time_column
122122
self.adapter.insert_overwrite_by_time_partition(
123123
table_name,
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Seed metadata hashes now correctly include the batch_size."""
2+
3+
4+
def migrate(state_sync): # type: ignore
5+
pass

0 commit comments

Comments
 (0)