Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
- Add support for row filters ([#1294](https://github.com/databricks/dbt-databricks/pull/1294))
- Add support for Python UDFs ([#1336](https://github.com/databricks/dbt-databricks/pull/1336))
- Add support for key-only `databricks_tags` for table and column tagging. This can now be configured by setting tag values to empty strings `""` or `None`. ([#1339](https://github.com/databricks/dbt-databricks/pull/1339))
- Support `SCHEDULE EVERY` and `TRIGGER ON UPDATE` refresh modes for materialized views and streaming tables, with parser and diff coverage so relations whose actual refresh is not CRON no longer crash on subsequent runs ([#1293](https://github.com/databricks/dbt-databricks/issues/1293))

### Fixes

- Fix `RefreshConfig.__eq__` self/other typo where two configs with the same `cron` but different `time_zone_value` compared equal
- Fix streaming-table DROP-SCHEDULE path that was silently filtered out of the changeset

### Under the Hood

Expand Down
222 changes: 179 additions & 43 deletions dbt/adapters/databricks/relation_configs/refresh.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,169 @@
import re
from enum import Enum
from typing import Any, ClassVar, Optional

from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.relation_configs.config_base import RelationResults
from dbt_common.exceptions import DbtRuntimeError
from pydantic import model_validator

from dbt.adapters.databricks.relation_configs import base
from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
DatabricksComponentProcessor,
)

SCHEDULE_REGEX = re.compile(r"CRON '(.*)' AT TIME ZONE '(.*)'")

class RefreshMode(str, Enum):
MANUAL = "manual"
CRON = "cron"
EVERY = "every"
ON_UPDATE = "on_update"


CRON_REGEX = re.compile(r"^CRON '(.*)' AT TIME ZONE '(.*)'$")
EVERY_REGEX = re.compile(r"^EVERY (\d+) (HOURS?|DAYS?|WEEKS?)$", re.IGNORECASE)
TRIGGER_REGEX = re.compile(
r"^TRIGGER ON UPDATE(?: AT MOST EVERY INTERVAL (\d+) SECONDS?)?$",
re.IGNORECASE,
)

_QUANTITY_RE = re.compile(r"^\s*(\d+)\s+([A-Z]+)\s*$", re.IGNORECASE)
_SECONDS_PER_UNIT = {
"SECOND": 1,
"MINUTE": 60,
"HOUR": 3600,
"DAY": 86400,
"WEEK": 604800,
}
_EVERY_UNITS = {"HOUR", "DAY", "WEEK"}


def _parse_quantity(value: str) -> tuple[int, str]:
"""Parse '<n> <unit>' (case-insensitive, singular or plural) into (n, singular_unit)."""
match = _QUANTITY_RE.match(value)
if not match:
raise DbtRuntimeError(f"Cannot parse interval {value!r}; expected '<integer> <unit>'.")
n, unit = int(match.group(1)), match.group(2).upper()
singular = unit[:-1] if unit.endswith("S") else unit
return n, singular


def _interval_seconds(value: str) -> int:
n, singular = _parse_quantity(value)
if singular not in _SECONDS_PER_UNIT:
raise DbtRuntimeError(
f"Unknown interval unit in {value!r};"
f" supported: SECOND, MINUTE, HOUR, DAY, WEEK (singular or plural)."
)
return n * _SECONDS_PER_UNIT[singular]


def _every_canonical(value: str) -> tuple[int, str]:
"""Return (n, plural_unit) for an EVERY clause, e.g. '1 DAY' -> (1, 'DAYS')."""
n, singular = _parse_quantity(value)
if singular not in _EVERY_UNITS:
raise DbtRuntimeError(
f"Cannot parse `every` value {value!r}; expected '<integer> {{HOURS|DAYS|WEEKS}}'."
)
return n, singular + "S"


class RefreshConfig(DatabricksComponentConfig):
"""Component encapsulating the refresh schedule of a relation."""
"""Component encapsulating the refresh schedule of a relation.

The mode is derived from which discriminator field is set:
- MANUAL - no fields set
- CRON - `cron` set, optional `time_zone_value`
- EVERY - `every` set, e.g. "2 HOURS"
- ON_UPDATE - `on_update=True`, optional `at_most_every` (e.g. "15 MINUTES")
"""

cron: Optional[str] = None
time_zone_value: Optional[str] = None
every: Optional[str] = None
on_update: bool = False
at_most_every: Optional[str] = None

# Property indicating whether the schedule change should be accomplished by an ADD SCHEDULE
# vs an ALTER SCHEDULE. This is only True when modifying an existing schedule, rather than
# switching from manual refresh to scheduled or vice versa.
# Render-time hint for the alter macro: True when both old and new states are scheduled
# (emit ALTER); False for ADD or DROP. Excluded from __eq__ / __hash__ so it doesn't
# affect identity.
is_altered: bool = False

@model_validator(mode="after")
def _validate_mode_fields(self) -> "RefreshConfig":
modes_set = [name for name, value in self._mode_signals() if value]
if len(modes_set) > 1:
raise DbtRuntimeError(
f"Refresh schedule must specify at most one of cron / every / on_update;"
f" got {modes_set}."
)
if self.time_zone_value is not None and self.cron is None:
raise DbtRuntimeError("`time_zone_value` is only valid when `cron` is set.")
if self.at_most_every is not None:
if not self.on_update:
raise DbtRuntimeError("`at_most_every` is only valid when `on_update` is True.")
seconds = _interval_seconds(self.at_most_every)
if seconds < 60:
raise DbtRuntimeError(
f"`at_most_every` must be at least 60 seconds (1 minute);"
f" got {self.at_most_every!r} ({seconds}s)."
)
return self

def _mode_signals(self) -> tuple[tuple[str, Any], ...]:
return (
("cron", self.cron),
("every", self.every),
("on_update", self.on_update),
)

@property
def mode(self) -> RefreshMode:
if self.cron is not None:
return RefreshMode.CRON
if self.every is not None:
return RefreshMode.EVERY
if self.on_update:
return RefreshMode.ON_UPDATE
return RefreshMode.MANUAL

@property
def auto_refreshed(self) -> bool:
"""True for modes where Databricks auto-manages refresh and a manual REFRESH is a no-op."""
return self.mode in (RefreshMode.EVERY, RefreshMode.ON_UPDATE)

def __eq__(self, other: Any) -> bool:
if not isinstance(other, RefreshConfig):
if not isinstance(other, RefreshConfig) or self.mode != other.mode:
return False
return self.cron == other.cron and (
(
self.time_zone_value is None
and other.time_zone_value
and "utc" in other.time_zone_value.lower()
)
or (other.time_zone_value == other.time_zone_value)
if self.mode == RefreshMode.MANUAL:
return True
if self.mode == RefreshMode.CRON:
# Databricks treats no time zone as UTC.
tz_self = (self.time_zone_value or "UTC").upper()
tz_other = (other.time_zone_value or "UTC").upper()
return self.cron == other.cron and tz_self == tz_other
if self.mode == RefreshMode.EVERY:
assert self.every is not None and other.every is not None
return _every_canonical(self.every) == _every_canonical(other.every)
if (self.at_most_every is None) != (other.at_most_every is None):
return False
if self.at_most_every is None:
return True
assert other.at_most_every is not None
return _interval_seconds(self.at_most_every) == _interval_seconds(other.at_most_every)

def __hash__(self) -> int:
return hash(
(self.cron, self.time_zone_value, self.every, self.on_update, self.at_most_every)
)

def get_diff(self, other: "RefreshConfig") -> Optional["RefreshConfig"]:
if self != other:
return RefreshConfig(
cron=self.cron,
time_zone_value=self.time_zone_value,
is_altered=self.cron is not None and other.cron is not None,
)
return None
if self == other:
return None
is_altered = self.mode != RefreshMode.MANUAL and other.mode != RefreshMode.MANUAL
# model_construct skips re-validation; only is_altered changes, other fields stay valid.
return self.model_construct(**{**self.model_dump(), "is_altered": is_altered})


class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]):
Expand All @@ -54,35 +173,52 @@ class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]):
def from_relation_results(cls, results: RelationResults) -> RefreshConfig:
table = results["describe_extended"]
for row in table.rows:
if row[0] == "Refresh Schedule":
if row[1] == "MANUAL":
return RefreshConfig()

match = SCHEDULE_REGEX.match(row[1])

if match:
cron, time_zone_value = match.groups()
return RefreshConfig(cron=cron, time_zone_value=time_zone_value)
if row[0] != "Refresh Schedule":
continue
return cls._parse_schedule(row[1])

raise DbtRuntimeError(
f"Could not parse schedule from description: {row[1]}."
" This is most likely a bug in the dbt-databricks adapter,"
" so please file an issue!"
)
raise DbtRuntimeError(
"Could not find Refresh Schedule in describe extended."
" Please file an issue at https://github.com/databricks/dbt-databricks/issues."
)

@staticmethod
def _parse_schedule(value: str) -> RefreshConfig:
if value == "MANUAL":
return RefreshConfig()
if (m := CRON_REGEX.match(value)) is not None:
cron, time_zone_value = m.groups()
return RefreshConfig(cron=cron, time_zone_value=time_zone_value)
if (m := EVERY_REGEX.match(value)) is not None:
n, unit = m.groups()
return RefreshConfig(every=f"{n} {unit.upper()}")
if (m := TRIGGER_REGEX.match(value)) is not None:
seconds = m.group(1)
if seconds is None:
return RefreshConfig(on_update=True)
return RefreshConfig(on_update=True, at_most_every=f"{seconds} SECOND")
raise DbtRuntimeError(
"Could not parse schedule for table."
" This is most likely a bug in the dbt-databricks adapter, so please file an issue!"
f"Could not parse refresh schedule from describe extended: {value!r}."
" Please file an issue at https://github.com/databricks/dbt-databricks/issues."
)

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> RefreshConfig:
schedule = base.get_config_value(relation_config, "schedule")
if schedule:
if "cron" not in schedule:
raise DbtRuntimeError(f"Schedule config must contain a 'cron' key, got {schedule}")
return RefreshConfig(
cron=schedule["cron"], time_zone_value=schedule.get("time_zone_value")
)
else:
if not schedule:
return RefreshConfig()
if not isinstance(schedule, dict):
raise DbtRuntimeError(f"`schedule` must be a dict; got {schedule!r}.")

kwargs: dict[str, Any] = {
field: schedule[field]
for field in ("cron", "time_zone_value", "every", "on_update", "at_most_every")
if field in schedule
}

if not kwargs:
raise DbtRuntimeError(
"Schedule config must contain one of `cron`, `every`, or `on_update`;"
f" got {schedule}"
)
return RefreshConfig(**kwargs)
29 changes: 15 additions & 14 deletions dbt/adapters/databricks/relation_configs/streaming_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
PartitionedByProcessor,
)
from dbt.adapters.databricks.relation_configs.query import DescribeQueryProcessor
from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig, RefreshProcessor
from dbt.adapters.databricks.relation_configs.refresh import RefreshProcessor
from dbt.adapters.databricks.relation_configs.row_filter import RowFilterProcessor
from dbt.adapters.databricks.relation_configs.tags import TagsProcessor
from dbt.adapters.databricks.relation_configs.tblproperties import (
Expand All @@ -42,22 +42,23 @@ def get_changeset(
current state of the dbt project.
"""
changes: dict[str, DatabricksComponentConfig] = {}
requires_refresh = False
requires_replace = False
requires_full_refresh = False
has_changes = False

for component in self.config_components:
key = component.name
value = self.config[key]
diff = value.get_diff(existing.config[key])
if key == "partition_by" and diff is not None:
requires_refresh = True
if diff and diff != RefreshConfig():
requires_replace = True
diff = diff or value
if diff != RefreshConfig():
if diff is not None:
has_changes = True
if key == "partition_by":
requires_full_refresh = True
changes[key] = diff
if requires_replace:
return DatabricksRelationChangeSet(
changes=changes, requires_full_refresh=requires_refresh
)
return None
else:
changes[key] = value

if not has_changes:
return None
return DatabricksRelationChangeSet(
changes=changes, requires_full_refresh=requires_full_refresh
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_configuration_changes(existing_relation) %}

{# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #}
{% if configuration_changes is none %}
{% set build_sql = refresh_materialized_view(target_relation) %}
{%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%}
{%- if refresh.auto_refreshed -%}
{% set build_sql = '' %}
{%- else -%}
{% set build_sql = refresh_materialized_view(target_relation) %}
{%- endif -%}

{% elif on_configuration_change == 'apply' %}
{% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@
-- get config options
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_configuration_changes(existing_relation) %}
{# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #}
{% if configuration_changes is none %}
{{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }}
{% set build_sql = refresh_streaming_table(target_relation, sql) %}
{%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%}
{%- if refresh.auto_refreshed -%}
{% set build_sql = '' %}
{%- else -%}
{{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }}
{% set build_sql = refresh_streaming_table(target_relation, sql) %}
{%- endif -%}

{% elif on_configuration_change == 'apply' %}
{% set build_sql = get_alter_streaming_table_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{% macro get_create_sql_refresh_schedule(cron, time_zone_value) %}
{%- if cron -%}
SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
{% macro get_create_sql_refresh_schedule(refresh) %}
{%- if refresh.cron -%}
SCHEDULE CRON '{{ refresh.cron }}'{%- if refresh.time_zone_value %} AT TIME ZONE '{{ refresh.time_zone_value }}'{%- endif -%}
{%- elif refresh.every -%}
SCHEDULE EVERY {{ refresh.every }}
{%- elif refresh.on_update -%}
TRIGGER ON UPDATE{%- if refresh.at_most_every %} AT MOST EVERY INTERVAL {{ refresh.at_most_every }}{%- endif -%}
{%- endif -%}
{% endmacro %}

{% macro get_alter_sql_refresh_schedule(cron, time_zone_value, is_altered) %}
{%- if cron -%}
{%- if is_altered -%}
ALTER SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
{%- else -%}
ADD SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
{%- endif -%}
{%- else -%}
{% macro get_alter_sql_refresh_schedule(refresh) %}
{%- if not (refresh.cron or refresh.every or refresh.on_update) -%}
DROP SCHEDULE
{%- else -%}
{{- 'ALTER ' if refresh.is_altered else 'ADD ' -}}{{- get_create_sql_refresh_schedule(refresh) -}}
{%- endif -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@
{%- if refresh -%}
-- Currently only schedule can be altered
ALTER MATERIALIZED VIEW {{ relation.render() }}
{{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, refresh.is_altered) -}}
{{ get_alter_sql_refresh_schedule(refresh) -}}
{%- endif -%}
{% endmacro %}
Loading
Loading