Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

- Validate relation identifier length at creation time and raise a clear error when it exceeds Databricks' 255-character limit ([#1309](https://github.com/databricks/dbt-databricks/issues/1309))
- Fix spurious `MicrobatchConcurrency` behavior-change warning firing on every run regardless of whether the project contained microbatch models ([#1406](https://github.com/databricks/dbt-databricks/issues/1406))
- Fix DBR capability cache being permanently poisoned by a transient version-query failure ([#1398](https://github.com/databricks/dbt-databricks/issues/1398))

## dbt-databricks 1.11.7 (Apr 17, 2026)

Expand Down
35 changes: 13 additions & 22 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ def is_cluster(self) -> bool:
databricks_conn = cast(DatabricksDBTConnection, conn)
return is_cluster_http_path(databricks_conn.http_path, conn.credentials.cluster_id)

def _get_capabilities_for_http_path(self, http_path: str) -> DBRCapabilities:
return self._dbr_capabilities_cache.get(http_path, DBRCapabilities())
@classmethod
def _get_capabilities_for_http_path(cls, http_path: str) -> DBRCapabilities:
return cls._dbr_capabilities_cache.get(http_path, DBRCapabilities())

@classmethod
def _query_dbr_version(
Expand All @@ -193,29 +194,19 @@ def _query_dbr_version(
result = cursor.fetchone()
if result:
return SqlUtils.extract_dbr_version(result[1])
except Exception:
pass
except Exception as e:
logger.debug(f"Failed to query DBR version for http_path={http_path}: {e}")

return None

@classmethod
def _cache_dbr_capabilities(cls, creds: DatabricksCredentials, http_path: str) -> None:
if http_path not in cls._dbr_capabilities_cache:
is_cluster = is_cluster_http_path(http_path, creds.cluster_id)
dbr_version = cls._query_dbr_version(creds, http_path)
"""Cache DBR capabilities for an http_path on first successful version query.

cls._dbr_capabilities_cache[http_path] = DBRCapabilities(
dbr_version=dbr_version,
is_sql_warehouse=not is_cluster,
)

@classmethod
def _try_cache_dbr_capabilities(cls, creds: DatabricksCredentials, http_path: str) -> None:
"""Like _cache_dbr_capabilities, but only writes to the cache when the version query
actually succeeds. This prevents a failed eager lookup (e.g. credentials_manager not yet
set, cluster still spinning up) from storing a None-version entry that the idempotency
guard in open() would later treat as authoritative, causing all capability checks to
return False.
Only writes when the version query succeeds: a failed lookup (credentials_manager
not yet set, cluster spinning up) must not store a None-version entry, since the
idempotency guard would then treat it as authoritative and disable every
capability check for the rest of the process.
"""
if http_path not in cls._dbr_capabilities_cache:
is_cluster = is_cluster_http_path(http_path, creds.cluster_id)
Expand Down Expand Up @@ -306,7 +297,7 @@ def _create_fresh_connection(
conn.http_path = QueryConfigUtils.get_http_path(query_header_context, creds)
conn.thread_identifier = cast(tuple[int, int], self.get_thread_identifier())
conn._query_header_context = query_header_context
self._try_cache_dbr_capabilities(creds, conn.http_path)
self._cache_dbr_capabilities(creds, conn.http_path)
conn.capabilities = self._get_capabilities_for_http_path(conn.http_path)
conn.handle = LazyHandle(self.open)

Expand Down Expand Up @@ -507,9 +498,9 @@ def connect() -> DatabricksHandle:
if conn:
databricks_connection.session_id = conn.session_id
cls._cache_dbr_capabilities(creds, databricks_connection.http_path)
databricks_connection.capabilities = cls._dbr_capabilities_cache[
databricks_connection.capabilities = cls._get_capabilities_for_http_path(
databricks_connection.http_path
]
)
return conn
else:
raise DbtDatabaseError("Failed to create connection")
Expand Down
31 changes: 22 additions & 9 deletions tests/unit/test_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def test_open_calls_is_cluster_http_path_for_warehouse(
assert args[1] is True


class TestTryCacheDbr:
"""Unit tests for _try_cache_dbr_capabilities."""
class TestCacheDbr:
"""Unit tests for _cache_dbr_capabilities."""

HTTP_PATH = "sql/protocolv1/o/1234567890123456/cluster-abc"

Expand All @@ -109,14 +109,11 @@ def clear_cache(self):

@patch.object(DatabricksConnectionManager, "_query_dbr_version", return_value=None)
def test_does_not_write_to_cache_when_version_is_none(self, mock_query):
"""When the version query returns None, the cache must not be written.

This prevents a poisoned None entry from blocking the authoritative write in open().
"""
"""Regression for #1398: a None version query result must not poison the cache."""
creds = Mock(spec=DatabricksCredentials)
creds.cluster_id = None

DatabricksConnectionManager._try_cache_dbr_capabilities(creds, self.HTTP_PATH)
DatabricksConnectionManager._cache_dbr_capabilities(creds, self.HTTP_PATH)

assert self.HTTP_PATH not in DatabricksConnectionManager._dbr_capabilities_cache
mock_query.assert_called_once_with(creds, self.HTTP_PATH)
Expand All @@ -127,7 +124,7 @@ def test_writes_to_cache_when_version_is_known(self, mock_query):
creds = Mock(spec=DatabricksCredentials)
creds.cluster_id = None

DatabricksConnectionManager._try_cache_dbr_capabilities(creds, self.HTTP_PATH)
DatabricksConnectionManager._cache_dbr_capabilities(creds, self.HTTP_PATH)

mock_query.assert_called_once_with(creds, self.HTTP_PATH)
caps = DatabricksConnectionManager._dbr_capabilities_cache.get(self.HTTP_PATH)
Expand All @@ -144,7 +141,23 @@ def test_skips_write_when_already_cached(self, mock_query):
existing = DBRCapabilities(dbr_version=(14, 3), is_sql_warehouse=False)
DatabricksConnectionManager._dbr_capabilities_cache[self.HTTP_PATH] = existing

DatabricksConnectionManager._try_cache_dbr_capabilities(creds, self.HTTP_PATH)
DatabricksConnectionManager._cache_dbr_capabilities(creds, self.HTTP_PATH)

mock_query.assert_not_called()
assert DatabricksConnectionManager._dbr_capabilities_cache[self.HTTP_PATH] is existing

def test_retry_succeeds_after_transient_failure(self):
"""Regression for #1398: after a transient None result, the next call must re-query."""
creds = Mock(spec=DatabricksCredentials)
creds.cluster_id = None

with patch.object(DatabricksConnectionManager, "_query_dbr_version", return_value=None):
DatabricksConnectionManager._cache_dbr_capabilities(creds, self.HTTP_PATH)

with patch.object(DatabricksConnectionManager, "_query_dbr_version", return_value=(16, 2)):
DatabricksConnectionManager._cache_dbr_capabilities(creds, self.HTTP_PATH)

caps = DatabricksConnectionManager._dbr_capabilities_cache.get(self.HTTP_PATH)
assert caps is not None
assert caps.dbr_version == (16, 2)
assert caps.has_capability(DBRCapability.ICEBERG)
Loading