diff --git a/CHANGELOG.md b/CHANGELOG.md index bad579f97..522cb4a10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,9 @@ ### Features +- Allow V1 incremental path to skip unnecessary metadata queries by respecting `incremental_apply_config_changes` config flag, matching V2 behavior ([#1402](https://github.com/databricks/dbt-databricks/issues/1402)) - Enable Notebook scoped python packages installation -- + ### Fixes - Fix `workflow_job` Python model submission method failing with dictionary attribute error ([#1360](https://github.com/databricks/dbt-databricks/issues/1360)) diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index ac717ae45..77c0f1be4 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -133,9 +133,6 @@ {{ set_overwrite_mode('DYNAMIC') }} {%- endif -%} {#-- Relation must be merged --#} - {%- set _existing_config = adapter.get_relation_config(existing_relation) -%} - {%- set model_config = adapter.get_config_from_model(config.model) -%} - {%- set _configuration_changes = model_config.get_changeset(_existing_config) -%} {%- call statement('create_temp_relation', language=language) -%} {{ create_table_as(True, temp_relation, compiled_code, language) }} {%- endcall -%} @@ -173,27 +170,7 @@ Also, why does not either drop_relation or adapter.drop_relation work here?! --#} {%- endif -%} - {% if _configuration_changes is not none %} - {% set tags = _configuration_changes.changes.get("tags", None) %} - {% set tblproperties = _configuration_changes.changes.get("tblproperties", None) %} - {% set liquid_clustering = _configuration_changes.changes.get("liquid_clustering") %} - {% set constraints = _configuration_changes.changes.get("constraints") %} - {% if tags is not none %} - {% do apply_tags(target_relation, tags.set_tags) %} - {%- endif -%} - {% if tblproperties is not none %} - {% do apply_tblproperties(target_relation, tblproperties.tblproperties) %} - {%- endif -%} - {% if liquid_clustering is not none %} - {% do apply_liquid_clustered_cols(target_relation, liquid_clustering) %} - {% endif %} - {#- Incremental constraint application requires information_schema access (see fetch_*_constraints macros) -#} - {% set contract_config = config.get('contract') %} - {% if constraints and contract_config and contract_config.enforced and not target_relation.is_hive_metastore() %} - {{ apply_constraints(target_relation, constraints) }} - {% endif %} - {%- endif -%} - {% do persist_docs(target_relation, model, for_relation=True) %} + {{ process_config_changes(target_relation) }} {%- endif -%} {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} diff --git a/dbt/include/databricks/macros/relations/table/alter.sql b/dbt/include/databricks/macros/relations/table/alter.sql index ca2a7690e..210e26c3f 100644 --- a/dbt/include/databricks/macros/relations/table/alter.sql +++ b/dbt/include/databricks/macros/relations/table/alter.sql @@ -30,7 +30,9 @@ {% if constraints %} {{ apply_constraints(target_relation, constraints) }} {% endif %} - {% if column_masks %} + {#-- Column masks are only applied in V2 to avoid a window where data is unmasked (CTAS in V1 + writes data before masks can be applied, whereas V2 creates an empty table first) --#} + {% if column_masks and adapter.behavior.use_materialization_v2 %} {{ apply_column_masks(target_relation, column_masks) }} {% endif %} {%- endif -%} diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index d50fa71f3..8f70c4a61 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -1118,6 +1118,77 @@ def model(dbt, spark): return spark.createDataFrame(data, schema=['id', 'msg', 'color']) """ +v1_skip_config_changes_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_apply_config_changes = false, +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg + +{% else %} + +select cast(1 as bigint) as id, 'updated' as msg + +{% endif %} +""" + +fail_if_metadata_fetched_macros = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called when incremental_apply_config_changes is false") }} +{% endmacro %} + +{% macro fetch_column_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }} +{% endmacro %} + +{% macro fetch_non_null_constraint_columns(relation) %} + {{ exceptions.raise_compiler_error("fetch_non_null_constraint_columns should not be called") }} +{% endmacro %} + +{% macro fetch_primary_key_constraints(relation) %} + {{ exceptions.raise_compiler_error("fetch_primary_key_constraints should not be called") }} +{% endmacro %} + +{% macro fetch_foreign_key_constraints(relation) %} + {{ exceptions.raise_compiler_error("fetch_foreign_key_constraints should not be called") }} +{% endmacro %} + +{% macro fetch_column_masks(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_masks should not be called") }} +{% endmacro %} +""" + +fail_if_column_masks_applied_macro = """ +{% macro apply_column_masks(relation, column_masks) %} + {{ exceptions.raise_compiler_error("apply_column_masks should not be called in V1 — column masks must only be applied in V2 to prevent unmasked data exposure") }} +{% endmacro %} +""" + +v1_column_mask_model_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +select cast(1 as bigint) as id, 'hello' as name +""" + +v1_column_mask_schema = """ +version: 2 + +models: + - name: v1_column_mask_model_sql + columns: + - name: id + - name: name + column_mask: + function: full_mask +""" + warn_unenforced_override_sql = """ select "abc" as id """ diff --git a/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py b/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py new file mode 100644 index 000000000..99dbb8c00 --- /dev/null +++ b/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py @@ -0,0 +1,68 @@ +import pytest +from dbt.tests import util + +from tests.functional.adapter.incremental import fixtures + + +@pytest.mark.skip_profile("databricks_cluster") +class TestV1IncrementalSkipConfigChanges: + """Test that incremental_apply_config_changes=false skips metadata fetch queries in V1 path.""" + + @pytest.fixture(scope="class") + def models(self): + return { + "v1_skip_config_changes_sql.sql": fixtures.v1_skip_config_changes_sql, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "fail_if_metadata_fetched.sql": fixtures.fail_if_metadata_fetched_macros, + } + + def test_incremental_run_skips_metadata_queries(self, project): + # First run creates the table + util.run_dbt(["run"]) + # Second run exercises the incremental merge path. + # If metadata fetch macros are called, they will raise errors and the run will fail. + util.run_dbt(["run"]) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestV1IncrementalColumnMasksNotApplied: + """Test that column masks are NOT applied in V1 incremental path. + + Column masks must only be applied in V2 where the empty table is created before data + arrives. In V1 (CTAS), data is written immediately, so applying masks after the fact + would leave a window where data is unmasked — a security/privacy vulnerability. + """ + + @pytest.fixture(scope="class") + def models(self): + return { + "v1_column_mask_model_sql.sql": fixtures.v1_column_mask_model_sql, + "schema.yml": fixtures.v1_column_mask_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "fail_if_column_masks_applied.sql": fixtures.fail_if_column_masks_applied_macro, + } + + def test_column_masks_not_applied_in_v1(self, project): + # Create the mask function so the model config is valid + project.run_sql( + f""" + CREATE OR REPLACE FUNCTION + {project.database}.{project.test_schema}.full_mask(val STRING) + RETURNS STRING + RETURN '*****'; + """ + ) + + # First run creates the table + util.run_dbt(["run"]) + # Second run exercises the incremental merge path with config change detection. + # If apply_column_masks is called, the overridden macro raises an error. + util.run_dbt(["run"])