Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@
from dbt.adapters.databricks.relation_configs.streaming_table import (
StreamingTableConfig,
)
from dbt.adapters.databricks.relation_configs.tags import (
TagsProcessor,
)
from dbt.adapters.databricks.relation_configs.table_format import TableFormat
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig
from dbt.adapters.databricks.relation_configs.view import ViewConfig
Expand Down Expand Up @@ -928,15 +931,15 @@ def parse_columns_and_constraints(
return enriched_columns, parsed_constraints

@available.parse(lambda *a, **k: {})
def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase:
def get_relation_config(self, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase) -> DatabricksRelationConfigBase:
if relation.type == DatabricksRelationType.MaterializedView:
return MaterializedViewAPI.get_from_relation(self, relation)
return MaterializedViewAPI.get_from_relation(self, relation, relation_config)
elif relation.type == DatabricksRelationType.StreamingTable:
return StreamingTableAPI.get_from_relation(self, relation)
return StreamingTableAPI.get_from_relation(self, relation, relation_config)
elif relation.type == DatabricksRelationType.Table:
return IncrementalTableAPI.get_from_relation(self, relation)
return IncrementalTableAPI.get_from_relation(self, relation, relation_config)
elif relation.type == DatabricksRelationType.View:
return ViewAPI.get_from_relation(self, relation)
return ViewAPI.get_from_relation(self, relation, relation_config)
else:
raise NotImplementedError(f"Relation type {relation.type} is not supported.")

Expand Down Expand Up @@ -1017,12 +1020,12 @@ def config_type(cls) -> type[DatabricksRelationConfig]:

@classmethod
def get_from_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase
) -> DatabricksRelationConfig:
"""Get the relation config from the relation."""

assert relation.type == cls.relation_type
results = cls._describe_relation(adapter, relation)
results = cls._describe_relation(adapter, relation, relation_config)
return cls.config_type().from_results(results)

@classmethod
Expand All @@ -1034,7 +1037,7 @@ def get_from_relation_config(cls, relation_config: RelationConfig) -> Databricks
@classmethod
@abstractmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase
) -> RelationResults:
"""Describe the relation and return the results."""

Expand All @@ -1044,11 +1047,11 @@ def _describe_relation(
class DeltaLiveTableAPIBase(RelationAPIBase[DatabricksRelationConfig]):
@classmethod
def get_from_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase
) -> DatabricksRelationConfig:
"""Get the relation config from the relation."""

relation_config = super().get_from_relation(adapter, relation)
relation_config = super().get_from_relation(adapter, relation, relation_config)

# Ensure any current refreshes are completed before returning the relation config
tblproperties = cast(TblPropertiesConfig, relation_config.config["tblproperties"])
Expand All @@ -1068,7 +1071,7 @@ def config_type(cls) -> type[MaterializedViewConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase
) -> RelationResults:
kwargs = {"table_name": relation}
results: RelationResults = dict()
Expand All @@ -1093,7 +1096,7 @@ def config_type(cls) -> type[StreamingTableConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase
) -> RelationResults:
kwargs = {"table_name": relation}
results: RelationResults = dict()
Expand All @@ -1116,16 +1119,26 @@ def config_type(cls) -> type[IncrementalTableConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase
) -> RelationResults:
results = {}
kwargs = {"relation": relation}

if not relation.is_hive_metastore():
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["information_schema.column_tags"] = adapter.execute_macro(
"fetch_column_tags", kwargs=kwargs
)
table_tag_config = relation_config.config.get(TagsProcessor.name) if relation_config else None
if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff():
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
else:
results["information_schema.tags"] = None

column_tag_config = relation_config.config.get(ColumnTagsProcessor.name) if relation_config else None
if column_tag_config is None or column_tag_config.requires_server_metadata_for_diff():
results["information_schema.column_tags"] = adapter.execute_macro(
"fetch_column_tags", kwargs=kwargs
)
else:
results["information_schema.column_tags"] = None

results["non_null_constraint_columns"] = adapter.execute_macro(
"fetch_non_null_constraint_columns", kwargs=kwargs
)
Expand Down Expand Up @@ -1154,15 +1167,21 @@ def config_type(cls) -> type[ViewConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase
) -> RelationResults:
results = {}
kwargs = {"relation": relation}

results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)

table_tag_config = relation_config.config.get(TagsProcessor.name) if relation_config else None
if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff():
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
else:
results["information_schema.tags"] = None

results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

kwargs = {"table_name": relation}
Expand Down
5 changes: 5 additions & 0 deletions dbt/adapters/databricks/relation_configs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def get_diff(self, other: Self) -> Optional[Self]:
return self
return None

def requires_server_metadata_for_diff(self) -> bool:
"""
Indicates whether server metadata is required to compute the diff for this component.
"""
return True

class DatabricksRelationChangeSet(BaseModel):
"""Class for encapsulating the changes that need to be applied to a Databricks relation."""
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/databricks/relation_configs/column_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def get_diff(self, other: "ColumnTagsConfig") -> Optional["ColumnTagsConfig"]:
return ColumnTagsConfig(set_column_tags=set_column_tags)
return None

def requires_server_metadata_for_diff(self) -> bool:
"""
Indicates whether server metadata is required to compute the diff for this component.
"""
return self.set_column_tags and len(self.set_column_tags) > 0


class ColumnTagsProcessor(DatabricksComponentProcessor[ColumnTagsConfig]):
name: ClassVar[str] = "column_tags"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class StreamingTableConfig(DatabricksRelationConfigBase):
CommentProcessor,
TblPropertiesProcessor,
RefreshProcessor,
TagsProcessor,
DescribeQueryProcessor,
TagsProcessor,
]
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/databricks/relation_configs/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ def get_diff(self, other: "TagsConfig") -> Optional["TagsConfig"]:
return TagsConfig(set_tags=self.set_tags)
return None

def requires_server_metadata_for_diff(self) -> bool:
"""
Indicates whether server metadata is required to compute the diff for this component.
"""
return self.set_tags and len(self.set_tags) > 0


class TagsProcessor(DatabricksComponentProcessor[TagsConfig]):
name: ClassVar[str] = "tags"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@
{{ set_overwrite_mode('DYNAMIC') }}
{%- endif -%}
{#-- Relation must be merged --#}
{%- set _existing_config = adapter.get_relation_config(existing_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
{%- set _existing_config = adapter.get_relation_config(existing_relation, model_config) -%}
{%- set _configuration_changes = model_config.get_changeset(_existing_config) -%}
{%- call statement('create_temp_relation', language=language) -%}
{{ create_table_as(True, temp_relation, compiled_code, language) }}
Expand Down Expand Up @@ -237,8 +237,8 @@
{% macro process_config_changes(target_relation) %}
{% set apply_config_changes = config.get('incremental_apply_config_changes', True) | as_bool %}
{% if apply_config_changes %}
{%- set existing_config = adapter.get_relation_config(target_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
{%- set existing_config = adapter.get_relation_config(target_relation, model_config) -%}
{%- set configuration_changes = model_config.get_changeset(existing_config) -%}
{{ apply_config_changeset(target_relation, model, configuration_changes) }}
{% endif %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/relations/config.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{%- macro get_configuration_changes(existing_relation) -%}
{%- set existing_config = adapter.get_relation_config(existing_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
{%- set existing_config = adapter.get_relation_config(existing_relation, model_config) -%}
{%- set configuration_changes = model_config.get_changeset(existing_config) -%}
{% do return(configuration_changes) %}
{%- endmacro -%}
34 changes: 34 additions & 0 deletions tests/functional/adapter/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from dbt.adapters.base.relation import BaseRelation
from dbt.tests.util import get_manifest

FAIL_IF_TAG_FETCH_CALLED_MACROS = """
{% macro fetch_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_tags should not be called") }}
{% endmacro %}
"""

FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS = """
{% macro fetch_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_tags should not be called") }}
{% endmacro %}

{% macro fetch_column_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }}
{% endmacro %}
"""


def get_model_config(project, relation: BaseRelation):
"""Return the parsed dbt model config for the given relation fixture."""
manifest = get_manifest(project.project_root)
model_nodes = [
node
for node in manifest.nodes.values()
if getattr(node, "resource_type", None) == "model"
and getattr(node, "alias", None) == relation.identifier
]
assert len(model_nodes) == 1, (
f"Expected exactly one model node for relation {relation.identifier}, "
f"found {len(model_nodes)}"
)
return project.adapter.get_config_from_model(model_nodes[0])
41 changes: 41 additions & 0 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,47 @@
- name: color
"""

metadata_fetch_incremental_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
) }}

select cast(1 as bigint) as id
"""

metadata_fetch_no_tags_schema = """
version: 2

models:
- name: metadata_fetch_incremental
columns:
- name: id
"""

metadata_fetch_table_tags_schema = """
version: 2

models:
- name: metadata_fetch_incremental
config:
databricks_tags:
classification: internal
columns:
- name: id
"""

metadata_fetch_column_tags_schema = """
version: 2

models:
- name: metadata_fetch_incremental
columns:
- name: id
databricks_tags:
classification: internal
"""

tblproperties_a = """
version: 2

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pytest
from dbt.tests import util

from tests.functional.adapter.helpers import (
FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS,
)
from tests.functional.adapter.incremental import fixtures


@pytest.mark.skip_profile("databricks_cluster")
class TestIncrementalMetadataFetchSkips:
@pytest.fixture(scope="class")
def models(self):
return {
"metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql,
"schema.yml": fixtures.metadata_fetch_no_tags_schema,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS
}

def test_second_incremental_run_succeeds_without_tag_fetches(self, project):
# The first run creates the relation; the second run exercises the existing-relation
# path where adapter.get_relation_config() may attempt metadata fetches.
util.run_dbt(["run"])
util.run_dbt(["run"])


@pytest.mark.skip_profile("databricks_cluster")
class TestIncrementalMetadataFetchRequiresTableTags:
@pytest.fixture(scope="class")
def models(self):
return {
"metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql,
"schema.yml": fixtures.metadata_fetch_table_tags_schema,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS
}

def test_second_incremental_run_fails_when_table_tag_fetch_is_required(self, project):
# The first run creates the relation; the second run exercises the existing-relation
# path where adapter.get_relation_config() may attempt metadata fetches.
util.run_dbt(["run"])
_, logs = util.run_dbt_and_capture(["run"], expect_pass=False)
util.assert_message_in_logs("fetch_tags should not be called", logs)


@pytest.mark.skip_profile("databricks_cluster")
class TestIncrementalMetadataFetchRequiresColumnTags:
@pytest.fixture(scope="class")
def models(self):
return {
"metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql,
"schema.yml": fixtures.metadata_fetch_column_tags_schema,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS
}

def test_second_incremental_run_fails_when_column_tag_fetch_is_required(self, project):
# The first run creates the relation; the second run exercises the existing-relation
# path where adapter.get_relation_config() may attempt metadata fetches.
util.run_dbt(["run"])
_, logs = util.run_dbt_and_capture(["run"], expect_pass=False)
util.assert_message_in_logs("fetch_column_tags should not be called", logs)
Loading
Loading