Skip to content

Commit c66209d

Browse files
committed
address review: guard column_masks to V2 only, add security test
- Add V2 guard to apply_column_masks in apply_config_changeset to prevent unmasked data exposure in V1 (CTAS writes data before masks can be applied, whereas V2 creates an empty table first) - Add TestV1IncrementalColumnMasksNotApplied: overrides apply_column_masks to raise error, verifying it is never called in V1 incremental path - Remove V1 column_tags/column_masks tests that are no longer applicable - Keep TestV1IncrementalSkipConfigChanges for metadata skip validation Co-authored-by: Isaac
1 parent 2882e9c commit c66209d

3 files changed

Lines changed: 62 additions & 140 deletions

File tree

dbt/include/databricks/macros/relations/table/alter.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
{% if constraints %}
3131
{{ apply_constraints(target_relation, constraints) }}
3232
{% endif %}
33-
{% if column_masks %}
33+
{#-- Column masks are only applied in V2 to avoid a window where data is unmasked (CTAS in V1
34+
writes data before masks can be applied, whereas V2 creates an empty table first) --#}
35+
{% if column_masks and adapter.behavior.use_materialization_v2 %}
3436
{{ apply_column_masks(target_relation, column_masks) }}
3537
{% endif %}
3638
{%- endif -%}

tests/functional/adapter/incremental/fixtures.py

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,24 +1118,6 @@ def model(dbt, spark):
11181118
return spark.createDataFrame(data, schema=['id', 'msg', 'color'])
11191119
"""
11201120

1121-
v1_config_changes_sql = """
1122-
{{ config(
1123-
materialized = 'incremental',
1124-
unique_key = 'id',
1125-
merge_update_columns = ['msg'],
1126-
) }}
1127-
1128-
{% if not is_incremental() %}
1129-
1130-
select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
1131-
1132-
{% else %}
1133-
1134-
select cast(1 as bigint) as id, 'updated' as msg, 'blue' as color
1135-
1136-
{% endif %}
1137-
"""
1138-
11391121
v1_skip_config_changes_sql = """
11401122
{{ config(
11411123
materialized = 'incremental',
@@ -1154,34 +1136,6 @@ def model(dbt, spark):
11541136
{% endif %}
11551137
"""
11561138

1157-
v1_column_tags_a = """
1158-
version: 2
1159-
1160-
models:
1161-
- name: v1_config_changes_sql
1162-
columns:
1163-
- name: id
1164-
databricks_tags:
1165-
pii: "false"
1166-
- name: msg
1167-
- name: color
1168-
"""
1169-
1170-
v1_column_tags_b = """
1171-
version: 2
1172-
1173-
models:
1174-
- name: v1_config_changes_sql
1175-
columns:
1176-
- name: id
1177-
databricks_tags:
1178-
pii: "true"
1179-
- name: msg
1180-
databricks_tags:
1181-
source: "app"
1182-
- name: color
1183-
"""
1184-
11851139
fail_if_metadata_fetched_macros = """
11861140
{% macro fetch_tags(relation) %}
11871141
{{ exceptions.raise_compiler_error("fetch_tags should not be called when incremental_apply_config_changes is false") }}
@@ -1208,6 +1162,33 @@ def model(dbt, spark):
12081162
{% endmacro %}
12091163
"""
12101164

1165+
fail_if_column_masks_applied_macro = """
1166+
{% macro apply_column_masks(relation, column_masks) %}
1167+
{{ exceptions.raise_compiler_error("apply_column_masks should not be called in V1 — column masks must only be applied in V2 to prevent unmasked data exposure") }}
1168+
{% endmacro %}
1169+
"""
1170+
1171+
v1_column_mask_model_sql = """
1172+
{{ config(
1173+
materialized = 'incremental',
1174+
unique_key = 'id',
1175+
) }}
1176+
1177+
select cast(1 as bigint) as id, 'hello' as name
1178+
"""
1179+
1180+
v1_column_mask_schema = """
1181+
version: 2
1182+
1183+
models:
1184+
- name: v1_column_mask_model_sql
1185+
columns:
1186+
- name: id
1187+
- name: name
1188+
column_mask:
1189+
function: full_mask
1190+
"""
1191+
12111192
warn_unenforced_override_sql = """
12121193
select "abc" as id
12131194
"""

tests/functional/adapter/incremental/test_v1_incremental_config_changes.py

Lines changed: 32 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -5,125 +5,64 @@
55

66

77
@pytest.mark.skip_profile("databricks_cluster")
8-
class TestV1IncrementalColumnTags:
9-
"""Test that V1 incremental path applies column tag changes via process_config_changes."""
8+
class TestV1IncrementalSkipConfigChanges:
9+
"""Test that incremental_apply_config_changes=false skips metadata fetch queries in V1 path."""
1010

1111
@pytest.fixture(scope="class")
1212
def models(self):
1313
return {
14-
"v1_config_changes_sql.sql": fixtures.v1_config_changes_sql,
15-
"schema.yml": fixtures.v1_column_tags_a,
16-
}
17-
18-
def test_changing_column_tags(self, project):
19-
# First run creates the table
20-
util.run_dbt(["run"])
21-
22-
# Update column tags
23-
util.write_file(fixtures.v1_column_tags_b, "models", "schema.yml")
24-
util.run_dbt(["run"])
25-
26-
# Verify column tags were applied
27-
results = project.run_sql(
28-
f"""
29-
select column_name, tag_name, tag_value
30-
from `system`.`information_schema`.`column_tags`
31-
where schema_name = '{project.test_schema}'
32-
and table_name = 'v1_config_changes_sql'
33-
order by column_name, tag_name
34-
""",
35-
fetch="all",
36-
)
37-
38-
tags_dict = {}
39-
for row in results:
40-
col = row.column_name
41-
if col not in tags_dict:
42-
tags_dict[col] = {}
43-
tags_dict[col][row.tag_name] = row.tag_value
44-
45-
# Verify expected final state
46-
expected_tags = {
47-
"id": {"pii": "true"},
48-
"msg": {"source": "app"},
14+
"v1_skip_config_changes_sql.sql": fixtures.v1_skip_config_changes_sql,
4915
}
50-
assert tags_dict == expected_tags
51-
52-
53-
@pytest.mark.skip_profile("databricks_cluster")
54-
class TestV1IncrementalColumnMasks:
55-
"""Test that V1 incremental path applies column mask changes via process_config_changes."""
5616

5717
@pytest.fixture(scope="class")
58-
def models(self):
18+
def macros(self):
5919
return {
60-
"column_mask_sql.sql": fixtures.column_mask_sql,
61-
"schema.yml": fixtures.column_mask_base,
20+
"fail_if_metadata_fetched.sql": fixtures.fail_if_metadata_fetched_macros,
6221
}
6322

64-
def test_changing_column_masks(self, project):
65-
# Create mask functions
66-
project.run_sql(
67-
f"""
68-
CREATE OR REPLACE FUNCTION
69-
{project.database}.{project.test_schema}.full_mask(password STRING)
70-
RETURNS STRING
71-
RETURN '*****';
72-
"""
73-
)
74-
project.run_sql(
75-
f"""
76-
CREATE OR REPLACE FUNCTION
77-
{project.database}.{project.test_schema}.email_mask(value STRING)
78-
RETURNS STRING
79-
RETURN CONCAT(
80-
REPEAT('*', POSITION('@' IN value) - 1),
81-
SUBSTR(value, POSITION('@' IN value))
82-
);
83-
"""
84-
)
85-
86-
# First run with initial masks
23+
def test_incremental_run_skips_metadata_queries(self, project):
24+
# First run creates the table
8725
util.run_dbt(["run"])
88-
masks = project.run_sql(
89-
"SELECT id, name, email, password FROM column_mask_sql",
90-
fetch="all",
91-
)
92-
assert len(masks) == 1
93-
assert masks[0][1] == "*****" # name (masked)
94-
assert masks[0][3] == "password123" # password (unmasked)
95-
96-
# Update masks and verify changes
97-
util.write_file(fixtures.column_mask_valid_mask_updates, "models", "schema.yml")
26+
# Second run exercises the incremental merge path.
27+
# If metadata fetch macros are called, they will raise errors and the run will fail.
9828
util.run_dbt(["run"])
9929

100-
result = project.run_sql(
101-
"SELECT id, name, email, password FROM column_mask_sql", fetch="all"
102-
)
103-
assert len(result) == 1
104-
assert result[0][1] == "hello" # name (unmasked)
105-
assert result[0][3] == "*****" # password (masked)
106-
10730

10831
@pytest.mark.skip_profile("databricks_cluster")
109-
class TestV1IncrementalSkipConfigChanges:
110-
"""Test that incremental_apply_config_changes=false skips metadata fetch queries."""
32+
class TestV1IncrementalColumnMasksNotApplied:
33+
"""Test that column masks are NOT applied in V1 incremental path.
34+
35+
Column masks must only be applied in V2 where the empty table is created before data
36+
arrives. In V1 (CTAS), data is written immediately, so applying masks after the fact
37+
would leave a window where data is unmasked — a security/privacy vulnerability.
38+
"""
11139

11240
@pytest.fixture(scope="class")
11341
def models(self):
11442
return {
115-
"v1_skip_config_changes_sql.sql": fixtures.v1_skip_config_changes_sql,
43+
"v1_column_mask_model_sql.sql": fixtures.v1_column_mask_model_sql,
44+
"schema.yml": fixtures.v1_column_mask_schema,
11645
}
11746

11847
@pytest.fixture(scope="class")
11948
def macros(self):
12049
return {
121-
"fail_if_metadata_fetched.sql": fixtures.fail_if_metadata_fetched_macros,
50+
"fail_if_column_masks_applied.sql": fixtures.fail_if_column_masks_applied_macro,
12251
}
12352

124-
def test_incremental_run_skips_metadata_queries(self, project):
53+
def test_column_masks_not_applied_in_v1(self, project):
54+
# Create the mask function so the model config is valid
55+
project.run_sql(
56+
f"""
57+
CREATE OR REPLACE FUNCTION
58+
{project.database}.{project.test_schema}.full_mask(val STRING)
59+
RETURNS STRING
60+
RETURN '*****';
61+
"""
62+
)
63+
12564
# First run creates the table
12665
util.run_dbt(["run"])
127-
# Second run exercises the incremental merge path.
128-
# If metadata fetch macros are called, they will raise errors and the run will fail.
66+
# Second run exercises the incremental merge path with config change detection.
67+
# If apply_column_masks is called, the overridden macro raises an error.
12968
util.run_dbt(["run"])

0 commit comments

Comments
 (0)