diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 045f4b106..b3554b842 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -79,6 +79,9 @@ from dbt.adapters.databricks.relation_configs.streaming_table import ( StreamingTableConfig, ) +from dbt.adapters.databricks.relation_configs.tags import ( + TagsProcessor, +) from dbt.adapters.databricks.relation_configs.table_format import TableFormat from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig from dbt.adapters.databricks.relation_configs.view import ViewConfig @@ -928,15 +931,15 @@ def parse_columns_and_constraints( return enriched_columns, parsed_constraints @available.parse(lambda *a, **k: {}) - def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase: + def get_relation_config(self, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase) -> DatabricksRelationConfigBase: if relation.type == DatabricksRelationType.MaterializedView: - return MaterializedViewAPI.get_from_relation(self, relation) + return MaterializedViewAPI.get_from_relation(self, relation, relation_config) elif relation.type == DatabricksRelationType.StreamingTable: - return StreamingTableAPI.get_from_relation(self, relation) + return StreamingTableAPI.get_from_relation(self, relation, relation_config) elif relation.type == DatabricksRelationType.Table: - return IncrementalTableAPI.get_from_relation(self, relation) + return IncrementalTableAPI.get_from_relation(self, relation, relation_config) elif relation.type == DatabricksRelationType.View: - return ViewAPI.get_from_relation(self, relation) + return ViewAPI.get_from_relation(self, relation, relation_config) else: raise NotImplementedError(f"Relation type {relation.type} is not supported.") @@ -1017,12 +1020,12 @@ def config_type(cls) -> type[DatabricksRelationConfig]: @classmethod def get_from_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> DatabricksRelationConfig: """Get the relation config from the relation.""" assert relation.type == cls.relation_type - results = cls._describe_relation(adapter, relation) + results = cls._describe_relation(adapter, relation, relation_config) return cls.config_type().from_results(results) @classmethod @@ -1034,7 +1037,7 @@ def get_from_relation_config(cls, relation_config: RelationConfig) -> Databricks @classmethod @abstractmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: """Describe the relation and return the results.""" @@ -1044,11 +1047,11 @@ def _describe_relation( class DeltaLiveTableAPIBase(RelationAPIBase[DatabricksRelationConfig]): @classmethod def get_from_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> DatabricksRelationConfig: """Get the relation config from the relation.""" - relation_config = super().get_from_relation(adapter, relation) + relation_config = super().get_from_relation(adapter, relation, relation_config) # Ensure any current refreshes are completed before returning the relation config tblproperties = cast(TblPropertiesConfig, relation_config.config["tblproperties"]) @@ -1068,7 +1071,7 @@ def config_type(cls) -> type[MaterializedViewConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: kwargs = {"table_name": relation} results: RelationResults = dict() @@ -1093,7 +1096,7 @@ def config_type(cls) -> type[StreamingTableConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: kwargs = {"table_name": relation} results: RelationResults = dict() @@ -1116,16 +1119,26 @@ def config_type(cls) -> type[IncrementalTableConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: results = {} kwargs = {"relation": relation} if not relation.is_hive_metastore(): - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) - results["information_schema.column_tags"] = adapter.execute_macro( - "fetch_column_tags", kwargs=kwargs - ) + table_tag_config = relation_config.config.get(TagsProcessor.name) if relation_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + else: + results["information_schema.tags"] = None + + column_tag_config = relation_config.config.get(ColumnTagsProcessor.name) if relation_config else None + if column_tag_config is None or column_tag_config.requires_server_metadata_for_diff(): + results["information_schema.column_tags"] = adapter.execute_macro( + "fetch_column_tags", kwargs=kwargs + ) + else: + results["information_schema.column_tags"] = None + results["non_null_constraint_columns"] = adapter.execute_macro( "fetch_non_null_constraint_columns", kwargs=kwargs ) @@ -1154,7 +1167,7 @@ def config_type(cls) -> type[ViewConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: results = {} kwargs = {"relation": relation} @@ -1162,7 +1175,13 @@ def _describe_relation( results["information_schema.views"] = get_first_row( adapter.execute_macro("get_view_description", kwargs=kwargs) ) - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + + table_tag_config = relation_config.config.get(TagsProcessor.name) if relation_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + else: + results["information_schema.tags"] = None + results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) kwargs = {"table_name": relation} diff --git a/dbt/adapters/databricks/relation_configs/base.py b/dbt/adapters/databricks/relation_configs/base.py index 92be6706a..c6b505d58 100644 --- a/dbt/adapters/databricks/relation_configs/base.py +++ b/dbt/adapters/databricks/relation_configs/base.py @@ -34,6 +34,11 @@ def get_diff(self, other: Self) -> Optional[Self]: return self return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return True class DatabricksRelationChangeSet(BaseModel): """Class for encapsulating the changes that need to be applied to a Databricks relation.""" diff --git a/dbt/adapters/databricks/relation_configs/column_tags.py b/dbt/adapters/databricks/relation_configs/column_tags.py index 93d5ea890..16c45141b 100644 --- a/dbt/adapters/databricks/relation_configs/column_tags.py +++ b/dbt/adapters/databricks/relation_configs/column_tags.py @@ -41,6 +41,12 @@ def get_diff(self, other: "ColumnTagsConfig") -> Optional["ColumnTagsConfig"]: return ColumnTagsConfig(set_column_tags=set_column_tags) return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return self.set_column_tags and len(self.set_column_tags) > 0 + class ColumnTagsProcessor(DatabricksComponentProcessor[ColumnTagsConfig]): name: ClassVar[str] = "column_tags" diff --git a/dbt/adapters/databricks/relation_configs/streaming_table.py b/dbt/adapters/databricks/relation_configs/streaming_table.py index b020272cd..c153d30e5 100644 --- a/dbt/adapters/databricks/relation_configs/streaming_table.py +++ b/dbt/adapters/databricks/relation_configs/streaming_table.py @@ -29,7 +29,6 @@ class StreamingTableConfig(DatabricksRelationConfigBase): CommentProcessor, TblPropertiesProcessor, RefreshProcessor, - TagsProcessor, DescribeQueryProcessor, TagsProcessor, ] diff --git a/dbt/adapters/databricks/relation_configs/tags.py b/dbt/adapters/databricks/relation_configs/tags.py index 9286bc9b3..7f306fec5 100644 --- a/dbt/adapters/databricks/relation_configs/tags.py +++ b/dbt/adapters/databricks/relation_configs/tags.py @@ -22,6 +22,12 @@ def get_diff(self, other: "TagsConfig") -> Optional["TagsConfig"]: return TagsConfig(set_tags=self.set_tags) return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return self.set_tags and len(self.set_tags) > 0 + class TagsProcessor(DatabricksComponentProcessor[TagsConfig]): name: ClassVar[str] = "tags" diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index ac717ae45..559812828 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -133,8 +133,8 @@ {{ set_overwrite_mode('DYNAMIC') }} {%- endif -%} {#-- Relation must be merged --#} - {%- set _existing_config = adapter.get_relation_config(existing_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set _existing_config = adapter.get_relation_config(existing_relation, model_config) -%} {%- set _configuration_changes = model_config.get_changeset(_existing_config) -%} {%- call statement('create_temp_relation', language=language) -%} {{ create_table_as(True, temp_relation, compiled_code, language) }} @@ -237,8 +237,8 @@ {% macro process_config_changes(target_relation) %} {% set apply_config_changes = config.get('incremental_apply_config_changes', True) | as_bool %} {% if apply_config_changes %} - {%- set existing_config = adapter.get_relation_config(target_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set existing_config = adapter.get_relation_config(target_relation, model_config) -%} {%- set configuration_changes = model_config.get_changeset(existing_config) -%} {{ apply_config_changeset(target_relation, model, configuration_changes) }} {% endif %} diff --git a/dbt/include/databricks/macros/relations/config.sql b/dbt/include/databricks/macros/relations/config.sql index 4c6ae8910..8a6b016f5 100644 --- a/dbt/include/databricks/macros/relations/config.sql +++ b/dbt/include/databricks/macros/relations/config.sql @@ -1,6 +1,6 @@ {%- macro get_configuration_changes(existing_relation) -%} - {%- set existing_config = adapter.get_relation_config(existing_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set existing_config = adapter.get_relation_config(existing_relation, model_config) -%} {%- set configuration_changes = model_config.get_changeset(existing_config) -%} {% do return(configuration_changes) %} {%- endmacro -%} \ No newline at end of file diff --git a/tests/functional/adapter/helpers.py b/tests/functional/adapter/helpers.py new file mode 100644 index 000000000..f94de5216 --- /dev/null +++ b/tests/functional/adapter/helpers.py @@ -0,0 +1,34 @@ +from dbt.adapters.base.relation import BaseRelation +from dbt.tests.util import get_manifest + +FAIL_IF_TAG_FETCH_CALLED_MACROS = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} +""" + +FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} + +{% macro fetch_column_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }} +{% endmacro %} +""" + + +def get_model_config(project, relation: BaseRelation): + """Return the parsed dbt model config for the given relation fixture.""" + manifest = get_manifest(project.project_root) + model_nodes = [ + node + for node in manifest.nodes.values() + if getattr(node, "resource_type", None) == "model" + and getattr(node, "alias", None) == relation.identifier + ] + assert len(model_nodes) == 1, ( + f"Expected exactly one model node for relation {relation.identifier}, " + f"found {len(model_nodes)}" + ) + return project.adapter.get_config_from_model(model_nodes[0]) diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index d50fa71f3..5ff35abc3 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -76,6 +76,47 @@ - name: color """ +metadata_fetch_incremental_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +select cast(1 as bigint) as id +""" + +metadata_fetch_no_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + columns: + - name: id +""" + +metadata_fetch_table_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + config: + databricks_tags: + classification: internal + columns: + - name: id +""" + +metadata_fetch_column_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + columns: + - name: id + databricks_tags: + classification: internal +""" + tblproperties_a = """ version: 2 diff --git a/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py new file mode 100644 index 000000000..b7d90ba7b --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py @@ -0,0 +1,75 @@ +import pytest +from dbt.tests import util + +from tests.functional.adapter.helpers import ( + FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS, +) +from tests.functional.adapter.incremental import fixtures + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchSkips: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_no_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS + } + + def test_second_incremental_run_succeeds_without_tag_fetches(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["run"]) + util.run_dbt(["run"]) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchRequiresTableTags: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_table_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS + } + + def test_second_incremental_run_fails_when_table_tag_fetch_is_required(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["run"]) + _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) + util.assert_message_in_logs("fetch_tags should not be called", logs) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchRequiresColumnTags: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_column_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS + } + + def test_second_incremental_run_fails_when_column_tag_fetch_is_required(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["run"]) + _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) + util.assert_message_in_logs("fetch_column_tags should not be called", logs) diff --git a/tests/functional/adapter/materialized_view_tests/test_changes.py b/tests/functional/adapter/materialized_view_tests/test_changes.py index 85f4b08cf..b95bb2816 100644 --- a/tests/functional/adapter/materialized_view_tests/test_changes.py +++ b/tests/functional/adapter/materialized_view_tests/test_changes.py @@ -14,6 +14,7 @@ MaterializedViewConfig, ) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig +from tests.functional.adapter.helpers import get_model_config from tests.functional.adapter.materialized_view_tests import fixtures @@ -23,7 +24,6 @@ def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): } assert final_tblproperties == expected - class MaterializedViewChangesMixin(MaterializedViewChanges): @pytest.fixture(scope="class", autouse=True) def models(self): @@ -32,7 +32,8 @@ def models(self): @staticmethod def check_start_state(project, materialized_view): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(materialized_view) + relation_config = get_model_config(project, materialized_view) + results = project.adapter.get_relation_config(materialized_view, relation_config) assert isinstance(results, MaterializedViewConfig) assert results.config["partition_by"].partition_by == ["id"] assert results.config["query"].query.startswith("select * from") @@ -49,7 +50,8 @@ def change_config_via_alter(project, materialized_view): @staticmethod def check_state_alter_change_is_applied(project, materialized_view): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(materialized_view) + relation_config = get_model_config(project, materialized_view) + results = project.adapter.get_relation_config(materialized_view, relation_config) assert isinstance(results, MaterializedViewConfig) assert results.config["refresh"].cron == "0 5 * * * ? *" assert results.config["refresh"].time_zone_value == "Etc/UTC" @@ -67,7 +69,8 @@ def change_config_via_replace(project, materialized_view): @staticmethod def check_state_replace_change_is_applied(project, materialized_view): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(materialized_view) + relation_config = get_model_config(project, materialized_view) + results = project.adapter.get_relation_config(materialized_view, relation_config) assert isinstance(results, MaterializedViewConfig) assert results.config["partition_by"].partition_by == [] assert results.config["query"].query.startswith("select id, value") diff --git a/tests/functional/adapter/streaming_tables/test_st_basic.py b/tests/functional/adapter/streaming_tables/test_st_basic.py index ee2aa1d94..23d41b8ef 100644 --- a/tests/functional/adapter/streaming_tables/test_st_basic.py +++ b/tests/functional/adapter/streaming_tables/test_st_basic.py @@ -8,6 +8,7 @@ from dbt.adapters.databricks.relation import DatabricksRelationType from dbt.adapters.databricks.relation_configs.streaming_table import StreamingTableConfig +from tests.functional.adapter.helpers import get_model_config from tests.functional.adapter.streaming_tables import fixtures @@ -346,7 +347,8 @@ def test_liquid_clustering_change_is_applied(self, project, liquid_clustered_st) util.run_dbt(["run", "--models", liquid_clustered_st.identifier]) with util.get_connection(project.adapter): - config = project.adapter.get_relation_config(liquid_clustered_st) + relation_config = get_model_config(project, liquid_clustered_st) + config = project.adapter.get_relation_config(liquid_clustered_st, relation_config) assert isinstance(config, StreamingTableConfig) assert config.config["liquid_clustering"].cluster_by == ["id", "value"] diff --git a/tests/functional/adapter/streaming_tables/test_st_changes.py b/tests/functional/adapter/streaming_tables/test_st_changes.py index e8189a260..b5b069029 100644 --- a/tests/functional/adapter/streaming_tables/test_st_changes.py +++ b/tests/functional/adapter/streaming_tables/test_st_changes.py @@ -13,6 +13,7 @@ StreamingTableConfig, ) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig +from tests.functional.adapter.helpers import get_model_config from tests.functional.adapter.streaming_tables import fixtures @@ -22,12 +23,12 @@ def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): } assert final_tblproperties == expected - class StreamingTableChanges: @staticmethod def check_start_state(project, streaming_table): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(streaming_table) + relation_config = get_model_config(project, streaming_table) + results = project.adapter.get_relation_config(streaming_table, relation_config) assert isinstance(results, StreamingTableConfig) assert results.config["partition_by"].partition_by == ["id"] _check_tblproperties(results.config["tblproperties"], {"key": "value"}) @@ -46,7 +47,8 @@ def change_config_via_alter(project, streaming_table): @staticmethod def check_state_alter_change_is_applied(project, streaming_table): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(streaming_table) + relation_config = get_model_config(project, streaming_table) + results = project.adapter.get_relation_config(streaming_table, relation_config) assert isinstance(results, StreamingTableConfig) assert results.config["refresh"].cron == "0 5 * * * ? *" assert results.config["refresh"].time_zone_value == "Etc/UTC" @@ -62,7 +64,8 @@ def change_config_via_replace(project, streaming_table): @staticmethod def check_state_replace_change_is_applied(project, streaming_table): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(streaming_table) + relation_config = get_model_config(project, streaming_table) + results = project.adapter.get_relation_config(streaming_table, relation_config) assert isinstance(results, StreamingTableConfig) assert results.config["partition_by"].partition_by == ["value"] diff --git a/tests/functional/adapter/utils/test_utils.py b/tests/functional/adapter/utils/test_utils.py index 06086f25d..35fda97ce 100644 --- a/tests/functional/adapter/utils/test_utils.py +++ b/tests/functional/adapter/utils/test_utils.py @@ -28,8 +28,6 @@ from dbt.tests.adapter.utils.test_split_part import BaseSplitPart from dbt.tests.adapter.utils.test_string_literal import BaseStringLiteral from dbt.tests.adapter.utils.test_validate_sql import BaseValidateSqlMethod - - class TestAnyValue(BaseAnyValue): pass diff --git a/tests/functional/adapter/views/test_view_metadata_fetch_skips.py b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py new file mode 100644 index 000000000..8bfade86c --- /dev/null +++ b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py @@ -0,0 +1,72 @@ +import pytest +from dbt.tests import util + +from tests.functional.adapter.helpers import FAIL_IF_TAG_FETCH_CALLED_MACROS + +VIEW_WITHOUT_TAGS_SQL = """ +{{ config(materialized='view') }} + +select cast(1 as bigint) as id +""" + +VIEW_WITH_TAGS_SQL = """ +{{ config( + materialized='view', + databricks_tags={'classification': 'internal'} +) }} + +select cast(1 as bigint) as id +""" + + +@pytest.mark.skip_profile("databricks_cluster") +class TestViewMetadataFetchSkips: + @pytest.fixture(scope="class") + def models(self): + return {"view_metadata_fetch.sql": VIEW_WITHOUT_TAGS_SQL} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"use_materialization_v2": True}, + "models": { + "+view_update_via_alter": True, + }, + } + + def test_second_view_run_succeeds_without_tag_fetches(self, project): + # The first run creates the view; the second run exercises the existing-relation + # alter/config-diff path where adapter.get_relation_config() may fetch tags. + util.run_dbt(["run"]) + util.run_dbt(["run"]) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestViewMetadataFetchRequiresTags: + @pytest.fixture(scope="class") + def models(self): + return {"view_metadata_fetch.sql": VIEW_WITH_TAGS_SQL} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"use_materialization_v2": True}, + "models": { + "+view_update_via_alter": True, + }, + } + + def test_second_view_run_fails_when_tag_fetch_is_required(self, project): + # The first run creates the view; the second run exercises the existing-relation + # alter/config-diff path where adapter.get_relation_config() may fetch tags. + util.run_dbt(["run"]) + _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) + util.assert_message_in_logs("fetch_tags should not be called", logs) diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 23ce774d8..bc31c7d31 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -16,7 +16,10 @@ CATALOG_KEY_IN_SESSION_PROPERTIES, ) from dbt.adapters.databricks.impl import ( + DESCRIBE_TABLE_EXTENDED_MACRO_NAME, DatabricksRelationInfo, + IncrementalTableAPI, + ViewAPI, get_identifier_list_string, ) from dbt.adapters.databricks.relation import ( @@ -24,6 +27,12 @@ DatabricksRelationType, DatabricksTableType, ) +from dbt.adapters.databricks.relation_configs.column_tags import ColumnTagsConfig +from dbt.adapters.databricks.relation_configs.column_tags import ColumnTagsProcessor +from dbt.adapters.databricks.relation_configs.incremental import IncrementalTableConfig +from dbt.adapters.databricks.relation_configs.tags import TagsConfig +from dbt.adapters.databricks.relation_configs.tags import TagsProcessor +from dbt.adapters.databricks.relation_configs.view import ViewConfig from dbt.adapters.databricks.utils import check_not_found_error from tests.unit.utils import config_from_parts_or_dicts @@ -1228,6 +1237,172 @@ def test_get_columns_reraises_other_database_errors( ) +class TestDescribeRelationMetadataFetchPlanning: + @staticmethod + def _create_adapter(): + adapter = Mock() + + def execute_macro(macro_name, kwargs=None): + if macro_name == "get_view_description": + return Mock(rows=[("view_description",)]) + return f"{macro_name}_result" + + adapter.execute_macro.side_effect = execute_macro + return adapter + + @staticmethod + def _create_incremental_relation(database="main"): + return DatabricksRelation.create( + database=database, + schema="analytics", + identifier="my_incremental_model", + type=DatabricksRelationType.Table, + ) + + @staticmethod + def _create_view_relation(database="main"): + return DatabricksRelation.create( + database=database, + schema="analytics", + identifier="my_view_model", + type=DatabricksRelationType.View, + ) + + @staticmethod + def _create_incremental_config( + tags: dict[str, str] | None = None, + column_tags: dict[str, dict[str, str]] | None = None, + ) -> IncrementalTableConfig: + return IncrementalTableConfig( + config={ + TagsProcessor.name: TagsConfig(set_tags=tags or {}), + ColumnTagsProcessor.name: ColumnTagsConfig(set_column_tags=column_tags or {}), + } + ) + + @staticmethod + def _create_view_config(tags: dict[str, str] | None = None) -> ViewConfig: + return ViewConfig(config={TagsProcessor.name: TagsConfig(set_tags=tags or {})}) + + @staticmethod + def _called_macro_names(adapter: Mock) -> list[str]: + return [call.args[0] for call in adapter.execute_macro.call_args_list] + + def test_incremental_describe_relation_skips_both_tag_queries_without_tags(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config() + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + assert results["information_schema.column_tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" not in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + assert DESCRIBE_TABLE_EXTENDED_MACRO_NAME in called_macro_names + + def test_incremental_describe_relation_fetches_only_table_tags_when_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config(tags={"classification": "internal"}) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" not in called_macro_names + + def test_incremental_describe_relation_fetches_only_column_tags_when_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config( + column_tags={"id": {"classification": "internal"}} + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_fetches_both_tag_queries_when_both_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config( + tags={"classification": "internal"}, + column_tags={"id": {"classification": "internal"}}, + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_fetches_tag_queries_when_relation_config_is_none(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + + results = IncrementalTableAPI._describe_relation(adapter, relation, None) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_skips_tag_queries_for_hive_metastore(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation(database="hive_metastore") + relation_config = self._create_incremental_config( + tags={"classification": "internal"}, + column_tags={"id": {"classification": "internal"}}, + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert "information_schema.tags" not in results + assert "information_schema.column_tags" not in results + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" not in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + assert DESCRIBE_TABLE_EXTENDED_MACRO_NAME in called_macro_names + + def test_view_describe_relation_skips_tag_query_without_tags(self): + adapter = self._create_adapter() + relation = self._create_view_relation() + relation_config = self._create_view_config() + + results = ViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "get_view_description" in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + + def test_view_describe_relation_fetches_tag_query_when_tags_present(self): + adapter = self._create_adapter() + relation = self._create_view_relation() + relation_config = self._create_view_config(tags={"classification": "internal"}) + + results = ViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "get_view_description" in called_macro_names + + class TestManagedIcebergBehaviorFlag(DatabricksAdapterBase): @pytest.fixture def adapter(self):