Skip to content

Commit 3142111

Browse files
authored
Merge branch 'main' into eric-jang_data/simplify-connection-mgmt
2 parents e36a9e5 + f1d45a3 commit 3142111

8 files changed

Lines changed: 219 additions & 57 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
## dbt-databricks 1.10.8 (TBD)
22

3+
### Features
4+
- Support insert_overwrite incremental strategy for SQL warehouses ([1025](https://github.com/databricks/dbt-databricks/issues/1025))
5+
6+
### Fixes
7+
- Add fallback logic for known error types for `DESCRIBE TABLE EXTENDED .. AS JSON` for better reliability ([1128](https://github.com/databricks/dbt-databricks/issues/1128))
8+
39
### Under the Hood
410
- Simplify connection management to align with base adapter. Connections are no longer cached per-thread
511

dbt/adapters/databricks/behaviors/columns.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from abc import ABC, abstractmethod
22

3+
from dbt_common.exceptions import DbtDatabaseError
34
from dbt_common.utils.dict import AttrDict
45

56
from dbt.adapters.databricks.column import DatabricksColumn
@@ -37,13 +38,20 @@ def get_columns_in_relation(
3738
rows = cls._get_columns_with_comments(adapter, relation, "get_columns_comments")
3839
return cls._parse_columns(rows)
3940
else:
40-
result = cls._get_columns_with_comments(
41-
adapter, relation, "get_columns_comments_as_json"
42-
)
43-
if not result:
44-
return []
45-
json_metadata = result[0]["json_metadata"]
46-
return DatabricksColumn.from_json_metadata(json_metadata)
41+
try:
42+
result = cls._get_columns_with_comments(
43+
adapter, relation, "get_columns_comments_as_json"
44+
)
45+
if not result:
46+
return []
47+
json_metadata = result[0]["json_metadata"]
48+
return DatabricksColumn.from_json_metadata(json_metadata)
49+
except DbtDatabaseError as ex:
50+
# Fall back to legacy logic if the error is due to AS JSON not being supported
51+
# for the current runtime or relation type (e.g. foreign table)
52+
if "PARSE_SYNTAX_ERROR" in ex.msg or "UNSUPPORTED_FEATURE" in ex.msg:
53+
return cls.get_columns_in_relation(adapter, relation, True)
54+
raise ex
4755

4856
@classmethod
4957
def _parse_columns(cls, rows: list[AttrDict]) -> list[DatabricksColumn]:

dbt/include/databricks/macros/materializations/incremental/incremental.sql

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
{%- else -%}
5353
{{ log("Existing relation found, proceeding with incremental work")}}
5454
{#-- Set Overwrite Mode to DYNAMIC for subsequent incremental operations --#}
55-
{%- if incremental_strategy == 'insert_overwrite' and partition_by -%}
55+
{%- if incremental_strategy == 'insert_overwrite' and partition_by and adapter.compare_dbr_version(16, 3) < 0 -%}
5656
{{ set_overwrite_mode('DYNAMIC') }}
5757
{%- endif -%}
5858
{#-- Relation must be merged --#}
@@ -118,7 +118,7 @@
118118
{% do persist_docs(target_relation, model, for_relation=language=='python') %}
119119
{%- else -%}
120120
{#-- Set Overwrite Mode to DYNAMIC for subsequent incremental operations --#}
121-
{%- if incremental_strategy == 'insert_overwrite' and partition_by -%}
121+
{%- if incremental_strategy == 'insert_overwrite' and partition_by and adapter.compare_dbr_version(16, 3) < 0 -%}
122122
{{ set_overwrite_mode('DYNAMIC') }}
123123
{%- endif -%}
124124
{#-- Relation must be merged --#}
@@ -177,7 +177,7 @@
177177
{{ run_hooks(post_hooks) }}
178178
{%- endif -%}
179179

180-
{%- if incremental_strategy == 'insert_overwrite' and not full_refresh -%}
180+
{%- if incremental_strategy == 'insert_overwrite' and not full_refresh and adapter.compare_dbr_version(16, 3) < 0 -%}
181181
{{ set_overwrite_mode('STATIC') }}
182182
{%- endif -%}
183183

@@ -191,7 +191,7 @@
191191
set spark.sql.sources.partitionOverwriteMode = {{ value }}
192192
{%- endcall -%}
193193
{% else %}
194-
{{ exceptions.warn("INSERT OVERWRITE is only properly supported on all-purpose clusters. On SQL Warehouses, this strategy would be equivalent to using the table materialization.") }}
194+
{{ exceptions.warn("INSERT OVERWRITE is supported on SQL warehouses with DBR 16.3+. On older DBR versions, this strategy would be equivalent to using the table materialization.") }}
195195
{% endif %}
196196
{% endmacro %}
197197

@@ -216,4 +216,4 @@
216216
{%- set configuration_changes = model_config.get_changeset(existing_config) -%}
217217
{{ apply_config_changeset(target_relation, model, configuration_changes) }}
218218
{% endif %}
219-
{% endmacro %}
219+
{% endmacro %}

dbt/include/databricks/macros/materializations/incremental/strategies.sql

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,32 @@
3838
{%- endfor -%}
3939
{%- set dest_cols_csv = dest_columns | join(', ') -%}
4040
{%- set source_cols_csv = common_columns | join(', ') -%}
41-
insert overwrite table {{ target_relation }}
42-
{{ partition_cols(label="partition") }}
43-
select {{source_cols_csv}} from {{ source_relation }}
41+
42+
{%- if adapter.compare_dbr_version(16, 3) >= 0 -%}
43+
{{ get_insert_replace_using_sql(source_relation, target_relation, source_cols_csv) }}
44+
{%- else -%}
45+
{#-- Use traditional INSERT OVERWRITE for older DBR versions --#}
46+
insert overwrite table {{ target_relation }}
47+
{{ partition_cols(label="partition") }}
48+
select {{ source_cols_csv }} from {{ source_relation }}
49+
{%- endif -%}
50+
{% endmacro %}
51+
52+
{% macro get_insert_replace_using_sql(source_relation, target_relation, source_cols_csv) %}
53+
{%- set partition_by = config.get('partition_by') -%}
54+
{%- if partition_by -%}
55+
{%- if partition_by is string -%}
56+
{%- set partition_by = [partition_by] -%}
57+
{%- endif -%}
58+
{%- set partition_cols_csv = partition_by | join(', ') -%}
59+
insert into table {{ target_relation }}
60+
replace using ({{ partition_cols_csv }})
61+
select {{ source_cols_csv }} from {{ source_relation }}
62+
{%- else -%}
63+
{#-- Fallback to regular insert if no partitions defined --#}
64+
insert overwrite table {{ target_relation }}
65+
select {{ source_cols_csv }} from {{ source_relation }}
66+
{%- endif -%}
4467
{% endmacro %}
4568

4669
{% macro get_replace_where_sql(args_dict) -%}

dbt/include/databricks/macros/materializations/incremental/validate.sql

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,6 @@
2424
You can only choose this strategy when file_format is set to 'delta'
2525
{%- endset %}
2626

27-
{% set invalid_insert_overwrite_endpoint_msg -%}
28-
Invalid incremental strategy provided: {{ raw_strategy }}
29-
You cannot use this strategy when connecting via warehouse
30-
Use the 'merge' or 'replace_where' strategy instead
31-
{%- endset %}
32-
3327
{% if raw_strategy not in adapter.valid_incremental_strategies() %}
3428
{{ log("WARNING - You are using an unsupported incremental strategy: " ~ raw_strategy) }}
3529
{{ log("You can ignore this warning if you are using a custom incremental strategy") }}

tests/functional/adapter/incremental/test_incremental_strategies.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ def project_config_update(self):
7070
}
7171

7272

73-
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
7473
class InsertOverwriteBase(IncrementalBase):
7574
@pytest.fixture(scope="class")
7675
def seeds(self):
@@ -93,12 +92,10 @@ def test_incremental(self, project):
9392
util.check_relations_equal(project.adapter, ["overwrite_model", "overwrite_expected"])
9493

9594

96-
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
9795
class TestInsertOverwriteDelta(InsertOverwriteBase):
9896
pass
9997

10098

101-
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
10299
class TestInsertOverwriteWithPartitionsDelta(InsertOverwriteBase):
103100
@pytest.fixture(scope="class")
104101
def project_config_update(self):
@@ -120,7 +117,6 @@ def test_incremental(self, project):
120117
util.check_relations_equal(project.adapter, ["overwrite_model", "upsert_expected"])
121118

122119

123-
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
124120
class TestInsertOverwriteChangeSchema(InsertOverwriteBase):
125121
@pytest.fixture(scope="class")
126122
def models(self):
@@ -178,37 +174,6 @@ def test_incremental(self, project):
178174
util.check_relations_equal(project.adapter, ["overwrite_model", "upsert_expected"])
179175

180176

181-
# Insert overwrite in SQL warehouse is expected to behave like a table materialization
182-
# We support this as a short term hack for customers who want the side effect of reusing
183-
# the same table on subsequent runs
184-
@pytest.mark.skip_profile("databricks_uc_cluster", "databricks_cluster")
185-
class TestInsertOverwriteSqlWarehouse(IncrementalBase):
186-
@pytest.fixture(scope="class")
187-
def project_config_update(self):
188-
return {
189-
"models": {
190-
"+incremental_strategy": "insert_overwrite",
191-
"+partition_by": "id",
192-
},
193-
}
194-
195-
@pytest.fixture(scope="class")
196-
def seeds(self):
197-
return {
198-
"overwrite_expected.csv": fixtures.overwrite_expected,
199-
}
200-
201-
@pytest.fixture(scope="class")
202-
def models(self):
203-
return {
204-
"overwrite_model.sql": fixtures.base_model,
205-
}
206-
207-
def test_incremental(self, project):
208-
self.seed_and_run_twice()
209-
util.check_relations_equal(project.adapter, ["overwrite_model", "overwrite_expected"])
210-
211-
212177
@pytest.mark.external
213178
@pytest.mark.skip("This test is not repeatable due to external location")
214179
class TestInsertOverwriteParquet(InsertOverwriteBase):
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
from unittest.mock import Mock
2+
3+
import pytest
4+
5+
from tests.unit.macros.base import MacroTestBase
6+
7+
8+
class TestInsertOverwriteMacros(MacroTestBase):
9+
@pytest.fixture(scope="class")
10+
def template_name(self) -> str:
11+
return "strategies.sql"
12+
13+
@pytest.fixture(scope="class")
14+
def macro_folders_to_load(self) -> list:
15+
return ["macros/materializations/incremental"]
16+
17+
@pytest.fixture(scope="class")
18+
def spark_template_names(self) -> list:
19+
# Need spark templates for partition_cols macro
20+
return ["adapters.sql"]
21+
22+
@pytest.fixture(autouse=True)
23+
def setup_mock_columns(self, context):
24+
"""Mock the adapter methods needed for the macro"""
25+
# Mock get_columns_in_relation to return some test columns
26+
mock_column_a = Mock()
27+
mock_column_a.quoted = "a"
28+
mock_column_b = Mock()
29+
mock_column_b.quoted = "b"
30+
31+
context["adapter"].get_columns_in_relation.return_value = [mock_column_a, mock_column_b]
32+
33+
def test_get_insert_overwrite_sql__legacy_dbr_version(self, template, context, config):
34+
"""Test that DBR < 16.3 uses traditional INSERT OVERWRITE syntax"""
35+
# Negative return value means DBR < 16.3
36+
context["adapter"].compare_dbr_version.return_value = -1
37+
config["partition_by"] = ["partition_col"]
38+
39+
source_relation = Mock()
40+
source_relation.__str__ = lambda self: "source_table"
41+
target_relation = Mock()
42+
target_relation.__str__ = lambda self: "target_table"
43+
44+
result = self.run_macro_raw(
45+
template,
46+
"get_insert_overwrite_sql",
47+
source_relation,
48+
target_relation,
49+
)
50+
51+
# Verify it uses legacy INSERT OVERWRITE syntax
52+
expected_sql = """insert overwrite table target_table
53+
partition (partition_col)
54+
select a, b from source_table"""
55+
56+
self.assert_sql_equal(result, expected_sql)
57+
58+
def test_get_insert_overwrite_sql__modern_dbr_version(self, template, context, config):
59+
"""Test that DBR >= 16.3 uses REPLACE USING syntax"""
60+
# Positive return value means DBR > 16.3
61+
context["adapter"].compare_dbr_version.return_value = 1
62+
config["partition_by"] = ["partition_col"]
63+
64+
source_relation = Mock()
65+
source_relation.__str__ = lambda self: "source_table"
66+
target_relation = Mock()
67+
target_relation.__str__ = lambda self: "target_table"
68+
69+
result = self.run_macro_raw(
70+
template,
71+
"get_insert_overwrite_sql",
72+
source_relation,
73+
target_relation,
74+
)
75+
76+
# Verify it uses REPLACE USING syntax
77+
expected_sql = """insert into table target_table
78+
replace using (partition_col)
79+
select a, b from source_table"""
80+
81+
self.assert_sql_equal(result, expected_sql)
82+
83+
@pytest.mark.parametrize("dbr_version_return", [-1, 0, 1])
84+
def test_get_insert_overwrite_sql__no_partitions(
85+
self, template, context, config, dbr_version_return
86+
):
87+
"""Test that empty partition_by falls back to INSERT OVERWRITE regardless of DBR version"""
88+
context["adapter"].compare_dbr_version.return_value = dbr_version_return
89+
# No partition_by set in config
90+
91+
source_relation = Mock()
92+
source_relation.__str__ = lambda self: "source_table"
93+
target_relation = Mock()
94+
target_relation.__str__ = lambda self: "target_table"
95+
96+
# Run the macro
97+
result = self.run_macro_raw(
98+
template,
99+
"get_insert_overwrite_sql",
100+
source_relation,
101+
target_relation,
102+
)
103+
104+
# Verify it uses regular INSERT OVERWRITE syntax
105+
expected_sql = """insert overwrite table target_table select a, b from source_table"""
106+
107+
self.assert_sql_equal(result, expected_sql)

tests/unit/test_adapter.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import pytest
77
from agate import Row
8-
from dbt_common.exceptions import DbtConfigError, DbtValidationError
8+
from dbt_common.exceptions import DbtConfigError, DbtDatabaseError, DbtValidationError
99

1010
import dbt.flags as flags
1111
from dbt.adapters.databricks import DatabricksAdapter, __version__
@@ -1173,3 +1173,62 @@ def test_get_columns_materialized_view(self, mock_get_columns, adapter, unity_re
11731173
assert result[0].column == "mv_col"
11741174
assert result[0].dtype == "string"
11751175
assert result[0].comment == "mv col"
1176+
1177+
@patch(
1178+
"dbt.adapters.databricks.behaviors.columns.GetColumnsByDescribe._get_columns_with_comments"
1179+
)
1180+
def test_get_columns_fallback_on_known_error(self, mock_get_columns, adapter, unity_relation):
1181+
"""Test that UNSUPPORTED_FEATURE in DbtDatabaseError triggers fallback to legacy logic"""
1182+
with patch.object(adapter, "compare_dbr_version", return_value=1):
1183+
# Mock the first call to raise PARSE_SYNTAX_ERROR
1184+
# Mock the second call (fallback) to return legacy data
1185+
mock_get_columns.side_effect = [
1186+
DbtDatabaseError(
1187+
"[UNSUPPORTED_FEATURE.DESC_JSON_TABLE_TYPE] The feature is not supported: "
1188+
"DESCRIBE AS JSON not supported for table type of [table_name]. "
1189+
"Please try again without the AS JSON clause."
1190+
),
1191+
[
1192+
{
1193+
"col_name": "fallback_col",
1194+
"data_type": "string",
1195+
"comment": "fallback comment",
1196+
}
1197+
],
1198+
]
1199+
1200+
result = adapter.get_columns_in_relation(unity_relation)
1201+
1202+
# Verify two calls were made: first with AS JSON, then with legacy
1203+
assert mock_get_columns.call_count == 2
1204+
mock_get_columns.assert_any_call(
1205+
adapter, unity_relation, "get_columns_comments_as_json"
1206+
)
1207+
mock_get_columns.assert_any_call(adapter, unity_relation, "get_columns_comments")
1208+
1209+
# Verify the result comes from the fallback (legacy) logic
1210+
assert len(result) == 1
1211+
assert result[0].column == "fallback_col"
1212+
assert result[0].dtype == "string"
1213+
assert result[0].comment == "fallback comment"
1214+
1215+
@patch(
1216+
"dbt.adapters.databricks.behaviors.columns.GetColumnsByDescribe._get_columns_with_comments"
1217+
)
1218+
def test_get_columns_reraises_other_database_errors(
1219+
self, mock_get_columns, adapter, unity_relation
1220+
):
1221+
"""Test that unknown types of DbtDatabaseError is re-raised"""
1222+
with patch.object(adapter, "compare_dbr_version", return_value=1):
1223+
# Mock to raise a different database error that should be re-raised
1224+
mock_get_columns.side_effect = DbtDatabaseError("Some other database error")
1225+
1226+
# Verify the exception is re-raised
1227+
with pytest.raises(DbtDatabaseError, match="Some other database error"):
1228+
adapter.get_columns_in_relation(unity_relation)
1229+
1230+
# Verify only one call was made (no fallback)
1231+
assert mock_get_columns.call_count == 1
1232+
mock_get_columns.assert_called_with(
1233+
adapter, unity_relation, "get_columns_comments_as_json"
1234+
)

0 commit comments

Comments
 (0)