Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

### Features

- Allow V1 incremental path to skip unnecessary metadata queries by respecting `incremental_apply_config_changes` config flag, matching V2 behavior ([#1402](https://github.com/databricks/dbt-databricks/issues/1402))
- Enable Notebook scoped python packages installation
-

### Fixes

- Fix `workflow_job` Python model submission method failing with dictionary attribute error ([#1360](https://github.com/databricks/dbt-databricks/issues/1360))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@
{{ set_overwrite_mode('DYNAMIC') }}
{%- endif -%}
{#-- Relation must be merged --#}
{%- set _existing_config = adapter.get_relation_config(existing_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
{%- set _configuration_changes = model_config.get_changeset(_existing_config) -%}
{%- call statement('create_temp_relation', language=language) -%}
{{ create_table_as(True, temp_relation, compiled_code, language) }}
{%- endcall -%}
Expand Down Expand Up @@ -173,27 +170,7 @@
Also, why does not either drop_relation or adapter.drop_relation work here?!
--#}
{%- endif -%}
{% if _configuration_changes is not none %}
{% set tags = _configuration_changes.changes.get("tags", None) %}
{% set tblproperties = _configuration_changes.changes.get("tblproperties", None) %}
{% set liquid_clustering = _configuration_changes.changes.get("liquid_clustering") %}
{% set constraints = _configuration_changes.changes.get("constraints") %}
{% if tags is not none %}
{% do apply_tags(target_relation, tags.set_tags) %}
{%- endif -%}
{% if tblproperties is not none %}
{% do apply_tblproperties(target_relation, tblproperties.tblproperties) %}
{%- endif -%}
{% if liquid_clustering is not none %}
{% do apply_liquid_clustered_cols(target_relation, liquid_clustering) %}
{% endif %}
{#- Incremental constraint application requires information_schema access (see fetch_*_constraints macros) -#}
{% set contract_config = config.get('contract') %}
{% if constraints and contract_config and contract_config.enforced and not target_relation.is_hive_metastore() %}
{{ apply_constraints(target_relation, constraints) }}
{% endif %}
{%- endif -%}
{% do persist_docs(target_relation, model, for_relation=True) %}
{{ process_config_changes(target_relation) }}
{%- endif -%}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
Expand Down
4 changes: 3 additions & 1 deletion dbt/include/databricks/macros/relations/table/alter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
{% if constraints %}
{{ apply_constraints(target_relation, constraints) }}
{% endif %}
{% if column_masks %}
{#-- Column masks are only applied in V2 to avoid a window where data is unmasked (CTAS in V1
writes data before masks can be applied, whereas V2 creates an empty table first) --#}
{% if column_masks and adapter.behavior.use_materialization_v2 %}
{{ apply_column_masks(target_relation, column_masks) }}
{% endif %}
{%- endif -%}
Expand Down
71 changes: 71 additions & 0 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,77 @@ def model(dbt, spark):
return spark.createDataFrame(data, schema=['id', 'msg', 'color'])
"""

v1_skip_config_changes_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_apply_config_changes = false,
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg

{% else %}

select cast(1 as bigint) as id, 'updated' as msg

{% endif %}
"""

fail_if_metadata_fetched_macros = """
{% macro fetch_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_tags should not be called when incremental_apply_config_changes is false") }}
{% endmacro %}

{% macro fetch_column_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }}
{% endmacro %}

{% macro fetch_non_null_constraint_columns(relation) %}
{{ exceptions.raise_compiler_error("fetch_non_null_constraint_columns should not be called") }}
{% endmacro %}

{% macro fetch_primary_key_constraints(relation) %}
{{ exceptions.raise_compiler_error("fetch_primary_key_constraints should not be called") }}
{% endmacro %}

{% macro fetch_foreign_key_constraints(relation) %}
{{ exceptions.raise_compiler_error("fetch_foreign_key_constraints should not be called") }}
{% endmacro %}

{% macro fetch_column_masks(relation) %}
{{ exceptions.raise_compiler_error("fetch_column_masks should not be called") }}
{% endmacro %}
"""

fail_if_column_masks_applied_macro = """
{% macro apply_column_masks(relation, column_masks) %}
{{ exceptions.raise_compiler_error("apply_column_masks should not be called in V1 — column masks must only be applied in V2 to prevent unmasked data exposure") }}
{% endmacro %}
"""

v1_column_mask_model_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
) }}

select cast(1 as bigint) as id, 'hello' as name
"""

v1_column_mask_schema = """
version: 2

models:
- name: v1_column_mask_model_sql
columns:
- name: id
- name: name
column_mask:
function: full_mask
"""

warn_unenforced_override_sql = """
select "abc" as id
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import pytest
from dbt.tests import util

from tests.functional.adapter.incremental import fixtures


@pytest.mark.skip_profile("databricks_cluster")
class TestV1IncrementalSkipConfigChanges:
"""Test that incremental_apply_config_changes=false skips metadata fetch queries in V1 path."""

@pytest.fixture(scope="class")
def models(self):
return {
"v1_skip_config_changes_sql.sql": fixtures.v1_skip_config_changes_sql,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"fail_if_metadata_fetched.sql": fixtures.fail_if_metadata_fetched_macros,
}

def test_incremental_run_skips_metadata_queries(self, project):
# First run creates the table
util.run_dbt(["run"])
# Second run exercises the incremental merge path.
# If metadata fetch macros are called, they will raise errors and the run will fail.
util.run_dbt(["run"])


@pytest.mark.skip_profile("databricks_cluster")
class TestV1IncrementalColumnMasksNotApplied:
"""Test that column masks are NOT applied in V1 incremental path.

Column masks must only be applied in V2 where the empty table is created before data
arrives. In V1 (CTAS), data is written immediately, so applying masks after the fact
would leave a window where data is unmasked — a security/privacy vulnerability.
"""

@pytest.fixture(scope="class")
def models(self):
return {
"v1_column_mask_model_sql.sql": fixtures.v1_column_mask_model_sql,
"schema.yml": fixtures.v1_column_mask_schema,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"fail_if_column_masks_applied.sql": fixtures.fail_if_column_masks_applied_macro,
}

def test_column_masks_not_applied_in_v1(self, project):
# Create the mask function so the model config is valid
project.run_sql(
f"""
CREATE OR REPLACE FUNCTION
{project.database}.{project.test_schema}.full_mask(val STRING)
RETURNS STRING
RETURN '*****';
"""
)

# First run creates the table
util.run_dbt(["run"])
# Second run exercises the incremental merge path with config change detection.
# If apply_column_masks is called, the overridden macro raises an error.
util.run_dbt(["run"])