diff --git a/CHANGELOG.md b/CHANGELOG.md index dd87bc70f..53bf83992 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ - Add support for row filters ([#1294](https://github.com/databricks/dbt-databricks/pull/1294)) - Add support for Python UDFs ([#1336](https://github.com/databricks/dbt-databricks/pull/1336)) - Add support for key-only `databricks_tags` for table and column tagging. This can now be configured by setting tag values to empty strings `""` or `None`. ([#1339](https://github.com/databricks/dbt-databricks/pull/1339)) +- Support `SCHEDULE EVERY` and `TRIGGER ON UPDATE` refresh modes for materialized views and streaming tables, with parser and diff coverage so relations whose actual refresh is not CRON no longer crash on subsequent runs ([#1293](https://github.com/databricks/dbt-databricks/issues/1293)) + +### Fixes + +- Fix `RefreshConfig.__eq__` self/other typo where two configs with the same `cron` but different `time_zone_value` compared equal +- Fix streaming-table DROP-SCHEDULE path that was silently filtered out of the changeset ### Under the Hood diff --git a/dbt/adapters/databricks/relation_configs/refresh.py b/dbt/adapters/databricks/relation_configs/refresh.py index 201d3939f..9fce7ee14 100644 --- a/dbt/adapters/databricks/relation_configs/refresh.py +++ b/dbt/adapters/databricks/relation_configs/refresh.py @@ -1,9 +1,11 @@ import re +from enum import Enum from typing import Any, ClassVar, Optional from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.relation_configs.config_base import RelationResults from dbt_common.exceptions import DbtRuntimeError +from pydantic import model_validator from dbt.adapters.databricks.relation_configs import base from dbt.adapters.databricks.relation_configs.base import ( @@ -11,40 +13,157 @@ DatabricksComponentProcessor, ) -SCHEDULE_REGEX = re.compile(r"CRON '(.*)' AT TIME ZONE '(.*)'") + +class RefreshMode(str, Enum): + MANUAL = "manual" + CRON = "cron" + EVERY = "every" + ON_UPDATE = "on_update" + + +CRON_REGEX = re.compile(r"^CRON '(.*)' AT TIME ZONE '(.*)'$") +EVERY_REGEX = re.compile(r"^EVERY (\d+) (HOURS?|DAYS?|WEEKS?)$", re.IGNORECASE) +TRIGGER_REGEX = re.compile( + r"^TRIGGER ON UPDATE(?: AT MOST EVERY INTERVAL (\d+) SECONDS?)?$", + re.IGNORECASE, +) + +_QUANTITY_RE = re.compile(r"^\s*(\d+)\s+([A-Z]+)\s*$", re.IGNORECASE) +_SECONDS_PER_UNIT = { + "SECOND": 1, + "MINUTE": 60, + "HOUR": 3600, + "DAY": 86400, + "WEEK": 604800, +} +_EVERY_UNITS = {"HOUR", "DAY", "WEEK"} + + +def _parse_quantity(value: str) -> tuple[int, str]: + """Parse ' ' (case-insensitive, singular or plural) into (n, singular_unit).""" + match = _QUANTITY_RE.match(value) + if not match: + raise DbtRuntimeError(f"Cannot parse interval {value!r}; expected ' '.") + n, unit = int(match.group(1)), match.group(2).upper() + singular = unit[:-1] if unit.endswith("S") else unit + return n, singular + + +def _interval_seconds(value: str) -> int: + n, singular = _parse_quantity(value) + if singular not in _SECONDS_PER_UNIT: + raise DbtRuntimeError( + f"Unknown interval unit in {value!r};" + f" supported: SECOND, MINUTE, HOUR, DAY, WEEK (singular or plural)." + ) + return n * _SECONDS_PER_UNIT[singular] + + +def _every_canonical(value: str) -> tuple[int, str]: + """Return (n, plural_unit) for an EVERY clause, e.g. '1 DAY' -> (1, 'DAYS').""" + n, singular = _parse_quantity(value) + if singular not in _EVERY_UNITS: + raise DbtRuntimeError( + f"Cannot parse `every` value {value!r}; expected ' {{HOURS|DAYS|WEEKS}}'." + ) + return n, singular + "S" class RefreshConfig(DatabricksComponentConfig): - """Component encapsulating the refresh schedule of a relation.""" + """Component encapsulating the refresh schedule of a relation. + + The mode is derived from which discriminator field is set: + - MANUAL - no fields set + - CRON - `cron` set, optional `time_zone_value` + - EVERY - `every` set, e.g. "2 HOURS" + - ON_UPDATE - `on_update=True`, optional `at_most_every` (e.g. "15 MINUTES") + """ cron: Optional[str] = None time_zone_value: Optional[str] = None + every: Optional[str] = None + on_update: bool = False + at_most_every: Optional[str] = None - # Property indicating whether the schedule change should be accomplished by an ADD SCHEDULE - # vs an ALTER SCHEDULE. This is only True when modifying an existing schedule, rather than - # switching from manual refresh to scheduled or vice versa. + # Render-time hint for the alter macro: True when both old and new states are scheduled + # (emit ALTER); False for ADD or DROP. Excluded from __eq__ / __hash__ so it doesn't + # affect identity. is_altered: bool = False + @model_validator(mode="after") + def _validate_mode_fields(self) -> "RefreshConfig": + modes_set = [name for name, value in self._mode_signals() if value] + if len(modes_set) > 1: + raise DbtRuntimeError( + f"Refresh schedule must specify at most one of cron / every / on_update;" + f" got {modes_set}." + ) + if self.time_zone_value is not None and self.cron is None: + raise DbtRuntimeError("`time_zone_value` is only valid when `cron` is set.") + if self.at_most_every is not None: + if not self.on_update: + raise DbtRuntimeError("`at_most_every` is only valid when `on_update` is True.") + seconds = _interval_seconds(self.at_most_every) + if seconds < 60: + raise DbtRuntimeError( + f"`at_most_every` must be at least 60 seconds (1 minute);" + f" got {self.at_most_every!r} ({seconds}s)." + ) + return self + + def _mode_signals(self) -> tuple[tuple[str, Any], ...]: + return ( + ("cron", self.cron), + ("every", self.every), + ("on_update", self.on_update), + ) + + @property + def mode(self) -> RefreshMode: + if self.cron is not None: + return RefreshMode.CRON + if self.every is not None: + return RefreshMode.EVERY + if self.on_update: + return RefreshMode.ON_UPDATE + return RefreshMode.MANUAL + + @property + def auto_refreshed(self) -> bool: + """True for modes where Databricks auto-manages refresh and a manual REFRESH is a no-op.""" + return self.mode in (RefreshMode.EVERY, RefreshMode.ON_UPDATE) + def __eq__(self, other: Any) -> bool: - if not isinstance(other, RefreshConfig): + if not isinstance(other, RefreshConfig) or self.mode != other.mode: return False - return self.cron == other.cron and ( - ( - self.time_zone_value is None - and other.time_zone_value - and "utc" in other.time_zone_value.lower() - ) - or (other.time_zone_value == other.time_zone_value) + if self.mode == RefreshMode.MANUAL: + return True + if self.mode == RefreshMode.CRON: + # Databricks treats no time zone as UTC. + tz_self = (self.time_zone_value or "UTC").upper() + tz_other = (other.time_zone_value or "UTC").upper() + return self.cron == other.cron and tz_self == tz_other + if self.mode == RefreshMode.EVERY: + assert self.every is not None and other.every is not None + return _every_canonical(self.every) == _every_canonical(other.every) + if (self.at_most_every is None) != (other.at_most_every is None): + return False + if self.at_most_every is None: + return True + assert other.at_most_every is not None + return _interval_seconds(self.at_most_every) == _interval_seconds(other.at_most_every) + + def __hash__(self) -> int: + return hash( + (self.cron, self.time_zone_value, self.every, self.on_update, self.at_most_every) ) def get_diff(self, other: "RefreshConfig") -> Optional["RefreshConfig"]: - if self != other: - return RefreshConfig( - cron=self.cron, - time_zone_value=self.time_zone_value, - is_altered=self.cron is not None and other.cron is not None, - ) - return None + if self == other: + return None + is_altered = self.mode != RefreshMode.MANUAL and other.mode != RefreshMode.MANUAL + # model_construct skips re-validation; only is_altered changes, other fields stay valid. + return self.model_construct(**{**self.model_dump(), "is_altered": is_altered}) class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]): @@ -54,35 +173,52 @@ class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]): def from_relation_results(cls, results: RelationResults) -> RefreshConfig: table = results["describe_extended"] for row in table.rows: - if row[0] == "Refresh Schedule": - if row[1] == "MANUAL": - return RefreshConfig() - - match = SCHEDULE_REGEX.match(row[1]) - - if match: - cron, time_zone_value = match.groups() - return RefreshConfig(cron=cron, time_zone_value=time_zone_value) + if row[0] != "Refresh Schedule": + continue + return cls._parse_schedule(row[1]) - raise DbtRuntimeError( - f"Could not parse schedule from description: {row[1]}." - " This is most likely a bug in the dbt-databricks adapter," - " so please file an issue!" - ) + raise DbtRuntimeError( + "Could not find Refresh Schedule in describe extended." + " Please file an issue at https://github.com/databricks/dbt-databricks/issues." + ) + @staticmethod + def _parse_schedule(value: str) -> RefreshConfig: + if value == "MANUAL": + return RefreshConfig() + if (m := CRON_REGEX.match(value)) is not None: + cron, time_zone_value = m.groups() + return RefreshConfig(cron=cron, time_zone_value=time_zone_value) + if (m := EVERY_REGEX.match(value)) is not None: + n, unit = m.groups() + return RefreshConfig(every=f"{n} {unit.upper()}") + if (m := TRIGGER_REGEX.match(value)) is not None: + seconds = m.group(1) + if seconds is None: + return RefreshConfig(on_update=True) + return RefreshConfig(on_update=True, at_most_every=f"{seconds} SECOND") raise DbtRuntimeError( - "Could not parse schedule for table." - " This is most likely a bug in the dbt-databricks adapter, so please file an issue!" + f"Could not parse refresh schedule from describe extended: {value!r}." + " Please file an issue at https://github.com/databricks/dbt-databricks/issues." ) @classmethod def from_relation_config(cls, relation_config: RelationConfig) -> RefreshConfig: schedule = base.get_config_value(relation_config, "schedule") - if schedule: - if "cron" not in schedule: - raise DbtRuntimeError(f"Schedule config must contain a 'cron' key, got {schedule}") - return RefreshConfig( - cron=schedule["cron"], time_zone_value=schedule.get("time_zone_value") - ) - else: + if not schedule: return RefreshConfig() + if not isinstance(schedule, dict): + raise DbtRuntimeError(f"`schedule` must be a dict; got {schedule!r}.") + + kwargs: dict[str, Any] = { + field: schedule[field] + for field in ("cron", "time_zone_value", "every", "on_update", "at_most_every") + if field in schedule + } + + if not kwargs: + raise DbtRuntimeError( + "Schedule config must contain one of `cron`, `every`, or `on_update`;" + f" got {schedule}" + ) + return RefreshConfig(**kwargs) diff --git a/dbt/adapters/databricks/relation_configs/streaming_table.py b/dbt/adapters/databricks/relation_configs/streaming_table.py index 5333d5b2d..1b51195d7 100644 --- a/dbt/adapters/databricks/relation_configs/streaming_table.py +++ b/dbt/adapters/databricks/relation_configs/streaming_table.py @@ -15,7 +15,7 @@ PartitionedByProcessor, ) from dbt.adapters.databricks.relation_configs.query import DescribeQueryProcessor -from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig, RefreshProcessor +from dbt.adapters.databricks.relation_configs.refresh import RefreshProcessor from dbt.adapters.databricks.relation_configs.row_filter import RowFilterProcessor from dbt.adapters.databricks.relation_configs.tags import TagsProcessor from dbt.adapters.databricks.relation_configs.tblproperties import ( @@ -42,22 +42,23 @@ def get_changeset( current state of the dbt project. """ changes: dict[str, DatabricksComponentConfig] = {} - requires_refresh = False - requires_replace = False + requires_full_refresh = False + has_changes = False for component in self.config_components: key = component.name value = self.config[key] diff = value.get_diff(existing.config[key]) - if key == "partition_by" and diff is not None: - requires_refresh = True - if diff and diff != RefreshConfig(): - requires_replace = True - diff = diff or value - if diff != RefreshConfig(): + if diff is not None: + has_changes = True + if key == "partition_by": + requires_full_refresh = True changes[key] = diff - if requires_replace: - return DatabricksRelationChangeSet( - changes=changes, requires_full_refresh=requires_refresh - ) - return None + else: + changes[key] = value + + if not has_changes: + return None + return DatabricksRelationChangeSet( + changes=changes, requires_full_refresh=requires_full_refresh + ) diff --git a/dbt/include/databricks/macros/materializations/materialized_view.sql b/dbt/include/databricks/macros/materializations/materialized_view.sql index 979b7ac2b..0ed13fd03 100644 --- a/dbt/include/databricks/macros/materializations/materialized_view.sql +++ b/dbt/include/databricks/macros/materializations/materialized_view.sql @@ -34,8 +34,14 @@ {% set on_configuration_change = config.get('on_configuration_change') %} {% set configuration_changes = get_configuration_changes(existing_relation) %} + {# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #} {% if configuration_changes is none %} - {% set build_sql = refresh_materialized_view(target_relation) %} + {%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%} + {%- if refresh.auto_refreshed -%} + {% set build_sql = '' %} + {%- else -%} + {% set build_sql = refresh_materialized_view(target_relation) %} + {%- endif -%} {% elif on_configuration_change == 'apply' %} {% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %} diff --git a/dbt/include/databricks/macros/materializations/streaming_table.sql b/dbt/include/databricks/macros/materializations/streaming_table.sql index cd89b98ad..7639077bc 100644 --- a/dbt/include/databricks/macros/materializations/streaming_table.sql +++ b/dbt/include/databricks/macros/materializations/streaming_table.sql @@ -34,9 +34,15 @@ -- get config options {% set on_configuration_change = config.get('on_configuration_change') %} {% set configuration_changes = get_configuration_changes(existing_relation) %} + {# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #} {% if configuration_changes is none %} - {{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }} - {% set build_sql = refresh_streaming_table(target_relation, sql) %} + {%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%} + {%- if refresh.auto_refreshed -%} + {% set build_sql = '' %} + {%- else -%} + {{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }} + {% set build_sql = refresh_streaming_table(target_relation, sql) %} + {%- endif -%} {% elif on_configuration_change == 'apply' %} {% set build_sql = get_alter_streaming_table_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %} diff --git a/dbt/include/databricks/macros/relations/components/refresh_schedule.sql b/dbt/include/databricks/macros/relations/components/refresh_schedule.sql index 11f459f6a..e2f348040 100644 --- a/dbt/include/databricks/macros/relations/components/refresh_schedule.sql +++ b/dbt/include/databricks/macros/relations/components/refresh_schedule.sql @@ -1,17 +1,17 @@ -{% macro get_create_sql_refresh_schedule(cron, time_zone_value) %} - {%- if cron -%} - SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%} +{% macro get_create_sql_refresh_schedule(refresh) %} + {%- if refresh.cron -%} + SCHEDULE CRON '{{ refresh.cron }}'{%- if refresh.time_zone_value %} AT TIME ZONE '{{ refresh.time_zone_value }}'{%- endif -%} + {%- elif refresh.every -%} + SCHEDULE EVERY {{ refresh.every }} + {%- elif refresh.on_update -%} + TRIGGER ON UPDATE{%- if refresh.at_most_every %} AT MOST EVERY INTERVAL {{ refresh.at_most_every }}{%- endif -%} {%- endif -%} {% endmacro %} -{% macro get_alter_sql_refresh_schedule(cron, time_zone_value, is_altered) %} - {%- if cron -%} - {%- if is_altered -%} - ALTER SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%} - {%- else -%} - ADD SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%} - {%- endif -%} - {%- else -%} +{% macro get_alter_sql_refresh_schedule(refresh) %} + {%- if not (refresh.cron or refresh.every or refresh.on_update) -%} DROP SCHEDULE + {%- else -%} + {{- 'ALTER ' if refresh.is_altered else 'ADD ' -}}{{- get_create_sql_refresh_schedule(refresh) -}} {%- endif -%} {% endmacro %} diff --git a/dbt/include/databricks/macros/relations/materialized_view/alter.sql b/dbt/include/databricks/macros/relations/materialized_view/alter.sql index d5d1aea94..d5bd2be0a 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/alter.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/alter.sql @@ -67,6 +67,6 @@ {%- if refresh -%} -- Currently only schedule can be altered ALTER MATERIALIZED VIEW {{ relation.render() }} - {{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, refresh.is_altered) -}} + {{ get_alter_sql_refresh_schedule(refresh) -}} {%- endif -%} {% endmacro %} diff --git a/dbt/include/databricks/macros/relations/materialized_view/create.sql b/dbt/include/databricks/macros/relations/materialized_view/create.sql index 00483574f..7563a046b 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/create.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/create.sql @@ -38,7 +38,7 @@ {{ liquid_clustered_cols() }} {{ get_create_sql_comment(comment) }} {{ get_create_sql_tblproperties(tblproperties) }} - {{ get_create_sql_refresh_schedule(refresh.cron, refresh.time_zone_value) }} + {{ get_create_sql_refresh_schedule(refresh) }} as {{ sql }} {% endmacro %} diff --git a/dbt/include/databricks/macros/relations/streaming_table/alter.sql b/dbt/include/databricks/macros/relations/streaming_table/alter.sql index a8c87648f..ba0d1715d 100644 --- a/dbt/include/databricks/macros/relations/streaming_table/alter.sql +++ b/dbt/include/databricks/macros/relations/streaming_table/alter.sql @@ -80,8 +80,9 @@ {% macro get_alter_st_internal(relation, configuration_changes) %} {%- set refresh = configuration_changes.changes["refresh"] -%} - {%- if refresh and refresh.cron -%} + {%- set is_scheduled = refresh and (refresh.cron or refresh.every or refresh.on_update) -%} + {%- if is_scheduled -%} ALTER STREAMING TABLE {{ relation.render() }} - {{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, False) -}} + ADD {{ get_create_sql_refresh_schedule(refresh) -}} {%- endif -%} {% endmacro %} diff --git a/dbt/include/databricks/macros/relations/streaming_table/create.sql b/dbt/include/databricks/macros/relations/streaming_table/create.sql index ab55e10fc..ec18565c4 100644 --- a/dbt/include/databricks/macros/relations/streaming_table/create.sql +++ b/dbt/include/databricks/macros/relations/streaming_table/create.sql @@ -34,6 +34,6 @@ {{ liquid_clustered_cols() }} {{ get_create_sql_comment(comment) }} {{ get_create_sql_tblproperties(tblproperties) }} - {{ get_create_sql_refresh_schedule(refresh.cron, refresh.time_zone_value) }} + {{ get_create_sql_refresh_schedule(refresh) }} AS {{ sql }} {% endmacro %} diff --git a/tests/functional/adapter/materialized_view_tests/fixtures.py b/tests/functional/adapter/materialized_view_tests/fixtures.py index 855e6edcc..ffc0591d7 100644 --- a/tests/functional/adapter/materialized_view_tests/fixtures.py +++ b/tests/functional/adapter/materialized_view_tests/fixtures.py @@ -37,7 +37,7 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]: partition_by='id', schedule = { 'cron': '0 0 * * * ? *', - 'time_zone': 'Etc/UTC' + 'time_zone_value': 'Etc/UTC' }, tblproperties={ 'key': 'value' @@ -75,7 +75,7 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]: materialized='materialized_view', schedule = { 'cron': '0 0 * * * ? *', - 'time_zone': 'Etc/UTC' + 'time_zone_value': 'Etc/UTC' }, ) }} SELECT diff --git a/tests/functional/adapter/materialized_view_tests/test_schedule_modes.py b/tests/functional/adapter/materialized_view_tests/test_schedule_modes.py new file mode 100644 index 000000000..52eeaf2b4 --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/test_schedule_modes.py @@ -0,0 +1,354 @@ +"""Functional coverage for the new MV refresh modes (EVERY / TRIGGER ON UPDATE) +and the GH #1293 regression. +""" + +import pytest +from dbt.tests import util +from dbt.tests.adapter.materialized_view.files import MY_SEED + +from dbt.adapters.databricks.relation import DatabricksRelationType +from dbt.adapters.databricks.relation_configs.materialized_view import ( + MaterializedViewConfig, +) + +MV_EVERY_2_HOURS = """ +{{ config( + materialized='materialized_view', + schedule = {'every': '2 HOURS'}, +) }} +select * from {{ ref('my_seed') }} +""" + +MV_ON_UPDATE_BARE = """ +{{ config( + materialized='materialized_view', + schedule = {'on_update': True}, +) }} +select * from {{ ref('my_seed') }} +""" + +MV_ON_UPDATE_RATE_LIMITED = """ +{{ config( + materialized='materialized_view', + schedule = {'on_update': True, 'at_most_every': '15 MINUTES'}, +) }} +select * from {{ ref('my_seed') }} +""" + +MV_CRON = """ +{{ config( + materialized='materialized_view', + schedule = {'cron': '0 0 * * * ? *', 'time_zone_value': 'Etc/UTC'}, +) }} +select * from {{ ref('my_seed') }} +""" + +MV_NO_SCHEDULE = """ +{{ config(materialized='materialized_view') }} +select * from {{ ref('my_seed') }} +""" + +MV_EVERY_WITH_TBLPROPS = """ +{{ config( + materialized='materialized_view', + schedule = {'every': '2 HOURS'}, + tblproperties={'lifecycle_marker': 'v1'}, +) }} +select * from {{ ref('my_seed') }} +""" + + +def _get_refresh_config(project, identifier): + relation = project.adapter.Relation.create( + identifier=identifier, + schema=project.test_schema, + database=project.database, + type=DatabricksRelationType.MaterializedView, + ) + with util.get_connection(project.adapter): + results = project.adapter.get_relation_config(relation) + assert isinstance(results, MaterializedViewConfig) + return results.config["refresh"] + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewScheduleModes: + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "mv_every.sql": MV_EVERY_2_HOURS, + "mv_on_update_bare.sql": MV_ON_UPDATE_BARE, + "mv_on_update_rate_limited.sql": MV_ON_UPDATE_RATE_LIMITED, + } + + def test_every_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_every"]) + + refresh = _get_refresh_config(project, "mv_every") + assert refresh.every is not None + # Server returns plural; our parser preserves the canonical form. + assert refresh.mode.value == "every" + assert refresh.cron is None + assert refresh.at_most_every is None + + def test_on_update_bare_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_on_update_bare"]) + + refresh = _get_refresh_config(project, "mv_on_update_bare") + assert refresh.mode.value == "on_update" + assert refresh.on_update is True + assert refresh.at_most_every is None + assert refresh.cron is None + assert refresh.every is None + + def test_on_update_rate_limited_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_on_update_rate_limited"]) + + refresh = _get_refresh_config(project, "mv_on_update_rate_limited") + assert refresh.mode.value == "on_update" + # Server returns "INTERVAL N SECOND"; parser stores as " SECOND". + assert refresh.at_most_every is not None + # 15 MINUTES → 900 SECOND server-side. + assert "900" in refresh.at_most_every + + def test_idempotent_run_no_alter_for_every_mode(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_every"]) + # Second run with no config change: should NOT emit any ALTER or REFRESH. + _, logs = util.run_dbt_and_capture(["--debug", "run", "--models", "mv_every"]) + # Auto-refresh suppression: REFRESH MATERIALIZED VIEW is skipped for every/on_update. + assert "refresh materialized view" not in logs.lower() + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewManualMode: + """Initial-create with no `schedule` config: relation should round-trip as MANUAL. + The drop-and-readd test covers CRON → MANUAL transition; this covers fresh MANUAL.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"mv_manual.sql": MV_NO_SCHEDULE} + + def test_manual_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_manual"]) + + refresh = _get_refresh_config(project, "mv_manual") + assert refresh.mode.value == "manual" + assert refresh.cron is None + assert refresh.every is None + assert refresh.on_update is False + assert refresh.at_most_every is None + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewCronAutoRefreshPositiveControl: + """Positive control for the auto-REFRESH suppression conditional: CRON mode must still + emit REFRESH MATERIALIZED VIEW on idempotent re-runs (existing behavior preserved).""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"mv_cron_idempotent.sql": MV_CRON} + + def test_cron_mode_idempotent_run_still_refreshes(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_cron_idempotent"]) + _, logs = util.run_dbt_and_capture(["--debug", "run", "--models", "mv_cron_idempotent"]) + assert "refresh materialized view" in logs.lower() + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewDropAndReadd: + """Drop schedule (config removed) → DROP SCHEDULE; re-add → ADD SCHEDULE.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"mv_drop_readd.sql": MV_CRON} + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_drop_then_readd(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_drop_readd"]) + + no_schedule = """ +{{ config(materialized='materialized_view') }} +select * from {{ ref('my_seed') }} +""" + util.write_file(no_schedule, "models", "mv_drop_readd.sql") + _, drop_logs = util.run_dbt_and_capture(["--debug", "run", "--models", "mv_drop_readd"]) + assert "drop schedule" in drop_logs.lower() + + refresh = _get_refresh_config(project, "mv_drop_readd") + assert refresh.mode.value == "manual" + + util.write_file(MV_EVERY_2_HOURS, "models", "mv_drop_readd.sql") + _, add_logs = util.run_dbt_and_capture(["--debug", "run", "--models", "mv_drop_readd"]) + assert "add schedule every" in add_logs.lower() + + refresh = _get_refresh_config(project, "mv_drop_readd") + assert refresh.mode.value == "every" + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewCronToEveryAlter: + """Cross-mode ALTER (CRON → EVERY) preserves the relation, no rebuild.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"mv_cross_mode.sql": MV_CRON} + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_cron_to_every_alter(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_cross_mode"]) + + refresh = _get_refresh_config(project, "mv_cross_mode") + assert refresh.cron == "0 0 * * * ? *" + + util.write_file(MV_EVERY_2_HOURS, "models", "mv_cross_mode.sql") + + _, logs = util.run_dbt_and_capture(["--debug", "run", "--models", "mv_cross_mode"]) + + refresh = _get_refresh_config(project, "mv_cross_mode") + assert refresh.mode.value == "every" + assert refresh.cron is None + # Confirm an ALTER (not REPLACE) was applied. + util.assert_message_in_logs("Applying ALTER to:", logs) + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewGH1293Regression: + """GH #1293: a relation with a TRIGGER schedule (set via post_hook) breaks the parser + on the next dbt run. The fix is to have a parser that recognizes the TRIGGER syntax.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"mv_gh_1293.sql": MV_CRON} + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_post_hook_alter_to_trigger_then_run_succeeds(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "mv_gh_1293"]) + + # External ALTER (simulates the post_hook path the original GH #1293 user took). + relation = project.adapter.Relation.create( + identifier="mv_gh_1293", + schema=project.test_schema, + database=project.database, + ) + project.run_sql( + f"ALTER MATERIALIZED VIEW {relation} ALTER TRIGGER ON UPDATE" + " AT MOST EVERY INTERVAL 15 MINUTES" + ) + + # Pre-fix this raised "Could not parse schedule from description: ...". + util.run_dbt(["run", "--models", "mv_gh_1293"]) + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewScheduleLifecycle: + """Walks one MV through the realistic schedule lifecycle: MANUAL → CRON → ON_UPDATE + rate-limited → EVERY → (non-refresh change) → MANUAL. Each transition asserts the + post-state via DESCRIBE EXTENDED. Step 5 (tblproperties change) triggers + requires_full_refresh -> REPLACE path on MV; the schedule must survive the re-CREATE.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"mv_lifecycle.sql": MV_NO_SCHEDULE} + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_full_lifecycle(self, project): + util.run_dbt(["seed"]) + + # 1. MANUAL initial create + util.run_dbt(["run", "--models", "mv_lifecycle"]) + refresh = _get_refresh_config(project, "mv_lifecycle") + assert refresh.mode.value == "manual" + + # 2. MANUAL → CRON (in-place ALTER ADD SCHEDULE) + util.write_file(MV_CRON, "models", "mv_lifecycle.sql") + util.run_dbt(["run", "--models", "mv_lifecycle"]) + refresh = _get_refresh_config(project, "mv_lifecycle") + assert refresh.mode.value == "cron" + assert refresh.cron == "0 0 * * * ? *" + + # 3. CRON → ON_UPDATE rate-limited (in-place ALTER TRIGGER) + util.write_file(MV_ON_UPDATE_RATE_LIMITED, "models", "mv_lifecycle.sql") + util.run_dbt(["run", "--models", "mv_lifecycle"]) + refresh = _get_refresh_config(project, "mv_lifecycle") + assert refresh.mode.value == "on_update" + assert refresh.on_update is True + assert refresh.at_most_every is not None + assert "900" in refresh.at_most_every + + # 4. ON_UPDATE → EVERY (in-place ALTER SCHEDULE) + util.write_file(MV_EVERY_2_HOURS, "models", "mv_lifecycle.sql") + util.run_dbt(["run", "--models", "mv_lifecycle"]) + refresh = _get_refresh_config(project, "mv_lifecycle") + assert refresh.mode.value == "every" + assert refresh.cron is None + assert refresh.at_most_every is None + + # 5. EVERY + tblproperties (REPLACE path; schedule must survive re-CREATE) + util.write_file(MV_EVERY_WITH_TBLPROPS, "models", "mv_lifecycle.sql") + util.run_dbt(["run", "--models", "mv_lifecycle"]) + refresh = _get_refresh_config(project, "mv_lifecycle") + assert refresh.mode.value == "every", ( + "Schedule lost across REPLACE for tblproperties change" + ) + + # 6. EVERY → MANUAL (ALTER DROP SCHEDULE) + util.write_file(MV_NO_SCHEDULE, "models", "mv_lifecycle.sql") + util.run_dbt(["run", "--models", "mv_lifecycle"]) + refresh = _get_refresh_config(project, "mv_lifecycle") + assert refresh.mode.value == "manual", "Schedule not dropped on return to MANUAL" diff --git a/tests/functional/adapter/streaming_tables/fixtures.py b/tests/functional/adapter/streaming_tables/fixtures.py index f4e20122d..caa88cec0 100644 --- a/tests/functional/adapter/streaming_tables/fixtures.py +++ b/tests/functional/adapter/streaming_tables/fixtures.py @@ -44,7 +44,7 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]: partition_by='id', schedule = { 'cron': '0 0 * * * ? *', - 'time_zone': 'Etc/UTC' + 'time_zone_value': 'Etc/UTC' }, tblproperties={ 'key': 'value' diff --git a/tests/functional/adapter/streaming_tables/test_st_schedule_modes.py b/tests/functional/adapter/streaming_tables/test_st_schedule_modes.py new file mode 100644 index 000000000..2086a046c --- /dev/null +++ b/tests/functional/adapter/streaming_tables/test_st_schedule_modes.py @@ -0,0 +1,272 @@ +"""Functional coverage for the new streaming-table refresh modes (EVERY / TRIGGER ON UPDATE), +plus DROP/re-ADD and CRON-mode auto-REFRESH positive control. Mirrors the MV suite.""" + +import pytest +from dbt.tests import util +from dbt.tests.adapter.materialized_view.files import MY_SEED + +from dbt.adapters.databricks.relation import DatabricksRelationType +from dbt.adapters.databricks.relation_configs.streaming_table import ( + StreamingTableConfig, +) + +ST_EVERY_2_HOURS = """ +{{ config( + materialized='streaming_table', + schedule = {'every': '2 HOURS'}, +) }} +select * from stream {{ ref('my_seed') }} +""" + +ST_ON_UPDATE_BARE = """ +{{ config( + materialized='streaming_table', + schedule = {'on_update': True}, +) }} +select * from stream {{ ref('my_seed') }} +""" + +ST_ON_UPDATE_RATE_LIMITED = """ +{{ config( + materialized='streaming_table', + schedule = {'on_update': True, 'at_most_every': '15 MINUTES'}, +) }} +select * from stream {{ ref('my_seed') }} +""" + +ST_CRON = """ +{{ config( + materialized='streaming_table', + schedule = {'cron': '0 0 * * * ? *', 'time_zone_value': 'Etc/UTC'}, +) }} +select * from stream {{ ref('my_seed') }} +""" + +ST_EVERY_WITH_TBLPROPS = """ +{{ config( + materialized='streaming_table', + schedule = {'every': '2 HOURS'}, + tblproperties={'lifecycle_marker': 'v1'}, +) }} +select * from stream {{ ref('my_seed') }} +""" + +ST_NO_SCHEDULE = """ +{{ config(materialized='streaming_table') }} +select * from stream {{ ref('my_seed') }} +""" + + +def _get_refresh_config(project, identifier): + relation = project.adapter.Relation.create( + identifier=identifier, + schema=project.test_schema, + database=project.database, + type=DatabricksRelationType.StreamingTable, + ) + with util.get_connection(project.adapter): + results = project.adapter.get_relation_config(relation) + assert isinstance(results, StreamingTableConfig) + return results.config["refresh"] + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestStreamingTableScheduleModes: + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "st_every.sql": ST_EVERY_2_HOURS, + "st_on_update_bare.sql": ST_ON_UPDATE_BARE, + "st_on_update_rate_limited.sql": ST_ON_UPDATE_RATE_LIMITED, + } + + def test_every_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "st_every"]) + + refresh = _get_refresh_config(project, "st_every") + assert refresh.mode.value == "every" + assert refresh.cron is None + + def test_on_update_bare_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "st_on_update_bare"]) + + refresh = _get_refresh_config(project, "st_on_update_bare") + assert refresh.mode.value == "on_update" + assert refresh.on_update is True + assert refresh.at_most_every is None + + def test_on_update_rate_limited_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "st_on_update_rate_limited"]) + + refresh = _get_refresh_config(project, "st_on_update_rate_limited") + assert refresh.mode.value == "on_update" + assert refresh.at_most_every is not None + assert "900" in refresh.at_most_every + + def test_idempotent_run_no_refresh_for_every_mode(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "st_every"]) + _, logs = util.run_dbt_and_capture(["--debug", "run", "--models", "st_every"]) + assert "refreshing streaming table" not in logs.lower() + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestStreamingTableManualMode: + """Initial-create with no `schedule` config: relation should round-trip as MANUAL. + The drop-and-readd test covers CRON → MANUAL transition; this covers fresh MANUAL.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"st_manual.sql": ST_NO_SCHEDULE} + + def test_manual_mode_roundtrip(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "st_manual"]) + + refresh = _get_refresh_config(project, "st_manual") + assert refresh.mode.value == "manual" + assert refresh.cron is None + assert refresh.every is None + assert refresh.on_update is False + assert refresh.at_most_every is None + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestStreamingTableCronAutoRefreshPositiveControl: + """Positive control: CRON-mode ST still emits REFRESH on idempotent run.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"st_cron_idempotent.sql": ST_CRON} + + def test_cron_mode_idempotent_run_still_refreshes(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "st_cron_idempotent"]) + _, logs = util.run_dbt_and_capture(["--debug", "run", "--models", "st_cron_idempotent"]) + assert "refreshing streaming table" in logs.lower() + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestStreamingTableDropAndReadd: + """Drop ST schedule (config removed) and re-add. CREATE OR REFRESH clears the existing + schedule on every run, so drop is implicit (no ALTER emitted) and re-add uses ADD.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"st_drop_readd.sql": ST_CRON} + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_drop_then_readd(self, project): + util.run_dbt(["seed"]) + util.run_dbt(["run", "--models", "st_drop_readd"]) + + no_schedule = """ +{{ config(materialized='streaming_table') }} +select * from stream {{ ref('my_seed') }} +""" + util.write_file(no_schedule, "models", "st_drop_readd.sql") + _, drop_logs = util.run_dbt_and_capture(["--debug", "run", "--models", "st_drop_readd"]) + + refresh = _get_refresh_config(project, "st_drop_readd") + assert refresh.mode.value == "manual" + + util.write_file(ST_EVERY_2_HOURS, "models", "st_drop_readd.sql") + _, add_logs = util.run_dbt_and_capture(["--debug", "run", "--models", "st_drop_readd"]) + assert "add schedule every" in add_logs.lower() + + refresh = _get_refresh_config(project, "st_drop_readd") + assert refresh.mode.value == "every" + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestStreamingTableScheduleLifecycle: + """Walks one ST through the realistic schedule lifecycle: MANUAL → CRON → ON_UPDATE + rate-limited → EVERY → (non-refresh change) → MANUAL. Each transition asserts the + post-state via DESCRIBE EXTENDED. Step 5 (tblproperties change with schedule unchanged) + exercises the streaming_table.py:62 fix -- CREATE OR REFRESH clears the schedule as a + side effect, so the alter macro must re-ADD it even when refresh itself didn't diff.""" + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {"st_lifecycle.sql": ST_NO_SCHEDULE} + + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_full_lifecycle(self, project): + util.run_dbt(["seed"]) + + # 1. MANUAL initial create + util.run_dbt(["run", "--models", "st_lifecycle"]) + refresh = _get_refresh_config(project, "st_lifecycle") + assert refresh.mode.value == "manual" + + # 2. MANUAL → CRON + util.write_file(ST_CRON, "models", "st_lifecycle.sql") + util.run_dbt(["run", "--models", "st_lifecycle"]) + refresh = _get_refresh_config(project, "st_lifecycle") + assert refresh.mode.value == "cron" + assert refresh.cron == "0 0 * * * ? *" + + # 3. CRON → ON_UPDATE rate-limited + util.write_file(ST_ON_UPDATE_RATE_LIMITED, "models", "st_lifecycle.sql") + util.run_dbt(["run", "--models", "st_lifecycle"]) + refresh = _get_refresh_config(project, "st_lifecycle") + assert refresh.mode.value == "on_update" + assert refresh.on_update is True + assert refresh.at_most_every is not None + assert "900" in refresh.at_most_every + + # 4. ON_UPDATE → EVERY + util.write_file(ST_EVERY_2_HOURS, "models", "st_lifecycle.sql") + util.run_dbt(["run", "--models", "st_lifecycle"]) + refresh = _get_refresh_config(project, "st_lifecycle") + assert refresh.mode.value == "every" + assert refresh.cron is None + assert refresh.at_most_every is None + + # 5. EVERY + tblproperties (non-refresh change must preserve schedule) + util.write_file(ST_EVERY_WITH_TBLPROPS, "models", "st_lifecycle.sql") + util.run_dbt(["run", "--models", "st_lifecycle"]) + refresh = _get_refresh_config(project, "st_lifecycle") + assert refresh.mode.value == "every", ( + "Schedule silently dropped when only tblproperties changed" + ) + + # 6. EVERY → MANUAL + util.write_file(ST_NO_SCHEDULE, "models", "st_lifecycle.sql") + util.run_dbt(["run", "--models", "st_lifecycle"]) + refresh = _get_refresh_config(project, "st_lifecycle") + assert refresh.mode.value == "manual", "Schedule not dropped on return to MANUAL" diff --git a/tests/unit/macros/relations/components/__init__.py b/tests/unit/macros/relations/components/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/macros/relations/components/test_refresh_schedule_macros.py b/tests/unit/macros/relations/components/test_refresh_schedule_macros.py new file mode 100644 index 000000000..2f34b2005 --- /dev/null +++ b/tests/unit/macros/relations/components/test_refresh_schedule_macros.py @@ -0,0 +1,122 @@ +import pytest + +from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig +from tests.unit.macros.base import MacroTestBase + + +class _RefreshMacrosBase(MacroTestBase): + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "refresh_schedule.sql" + + @pytest.fixture(scope="class") + def macro_folders_to_load(self) -> list: + return ["macros", "macros/relations/components"] + + +class TestRefreshScheduleCreateMacro(_RefreshMacrosBase): + def test_cron_with_time_zone(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_create_sql_refresh_schedule", + RefreshConfig(cron="0 0 * * * ? *", time_zone_value="UTC"), + ) + self.assert_sql_equal(result, "schedule cron '0 0 * * * ? *' at time zone 'utc'") + + def test_cron_without_time_zone(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_create_sql_refresh_schedule", + RefreshConfig(cron="0 0 * * * ? *"), + ) + self.assert_sql_equal(result, "schedule cron '0 0 * * * ? *'") + + def test_every(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_create_sql_refresh_schedule", + RefreshConfig(every="2 HOURS"), + ) + self.assert_sql_equal(result, "schedule every 2 hours") + + def test_on_update_bare(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_create_sql_refresh_schedule", + RefreshConfig(on_update=True), + ) + self.assert_sql_equal(result, "trigger on update") + + def test_on_update_with_at_most_every(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_create_sql_refresh_schedule", + RefreshConfig(on_update=True, at_most_every="15 MINUTES"), + ) + self.assert_sql_equal(result, "trigger on update at most every interval 15 minutes") + + def test_manual_emits_nothing(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_create_sql_refresh_schedule", + RefreshConfig(), + ) + assert result.strip() == "" + + +class TestRefreshScheduleAlterMacro(_RefreshMacrosBase): + def test_add_cron(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_alter_sql_refresh_schedule", + RefreshConfig(cron="0 0 * * * ? *", time_zone_value="UTC", is_altered=False), + ) + self.assert_sql_equal(result, "add schedule cron '0 0 * * * ? *' at time zone 'utc'") + + def test_alter_cron(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_alter_sql_refresh_schedule", + RefreshConfig(cron="0 0 * * * ? *", time_zone_value="UTC", is_altered=True), + ) + self.assert_sql_equal(result, "alter schedule cron '0 0 * * * ? *' at time zone 'utc'") + + def test_add_every(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_alter_sql_refresh_schedule", + RefreshConfig(every="2 HOURS", is_altered=False), + ) + self.assert_sql_equal(result, "add schedule every 2 hours") + + def test_alter_every(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_alter_sql_refresh_schedule", + RefreshConfig(every="4 HOURS", is_altered=True), + ) + self.assert_sql_equal(result, "alter schedule every 4 hours") + + def test_add_trigger_bare(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_alter_sql_refresh_schedule", + RefreshConfig(on_update=True, is_altered=False), + ) + self.assert_sql_equal(result, "add trigger on update") + + def test_alter_trigger_with_at_most_every(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_alter_sql_refresh_schedule", + RefreshConfig(on_update=True, at_most_every="15 MINUTES", is_altered=True), + ) + self.assert_sql_equal(result, "alter trigger on update at most every interval 15 minutes") + + def test_drop_schedule_when_no_mode_set(self, template_bundle): + result = self.run_macro( + template_bundle.template, + "get_alter_sql_refresh_schedule", + RefreshConfig(), + ) + self.assert_sql_equal(result, "drop schedule") diff --git a/tests/unit/relation_configs/test_refresh.py b/tests/unit/relation_configs/test_refresh.py index 94180bb08..35cee02f5 100644 --- a/tests/unit/relation_configs/test_refresh.py +++ b/tests/unit/relation_configs/test_refresh.py @@ -3,7 +3,11 @@ import pytest from dbt.exceptions import DbtRuntimeError -from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig, RefreshProcessor +from dbt.adapters.databricks.relation_configs.refresh import ( + RefreshConfig, + RefreshMode, + RefreshProcessor, +) from tests.unit import fixtures @@ -34,7 +38,7 @@ def test_from_results__invalid(self): } with pytest.raises( DbtRuntimeError, - match="Could not parse schedule from description: invalid description", + match="Could not parse refresh schedule from describe extended: 'invalid description'", ): RefreshProcessor.from_relation_results(results) @@ -48,8 +52,7 @@ def test_from_model_node__without_cron(self): model = Mock() model.config.extra = {"schedule": {"time_zone_value": "UTC"}} with pytest.raises( - DbtRuntimeError, - match="Schedule config must contain a 'cron' key, got {'time_zone_value': 'UTC'}", + DbtRuntimeError, match=r"`time_zone_value` is only valid when `cron` is set" ): RefreshProcessor.from_relation_config(model) @@ -90,3 +93,331 @@ def test_get_diff__manual_other_manual_refresh(self): other = RefreshConfig() diff = config.get_diff(other) assert diff is None + + def test_eq__different_time_zone_not_equal(self): + # Regression: same cron, different time zones must compare unequal. + a = RefreshConfig(cron="*/5 * * * *", time_zone_value="America/Los_Angeles") + b = RefreshConfig(cron="*/5 * * * *", time_zone_value="America/New_York") + assert a != b + + def test_eq__same_time_zone_equal(self): + a = RefreshConfig(cron="*/5 * * * *", time_zone_value="America/Los_Angeles") + b = RefreshConfig(cron="*/5 * * * *", time_zone_value="America/Los_Angeles") + assert a == b + + def test_eq__implicit_utc_equals_explicit_utc(self): + a = RefreshConfig(cron="*/5 * * * *") + b = RefreshConfig(cron="*/5 * * * *", time_zone_value="UTC") + assert a == b + + +class TestRefreshConfigEquality: + """Direct coverage of __eq__ across every mode and every normalization path.""" + + def test_eq__manual_equals_manual(self): + assert RefreshConfig() == RefreshConfig() + + def test_eq__manual_not_equal_cron(self): + assert RefreshConfig() != RefreshConfig(cron="*/5 * * * *") + + def test_eq__manual_not_equal_every(self): + assert RefreshConfig() != RefreshConfig(every="2 HOURS") + + def test_eq__manual_not_equal_on_update(self): + assert RefreshConfig() != RefreshConfig(on_update=True) + + def test_eq__cross_mode_cron_vs_every(self): + assert RefreshConfig(cron="*/5 * * * *") != RefreshConfig(every="2 HOURS") + + def test_eq__cross_mode_cron_vs_on_update(self): + assert RefreshConfig(cron="*/5 * * * *") != RefreshConfig(on_update=True) + + def test_eq__cross_mode_every_vs_on_update(self): + assert RefreshConfig(every="2 HOURS") != RefreshConfig(on_update=True) + + def test_eq__cron_case_insensitive_timezone(self): + # Time-zone canonicalization is case-insensitive (server output is UPPERCASE). + a = RefreshConfig(cron="*/5 * * * *", time_zone_value="utc") + b = RefreshConfig(cron="*/5 * * * *", time_zone_value="UTC") + assert a == b + + def test_eq__cron_different_expr_not_equal(self): + a = RefreshConfig(cron="*/5 * * * *", time_zone_value="UTC") + b = RefreshConfig(cron="0 * * * *", time_zone_value="UTC") + assert a != b + + def test_eq__every_plural_vs_singular_normalized(self): + assert RefreshConfig(every="1 DAY") == RefreshConfig(every="1 DAYS") + + def test_eq__every_case_insensitive(self): + assert RefreshConfig(every="2 hours") == RefreshConfig(every="2 HOURS") + + def test_eq__every_different_count_not_equal(self): + assert RefreshConfig(every="2 HOURS") != RefreshConfig(every="4 HOURS") + + def test_eq__every_different_unit_not_equal(self): + assert RefreshConfig(every="2 HOURS") != RefreshConfig(every="2 DAYS") + + def test_eq__on_update_bare_equals_bare(self): + assert RefreshConfig(on_update=True) == RefreshConfig(on_update=True) + + def test_eq__on_update_bare_not_equal_rate_limited(self): + a = RefreshConfig(on_update=True) + b = RefreshConfig(on_update=True, at_most_every="15 MINUTES") + assert a != b + # Symmetry: a != b ↔ b != a. + assert b != a + + def test_eq__on_update_minutes_vs_seconds_normalized(self): + a = RefreshConfig(on_update=True, at_most_every="15 MINUTES") + b = RefreshConfig(on_update=True, at_most_every="900 SECOND") + assert a == b + + def test_eq__on_update_hour_vs_seconds_normalized(self): + a = RefreshConfig(on_update=True, at_most_every="1 HOUR") + b = RefreshConfig(on_update=True, at_most_every="3600 SECOND") + assert a == b + + def test_eq__on_update_different_intervals_not_equal(self): + a = RefreshConfig(on_update=True, at_most_every="15 MINUTES") + b = RefreshConfig(on_update=True, at_most_every="30 MINUTES") + assert a != b + + def test_eq__non_refreshconfig_not_equal(self): + # __eq__ must return False for foreign types, not raise. + assert RefreshConfig() != object() + assert RefreshConfig(cron="*/5 * * * *") != "*/5 * * * *" + + def test_eq__is_altered_does_not_affect_identity(self): + # is_altered is a render-time hint, not part of identity. + a = RefreshConfig(cron="*/5 * * * *", is_altered=False) + b = RefreshConfig(cron="*/5 * * * *", is_altered=True) + assert a == b + + +class TestRefreshModeDiscriminator: + def test_mode__manual(self): + assert RefreshConfig().mode == RefreshMode.MANUAL + + def test_mode__cron(self): + assert RefreshConfig(cron="*/5 * * * *").mode == RefreshMode.CRON + + def test_mode__every(self): + assert RefreshConfig(every="2 HOURS").mode == RefreshMode.EVERY + + def test_mode__on_update_bare(self): + assert RefreshConfig(on_update=True).mode == RefreshMode.ON_UPDATE + + def test_mode__on_update_rate_limited(self): + assert ( + RefreshConfig(on_update=True, at_most_every="15 MINUTES").mode == RefreshMode.ON_UPDATE + ) + + +class TestRefreshConfigValidation: + def test_validation__multiple_modes_rejected(self): + with pytest.raises(DbtRuntimeError, match="at most one"): + RefreshConfig(cron="*/5 * * * *", every="2 HOURS") + + def test_validation__cron_with_on_update_rejected(self): + with pytest.raises(DbtRuntimeError, match="at most one"): + RefreshConfig(cron="*/5 * * * *", on_update=True) + + def test_validation__time_zone_without_cron_rejected(self): + with pytest.raises(DbtRuntimeError, match="time_zone_value"): + RefreshConfig(time_zone_value="UTC", every="2 HOURS") + + def test_validation__at_most_every_below_minimum_rejected(self): + with pytest.raises(DbtRuntimeError, match="at least 60 seconds"): + RefreshConfig(on_update=True, at_most_every="30 SECONDS") + + def test_validation__at_most_every_59_seconds_rejected(self): + with pytest.raises(DbtRuntimeError, match="at least 60 seconds"): + RefreshConfig(on_update=True, at_most_every="59 SECONDS") + + def test_validation__at_most_every_minimum_seconds_ok(self): + c = RefreshConfig(on_update=True, at_most_every="60 SECONDS") + assert c.mode == RefreshMode.ON_UPDATE + + def test_validation__at_most_every_minimum_minutes_ok(self): + c = RefreshConfig(on_update=True, at_most_every="1 MINUTE") + assert c.mode == RefreshMode.ON_UPDATE + + def test_validation__at_most_every_without_on_update_rejected(self): + # Parallel to time_zone_value-without-cron: at_most_every is an option of on_update + # and cannot stand alone. + with pytest.raises(DbtRuntimeError, match="`at_most_every` is only valid"): + RefreshConfig(at_most_every="15 MINUTES") + + +class TestRefreshProcessorNewShapes: + def test_from_results__every_2_hours(self): + results = { + "describe_extended": fixtures.gen_describe_extended( + detailed_table_info=[["Refresh Schedule", "EVERY 2 HOURS"]] + ) + } + spec = RefreshProcessor.from_relation_results(results) + assert spec == RefreshConfig(every="2 HOURS") + + def test_from_results__every_1_days_normalizes_to_1_day(self): + results = { + "describe_extended": fixtures.gen_describe_extended( + detailed_table_info=[["Refresh Schedule", "EVERY 1 DAYS"]] + ) + } + spec = RefreshProcessor.from_relation_results(results) + assert spec.mode == RefreshMode.EVERY + assert spec == RefreshConfig(every="1 DAY") + + def test_from_results__every_8_weeks(self): + results = { + "describe_extended": fixtures.gen_describe_extended( + detailed_table_info=[["Refresh Schedule", "EVERY 8 WEEKS"]] + ) + } + spec = RefreshProcessor.from_relation_results(results) + assert spec == RefreshConfig(every="8 WEEKS") + + def test_from_results__trigger_bare(self): + results = { + "describe_extended": fixtures.gen_describe_extended( + detailed_table_info=[["Refresh Schedule", "TRIGGER ON UPDATE"]] + ) + } + spec = RefreshProcessor.from_relation_results(results) + assert spec.mode == RefreshMode.ON_UPDATE + assert spec.at_most_every is None + assert spec == RefreshConfig(on_update=True) + + def test_from_results__trigger_with_interval_seconds(self): + results = { + "describe_extended": fixtures.gen_describe_extended( + detailed_table_info=[ + ["Refresh Schedule", "TRIGGER ON UPDATE AT MOST EVERY INTERVAL 900 SECOND"] + ] + ) + } + spec = RefreshProcessor.from_relation_results(results) + assert spec.mode == RefreshMode.ON_UPDATE + assert spec.on_update is True + # Server-stored '900 SECOND' must compare equal to user-input '15 MINUTES'. + assert spec == RefreshConfig(on_update=True, at_most_every="15 MINUTES") + + def test_from_results__trigger_with_interval_does_not_raise(self): + results = { + "describe_extended": fixtures.gen_describe_extended( + detailed_table_info=[ + ["Refresh Schedule", "TRIGGER ON UPDATE AT MOST EVERY INTERVAL 900 SECOND"] + ] + ) + } + RefreshProcessor.from_relation_results(results) + + +class TestFromRelationConfigNewShapes: + def test_from_relation_config__every(self): + model = Mock() + model.config.extra = {"schedule": {"every": "2 HOURS"}} + spec = RefreshProcessor.from_relation_config(model) + assert spec == RefreshConfig(every="2 HOURS") + + def test_from_relation_config__on_update_bool(self): + model = Mock() + model.config.extra = {"schedule": {"on_update": True}} + spec = RefreshProcessor.from_relation_config(model) + assert spec == RefreshConfig(on_update=True) + + def test_from_relation_config__on_update_with_at_most_every(self): + model = Mock() + model.config.extra = {"schedule": {"on_update": True, "at_most_every": "15 MINUTES"}} + spec = RefreshProcessor.from_relation_config(model) + assert spec == RefreshConfig(on_update=True, at_most_every="15 MINUTES") + + def test_from_relation_config__at_most_every_without_on_update_rejected(self): + model = Mock() + model.config.extra = {"schedule": {"at_most_every": "15 MINUTES"}} + with pytest.raises(DbtRuntimeError, match="`at_most_every` is only valid"): + RefreshProcessor.from_relation_config(model) + + def test_from_relation_config__schedule_with_cron_and_every_rejected(self): + model = Mock() + model.config.extra = { + "schedule": {"cron": "*/5 * * * *", "every": "2 HOURS"}, + } + with pytest.raises(DbtRuntimeError, match="at most one"): + RefreshProcessor.from_relation_config(model) + + def test_from_relation_config__schedule_with_cron_and_on_update_rejected(self): + model = Mock() + model.config.extra = { + "schedule": {"cron": "*/5 * * * *", "on_update": True}, + } + with pytest.raises(DbtRuntimeError, match="at most one"): + RefreshProcessor.from_relation_config(model) + + +class TestRefreshConfigDiffNormalization: + def test_diff__every_1_day_vs_1_days_no_diff(self): + desired = RefreshConfig(every="1 DAY") + existing = RefreshConfig(every="1 DAYS") + assert desired.get_diff(existing) is None + + def test_diff__at_most_every_15_minutes_vs_900_second_no_diff(self): + desired = RefreshConfig(on_update=True, at_most_every="15 MINUTES") + existing = RefreshConfig(on_update=True, at_most_every="900 SECOND") + assert desired.get_diff(existing) is None + + def test_diff__at_most_every_1_hour_vs_3600_second_no_diff(self): + desired = RefreshConfig(on_update=True, at_most_every="1 HOUR") + existing = RefreshConfig(on_update=True, at_most_every="3600 SECOND") + assert desired.get_diff(existing) is None + + def test_diff__cross_mode_cron_to_every_alter(self): + desired = RefreshConfig(every="2 HOURS") + existing = RefreshConfig(cron="*/5 * * * *") + diff = desired.get_diff(existing) + assert diff is not None + assert diff.is_altered is True + + def test_diff__cross_mode_every_to_on_update_alter(self): + desired = RefreshConfig(on_update=True, at_most_every="15 MINUTES") + existing = RefreshConfig(every="2 HOURS") + diff = desired.get_diff(existing) + assert diff is not None + assert diff.is_altered is True + + def test_diff__bare_trigger_vs_rate_limited_alter(self): + desired = RefreshConfig(on_update=True, at_most_every="15 MINUTES") + existing = RefreshConfig(on_update=True) + diff = desired.get_diff(existing) + assert diff is not None + assert diff.is_altered is True + + def test_diff__different_every_alter(self): + desired = RefreshConfig(every="4 HOURS") + existing = RefreshConfig(every="2 HOURS") + diff = desired.get_diff(existing) + assert diff is not None + assert diff.is_altered is True + + def test_diff__manual_to_every_add(self): + desired = RefreshConfig(every="2 HOURS") + existing = RefreshConfig() + diff = desired.get_diff(existing) + assert diff is not None + assert diff.is_altered is False + + def test_diff__every_to_manual_drop(self): + desired = RefreshConfig() + existing = RefreshConfig(every="2 HOURS") + diff = desired.get_diff(existing) + assert diff is not None + assert diff.is_altered is False + + def test_diff__manual_to_on_update_add(self): + desired = RefreshConfig(on_update=True) + existing = RefreshConfig() + diff = desired.get_diff(existing) + assert diff is not None + assert diff.is_altered is False