Skip to content

Commit 7d4bd27

Browse files
authored
fix: Missing Intervals less than Interval Unit (#1584)
* fix: missing intervals less than interval unit * fix: missing intervals less than interval unit
1 parent 9d74e7a commit 7d4bd27

3 files changed

Lines changed: 87 additions & 1 deletion

File tree

sqlmesh/core/node.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ def cron_floor(self, value: TimeLike) -> TimeLike:
135135
def seconds(self) -> int:
136136
return INTERVAL_SECONDS[self]
137137

138+
@property
139+
def milliseconds(self) -> int:
140+
return self.seconds * 1000
141+
138142

139143
INTERVAL_SECONDS = {
140144
IntervalUnit.YEAR: 60 * 60 * 24 * 365,

sqlmesh/core/snapshot/definition.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
to_datetime,
2828
to_ds,
2929
to_timestamp,
30+
validate_date_range,
3031
yesterday,
3132
)
3233
from sqlmesh.utils.errors import SQLMeshError
@@ -632,7 +633,7 @@ def inclusive_exclusive(
632633
"""
633634
interval_unit = self.node.interval_unit
634635
start_ts = to_timestamp(interval_unit.cron_floor(start))
635-
if start_ts < to_timestamp(start):
636+
if start_ts < to_timestamp(start) and not self.model.allow_partials:
636637
start_ts = to_timestamp(interval_unit.cron_next(start_ts))
637638

638639
if is_date(end):
@@ -691,6 +692,15 @@ def missing_intervals(
691692
Returns:
692693
A list of all the missing intervals as epoch timestamps.
693694
"""
695+
# If the amount of time being checked is less than the size of a single interval then we
696+
# know that there can't being missing intervals within that range and return
697+
validate_date_range(start, end)
698+
if (
699+
not is_date(end)
700+
and not (self.is_model and self.model.allow_partials)
701+
and to_timestamp(end) - to_timestamp(start) < self.node.interval_unit.milliseconds
702+
):
703+
return []
694704
intervals = self.dev_intervals if is_dev and self.is_paused_forward_only else self.intervals
695705

696706
if self.is_symbolic or (self.is_seed and intervals):

tests/core/test_snapshot.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,78 @@ def test_seed_intervals(make_snapshot):
384384
assert snapshot_a.missing_intervals("2020-01-02", "2020-01-02") == []
385385

386386

387+
def test_missing_interval_smaller_than_interval_unit(make_snapshot):
388+
snapshot_daily = make_snapshot(
389+
SqlModel(
390+
name="name",
391+
kind=IncrementalByTimeRangeKind(time_column="ds", batch_size=30),
392+
owner="owner",
393+
dialect="spark",
394+
cron="@daily",
395+
start="2020-01-01",
396+
query=parse_one("SELECT @EACH([1, 2], x -> x), ds FROM parent.tbl"),
397+
)
398+
)
399+
400+
assert snapshot_daily.missing_intervals("2020-01-01 00:00:05", "2020-01-01 23:59:59") == []
401+
assert snapshot_daily.missing_intervals("2020-01-01 00:00:00", "2020-01-02 00:00:00") == [
402+
(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))
403+
]
404+
405+
snapshot_hourly = make_snapshot(
406+
SqlModel(
407+
name="name",
408+
kind=IncrementalByTimeRangeKind(time_column="ds", batch_size=30),
409+
owner="owner",
410+
dialect="spark",
411+
cron="@hourly",
412+
start="2020-01-01",
413+
query=parse_one("SELECT @EACH([1, 2], x -> x), ds FROM parent.tbl"),
414+
)
415+
)
416+
417+
assert snapshot_hourly.missing_intervals("2020-01-01 00:00:00", "2020-01-01 00:59:00") == []
418+
assert snapshot_hourly.missing_intervals("2020-01-01 00:00:00", "2020-01-01 01:00:00") == [
419+
(to_timestamp("2020-01-01"), to_timestamp("2020-01-01 01:00:00"))
420+
]
421+
422+
snapshot_end_categorical = make_snapshot(
423+
SqlModel(
424+
name="name",
425+
kind=IncrementalByTimeRangeKind(time_column="ds", batch_size=30),
426+
owner="owner",
427+
dialect="spark",
428+
cron="@daily",
429+
start="2020-01-01",
430+
query=parse_one("SELECT @EACH([1, 2], x -> x), ds FROM parent.tbl"),
431+
)
432+
)
433+
434+
assert snapshot_end_categorical.missing_intervals("2020-01-01 00:00:00", "2020-01-01") == [
435+
(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))
436+
]
437+
438+
snapshot_partial = make_snapshot(
439+
SqlModel(
440+
name="name",
441+
kind=IncrementalByTimeRangeKind(time_column="ds", batch_size=30),
442+
owner="owner",
443+
dialect="spark",
444+
cron="@daily",
445+
start="2020-01-01",
446+
query=parse_one("SELECT @EACH([1, 2], x -> x), ds FROM parent.tbl"),
447+
allow_partials=True,
448+
)
449+
)
450+
451+
assert snapshot_partial.missing_intervals("2020-01-01 00:00:05", "2020-01-01 23:59:59") == [
452+
(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))
453+
]
454+
assert snapshot_partial.missing_intervals("2020-01-01 00:00:00", "2020-01-02 00:00:00") == [
455+
(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))
456+
]
457+
458+
387459
def test_remove_intervals(snapshot: Snapshot):
388460
snapshot.add_interval("2020-01-01", "2020-01-01")
389461
snapshot.remove_interval(snapshot.get_removal_interval("2020-01-01", "2020-01-01"))

0 commit comments

Comments
 (0)