Skip to content

Commit ce47403

Browse files
authored
fix: apply column_tags on V1 incremental materialization (#1520)
## Description V1 (`use_materialization_v2: false`, the **default**) incremental models silently dropped column-level `databricks_tags`. The V1 branch of `incremental/incremental.sql` never called `apply_column_tags` on its create, full-refresh/recreate, or merge-changeset paths — while V1 `table` create (fixed in #1307) and the entire V2 path both apply them. The model builds successfully but `system.information_schema.column_tags` stays empty, with no error or warning. This applies column tags on all three V1 incremental paths, mirroring `table.sql` and `apply_config_changeset`. Follow-up to #1307, which fixed the identical gap for the `table` materialization only. ## Tests - `column_tags/test_column_tags.py::TestColumnTagsIncrementalV1` — create (run 1) + merge-change (run 2); fails on `main`, passes here. - `column_tags/test_column_tags.py::TestColumnTagsIncrementalV1FullRefresh` — column tags survive a `dbt run --full-refresh`. - `tests/unit/macros/relations/components/test_column_tags_macros.py` — `apply_column_tags` / `alter_set_column_tags` render (incl. the view → `ALTER TABLE` special case) and the empty-diff no-op (idempotency). ## Checklist - [x] Changelog entry added (`### Fixes`) - [x] Functional + unit/macro tests added
1 parent 6307f90 commit ce47403

4 files changed

Lines changed: 142 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- Add catalogs.yml v2 support (requires `use_catalogs_v2: true` in dbt-core) ([1440](https://github.com/databricks/dbt-databricks/pull/1440))
66

77
### Fixes
8+
- Apply column-level `databricks_tags` for incremental models on the V1 materialization path (`use_materialization_v2: false`, the default). They were silently dropped at create and on subsequent tag changes; the V1 incremental materialization now applies them, matching the `table` materialization and the V2 path. ([#1520](https://github.com/databricks/dbt-databricks/pull/1520) closes [#1307](https://github.com/databricks/dbt-databricks/issues/1307))
89
- Raise a `DbtRuntimeError` when a Python model job run terminates with a non-success `result_state` (e.g. `FAILED`/`TIMEDOUT`) instead of returning silently ([#1477](https://github.com/databricks/dbt-databricks/pull/1477))
910

1011
### Under the Hood

dbt/include/databricks/macros/materializations/incremental/incremental.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@
108108
{%- endcall -%}
109109
{% do persist_constraints(target_relation, model) %}
110110
{% do apply_tags(target_relation, tags) %}
111+
{% set column_tags = adapter.get_column_tags_from_model(config.model) %}
112+
{% if column_tags and column_tags.set_column_tags %}
113+
{{ apply_column_tags(target_relation, column_tags) }}
114+
{% endif %}
111115
{%- if language == 'python' -%}
112116
{%- do apply_tblproperties(target_relation, tblproperties) %}
113117
{%- endif -%}
@@ -126,6 +130,10 @@
126130
{% do persist_constraints(target_relation, model) %}
127131
{% endif %}
128132
{% do apply_tags(target_relation, tags) %}
133+
{% set column_tags = adapter.get_column_tags_from_model(config.model) %}
134+
{% if column_tags and column_tags.set_column_tags %}
135+
{{ apply_column_tags(target_relation, column_tags) }}
136+
{% endif %}
129137
{% do persist_docs(target_relation, model, for_relation=language=='python') %}
130138
{%- else -%}
131139
{#-- Set Overwrite Mode to DYNAMIC for subsequent incremental operations --#}
@@ -179,6 +187,7 @@
179187
{% set liquid_clustering = _configuration_changes.changes.get("liquid_clustering") %}
180188
{% set row_filter = _configuration_changes.changes.get("row_filter") %}
181189
{% set constraints = _configuration_changes.changes.get("constraints") %}
190+
{% set column_tags = _configuration_changes.changes.get("column_tags") %}
182191
{% if tags is not none %}
183192
{% do apply_tags(target_relation, tags.set_tags) %}
184193
{%- endif -%}
@@ -191,6 +200,9 @@
191200
{% if row_filter is not none %}
192201
{{ apply_row_filter(target_relation, row_filter) }}
193202
{% endif %}
203+
{% if column_tags %}
204+
{{ apply_column_tags(target_relation, column_tags) }}
205+
{% endif %}
194206
{#- Incremental constraint application requires information_schema access (see fetch_*_constraints macros) -#}
195207
{% set contract_config = config.get('contract') %}
196208
{% if constraints and contract_config and contract_config.enforced and not target_relation.is_hive_metastore() %}

tests/functional/adapter/column_tags/test_column_tags.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
from dbt.tests import util
33

44
from tests.functional.adapter.column_tags import fixtures
5-
from tests.functional.adapter.fixtures import MaterializationV2Mixin, RerunSafeMixin
5+
from tests.functional.adapter.fixtures import (
6+
MaterializationV1Mixin,
7+
MaterializationV2Mixin,
8+
RerunSafeMixin,
9+
)
610

711

812
class ColumnTagsMixin(RerunSafeMixin, MaterializationV2Mixin):
@@ -124,3 +128,66 @@ class TestColumnTagsTableV1(ColumnTagsMixin):
124128
def project_config_update(self):
125129
# Override MaterializationV2Mixin to test the V1 (default) materialization path
126130
return {}
131+
132+
133+
@pytest.mark.skip_profile("databricks_cluster")
134+
class TestColumnTagsIncrementalV1(ColumnTagsMixin):
135+
relation_type = "incremental"
136+
137+
@pytest.fixture(scope="class")
138+
def project_config_update(self):
139+
# Override MaterializationV2Mixin to test the V1 (default) materialization path.
140+
# Exercises both the V1 incremental create path (run 1) and the V1 merge-time
141+
# configuration changeset (run 2 adds a tag to a previously-untagged column).
142+
return {}
143+
144+
145+
@pytest.mark.skip_profile("databricks_cluster")
146+
class TestColumnTagsIncrementalV1FullRefresh(RerunSafeMixin, MaterializationV1Mixin):
147+
"""A `dbt run --full-refresh` of a V1 incremental model must re-apply column tags.
148+
149+
Full-refresh recreates the relation from scratch (the recreate branch of the V1
150+
incremental materialization), so any column tags must be re-applied there or they
151+
are lost on every full refresh.
152+
"""
153+
154+
@pytest.fixture(scope="class")
155+
def relations_to_reset(self):
156+
return ("base_model",)
157+
158+
@pytest.fixture(scope="class")
159+
def models(self):
160+
return {
161+
"base_model.sql": fixtures.base_model_sql,
162+
"schema.yml": fixtures.initial_column_tag_model.replace(
163+
"materialized: table", "materialized: incremental"
164+
),
165+
}
166+
167+
def _column_tags(self, project):
168+
rows = project.run_sql(
169+
f"""
170+
SELECT column_name, tag_name, tag_value
171+
FROM `system`.`information_schema`.`column_tags`
172+
WHERE catalog_name = '{project.database}'
173+
AND schema_name = '{project.test_schema}'
174+
AND table_name = 'base_model'
175+
ORDER BY column_name, tag_name
176+
""",
177+
fetch="all",
178+
)
179+
return {(row[0], row[1], row[2]) for row in rows}
180+
181+
def test_column_tags_survive_full_refresh(self, project):
182+
expected = {
183+
("account_number", "pii", "true"),
184+
("account_number", "sensitive", "true"),
185+
("account_number", "key_only", ""),
186+
("account_number", "null_value", ""),
187+
}
188+
# Initial run creates the relation (create path applies the tags).
189+
util.run_dbt(["run"])
190+
assert self._column_tags(project) == expected
191+
# Full refresh recreates the relation; the recreate path must re-apply the tags.
192+
util.run_dbt(["run", "--full-refresh"])
193+
assert self._column_tags(project) == expected
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import pytest
2+
3+
from dbt.adapters.databricks.relation import DatabricksRelationType
4+
from dbt.adapters.databricks.relation_configs.column_tags import ColumnTagsConfig
5+
from tests.unit.macros.base import MacroTestBase
6+
7+
8+
class TestColumnTagsMacros(MacroTestBase):
9+
@pytest.fixture
10+
def template_name(self) -> str:
11+
return "column_tags.sql"
12+
13+
@pytest.fixture
14+
def macro_folders_to_load(self) -> list:
15+
return ["macros/relations/components", "macros/relations", "macros"]
16+
17+
@pytest.fixture
18+
def passthrough_statement(self, template_bundle):
19+
"""Make `{% call statement('main') %}…{% endcall %}` render its body.
20+
21+
The base mock returns the statement label and discards the body; for the
22+
apply_* wrappers we need the inner ALTER SQL so we can assert on it. Also
23+
force the UC path (apply_column_tags raises on hive_metastore).
24+
"""
25+
template_bundle.context["statement"] = lambda label, caller=None: (
26+
caller() if caller else label
27+
)
28+
template_bundle.relation.is_hive_metastore = lambda: False
29+
return template_bundle
30+
31+
def test_alter_set_column_tags_table(self, template_bundle):
32+
sql = self.render_bundle(
33+
template_bundle, "alter_set_column_tags", "email", {"pii": "true", "team": "growth"}
34+
)
35+
assert "alter table `some_database`.`some_schema`.`some_table`" in sql
36+
assert "alter column `email` set tags ('pii' = 'true', 'team' = 'growth')" in sql
37+
38+
def test_alter_set_column_tags_view_uses_alter_table(self, template_bundle):
39+
# ALTER VIEW cannot set column tags, so the macro must emit ALTER TABLE for views.
40+
template_bundle.relation.type = DatabricksRelationType.View
41+
sql = self.render_bundle(template_bundle, "alter_set_column_tags", "email", {"pii": "true"})
42+
assert "alter table" in sql
43+
assert "alter view" not in sql
44+
assert "alter column `email` set tags ('pii' = 'true')" in sql
45+
46+
def test_apply_column_tags_applies_each_column(self, passthrough_statement):
47+
config = ColumnTagsConfig(
48+
set_column_tags={"email": {"pii": "true"}, "ssn": {"pii": "true"}}
49+
)
50+
sql = self.render_bundle(passthrough_statement, "apply_column_tags", config)
51+
assert "alter column `email` set tags ('pii' = 'true')" in sql
52+
assert "alter column `ssn` set tags ('pii' = 'true')" in sql
53+
54+
def test_apply_column_tags_noop_when_empty(self, passthrough_statement):
55+
# Idempotency safety net: when get_diff returns an empty/None changeset, the
56+
# {% if column_tags %} gate skips apply entirely; and even if apply_column_tags
57+
# is reached with nothing to set, it must emit no ALTER. This pins the latter.
58+
config = ColumnTagsConfig(set_column_tags={})
59+
sql = self.render_bundle(passthrough_statement, "apply_column_tags", config)
60+
assert "alter" not in sql
61+
assert "set tags" not in sql

0 commit comments

Comments
 (0)