Skip to content

Commit 3d210a6

Browse files
authored
feat: support TRIGGER ON UPDATE and SCHEDULE EVERY for MV and ST (#1434)
## Summary Adds the full Databricks refresh-schedule grammar on materialized views and streaming tables: \`SCHEDULE CRON\`, \`SCHEDULE EVERY\`, and \`TRIGGER ON UPDATE [AT MOST EVERY INTERVAL ...]\`. Closes #1293. User-facing config: ```yaml config: schedule: cron: '0 0 * * * ? *' time_zone_value: 'America/Los_Angeles' # OR every: '2 HOURS' # OR on_update: true at_most_every: '15 MINUTES' ``` ## Test plan - [x] Unit tests for parser, validator, diff, and refresh-schedule macros (873 unit tests passing). - [x] Functional round-trip tests per mode for both MV and ST against live UC SQL endpoint. - [x] Functional lifecycle tests walking each relation through MANUAL → CRON → ON_UPDATE → EVERY → (non-refresh component change) → MANUAL. - [x] \`pre-commit run --all-files\` clean (ruff, ruff-format, mypy).
1 parent 578375a commit 3d210a6

17 files changed

Lines changed: 1252 additions & 81 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
- Add support for row filters ([#1294](https://github.com/databricks/dbt-databricks/pull/1294))
77
- Add support for Python UDFs ([#1336](https://github.com/databricks/dbt-databricks/pull/1336))
88
- Add support for key-only `databricks_tags` for table and column tagging. This can now be configured by setting tag values to empty strings `""` or `None`. ([#1339](https://github.com/databricks/dbt-databricks/pull/1339))
9+
- Support `SCHEDULE EVERY` and `TRIGGER ON UPDATE` refresh modes for materialized views and streaming tables, with parser and diff coverage so relations whose actual refresh is not CRON no longer crash on subsequent runs ([#1293](https://github.com/databricks/dbt-databricks/issues/1293))
10+
11+
### Fixes
12+
13+
- Fix `RefreshConfig.__eq__` self/other typo where two configs with the same `cron` but different `time_zone_value` compared equal
14+
- Fix streaming-table DROP-SCHEDULE path that was silently filtered out of the changeset
915

1016
### Fixes
1117

Lines changed: 191 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,183 @@
11
import re
2+
from enum import Enum
23
from typing import Any, ClassVar, Optional
34

45
from dbt.adapters.contracts.relation import RelationConfig
56
from dbt.adapters.relation_configs.config_base import RelationResults
67
from dbt_common.exceptions import DbtRuntimeError
8+
from pydantic import model_validator
79

810
from dbt.adapters.databricks.relation_configs import base
911
from dbt.adapters.databricks.relation_configs.base import (
1012
DatabricksComponentConfig,
1113
DatabricksComponentProcessor,
1214
)
1315

14-
SCHEDULE_REGEX = re.compile(r"CRON '(.*)' AT TIME ZONE '(.*)'")
16+
17+
class RefreshMode(str, Enum):
18+
MANUAL = "manual"
19+
CRON = "cron"
20+
EVERY = "every"
21+
ON_UPDATE = "on_update"
22+
23+
24+
CRON_REGEX = re.compile(r"^CRON '(.*)' AT TIME ZONE '(.*)'$")
25+
EVERY_REGEX = re.compile(r"^EVERY (\d+) (HOURS?|DAYS?|WEEKS?)$", re.IGNORECASE)
26+
TRIGGER_REGEX = re.compile(
27+
r"^TRIGGER ON UPDATE(?: AT MOST EVERY INTERVAL (\d+) SECONDS?)?$",
28+
re.IGNORECASE,
29+
)
30+
31+
_QUANTITY_RE = re.compile(r"^\s*(\d+)\s+([A-Z]+)\s*$", re.IGNORECASE)
32+
_SECONDS_PER_UNIT = {
33+
"SECOND": 1,
34+
"MINUTE": 60,
35+
"HOUR": 3600,
36+
"DAY": 86400,
37+
"WEEK": 604800,
38+
}
39+
_EVERY_UNITS = {"HOUR", "DAY", "WEEK"}
40+
# Databricks treats an absent time zone as UTC and emits 'Etc/UTC' back in DESCRIBE EXTENDED;
41+
# these all canonicalize to the same UTC for equality.
42+
_UTC_ALIASES = {"UTC", "ETC/UTC"}
43+
44+
45+
def _canonical_tz(tz: Optional[str]) -> str:
46+
s = (tz or "UTC").upper()
47+
return "UTC" if s in _UTC_ALIASES else s
48+
49+
50+
def _parse_quantity(value: str) -> tuple[int, str]:
51+
"""Parse '<n> <unit>' (case-insensitive, singular or plural) into (n, singular_unit)."""
52+
match = _QUANTITY_RE.match(value)
53+
if not match:
54+
raise DbtRuntimeError(f"Cannot parse interval {value!r}; expected '<integer> <unit>'.")
55+
n, unit = int(match.group(1)), match.group(2).upper()
56+
singular = unit[:-1] if unit.endswith("S") else unit
57+
return n, singular
58+
59+
60+
def _interval_seconds(value: str) -> int:
61+
n, singular = _parse_quantity(value)
62+
if singular not in _SECONDS_PER_UNIT:
63+
raise DbtRuntimeError(
64+
f"Unknown interval unit in {value!r};"
65+
f" supported: SECOND, MINUTE, HOUR, DAY, WEEK (singular or plural)."
66+
)
67+
return n * _SECONDS_PER_UNIT[singular]
68+
69+
70+
def _every_canonical(value: str) -> tuple[int, str]:
71+
"""Return (n, plural_unit) for an EVERY clause, e.g. '1 DAY' -> (1, 'DAYS')."""
72+
n, singular = _parse_quantity(value)
73+
if singular not in _EVERY_UNITS:
74+
raise DbtRuntimeError(
75+
f"Cannot parse `every` value {value!r}; expected '<integer> {{HOURS|DAYS|WEEKS}}'."
76+
)
77+
return n, singular + "S"
1578

1679

1780
class RefreshConfig(DatabricksComponentConfig):
18-
"""Component encapsulating the refresh schedule of a relation."""
81+
"""Component encapsulating the refresh schedule of a relation.
82+
83+
The mode is derived from which discriminator field is set:
84+
- MANUAL - no fields set
85+
- CRON - `cron` set, optional `time_zone_value`
86+
- EVERY - `every` set, e.g. "2 HOURS"
87+
- ON_UPDATE - `on_update=True`, optional `at_most_every` (e.g. "15 MINUTES")
88+
"""
1989

2090
cron: Optional[str] = None
2191
time_zone_value: Optional[str] = None
92+
every: Optional[str] = None
93+
on_update: bool = False
94+
at_most_every: Optional[str] = None
2295

23-
# Property indicating whether the schedule change should be accomplished by an ADD SCHEDULE
24-
# vs an ALTER SCHEDULE. This is only True when modifying an existing schedule, rather than
25-
# switching from manual refresh to scheduled or vice versa.
96+
# Render-time hint for the alter macro: True when both old and new states are scheduled
97+
# (emit ALTER); False for ADD or DROP. Excluded from __eq__ / __hash__ so it doesn't
98+
# affect identity.
2699
is_altered: bool = False
27100

101+
@model_validator(mode="after")
102+
def _validate_mode_fields(self) -> "RefreshConfig":
103+
modes_set = [name for name, value in self._mode_signals() if value]
104+
if len(modes_set) > 1:
105+
raise DbtRuntimeError(
106+
f"Refresh schedule must specify at most one of cron / every / on_update;"
107+
f" got {modes_set}."
108+
)
109+
if self.time_zone_value is not None and self.cron is None:
110+
raise DbtRuntimeError("`time_zone_value` is only valid when `cron` is set.")
111+
if self.at_most_every is not None:
112+
if not self.on_update:
113+
raise DbtRuntimeError("`at_most_every` is only valid when `on_update` is True.")
114+
seconds = _interval_seconds(self.at_most_every)
115+
if seconds < 60:
116+
raise DbtRuntimeError(
117+
f"`at_most_every` must be at least 60 seconds (1 minute);"
118+
f" got {self.at_most_every!r} ({seconds}s)."
119+
)
120+
return self
121+
122+
def _mode_signals(self) -> tuple[tuple[str, Any], ...]:
123+
return (
124+
("cron", self.cron),
125+
("every", self.every),
126+
("on_update", self.on_update),
127+
)
128+
129+
@property
130+
def mode(self) -> RefreshMode:
131+
if self.cron is not None:
132+
return RefreshMode.CRON
133+
if self.every is not None:
134+
return RefreshMode.EVERY
135+
if self.on_update:
136+
return RefreshMode.ON_UPDATE
137+
return RefreshMode.MANUAL
138+
139+
@property
140+
def auto_refreshed(self) -> bool:
141+
"""True for modes where Databricks auto-manages refresh and a manual REFRESH is a no-op."""
142+
return self.mode in (RefreshMode.EVERY, RefreshMode.ON_UPDATE)
143+
28144
def __eq__(self, other: Any) -> bool:
29-
if not isinstance(other, RefreshConfig):
145+
if not isinstance(other, RefreshConfig) or self.mode != other.mode:
146+
return False
147+
if self.mode == RefreshMode.MANUAL:
148+
return True
149+
if self.mode == RefreshMode.CRON:
150+
# Server fills 'Etc/UTC' when no time zone is given, so None/UTC/Etc/UTC all match.
151+
return self.cron == other.cron and _canonical_tz(self.time_zone_value) == _canonical_tz(
152+
other.time_zone_value
153+
)
154+
if self.mode == RefreshMode.EVERY:
155+
assert self.every is not None and other.every is not None
156+
return _every_canonical(self.every) == _every_canonical(other.every)
157+
if (self.at_most_every is None) != (other.at_most_every is None):
30158
return False
31-
return self.cron == other.cron and (
159+
if self.at_most_every is None:
160+
return True
161+
assert other.at_most_every is not None
162+
return _interval_seconds(self.at_most_every) == _interval_seconds(other.at_most_every)
163+
164+
def __hash__(self) -> int:
165+
return hash(
32166
(
33-
self.time_zone_value is None
34-
and other.time_zone_value
35-
and "utc" in other.time_zone_value.lower()
167+
self.cron,
168+
_canonical_tz(self.time_zone_value),
169+
self.every,
170+
self.on_update,
171+
self.at_most_every,
36172
)
37-
or (other.time_zone_value == other.time_zone_value)
38173
)
39174

40175
def get_diff(self, other: "RefreshConfig") -> Optional["RefreshConfig"]:
41-
if self != other:
42-
return RefreshConfig(
43-
cron=self.cron,
44-
time_zone_value=self.time_zone_value,
45-
is_altered=self.cron is not None and other.cron is not None,
46-
)
47-
return None
176+
if self == other:
177+
return None
178+
is_altered = self.mode != RefreshMode.MANUAL and other.mode != RefreshMode.MANUAL
179+
# model_construct skips re-validation; only is_altered changes, other fields stay valid.
180+
return self.model_construct(**{**self.model_dump(), "is_altered": is_altered})
48181

49182

50183
class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]):
@@ -54,35 +187,52 @@ class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]):
54187
def from_relation_results(cls, results: RelationResults) -> RefreshConfig:
55188
table = results["describe_extended"]
56189
for row in table.rows:
57-
if row[0] == "Refresh Schedule":
58-
if row[1] == "MANUAL":
59-
return RefreshConfig()
60-
61-
match = SCHEDULE_REGEX.match(row[1])
190+
if row[0] != "Refresh Schedule":
191+
continue
192+
return cls._parse_schedule(row[1])
62193

63-
if match:
64-
cron, time_zone_value = match.groups()
65-
return RefreshConfig(cron=cron, time_zone_value=time_zone_value)
66-
67-
raise DbtRuntimeError(
68-
f"Could not parse schedule from description: {row[1]}."
69-
" This is most likely a bug in the dbt-databricks adapter,"
70-
" so please file an issue!"
71-
)
194+
raise DbtRuntimeError(
195+
"Could not find Refresh Schedule in describe extended."
196+
" Please file an issue at https://github.com/databricks/dbt-databricks/issues."
197+
)
72198

199+
@staticmethod
200+
def _parse_schedule(value: str) -> RefreshConfig:
201+
if value == "MANUAL":
202+
return RefreshConfig()
203+
if (m := CRON_REGEX.match(value)) is not None:
204+
cron, time_zone_value = m.groups()
205+
return RefreshConfig(cron=cron, time_zone_value=time_zone_value)
206+
if (m := EVERY_REGEX.match(value)) is not None:
207+
n, unit = m.groups()
208+
return RefreshConfig(every=f"{n} {unit.upper()}")
209+
if (m := TRIGGER_REGEX.match(value)) is not None:
210+
seconds = m.group(1)
211+
if seconds is None:
212+
return RefreshConfig(on_update=True)
213+
return RefreshConfig(on_update=True, at_most_every=f"{seconds} SECOND")
73214
raise DbtRuntimeError(
74-
"Could not parse schedule for table."
75-
" This is most likely a bug in the dbt-databricks adapter, so please file an issue!"
215+
f"Could not parse refresh schedule from describe extended: {value!r}."
216+
" Please file an issue at https://github.com/databricks/dbt-databricks/issues."
76217
)
77218

78219
@classmethod
79220
def from_relation_config(cls, relation_config: RelationConfig) -> RefreshConfig:
80221
schedule = base.get_config_value(relation_config, "schedule")
81-
if schedule:
82-
if "cron" not in schedule:
83-
raise DbtRuntimeError(f"Schedule config must contain a 'cron' key, got {schedule}")
84-
return RefreshConfig(
85-
cron=schedule["cron"], time_zone_value=schedule.get("time_zone_value")
86-
)
87-
else:
222+
if not schedule:
88223
return RefreshConfig()
224+
if not isinstance(schedule, dict):
225+
raise DbtRuntimeError(f"`schedule` must be a dict; got {schedule!r}.")
226+
227+
kwargs: dict[str, Any] = {
228+
field: schedule[field]
229+
for field in ("cron", "time_zone_value", "every", "on_update", "at_most_every")
230+
if field in schedule
231+
}
232+
233+
if not kwargs:
234+
raise DbtRuntimeError(
235+
"Schedule config must contain one of `cron`, `every`, or `on_update`;"
236+
f" got {schedule}"
237+
)
238+
return RefreshConfig(**kwargs)

dbt/adapters/databricks/relation_configs/streaming_table.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
PartitionedByProcessor,
1616
)
1717
from dbt.adapters.databricks.relation_configs.query import DescribeQueryProcessor
18-
from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig, RefreshProcessor
18+
from dbt.adapters.databricks.relation_configs.refresh import RefreshProcessor
1919
from dbt.adapters.databricks.relation_configs.row_filter import RowFilterProcessor
2020
from dbt.adapters.databricks.relation_configs.tags import TagsProcessor
2121
from dbt.adapters.databricks.relation_configs.tblproperties import (
@@ -42,22 +42,23 @@ def get_changeset(
4242
current state of the dbt project.
4343
"""
4444
changes: dict[str, DatabricksComponentConfig] = {}
45-
requires_refresh = False
46-
requires_replace = False
45+
requires_full_refresh = False
46+
has_changes = False
4747

4848
for component in self.config_components:
4949
key = component.name
5050
value = self.config[key]
5151
diff = value.get_diff(existing.config[key])
52-
if key == "partition_by" and diff is not None:
53-
requires_refresh = True
54-
if diff and diff != RefreshConfig():
55-
requires_replace = True
56-
diff = diff or value
57-
if diff != RefreshConfig():
52+
if diff is not None:
53+
has_changes = True
54+
if key == "partition_by":
55+
requires_full_refresh = True
5856
changes[key] = diff
59-
if requires_replace:
60-
return DatabricksRelationChangeSet(
61-
changes=changes, requires_full_refresh=requires_refresh
62-
)
63-
return None
57+
else:
58+
changes[key] = value
59+
60+
if not has_changes:
61+
return None
62+
return DatabricksRelationChangeSet(
63+
changes=changes, requires_full_refresh=requires_full_refresh
64+
)

dbt/include/databricks/macros/materializations/materialized_view.sql

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,14 @@
3434
{% set on_configuration_change = config.get('on_configuration_change') %}
3535
{% set configuration_changes = get_configuration_changes(existing_relation) %}
3636

37+
{# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #}
3738
{% if configuration_changes is none %}
38-
{% set build_sql = refresh_materialized_view(target_relation) %}
39+
{%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%}
40+
{%- if refresh.auto_refreshed -%}
41+
{% set build_sql = '' %}
42+
{%- else -%}
43+
{% set build_sql = refresh_materialized_view(target_relation) %}
44+
{%- endif -%}
3945

4046
{% elif on_configuration_change == 'apply' %}
4147
{% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %}

dbt/include/databricks/macros/materializations/streaming_table.sql

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,15 @@
3434
-- get config options
3535
{% set on_configuration_change = config.get('on_configuration_change') %}
3636
{% set configuration_changes = get_configuration_changes(existing_relation) %}
37+
{# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #}
3738
{% if configuration_changes is none %}
38-
{{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }}
39-
{% set build_sql = refresh_streaming_table(target_relation, sql) %}
39+
{%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%}
40+
{%- if refresh.auto_refreshed -%}
41+
{% set build_sql = '' %}
42+
{%- else -%}
43+
{{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }}
44+
{% set build_sql = refresh_streaming_table(target_relation, sql) %}
45+
{%- endif -%}
4046

4147
{% elif on_configuration_change == 'apply' %}
4248
{% set build_sql = get_alter_streaming_table_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %}
Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
{% macro get_create_sql_refresh_schedule(cron, time_zone_value) %}
2-
{%- if cron -%}
3-
SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
1+
{% macro get_create_sql_refresh_schedule(refresh) %}
2+
{%- if refresh.cron -%}
3+
SCHEDULE CRON '{{ refresh.cron }}'{%- if refresh.time_zone_value %} AT TIME ZONE '{{ refresh.time_zone_value }}'{%- endif -%}
4+
{%- elif refresh.every -%}
5+
SCHEDULE EVERY {{ refresh.every }}
6+
{%- elif refresh.on_update -%}
7+
TRIGGER ON UPDATE{%- if refresh.at_most_every %} AT MOST EVERY INTERVAL {{ refresh.at_most_every }}{%- endif -%}
48
{%- endif -%}
59
{% endmacro %}
610

7-
{% macro get_alter_sql_refresh_schedule(cron, time_zone_value, is_altered) %}
8-
{%- if cron -%}
9-
{%- if is_altered -%}
10-
ALTER SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
11-
{%- else -%}
12-
ADD SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
13-
{%- endif -%}
14-
{%- else -%}
11+
{% macro get_alter_sql_refresh_schedule(refresh) %}
12+
{%- if not (refresh.cron or refresh.every or refresh.on_update) -%}
1513
DROP SCHEDULE
14+
{%- else -%}
15+
{{- 'ALTER ' if refresh.is_altered else 'ADD ' -}}{{- get_create_sql_refresh_schedule(refresh) -}}
1616
{%- endif -%}
1717
{% endmacro %}

0 commit comments

Comments
 (0)