Skip to content

Commit 6ed56a5

Browse files
authored
Feat!: add more interval units and make the smallest 5 minutes (#1471)
additional performance boosts
1 parent 905192d commit 6ed56a5

11 files changed

Lines changed: 146 additions & 54 deletions

File tree

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def _create_table_properties(
429429
if (
430430
isinstance(this, exp.Column)
431431
and partition_interval_unit is not None
432-
and partition_interval_unit != IntervalUnit.MINUTE
432+
and not partition_interval_unit.is_minute
433433
):
434434
column_type: t.Optional[exp.DataType] = (columns_to_types or {}).get(this.name)
435435

sqlmesh/core/node.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
from sqlmesh.core.snapshot import Node
2222

2323

24+
DEFAULT_SAMPLE_SIZE = 5
25+
26+
2427
class IntervalUnit(str, Enum):
2528
"""IntervalUnit is the inferred granularity of an incremental node.
2629
@@ -33,17 +36,19 @@ class IntervalUnit(str, Enum):
3336
MONTH = "month"
3437
DAY = "day"
3538
HOUR = "hour"
36-
MINUTE = "minute"
39+
HALF_HOUR = "half_hour"
40+
QUARTER_HOUR = "quarter_hour"
41+
FIVE_MINUTE = "five_minute"
3742

3843
@classmethod
39-
def from_cron(klass, cron: str, sample_size: int = 10) -> IntervalUnit:
44+
def from_cron(klass, cron: str, sample_size: int = DEFAULT_SAMPLE_SIZE) -> IntervalUnit:
4045
croniter = CroniterCache(cron)
4146
samples = [croniter.get_next() for _ in range(sample_size)]
4247
min_interval = min(b - a for a, b in zip(samples, samples[1:]))
4348
for unit, seconds in sorted(INTERVAL_SECONDS.items(), key=lambda x: x[1], reverse=True):
44-
if seconds <= min_interval:
49+
if seconds <= min_interval.total_seconds():
4550
return unit
46-
raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 1 minute or more.")
51+
raise ConfigError(f"Invalid cron '{cron}': must have a cadence of 5 minutes or more.")
4752

4853
@property
4954
def is_date_granularity(self) -> bool:
@@ -67,12 +72,16 @@ def is_hour(self) -> bool:
6772

6873
@property
6974
def is_minute(self) -> bool:
70-
return self == IntervalUnit.MINUTE
75+
return self in (IntervalUnit.FIVE_MINUTE, IntervalUnit.QUARTER_HOUR, IntervalUnit.HALF_HOUR)
7176

7277
@property
7378
def _cron_expr(self) -> str:
74-
if self == IntervalUnit.MINUTE:
75-
return "* * * * *"
79+
if self == IntervalUnit.FIVE_MINUTE:
80+
return "*/5 * * * *"
81+
if self == IntervalUnit.QUARTER_HOUR:
82+
return "*/15 * * * *"
83+
if self == IntervalUnit.HALF_HOUR:
84+
return "*/30 * * * *"
7685
if self == IntervalUnit.HOUR:
7786
return "0 * * * *"
7887
if self == IntervalUnit.DAY:
@@ -132,7 +141,9 @@ def seconds(self) -> int:
132141
IntervalUnit.MONTH: 60 * 60 * 24 * 28,
133142
IntervalUnit.DAY: 60 * 60 * 24,
134143
IntervalUnit.HOUR: 60 * 60,
135-
IntervalUnit.MINUTE: 60,
144+
IntervalUnit.HALF_HOUR: 60 * 30,
145+
IntervalUnit.QUARTER_HOUR: 60 * 15,
146+
IntervalUnit.FIVE_MINUTE: 60 * 5,
136147
}
137148

138149

@@ -265,7 +276,7 @@ def croniter(self, value: TimeLike) -> CroniterCache:
265276
if self._croniter is None:
266277
self._croniter = CroniterCache(self.cron, value)
267278
else:
268-
self._croniter.curr = value
279+
self._croniter.curr = to_datetime(value)
269280
return self._croniter
270281

271282
def cron_next(self, value: TimeLike) -> TimeLike:
@@ -315,7 +326,7 @@ def text_diff(self, other: Node) -> str:
315326
"""
316327
raise NotImplementedError
317328

318-
def _inferred_interval_unit(self, sample_size: int = 10) -> IntervalUnit:
329+
def _inferred_interval_unit(self, sample_size: int = DEFAULT_SAMPLE_SIZE) -> IntervalUnit:
319330
"""Infers the interval unit from the cron expression.
320331
321332
The interval unit is used to determine the lag applied to start_date and end_date for node rendering and intervals.

sqlmesh/core/plan/definition.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
import typing as t
66
from collections import defaultdict
7+
from dataclasses import dataclass
78
from datetime import datetime
89
from enum import Enum
910

@@ -39,7 +40,6 @@
3940
yesterday_ds,
4041
)
4142
from sqlmesh.utils.errors import NoChangesPlanError, PlanError, SQLMeshError
42-
from sqlmesh.utils.pydantic import PydanticModel
4343

4444
logger = logging.getLogger(__name__)
4545

@@ -783,7 +783,9 @@ def is_finished(self) -> bool:
783783
return self == PlanStatus.FINISHED
784784

785785

786-
class SnapshotIntervals(PydanticModel, frozen=True):
786+
# millions of these can be created, pydantic has significant overhead
787+
@dataclass
788+
class SnapshotIntervals:
787789
snapshot_name: str
788790
intervals: Intervals
789791

@@ -795,11 +797,12 @@ def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str:
795797
return format_intervals(self.merged_intervals, unit)
796798

797799

800+
@dataclass
798801
class LoadedSnapshotIntervals(SnapshotIntervals):
802+
change_category: SnapshotChangeCategory
799803
interval_unit: t.Optional[IntervalUnit]
800804
node_name: str
801805
view_name: t.Optional[str] = None
802-
change_category: SnapshotChangeCategory
803806

804807
@classmethod
805808
def from_snapshot(cls, snapshot: Snapshot) -> LoadedSnapshotIntervals:

sqlmesh/core/snapshot/definition.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1217,7 +1217,7 @@ def missing_intervals(
12171217
snapshot_start_date = start_dt
12181218
snapshot_end_date = end_date
12191219
if interval:
1220-
snapshot_start_date, snapshot_end_date = interval
1220+
snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in interval)
12211221
snapshot = snapshot.copy()
12221222
snapshot.intervals = snapshot.intervals.copy()
12231223
snapshot.remove_interval(interval, execution_time)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""Change any interval unit of minute to five_minute."""
2+
import json
3+
4+
import pandas as pd
5+
from sqlglot import exp
6+
7+
from sqlmesh.utils.migration import index_text_type
8+
9+
10+
def migrate(state_sync): # type: ignore
11+
engine_adapter = state_sync.engine_adapter
12+
schema = state_sync.schema
13+
snapshots_table = "_snapshots"
14+
if schema:
15+
snapshots_table = f"{schema}.{snapshots_table}"
16+
17+
new_snapshots = []
18+
19+
for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
20+
exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
21+
quote_identifiers=True,
22+
):
23+
parsed_snapshot = json.loads(snapshot)
24+
25+
node = parsed_snapshot["node"]
26+
27+
if node.get("interval_unit") == "minute":
28+
node["interval_unit"] = "five_minute"
29+
30+
new_snapshots.append(
31+
{
32+
"name": name,
33+
"identifier": identifier,
34+
"version": version,
35+
"snapshot": json.dumps(parsed_snapshot),
36+
"kind_name": kind_name,
37+
}
38+
)
39+
40+
if new_snapshots:
41+
engine_adapter.delete_from(snapshots_table, "TRUE")
42+
43+
text_type = index_text_type(engine_adapter.dialect)
44+
45+
engine_adapter.insert_append(
46+
snapshots_table,
47+
pd.DataFrame(new_snapshots),
48+
columns_to_types={
49+
"name": exp.DataType.build(text_type),
50+
"identifier": exp.DataType.build(text_type),
51+
"version": exp.DataType.build(text_type),
52+
"snapshot": exp.DataType.build("text"),
53+
"kind_name": exp.DataType.build(text_type),
54+
},
55+
contains_json=True,
56+
)

sqlmesh/utils/cron.py

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,20 @@
11
from __future__ import annotations
22

33
import typing as t
4-
from datetime import timedelta
4+
from datetime import datetime, timedelta
55

66
from croniter import croniter
77
from sqlglot.helper import first
88

9-
from sqlmesh.utils import ttl_cache
109
from sqlmesh.utils.date import TimeLike, now, to_datetime
1110

1211

13-
@ttl_cache()
14-
def cron_next(cron: str, time: TimeLike) -> float:
15-
return croniter(cron, to_datetime(time)).get_next()
16-
17-
18-
@ttl_cache()
19-
def cron_prev(cron: str, time: TimeLike) -> float:
20-
return croniter(cron, to_datetime(time)).get_prev()
21-
22-
2312
class CroniterCache:
24-
ESTIMATE_SAMPLES_NUM = 10
13+
ESTIMATE_SAMPLES_NUM = 5
2514

2615
def __init__(self, cron: str, time: t.Optional[TimeLike] = None):
2716
self.cron = cron
28-
self.curr: TimeLike = now() if time is None else time
17+
self.curr: datetime = to_datetime(now() if time is None else time)
2918
self._interval_seconds: t.Optional[int] = None
3019

3120
@property
@@ -39,30 +28,30 @@ def interval_seconds(self) -> int:
3928
"""
4029
if self._interval_seconds is None:
4130
seconds = set()
42-
curr = to_datetime(self.curr)
31+
curr = self.curr
4332

4433
for _ in range(self.ESTIMATE_SAMPLES_NUM):
4534
prev = curr
46-
curr = to_datetime(cron_next(self.cron, curr))
35+
curr = to_datetime(croniter(self.cron, curr).get_next())
4736
seconds.add(curr - prev)
4837

4938
if len(seconds) == 1:
50-
self._interval_seconds = first(seconds).seconds
39+
self._interval_seconds = int(first(seconds).total_seconds())
5140
else:
5241
self._interval_seconds = 0
5342

5443
return self._interval_seconds
5544

56-
def get_next(self, estimate: bool = False) -> float:
45+
def get_next(self, estimate: bool = False) -> datetime:
5746
if estimate and self.interval_seconds:
58-
self.curr = to_datetime(self.curr) + timedelta(seconds=self.interval_seconds)
47+
self.curr = self.curr + timedelta(seconds=self.interval_seconds)
5948
else:
60-
self.curr = cron_next(self.cron, self.curr)
61-
return t.cast(float, self.curr)
49+
self.curr = to_datetime(croniter(self.cron, self.curr).get_next())
50+
return self.curr
6251

63-
def get_prev(self, estimate: bool = False) -> float:
52+
def get_prev(self, estimate: bool = False) -> datetime:
6453
if estimate and self.interval_seconds:
65-
self.curr = to_datetime(self.curr) - timedelta(seconds=self.interval_seconds)
54+
self.curr = self.curr - timedelta(seconds=self.interval_seconds)
6655
else:
67-
self.curr = cron_prev(self.cron, self.curr)
68-
return t.cast(float, self.curr)
56+
self.curr = to_datetime(croniter(self.cron, self.curr).get_prev())
57+
return self.curr

sqlmesh/utils/date.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
import dateparser
1414
from sqlglot import exp
1515

16-
from sqlmesh.utils import ttl_cache
17-
1816
UTC = timezone.utc
1917
TimeLike = t.Union[date, datetime, str, int, float]
2018
MILLIS_THRESHOLD = time.time() + 100 * 365 * 24 * 3600
@@ -109,7 +107,6 @@ def to_timestamp(value: TimeLike, relative_base: t.Optional[datetime] = None) ->
109107
return int(to_datetime(value, relative_base=relative_base).timestamp() * 1000)
110108

111109

112-
@ttl_cache()
113110
def to_datetime(value: TimeLike, relative_base: t.Optional[datetime] = None) -> datetime:
114111
"""Converts a value into a UTC datetime object.
115112
@@ -150,7 +147,7 @@ def to_datetime(value: TimeLike, relative_base: t.Optional[datetime] = None) ->
150147
raise ValueError(f"Could not convert `{value}` to datetime.")
151148

152149
if dt.tzinfo:
153-
return dt.astimezone(UTC)
150+
return dt if dt.tzinfo == UTC else dt.astimezone(UTC)
154151
return dt.replace(tzinfo=UTC)
155152

156153

tests/core/engine_adapter/test_bigquery.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ def test_create_table_date_partition(
305305
@pytest.mark.parametrize(
306306
"partition_by_cols, partition_column_type, partition_interval_unit, partition_by_statement",
307307
[
308-
([exp.to_column("ds")], "date", IntervalUnit.MINUTE, "`ds`"),
308+
([exp.to_column("ds")], "date", IntervalUnit.FIVE_MINUTE, "`ds`"),
309309
([exp.to_column("ds")], "date", IntervalUnit.HOUR, "`ds`"),
310310
([exp.to_column("ds")], "date", IntervalUnit.DAY, "`ds`"),
311311
([exp.to_column("ds")], "date", IntervalUnit.MONTH, "DATE_TRUNC(`ds`, MONTH)"),
@@ -333,7 +333,7 @@ def test_create_table_date_partition(
333333
(
334334
[d.parse_one("TIMESTAMP_TRUNC(ds, DAY)", dialect="bigquery")],
335335
"timestamp",
336-
IntervalUnit.MINUTE,
336+
IntervalUnit.FIVE_MINUTE,
337337
"TIMESTAMP_TRUNC(`ds`, DAY)",
338338
),
339339
],

tests/core/test_model.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,9 +1798,9 @@ def test_incremental_unmanaged_validation():
17981798
def test_custom_interval_unit():
17991799
assert (
18001800
load_sql_based_model(
1801-
d.parse("MODEL (name db.table, interval_unit MINUTE); SELECT a FROM tbl;")
1801+
d.parse("MODEL (name db.table, interval_unit FIVE_MINUTE); SELECT a FROM tbl;")
18021802
).interval_unit
1803-
== IntervalUnit.MINUTE
1803+
== IntervalUnit.FIVE_MINUTE
18041804
)
18051805

18061806
assert (
@@ -1834,10 +1834,10 @@ def test_custom_interval_unit():
18341834
assert (
18351835
load_sql_based_model(
18361836
d.parse(
1837-
"MODEL (name db.table, cron '0 5 * * *', interval_unit 'minute'); SELECT a FROM tbl;"
1837+
"MODEL (name db.table, cron '0 5 * * *', interval_unit 'quarter_hour'); SELECT a FROM tbl;"
18381838
)
18391839
).interval_unit
1840-
== IntervalUnit.MINUTE
1840+
== IntervalUnit.QUARTER_HOUR
18411841
)
18421842

18431843
with pytest.raises(
@@ -2134,7 +2134,7 @@ def test_scd_type_2_defaults():
21342134
unique_key id,
21352135
),
21362136
);
2137-
SELECT
2137+
SELECT
21382138
1 as id,
21392139
'2020-01-01' as ds,
21402140
'2020-01-01' as test_updated_at,
@@ -2172,7 +2172,7 @@ def test_scd_type_2_overrides():
21722172
disable_restatement False,
21732173
),
21742174
);
2175-
SELECT
2175+
SELECT
21762176
1 as id,
21772177
'2020-01-01' as ds,
21782178
'2020-01-01' as test_updated_at,

0 commit comments

Comments
 (0)