Skip to content

Commit 05f81c2

Browse files
committed
Fix insert overwrite compatibility check for model-level compute override
1 parent 94b9da9 commit 05f81c2

3 files changed

Lines changed: 45 additions & 2 deletions

File tree

dbt/adapters/databricks/connections.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,13 @@ def api_client(self) -> DatabricksApiClient:
205205
return self._api_client
206206

207207
def is_cluster(self) -> bool:
208-
return self.get_thread_connection().credentials.cluster_id is not None
208+
conn = self.get_thread_connection()
209+
return (
210+
conn.credentials.cluster_id is not None
211+
# Credentials field is not updated when overriding the compute at model level.
212+
# This secondary check is a workaround for that case
213+
or "/warehouses/" not in cast(DatabricksDBTConnection, conn).http_path
214+
)
209215

210216
def cancel_open(self) -> list[str]:
211217
cancelled = super().cancel_open()

tests/conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ def dbt_profile_data(unique_schema, dbt_profile_target, profiles_config_update):
4848
}
4949
target = dbt_profile_target
5050
target["schema"] = unique_schema
51+
52+
# For testing model-level compute override
53+
target["compute"] = {
54+
"alternate_uc_cluster": {
55+
"http_path": get_databricks_cluster_target("databricks_uc_cluster")["http_path"]
56+
}
57+
}
5158
profile["test"]["outputs"]["default"] = target
5259

5360
alternate_warehouse = target.copy()

tests/functional/adapter/incremental/test_incremental_strategies.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,31 @@ def project_config_update(self):
106106
"models": {
107107
"+incremental_strategy": "insert_overwrite",
108108
"+partition_by": "id",
109-
}
109+
},
110+
}
111+
112+
@pytest.fixture(scope="class")
113+
def seeds(self):
114+
return {
115+
"upsert_expected.csv": fixtures.upsert_expected,
116+
}
117+
118+
def test_incremental(self, project):
119+
self.seed_and_run_twice()
120+
util.check_relations_equal(project.adapter, ["overwrite_model", "upsert_expected"])
121+
122+
123+
# Only runs under SQL warehouse profile, but overrides compute at model level
124+
@pytest.mark.skip_profile("databricks_uc_cluster", "databricks_cluster")
125+
class TestInsertOverwriteWithModelComputeOverride(IncrementalBase):
126+
@pytest.fixture(scope="class")
127+
def project_config_update(self):
128+
return {
129+
"models": {
130+
"+incremental_strategy": "insert_overwrite",
131+
"+partition_by": "id",
132+
"+databricks_compute": "alternate_uc_cluster",
133+
},
110134
}
111135

112136
@pytest.fixture(scope="class")
@@ -115,6 +139,12 @@ def seeds(self):
115139
"upsert_expected.csv": fixtures.upsert_expected,
116140
}
117141

142+
@pytest.fixture(scope="class")
143+
def models(self):
144+
return {
145+
"overwrite_model.sql": fixtures.base_model,
146+
}
147+
118148
def test_incremental(self, project):
119149
self.seed_and_run_twice()
120150
util.check_relations_equal(project.adapter, ["overwrite_model", "upsert_expected"])

0 commit comments

Comments
 (0)