Skip to content

Commit e88df0f

Browse files
authored
breaking: validate only correct model kinds support batch_size. (#665)
fix: full refresh should respect all intervals to get upstream backfills
1 parent 3bbe955 commit e88df0f

9 files changed

Lines changed: 56 additions & 79 deletions

File tree

examples/wursthall/models/db/item_d.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ MODEL (
55
cron '@daily',
66
owner jen,
77
start '2022-06-01 00:00:00+00:00',
8-
batch_size 200
98
);
109

1110
SELECT
1211
id AS item_id,
1312
item_name AS item_name,
1413
item_group AS item_group,
1514
item_price AS item_price
16-
FROM src.menu_item_details
15+
FROM src.menu_item_details

sqlmesh/core/model/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@
1919
TimeColumn,
2020
)
2121
from sqlmesh.core.model.meta import ModelMeta
22+
from sqlmesh.core.model.seed import Seed

sqlmesh/core/model/kind.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ def is_seed(self) -> bool:
6161
def is_materialized(self) -> bool:
6262
return self.name not in (ModelKindName.VIEW, ModelKindName.EMBEDDED)
6363

64+
@property
65+
def supports_batch_size(self) -> bool:
66+
"""Whether or not this model supports the batch_size property."""
67+
return self.name in (
68+
ModelKindName.INCREMENTAL_BY_TIME_RANGE,
69+
ModelKindName.INCREMENTAL_BY_UNIQUE_KEY,
70+
)
71+
6472
@property
6573
def only_latest(self) -> bool:
6674
"""Whether or not this model only cares about latest date to render."""

sqlmesh/core/model/meta.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,12 @@ def _int_validator(cls, v: t.Any) -> t.Optional[int]:
234234
@root_validator
235235
def _kind_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
236236
kind = values.get("kind")
237-
if kind and not kind.is_materialized:
238-
if values.get("partitioned_by_"):
239-
raise ValueError(f"partitioned_by field cannot be set for {kind} models")
237+
if kind:
238+
if not kind.is_materialized:
239+
if values.get("partitioned_by_"):
240+
raise ValueError(f"partitioned_by field cannot be set for {kind} models")
241+
if values.get("batch_size") and not kind.supports_batch_size:
242+
raise ValueError(f"batch_size field cannot be set for {kind} models")
240243
return values
241244

242245
@property

sqlmesh/core/snapshot/definition.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -484,19 +484,6 @@ def missing_intervals(
484484
if self.is_embedded_kind:
485485
return []
486486

487-
if self.is_full_kind or self.is_view_kind or self.is_seed_kind:
488-
latest = latest or now()
489-
490-
latest_start, latest_end = self._inclusive_exclusive(
491-
latest if is_date(latest) else self.model.cron_prev(self.model.cron_floor(latest)),
492-
latest,
493-
)
494-
# if the latest ts is stored in the last interval, nothing is missing
495-
# else returns the latest ts with the exclusive end ts.
496-
if self.intervals and self.intervals[-1][1] >= latest_end:
497-
return []
498-
return [(latest_start, latest_end)]
499-
500487
missing = []
501488
start_dt, end_dt = (to_datetime(ts) for ts in self._inclusive_exclusive(start, end))
502489
dates = tuple(croniter_range(start_dt, end_dt, self.model.normalized_cron()))

tests/core/test_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ def change_model_kind(context: Context, kind: ModelKindName):
224224
if kind in (ModelKindName.VIEW, ModelKindName.EMBEDDED, ModelKindName.FULL):
225225
context.upsert_model(
226226
"sushi.items",
227+
batch_size=None,
227228
partitioned_by=[],
228229
audits=[],
229230
)

tests/core/test_model.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,3 +966,25 @@ def test_star_expansion(assert_exp_eq) -> None:
966966
) AS model2
967967
""",
968968
)
969+
970+
971+
def test_batch_size_validation():
972+
expressions = parse(
973+
"""
974+
MODEL (
975+
name db.seed,
976+
kind SEED (
977+
path '../seeds/waiter_names.csv',
978+
batch_size 100,
979+
),
980+
columns (
981+
id double,
982+
alias varchar
983+
),
984+
batch_size 100,
985+
);
986+
"""
987+
)
988+
989+
with pytest.raises(ConfigError) as ex:
990+
load_model(expressions, path=Path("./examples/sushi/models/test_model.sql"))

tests/core/test_snapshot.py

Lines changed: 17 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from sqlmesh.core.model import (
1111
IncrementalByTimeRangeKind,
1212
Model,
13-
ModelKind,
14-
ModelKindName,
13+
Seed,
1514
SeedKind,
15+
SeedModel,
1616
SqlModel,
1717
create_seed_model,
1818
load_model,
@@ -53,20 +53,6 @@ def model():
5353
)
5454

5555

56-
@pytest.fixture
57-
def full_refresh_model():
58-
return SqlModel(
59-
name="fr_model",
60-
kind=ModelKind(name=ModelKindName.FULL),
61-
owner="owner",
62-
dialect="spark",
63-
cron="1 0 * * *",
64-
batch_size=30,
65-
start="2020-01-01",
66-
query=parse_one("SELECT @EACH([1, 2], x -> x), ds FROM parent.tbl"),
67-
)
68-
69-
7056
@pytest.fixture
7157
def snapshot(
7258
model: Model,
@@ -86,25 +72,6 @@ def snapshot(
8672
return snapshot
8773

8874

89-
@pytest.fixture
90-
def full_refresh_snapshot(
91-
full_refresh_model: Model,
92-
parent_model: Model,
93-
monkeypatch: MonkeyPatch,
94-
mocker: MockerFixture,
95-
make_snapshot,
96-
):
97-
mock = mocker.Mock()
98-
mock.return_value = to_datetime("2022-09-23T00:12:53+00:00")
99-
monkeypatch.setattr("sqlmesh.utils.date.now", mock)
100-
full_refresh_snapshot = make_snapshot(
101-
full_refresh_model,
102-
models={parent_model.name: parent_model, full_refresh_model.name: full_refresh_model},
103-
)
104-
full_refresh_snapshot.version = full_refresh_snapshot.fingerprint
105-
return full_refresh_snapshot
106-
107-
10875
def test_json(snapshot: Snapshot):
10976
assert json.loads(snapshot.json()) == {
11077
"created_ts": 1663891973000,
@@ -242,31 +209,22 @@ def test_missing_intervals(snapshot: Snapshot):
242209
]
243210

244211

245-
def test_missing_interval_latest(
246-
full_refresh_snapshot: Snapshot, monkeypatch: MonkeyPatch, mocker: MockerFixture
247-
):
248-
mock = mocker.Mock()
249-
mock.return_value = to_datetime("2020-01-05T00:12:53+00:00")
250-
monkeypatch.setattr("sqlmesh.core.snapshot.definition.now", mock)
251-
full_refresh_snapshot.add_interval("2020-01-01", "2020-01-01")
252-
assert full_refresh_snapshot.missing_intervals("2020-01-01", "2020-01-01", "2020-01-01") == []
253-
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-02", "2020-01-02") == [
254-
(to_timestamp("2020-01-02"), to_timestamp("2020-01-03"))
255-
]
256-
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-03", "2020-01-03") == [
257-
(to_timestamp("2020-01-03"), to_timestamp("2020-01-04"))
258-
]
259-
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-03", "2020-01-04") == [
260-
(to_timestamp("2020-01-04"), to_timestamp("2020-01-05"))
212+
def test_seed_intervals(make_snapshot):
213+
snapshot_a = make_snapshot(
214+
SeedModel(
215+
name="a",
216+
kind=SeedKind(path="./path/to/seed"),
217+
seed=Seed(content="content"),
218+
depends_on=set(),
219+
)
220+
)
221+
222+
assert snapshot_a.missing_intervals("2020-01-01", "2020-01-01") == [
223+
(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))
261224
]
262-
assert full_refresh_snapshot.missing_intervals(
263-
"2020-01-02", "2020-01-03", "2020-01-03 01:00:00"
264-
) == [(to_timestamp("2020-01-02"), to_timestamp("2020-01-03"))]
265-
assert full_refresh_snapshot.missing_intervals(
266-
"2020-01-02", "2020-01-03", "2020-01-04 23:59:59"
267-
) == [(to_timestamp("2020-01-03"), to_timestamp("2020-01-04"))]
268-
assert full_refresh_snapshot.missing_intervals("2020-01-02", "2020-01-03") == [
269-
(to_timestamp("2020-01-04"), to_timestamp("2020-01-05"))
225+
snapshot_a.add_interval("2020-01-01", "2020-01-01")
226+
assert snapshot_a.missing_intervals("2020-01-02", "2020-01-02") == [
227+
(to_timestamp("2020-01-02"), to_timestamp("2020-01-03"))
270228
]
271229

272230

tests/dbt/test_config.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ def test_to_sqlmesh_fields(sushi_test_project: Project):
112112
cron="@hourly",
113113
meta={"stamp": "bar", "dialect": "duckdb"},
114114
owner="Sally",
115-
batch_size=2,
116115
)
117116
context = DbtContext(project_name="Foo")
118117
context.target = DuckDbConfig(schema="foo")
@@ -128,7 +127,6 @@ def test_to_sqlmesh_fields(sushi_test_project: Project):
128127
assert model.stamp == "bar"
129128
assert model.dialect == "duckdb"
130129
assert model.owner == "Sally"
131-
assert model.batch_size == 2
132130

133131

134132
def test_model_config_sql_no_config():

0 commit comments

Comments
 (0)