Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Validate relation identifier length at creation time and raise a clear error when it exceeds Databricks' 255-character limit ([#1309](https://github.com/databricks/dbt-databricks/issues/1309))
- Fix spurious `MicrobatchConcurrency` behavior-change warning firing on every run regardless of whether the project contained microbatch models ([#1406](https://github.com/databricks/dbt-databricks/issues/1406))
- Fix DBR capability cache being permanently poisoned by a transient version-query failure ([#1398](https://github.com/databricks/dbt-databricks/issues/1398))
- Fix `dbt docs generate` failing with `RuntimeError: Tables contain columns with the same names ... but different types` during catalog merge across schemas ([#1392](https://github.com/databricks/dbt-databricks/issues/1392))

## dbt-databricks 1.11.7 (Apr 17, 2026)

Expand Down
25 changes: 23 additions & 2 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,10 +740,26 @@ def exec() -> AttrDict:

return handle_missing_objects(exec, None)

# Force Text type for catalog metadata columns. Without this, agate's
# value-driven type inference can mark a metadata column as Number in one
# schema (e.g. all-numeric column names, all-null comments) and Text in
# another, causing `catch_as_completed`'s merge to raise on the conflict.
CATALOG_TEXT_ONLY_COLUMNS = (
"table_database",
"table_schema",
"table_name",
"table_type",
"table_owner",
"table_comment",
"column_name",
"column_type",
"comment",
)

def _get_schema_for_catalog(self, catalog: str, schema: str, identifier: str) -> "Table":
# Lazy load to improve startup time
from agate import Table
from dbt_common.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt_common.clients.agate_helper import DEFAULT_TYPE_TESTER, build_type_tester

columns: list[dict[str, Any]] = []

Expand All @@ -756,7 +772,12 @@ def _get_schema_for_catalog(self, catalog: str, schema: str, identifier: str) ->
)
for relation, information in self._list_relations_with_information(schema_relation):
columns.extend(self._get_columns_for_catalog(relation, information))
return Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER)
# An empty input produces a 0-column table; force-typing absent columns
# would emit one RuntimeWarning per name. Skip the override in that case.
column_types = (
build_type_tester(self.CATALOG_TEXT_ONLY_COLUMNS) if columns else DEFAULT_TYPE_TESTER
)
return Table.from_object(columns, column_types=column_types)

def _get_columns_for_catalog( # type: ignore[override]
self, relation: DatabricksRelation, information: str
Expand Down
82 changes: 82 additions & 0 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import re
import warnings
from multiprocessing import get_context
from typing import Any, Optional
from unittest.mock import Mock, patch

import agate
import dbt.flags as flags
import pytest
from agate import Row
from dbt.config import RuntimeConfig
from dbt_common.clients.agate_helper import merge_tables
from dbt_common.exceptions import DbtConfigError, DbtDatabaseError, DbtValidationError

from dbt.adapters.databricks import DatabricksAdapter, __version__, constants
Expand All @@ -28,6 +31,22 @@
from tests.unit.utils import config_from_parts_or_dicts


def _catalog_row(column_name: str) -> dict:
"""Mimics the dict shape `_get_columns_for_catalog` yields per column."""
return {
"table_database": "cat",
"table_schema": "schema",
"table_name": "model",
"table_type": "table",
"table_owner": "root",
"table_comment": None,
"column_index": 0,
"column_name": column_name,
"column_type": "bigint",
"comment": None,
}


class DatabricksAdapterBase:
@pytest.fixture(autouse=True)
def setUp(self):
Expand Down Expand Up @@ -418,6 +437,69 @@ def test_get_schema_for_catalog__some_columns(self, _):
assert len(table.rows) == 2
assert table.column_names == ("name", "type", "comment")

@patch("dbt.adapters.databricks.api_client.DatabricksApiClient")
def test_get_schema_for_catalog__metadata_columns_are_always_text(self, _):
# column_name "12345" forces agate's DEFAULT_TYPE_TESTER to infer Number
# for column_name unless it is force-typed Text by the adapter.
with patch.object(DatabricksAdapter, "_list_relations_with_information") as list_info:
list_info.return_value = [(Mock(), "info")]
with patch.object(DatabricksAdapter, "_get_columns_for_catalog") as get_columns:
get_columns.return_value = [_catalog_row("12345")]
adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn"))
table = adapter._get_schema_for_catalog("cat", "schema", "name")

types = dict(zip(table.column_names, table.column_types))
for col in (
"table_database",
"table_schema",
"table_name",
"table_type",
"table_owner",
"table_comment",
"column_name",
"column_type",
"comment",
):
assert col in types, f"missing metadata column {col!r}"
assert isinstance(types[col], agate.data_types.text.Text), (
f"{col!r} must be Text, got {type(types[col]).__name__}"
)

@patch("dbt.adapters.databricks.api_client.DatabricksApiClient")
def test_get_schema_for_catalog__results_merge_across_schemas(self, _):
# Two schemas whose column_name values would force agate to infer
# different types (Text vs Number) without the text_only override.
# `catch_as_completed` merges per-schema results via `merge_tables`, so
# a type mismatch surfaces there as the user-visible RuntimeError.
with patch.object(DatabricksAdapter, "_list_relations_with_information") as list_info:
list_info.return_value = [(Mock(), "info")]
with patch.object(DatabricksAdapter, "_get_columns_for_catalog") as get_columns:
adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn"))
get_columns.return_value = [_catalog_row("id")]
table_a = adapter._get_schema_for_catalog("cat", "schema_a", "id")
get_columns.return_value = [_catalog_row("12345")]
table_b = adapter._get_schema_for_catalog("cat", "schema_b", "id")

merged = merge_tables([table_a, table_b])
assert len(merged.rows) == 2
types = dict(zip(merged.column_names, merged.column_types))
assert isinstance(types["column_name"], agate.data_types.text.Text)

@patch("dbt.adapters.databricks.api_client.DatabricksApiClient")
def test_get_schema_for_catalog__no_columns_emits_no_warnings(self, _):
# An empty schema must not spam RuntimeWarnings about forced columns
# missing from the (empty) table; that's the exact `dbt docs generate`
# scenario the fix targets.
with patch.object(DatabricksAdapter, "_list_relations_with_information") as list_info:
list_info.return_value = [(Mock(), "info")]
with patch.object(DatabricksAdapter, "_get_columns_for_catalog") as get_columns:
get_columns.return_value = []
adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn"))
with warnings.catch_warnings():
warnings.simplefilter("error", RuntimeWarning)
table = adapter._get_schema_for_catalog("database", "schema", "name")
assert len(table.rows) == 0

def test_simple_catalog_relation(self):
self.maxDiff = None
rel_type = DatabricksRelation.get_relation_type.Table
Expand Down
Loading