From 17754e021c4aae23bd7576936aec0b755b0d9055 Mon Sep 17 00:00:00 2001 From: "tejas.sp" <241722411+tejassp-db@users.noreply.github.com> Date: Fri, 10 Apr 2026 18:12:06 +0530 Subject: [PATCH 1/2] perf: skip unnecessary metadata fetch calls for tags when not configured Skip fetch_tags and fetch_column_tags information_schema queries during incremental and view materializations when the model has no tags configured. This avoids unnecessary server roundtrips on every run for models that don't use tags, while preserving full fetch behavior when tags are present or when the model config is unavailable. PECOBLR-2497 --- dbt/adapters/databricks/impl.py | 57 ++++-- .../databricks/relation_configs/base.py | 5 + .../relation_configs/column_tags.py | 6 + .../relation_configs/streaming_table.py | 1 - .../databricks/relation_configs/tags.py | 6 + .../incremental/incremental.sql | 4 +- .../databricks/macros/relations/config.sql | 2 +- .../adapter/incremental/fixtures.py | 41 +++++ .../test_incremental_metadata_fetch_skips.py | 70 +++++++ .../materialized_view_tests/test_changes.py | 11 +- .../adapter/streaming_tables/test_st_basic.py | 4 +- .../streaming_tables/test_st_changes.py | 11 +- tests/functional/adapter/utils/test_utils.py | 11 ++ .../views/test_view_metadata_fetch_skips.py | 72 ++++++++ tests/unit/test_adapter.py | 173 ++++++++++++++++++ 15 files changed, 442 insertions(+), 32 deletions(-) create mode 100644 tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py create mode 100644 tests/functional/adapter/views/test_view_metadata_fetch_skips.py diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 045f4b106..b3554b842 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -79,6 +79,9 @@ from dbt.adapters.databricks.relation_configs.streaming_table import ( StreamingTableConfig, ) +from dbt.adapters.databricks.relation_configs.tags import ( + TagsProcessor, +) from dbt.adapters.databricks.relation_configs.table_format import TableFormat from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig from dbt.adapters.databricks.relation_configs.view import ViewConfig @@ -928,15 +931,15 @@ def parse_columns_and_constraints( return enriched_columns, parsed_constraints @available.parse(lambda *a, **k: {}) - def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase: + def get_relation_config(self, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase) -> DatabricksRelationConfigBase: if relation.type == DatabricksRelationType.MaterializedView: - return MaterializedViewAPI.get_from_relation(self, relation) + return MaterializedViewAPI.get_from_relation(self, relation, relation_config) elif relation.type == DatabricksRelationType.StreamingTable: - return StreamingTableAPI.get_from_relation(self, relation) + return StreamingTableAPI.get_from_relation(self, relation, relation_config) elif relation.type == DatabricksRelationType.Table: - return IncrementalTableAPI.get_from_relation(self, relation) + return IncrementalTableAPI.get_from_relation(self, relation, relation_config) elif relation.type == DatabricksRelationType.View: - return ViewAPI.get_from_relation(self, relation) + return ViewAPI.get_from_relation(self, relation, relation_config) else: raise NotImplementedError(f"Relation type {relation.type} is not supported.") @@ -1017,12 +1020,12 @@ def config_type(cls) -> type[DatabricksRelationConfig]: @classmethod def get_from_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> DatabricksRelationConfig: """Get the relation config from the relation.""" assert relation.type == cls.relation_type - results = cls._describe_relation(adapter, relation) + results = cls._describe_relation(adapter, relation, relation_config) return cls.config_type().from_results(results) @classmethod @@ -1034,7 +1037,7 @@ def get_from_relation_config(cls, relation_config: RelationConfig) -> Databricks @classmethod @abstractmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: """Describe the relation and return the results.""" @@ -1044,11 +1047,11 @@ def _describe_relation( class DeltaLiveTableAPIBase(RelationAPIBase[DatabricksRelationConfig]): @classmethod def get_from_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> DatabricksRelationConfig: """Get the relation config from the relation.""" - relation_config = super().get_from_relation(adapter, relation) + relation_config = super().get_from_relation(adapter, relation, relation_config) # Ensure any current refreshes are completed before returning the relation config tblproperties = cast(TblPropertiesConfig, relation_config.config["tblproperties"]) @@ -1068,7 +1071,7 @@ def config_type(cls) -> type[MaterializedViewConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: kwargs = {"table_name": relation} results: RelationResults = dict() @@ -1093,7 +1096,7 @@ def config_type(cls) -> type[StreamingTableConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: kwargs = {"table_name": relation} results: RelationResults = dict() @@ -1116,16 +1119,26 @@ def config_type(cls) -> type[IncrementalTableConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: results = {} kwargs = {"relation": relation} if not relation.is_hive_metastore(): - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) - results["information_schema.column_tags"] = adapter.execute_macro( - "fetch_column_tags", kwargs=kwargs - ) + table_tag_config = relation_config.config.get(TagsProcessor.name) if relation_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + else: + results["information_schema.tags"] = None + + column_tag_config = relation_config.config.get(ColumnTagsProcessor.name) if relation_config else None + if column_tag_config is None or column_tag_config.requires_server_metadata_for_diff(): + results["information_schema.column_tags"] = adapter.execute_macro( + "fetch_column_tags", kwargs=kwargs + ) + else: + results["information_schema.column_tags"] = None + results["non_null_constraint_columns"] = adapter.execute_macro( "fetch_non_null_constraint_columns", kwargs=kwargs ) @@ -1154,7 +1167,7 @@ def config_type(cls) -> type[ViewConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, adapter: DatabricksAdapter, relation: DatabricksRelation, relation_config: DatabricksRelationConfigBase ) -> RelationResults: results = {} kwargs = {"relation": relation} @@ -1162,7 +1175,13 @@ def _describe_relation( results["information_schema.views"] = get_first_row( adapter.execute_macro("get_view_description", kwargs=kwargs) ) - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + + table_tag_config = relation_config.config.get(TagsProcessor.name) if relation_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + else: + results["information_schema.tags"] = None + results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) kwargs = {"table_name": relation} diff --git a/dbt/adapters/databricks/relation_configs/base.py b/dbt/adapters/databricks/relation_configs/base.py index 92be6706a..c6b505d58 100644 --- a/dbt/adapters/databricks/relation_configs/base.py +++ b/dbt/adapters/databricks/relation_configs/base.py @@ -34,6 +34,11 @@ def get_diff(self, other: Self) -> Optional[Self]: return self return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return True class DatabricksRelationChangeSet(BaseModel): """Class for encapsulating the changes that need to be applied to a Databricks relation.""" diff --git a/dbt/adapters/databricks/relation_configs/column_tags.py b/dbt/adapters/databricks/relation_configs/column_tags.py index 93d5ea890..16c45141b 100644 --- a/dbt/adapters/databricks/relation_configs/column_tags.py +++ b/dbt/adapters/databricks/relation_configs/column_tags.py @@ -41,6 +41,12 @@ def get_diff(self, other: "ColumnTagsConfig") -> Optional["ColumnTagsConfig"]: return ColumnTagsConfig(set_column_tags=set_column_tags) return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return self.set_column_tags and len(self.set_column_tags) > 0 + class ColumnTagsProcessor(DatabricksComponentProcessor[ColumnTagsConfig]): name: ClassVar[str] = "column_tags" diff --git a/dbt/adapters/databricks/relation_configs/streaming_table.py b/dbt/adapters/databricks/relation_configs/streaming_table.py index b020272cd..c153d30e5 100644 --- a/dbt/adapters/databricks/relation_configs/streaming_table.py +++ b/dbt/adapters/databricks/relation_configs/streaming_table.py @@ -29,7 +29,6 @@ class StreamingTableConfig(DatabricksRelationConfigBase): CommentProcessor, TblPropertiesProcessor, RefreshProcessor, - TagsProcessor, DescribeQueryProcessor, TagsProcessor, ] diff --git a/dbt/adapters/databricks/relation_configs/tags.py b/dbt/adapters/databricks/relation_configs/tags.py index 9286bc9b3..7f306fec5 100644 --- a/dbt/adapters/databricks/relation_configs/tags.py +++ b/dbt/adapters/databricks/relation_configs/tags.py @@ -22,6 +22,12 @@ def get_diff(self, other: "TagsConfig") -> Optional["TagsConfig"]: return TagsConfig(set_tags=self.set_tags) return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return self.set_tags and len(self.set_tags) > 0 + class TagsProcessor(DatabricksComponentProcessor[TagsConfig]): name: ClassVar[str] = "tags" diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index ac717ae45..559812828 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -133,8 +133,8 @@ {{ set_overwrite_mode('DYNAMIC') }} {%- endif -%} {#-- Relation must be merged --#} - {%- set _existing_config = adapter.get_relation_config(existing_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set _existing_config = adapter.get_relation_config(existing_relation, model_config) -%} {%- set _configuration_changes = model_config.get_changeset(_existing_config) -%} {%- call statement('create_temp_relation', language=language) -%} {{ create_table_as(True, temp_relation, compiled_code, language) }} @@ -237,8 +237,8 @@ {% macro process_config_changes(target_relation) %} {% set apply_config_changes = config.get('incremental_apply_config_changes', True) | as_bool %} {% if apply_config_changes %} - {%- set existing_config = adapter.get_relation_config(target_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set existing_config = adapter.get_relation_config(target_relation, model_config) -%} {%- set configuration_changes = model_config.get_changeset(existing_config) -%} {{ apply_config_changeset(target_relation, model, configuration_changes) }} {% endif %} diff --git a/dbt/include/databricks/macros/relations/config.sql b/dbt/include/databricks/macros/relations/config.sql index 4c6ae8910..8a6b016f5 100644 --- a/dbt/include/databricks/macros/relations/config.sql +++ b/dbt/include/databricks/macros/relations/config.sql @@ -1,6 +1,6 @@ {%- macro get_configuration_changes(existing_relation) -%} - {%- set existing_config = adapter.get_relation_config(existing_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set existing_config = adapter.get_relation_config(existing_relation, model_config) -%} {%- set configuration_changes = model_config.get_changeset(existing_config) -%} {% do return(configuration_changes) %} {%- endmacro -%} \ No newline at end of file diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index d50fa71f3..5ff35abc3 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -76,6 +76,47 @@ - name: color """ +metadata_fetch_incremental_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +select cast(1 as bigint) as id +""" + +metadata_fetch_no_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + columns: + - name: id +""" + +metadata_fetch_table_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + config: + databricks_tags: + classification: internal + columns: + - name: id +""" + +metadata_fetch_column_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + columns: + - name: id + databricks_tags: + classification: internal +""" + tblproperties_a = """ version: 2 diff --git a/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py new file mode 100644 index 000000000..86c9031d8 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py @@ -0,0 +1,70 @@ +import pytest +from dbt.tests import util + +from tests.functional.adapter.incremental import fixtures + +FAIL_IF_TAG_FETCH_CALLED_MACROS = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} + +{% macro fetch_column_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }} +{% endmacro %} +""" + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchSkips: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_no_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + + def test_second_incremental_run_succeeds_without_tag_fetches(self, project): + util.run_dbt(["run"]) + util.run_dbt(["run"]) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchRequiresTableTags: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_table_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + + def test_second_incremental_run_fails_when_table_tag_fetch_is_required(self, project): + util.run_dbt(["run"]) + _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) + util.assert_message_in_logs("fetch_tags should not be called", logs) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchRequiresColumnTags: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_column_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + + def test_second_incremental_run_fails_when_column_tag_fetch_is_required(self, project): + util.run_dbt(["run"]) + _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) + util.assert_message_in_logs("fetch_column_tags should not be called", logs) diff --git a/tests/functional/adapter/materialized_view_tests/test_changes.py b/tests/functional/adapter/materialized_view_tests/test_changes.py index 85f4b08cf..532ebfdb8 100644 --- a/tests/functional/adapter/materialized_view_tests/test_changes.py +++ b/tests/functional/adapter/materialized_view_tests/test_changes.py @@ -15,6 +15,7 @@ ) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig from tests.functional.adapter.materialized_view_tests import fixtures +from tests.functional.adapter.utils.test_utils import get_model_config def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): @@ -23,7 +24,6 @@ def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): } assert final_tblproperties == expected - class MaterializedViewChangesMixin(MaterializedViewChanges): @pytest.fixture(scope="class", autouse=True) def models(self): @@ -32,7 +32,8 @@ def models(self): @staticmethod def check_start_state(project, materialized_view): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(materialized_view) + relation_config = get_model_config(project, materialized_view) + results = project.adapter.get_relation_config(materialized_view, relation_config) assert isinstance(results, MaterializedViewConfig) assert results.config["partition_by"].partition_by == ["id"] assert results.config["query"].query.startswith("select * from") @@ -49,7 +50,8 @@ def change_config_via_alter(project, materialized_view): @staticmethod def check_state_alter_change_is_applied(project, materialized_view): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(materialized_view) + relation_config = get_model_config(project, materialized_view) + results = project.adapter.get_relation_config(materialized_view, relation_config) assert isinstance(results, MaterializedViewConfig) assert results.config["refresh"].cron == "0 5 * * * ? *" assert results.config["refresh"].time_zone_value == "Etc/UTC" @@ -67,7 +69,8 @@ def change_config_via_replace(project, materialized_view): @staticmethod def check_state_replace_change_is_applied(project, materialized_view): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(materialized_view) + relation_config = get_model_config(project, materialized_view) + results = project.adapter.get_relation_config(materialized_view, relation_config) assert isinstance(results, MaterializedViewConfig) assert results.config["partition_by"].partition_by == [] assert results.config["query"].query.startswith("select id, value") diff --git a/tests/functional/adapter/streaming_tables/test_st_basic.py b/tests/functional/adapter/streaming_tables/test_st_basic.py index ee2aa1d94..491e29f9c 100644 --- a/tests/functional/adapter/streaming_tables/test_st_basic.py +++ b/tests/functional/adapter/streaming_tables/test_st_basic.py @@ -9,6 +9,7 @@ from dbt.adapters.databricks.relation import DatabricksRelationType from dbt.adapters.databricks.relation_configs.streaming_table import StreamingTableConfig from tests.functional.adapter.streaming_tables import fixtures +from tests.functional.adapter.utils.test_utils import get_model_config class TestStreamingTablesMixin: @@ -346,7 +347,8 @@ def test_liquid_clustering_change_is_applied(self, project, liquid_clustered_st) util.run_dbt(["run", "--models", liquid_clustered_st.identifier]) with util.get_connection(project.adapter): - config = project.adapter.get_relation_config(liquid_clustered_st) + relation_config = get_model_config(project, liquid_clustered_st) + config = project.adapter.get_relation_config(liquid_clustered_st, relation_config) assert isinstance(config, StreamingTableConfig) assert config.config["liquid_clustering"].cluster_by == ["id", "value"] diff --git a/tests/functional/adapter/streaming_tables/test_st_changes.py b/tests/functional/adapter/streaming_tables/test_st_changes.py index e8189a260..f6583f978 100644 --- a/tests/functional/adapter/streaming_tables/test_st_changes.py +++ b/tests/functional/adapter/streaming_tables/test_st_changes.py @@ -14,6 +14,7 @@ ) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig from tests.functional.adapter.streaming_tables import fixtures +from tests.functional.adapter.utils.test_utils import get_model_config def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): @@ -22,12 +23,12 @@ def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): } assert final_tblproperties == expected - class StreamingTableChanges: @staticmethod def check_start_state(project, streaming_table): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(streaming_table) + relation_config = get_model_config(project, streaming_table) + results = project.adapter.get_relation_config(streaming_table, relation_config) assert isinstance(results, StreamingTableConfig) assert results.config["partition_by"].partition_by == ["id"] _check_tblproperties(results.config["tblproperties"], {"key": "value"}) @@ -46,7 +47,8 @@ def change_config_via_alter(project, streaming_table): @staticmethod def check_state_alter_change_is_applied(project, streaming_table): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(streaming_table) + relation_config = get_model_config(project, streaming_table) + results = project.adapter.get_relation_config(streaming_table, relation_config) assert isinstance(results, StreamingTableConfig) assert results.config["refresh"].cron == "0 5 * * * ? *" assert results.config["refresh"].time_zone_value == "Etc/UTC" @@ -62,7 +64,8 @@ def change_config_via_replace(project, streaming_table): @staticmethod def check_state_replace_change_is_applied(project, streaming_table): with util.get_connection(project.adapter): - results = project.adapter.get_relation_config(streaming_table) + relation_config = get_model_config(project, streaming_table) + results = project.adapter.get_relation_config(streaming_table, relation_config) assert isinstance(results, StreamingTableConfig) assert results.config["partition_by"].partition_by == ["value"] diff --git a/tests/functional/adapter/utils/test_utils.py b/tests/functional/adapter/utils/test_utils.py index 06086f25d..fc70ec0d2 100644 --- a/tests/functional/adapter/utils/test_utils.py +++ b/tests/functional/adapter/utils/test_utils.py @@ -1,3 +1,4 @@ +from dbt.adapters.base.relation import BaseRelation from dbt.tests.adapter.utils.test_any_value import BaseAnyValue from dbt.tests.adapter.utils.test_array_append import BaseArrayAppend from dbt.tests.adapter.utils.test_array_concat import BaseArrayConcat @@ -28,6 +29,16 @@ from dbt.tests.adapter.utils.test_split_part import BaseSplitPart from dbt.tests.adapter.utils.test_string_literal import BaseStringLiteral from dbt.tests.adapter.utils.test_validate_sql import BaseValidateSqlMethod +from dbt.tests.util import get_manifest + + +def get_model_config(project, relation: BaseRelation): + """Return the parsed dbt model config for the given relation fixture.""" + manifest = get_manifest(project.project_root) + node_id = f"model.test.{relation.identifier}" + node = manifest.nodes.get(node_id) + assert node is not None, f"Node {node_id} not found in manifest" + return project.adapter.get_config_from_model(node) class TestAnyValue(BaseAnyValue): diff --git a/tests/functional/adapter/views/test_view_metadata_fetch_skips.py b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py new file mode 100644 index 000000000..73efa452e --- /dev/null +++ b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py @@ -0,0 +1,72 @@ +import pytest +from dbt.tests import util + +FAIL_IF_TAG_FETCH_CALLED_MACROS = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} +""" + +VIEW_WITHOUT_TAGS_SQL = """ +{{ config(materialized='view') }} + +select cast(1 as bigint) as id +""" + +VIEW_WITH_TAGS_SQL = """ +{{ config( + materialized='view', + databricks_tags={'classification': 'internal'} +) }} + +select cast(1 as bigint) as id +""" + + +@pytest.mark.skip_profile("databricks_cluster") +class TestViewMetadataFetchSkips: + @pytest.fixture(scope="class") + def models(self): + return {"view_metadata_fetch.sql": VIEW_WITHOUT_TAGS_SQL} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"use_materialization_v2": True}, + "models": { + "+view_update_via_alter": True, + }, + } + + def test_second_view_run_succeeds_without_tag_fetches(self, project): + util.run_dbt(["run"]) + util.run_dbt(["run"]) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestViewMetadataFetchRequiresTags: + @pytest.fixture(scope="class") + def models(self): + return {"view_metadata_fetch.sql": VIEW_WITH_TAGS_SQL} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"use_materialization_v2": True}, + "models": { + "+view_update_via_alter": True, + }, + } + + def test_second_view_run_fails_when_tag_fetch_is_required(self, project): + util.run_dbt(["run"]) + _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) + util.assert_message_in_logs("fetch_tags should not be called", logs) diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 23ce774d8..a55f57d3a 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -16,7 +16,10 @@ CATALOG_KEY_IN_SESSION_PROPERTIES, ) from dbt.adapters.databricks.impl import ( + DESCRIBE_TABLE_EXTENDED_MACRO_NAME, DatabricksRelationInfo, + IncrementalTableAPI, + ViewAPI, get_identifier_list_string, ) from dbt.adapters.databricks.relation import ( @@ -24,6 +27,10 @@ DatabricksRelationType, DatabricksTableType, ) +from dbt.adapters.databricks.relation_configs.column_tags import ColumnTagsConfig +from dbt.adapters.databricks.relation_configs.incremental import IncrementalTableConfig +from dbt.adapters.databricks.relation_configs.tags import TagsConfig +from dbt.adapters.databricks.relation_configs.view import ViewConfig from dbt.adapters.databricks.utils import check_not_found_error from tests.unit.utils import config_from_parts_or_dicts @@ -1228,6 +1235,172 @@ def test_get_columns_reraises_other_database_errors( ) +class TestDescribeRelationMetadataFetchPlanning: + @staticmethod + def _create_adapter(): + adapter = Mock() + + def execute_macro(macro_name, kwargs=None): + if macro_name == "get_view_description": + return Mock(rows=[("view_description",)]) + return f"{macro_name}_result" + + adapter.execute_macro.side_effect = execute_macro + return adapter + + @staticmethod + def _create_incremental_relation(database="main"): + return DatabricksRelation.create( + database=database, + schema="analytics", + identifier="my_incremental_model", + type=DatabricksRelationType.Table, + ) + + @staticmethod + def _create_view_relation(database="main"): + return DatabricksRelation.create( + database=database, + schema="analytics", + identifier="my_view_model", + type=DatabricksRelationType.View, + ) + + @staticmethod + def _create_incremental_config( + tags: dict[str, str] | None = None, + column_tags: dict[str, dict[str, str]] | None = None, + ) -> IncrementalTableConfig: + return IncrementalTableConfig( + config={ + "tags": TagsConfig(set_tags=tags or {}), + "column_tags": ColumnTagsConfig(set_column_tags=column_tags or {}), + } + ) + + @staticmethod + def _create_view_config(tags: dict[str, str] | None = None) -> ViewConfig: + return ViewConfig(config={"tags": TagsConfig(set_tags=tags or {})}) + + @staticmethod + def _called_macro_names(adapter: Mock) -> list[str]: + return [call.args[0] for call in adapter.execute_macro.call_args_list] + + def test_incremental_describe_relation_skips_both_tag_queries_without_tags(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config() + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + assert results["information_schema.column_tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" not in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + assert DESCRIBE_TABLE_EXTENDED_MACRO_NAME in called_macro_names + + def test_incremental_describe_relation_fetches_only_table_tags_when_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config(tags={"classification": "internal"}) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" not in called_macro_names + + def test_incremental_describe_relation_fetches_only_column_tags_when_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config( + column_tags={"id": {"classification": "internal"}} + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_fetches_both_tag_queries_when_both_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config( + tags={"classification": "internal"}, + column_tags={"id": {"classification": "internal"}}, + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_fetches_tag_queries_when_relation_config_is_none(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + + results = IncrementalTableAPI._describe_relation(adapter, relation, None) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_skips_tag_queries_for_hive_metastore(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation(database="hive_metastore") + relation_config = self._create_incremental_config( + tags={"classification": "internal"}, + column_tags={"id": {"classification": "internal"}}, + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert "information_schema.tags" not in results + assert "information_schema.column_tags" not in results + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" not in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + assert DESCRIBE_TABLE_EXTENDED_MACRO_NAME in called_macro_names + + def test_view_describe_relation_skips_tag_query_without_tags(self): + adapter = self._create_adapter() + relation = self._create_view_relation() + relation_config = self._create_view_config() + + results = ViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "get_view_description" in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + + def test_view_describe_relation_fetches_tag_query_when_tags_present(self): + adapter = self._create_adapter() + relation = self._create_view_relation() + relation_config = self._create_view_config(tags={"classification": "internal"}) + + results = ViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "get_view_description" in called_macro_names + + class TestManagedIcebergBehaviorFlag(DatabricksAdapterBase): @pytest.fixture def adapter(self): From 9eca1c4a971b8960f4d3d027029c6044e7953e76 Mon Sep 17 00:00:00 2001 From: "tejas.sp" <241722411+tejassp-db@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:46:57 +0530 Subject: [PATCH 2/2] perf: add metadata fetch coverage for tag-aware config diffs Add unit and functional coverage for skipping tag metadata queries when model config does not require them. This protects the new fetch-planning logic across incremental, view, streaming table, and materialized view test paths without changing unrelated unstaged work. --- tests/functional/adapter/helpers.py | 34 +++++++++++++++++++ .../test_incremental_metadata_fetch_skips.py | 31 ++++++++++------- .../materialized_view_tests/test_changes.py | 2 +- .../adapter/streaming_tables/test_st_basic.py | 2 +- .../streaming_tables/test_st_changes.py | 2 +- tests/functional/adapter/utils/test_utils.py | 13 ------- .../views/test_view_metadata_fetch_skips.py | 10 +++--- tests/unit/test_adapter.py | 8 +++-- 8 files changed, 65 insertions(+), 37 deletions(-) create mode 100644 tests/functional/adapter/helpers.py diff --git a/tests/functional/adapter/helpers.py b/tests/functional/adapter/helpers.py new file mode 100644 index 000000000..f94de5216 --- /dev/null +++ b/tests/functional/adapter/helpers.py @@ -0,0 +1,34 @@ +from dbt.adapters.base.relation import BaseRelation +from dbt.tests.util import get_manifest + +FAIL_IF_TAG_FETCH_CALLED_MACROS = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} +""" + +FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} + +{% macro fetch_column_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }} +{% endmacro %} +""" + + +def get_model_config(project, relation: BaseRelation): + """Return the parsed dbt model config for the given relation fixture.""" + manifest = get_manifest(project.project_root) + model_nodes = [ + node + for node in manifest.nodes.values() + if getattr(node, "resource_type", None) == "model" + and getattr(node, "alias", None) == relation.identifier + ] + assert len(model_nodes) == 1, ( + f"Expected exactly one model node for relation {relation.identifier}, " + f"found {len(model_nodes)}" + ) + return project.adapter.get_config_from_model(model_nodes[0]) diff --git a/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py index 86c9031d8..b7d90ba7b 100644 --- a/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py +++ b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py @@ -1,18 +1,11 @@ import pytest from dbt.tests import util +from tests.functional.adapter.helpers import ( + FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS, +) from tests.functional.adapter.incremental import fixtures -FAIL_IF_TAG_FETCH_CALLED_MACROS = """ -{% macro fetch_tags(relation) %} - {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} -{% endmacro %} - -{% macro fetch_column_tags(relation) %} - {{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }} -{% endmacro %} -""" - @pytest.mark.skip_profile("databricks_cluster") class TestIncrementalMetadataFetchSkips: @@ -25,9 +18,13 @@ def models(self): @pytest.fixture(scope="class") def macros(self): - return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + return { + "fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS + } def test_second_incremental_run_succeeds_without_tag_fetches(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. util.run_dbt(["run"]) util.run_dbt(["run"]) @@ -43,9 +40,13 @@ def models(self): @pytest.fixture(scope="class") def macros(self): - return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + return { + "fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS + } def test_second_incremental_run_fails_when_table_tag_fetch_is_required(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. util.run_dbt(["run"]) _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) util.assert_message_in_logs("fetch_tags should not be called", logs) @@ -62,9 +63,13 @@ def models(self): @pytest.fixture(scope="class") def macros(self): - return {"fail_if_tag_fetch_called.sql": FAIL_IF_TAG_FETCH_CALLED_MACROS} + return { + "fail_if_tag_fetch_called.sql": FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS + } def test_second_incremental_run_fails_when_column_tag_fetch_is_required(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. util.run_dbt(["run"]) _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) util.assert_message_in_logs("fetch_column_tags should not be called", logs) diff --git a/tests/functional/adapter/materialized_view_tests/test_changes.py b/tests/functional/adapter/materialized_view_tests/test_changes.py index 532ebfdb8..b95bb2816 100644 --- a/tests/functional/adapter/materialized_view_tests/test_changes.py +++ b/tests/functional/adapter/materialized_view_tests/test_changes.py @@ -14,8 +14,8 @@ MaterializedViewConfig, ) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig +from tests.functional.adapter.helpers import get_model_config from tests.functional.adapter.materialized_view_tests import fixtures -from tests.functional.adapter.utils.test_utils import get_model_config def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): diff --git a/tests/functional/adapter/streaming_tables/test_st_basic.py b/tests/functional/adapter/streaming_tables/test_st_basic.py index 491e29f9c..23d41b8ef 100644 --- a/tests/functional/adapter/streaming_tables/test_st_basic.py +++ b/tests/functional/adapter/streaming_tables/test_st_basic.py @@ -8,8 +8,8 @@ from dbt.adapters.databricks.relation import DatabricksRelationType from dbt.adapters.databricks.relation_configs.streaming_table import StreamingTableConfig +from tests.functional.adapter.helpers import get_model_config from tests.functional.adapter.streaming_tables import fixtures -from tests.functional.adapter.utils.test_utils import get_model_config class TestStreamingTablesMixin: diff --git a/tests/functional/adapter/streaming_tables/test_st_changes.py b/tests/functional/adapter/streaming_tables/test_st_changes.py index f6583f978..b5b069029 100644 --- a/tests/functional/adapter/streaming_tables/test_st_changes.py +++ b/tests/functional/adapter/streaming_tables/test_st_changes.py @@ -13,8 +13,8 @@ StreamingTableConfig, ) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig +from tests.functional.adapter.helpers import get_model_config from tests.functional.adapter.streaming_tables import fixtures -from tests.functional.adapter.utils.test_utils import get_model_config def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): diff --git a/tests/functional/adapter/utils/test_utils.py b/tests/functional/adapter/utils/test_utils.py index fc70ec0d2..35fda97ce 100644 --- a/tests/functional/adapter/utils/test_utils.py +++ b/tests/functional/adapter/utils/test_utils.py @@ -1,4 +1,3 @@ -from dbt.adapters.base.relation import BaseRelation from dbt.tests.adapter.utils.test_any_value import BaseAnyValue from dbt.tests.adapter.utils.test_array_append import BaseArrayAppend from dbt.tests.adapter.utils.test_array_concat import BaseArrayConcat @@ -29,18 +28,6 @@ from dbt.tests.adapter.utils.test_split_part import BaseSplitPart from dbt.tests.adapter.utils.test_string_literal import BaseStringLiteral from dbt.tests.adapter.utils.test_validate_sql import BaseValidateSqlMethod -from dbt.tests.util import get_manifest - - -def get_model_config(project, relation: BaseRelation): - """Return the parsed dbt model config for the given relation fixture.""" - manifest = get_manifest(project.project_root) - node_id = f"model.test.{relation.identifier}" - node = manifest.nodes.get(node_id) - assert node is not None, f"Node {node_id} not found in manifest" - return project.adapter.get_config_from_model(node) - - class TestAnyValue(BaseAnyValue): pass diff --git a/tests/functional/adapter/views/test_view_metadata_fetch_skips.py b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py index 73efa452e..8bfade86c 100644 --- a/tests/functional/adapter/views/test_view_metadata_fetch_skips.py +++ b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py @@ -1,11 +1,7 @@ import pytest from dbt.tests import util -FAIL_IF_TAG_FETCH_CALLED_MACROS = """ -{% macro fetch_tags(relation) %} - {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} -{% endmacro %} -""" +from tests.functional.adapter.helpers import FAIL_IF_TAG_FETCH_CALLED_MACROS VIEW_WITHOUT_TAGS_SQL = """ {{ config(materialized='view') }} @@ -43,6 +39,8 @@ def project_config_update(self): } def test_second_view_run_succeeds_without_tag_fetches(self, project): + # The first run creates the view; the second run exercises the existing-relation + # alter/config-diff path where adapter.get_relation_config() may fetch tags. util.run_dbt(["run"]) util.run_dbt(["run"]) @@ -67,6 +65,8 @@ def project_config_update(self): } def test_second_view_run_fails_when_tag_fetch_is_required(self, project): + # The first run creates the view; the second run exercises the existing-relation + # alter/config-diff path where adapter.get_relation_config() may fetch tags. util.run_dbt(["run"]) _, logs = util.run_dbt_and_capture(["run"], expect_pass=False) util.assert_message_in_logs("fetch_tags should not be called", logs) diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index a55f57d3a..bc31c7d31 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -28,8 +28,10 @@ DatabricksTableType, ) from dbt.adapters.databricks.relation_configs.column_tags import ColumnTagsConfig +from dbt.adapters.databricks.relation_configs.column_tags import ColumnTagsProcessor from dbt.adapters.databricks.relation_configs.incremental import IncrementalTableConfig from dbt.adapters.databricks.relation_configs.tags import TagsConfig +from dbt.adapters.databricks.relation_configs.tags import TagsProcessor from dbt.adapters.databricks.relation_configs.view import ViewConfig from dbt.adapters.databricks.utils import check_not_found_error from tests.unit.utils import config_from_parts_or_dicts @@ -1273,14 +1275,14 @@ def _create_incremental_config( ) -> IncrementalTableConfig: return IncrementalTableConfig( config={ - "tags": TagsConfig(set_tags=tags or {}), - "column_tags": ColumnTagsConfig(set_column_tags=column_tags or {}), + TagsProcessor.name: TagsConfig(set_tags=tags or {}), + ColumnTagsProcessor.name: ColumnTagsConfig(set_column_tags=column_tags or {}), } ) @staticmethod def _create_view_config(tags: dict[str, str] | None = None) -> ViewConfig: - return ViewConfig(config={"tags": TagsConfig(set_tags=tags or {})}) + return ViewConfig(config={TagsProcessor.name: TagsConfig(set_tags=tags or {})}) @staticmethod def _called_macro_names(adapter: Mock) -> list[str]: