diff --git a/CHANGELOG.md b/CHANGELOG.md index 97c16aa48..93e313be4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Validate relation identifier length at creation time and raise a clear error when it exceeds Databricks' 255-character limit ([#1309](https://github.com/databricks/dbt-databricks/issues/1309)) - Fix spurious `MicrobatchConcurrency` behavior-change warning firing on every run regardless of whether the project contained microbatch models ([#1406](https://github.com/databricks/dbt-databricks/issues/1406)) - Fix DBR capability cache being permanently poisoned by a transient version-query failure ([#1398](https://github.com/databricks/dbt-databricks/issues/1398)) +- Fix `dbt docs generate` failing with `RuntimeError: Tables contain columns with the same names ... but different types` during catalog merge across schemas ([#1392](https://github.com/databricks/dbt-databricks/issues/1392)) ## dbt-databricks 1.11.7 (Apr 17, 2026) diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 4d6964495..3d674e4cd 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -740,10 +740,26 @@ def exec() -> AttrDict: return handle_missing_objects(exec, None) + # Force Text type for catalog metadata columns. Without this, agate's + # value-driven type inference can mark a metadata column as Number in one + # schema (e.g. all-numeric column names, all-null comments) and Text in + # another, causing `catch_as_completed`'s merge to raise on the conflict. + CATALOG_TEXT_ONLY_COLUMNS = ( + "table_database", + "table_schema", + "table_name", + "table_type", + "table_owner", + "table_comment", + "column_name", + "column_type", + "comment", + ) + def _get_schema_for_catalog(self, catalog: str, schema: str, identifier: str) -> "Table": # Lazy load to improve startup time from agate import Table - from dbt_common.clients.agate_helper import DEFAULT_TYPE_TESTER + from dbt_common.clients.agate_helper import DEFAULT_TYPE_TESTER, build_type_tester columns: list[dict[str, Any]] = [] @@ -756,7 +772,12 @@ def _get_schema_for_catalog(self, catalog: str, schema: str, identifier: str) -> ) for relation, information in self._list_relations_with_information(schema_relation): columns.extend(self._get_columns_for_catalog(relation, information)) - return Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER) + # An empty input produces a 0-column table; force-typing absent columns + # would emit one RuntimeWarning per name. Skip the override in that case. + column_types = ( + build_type_tester(self.CATALOG_TEXT_ONLY_COLUMNS) if columns else DEFAULT_TYPE_TESTER + ) + return Table.from_object(columns, column_types=column_types) def _get_columns_for_catalog( # type: ignore[override] self, relation: DatabricksRelation, information: str diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 23ce774d8..1197834f2 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -1,12 +1,15 @@ import re +import warnings from multiprocessing import get_context from typing import Any, Optional from unittest.mock import Mock, patch +import agate import dbt.flags as flags import pytest from agate import Row from dbt.config import RuntimeConfig +from dbt_common.clients.agate_helper import merge_tables from dbt_common.exceptions import DbtConfigError, DbtDatabaseError, DbtValidationError from dbt.adapters.databricks import DatabricksAdapter, __version__, constants @@ -28,6 +31,22 @@ from tests.unit.utils import config_from_parts_or_dicts +def _catalog_row(column_name: str) -> dict: + """Mimics the dict shape `_get_columns_for_catalog` yields per column.""" + return { + "table_database": "cat", + "table_schema": "schema", + "table_name": "model", + "table_type": "table", + "table_owner": "root", + "table_comment": None, + "column_index": 0, + "column_name": column_name, + "column_type": "bigint", + "comment": None, + } + + class DatabricksAdapterBase: @pytest.fixture(autouse=True) def setUp(self): @@ -418,6 +437,69 @@ def test_get_schema_for_catalog__some_columns(self, _): assert len(table.rows) == 2 assert table.column_names == ("name", "type", "comment") + @patch("dbt.adapters.databricks.api_client.DatabricksApiClient") + def test_get_schema_for_catalog__metadata_columns_are_always_text(self, _): + # column_name "12345" forces agate's DEFAULT_TYPE_TESTER to infer Number + # for column_name unless it is force-typed Text by the adapter. + with patch.object(DatabricksAdapter, "_list_relations_with_information") as list_info: + list_info.return_value = [(Mock(), "info")] + with patch.object(DatabricksAdapter, "_get_columns_for_catalog") as get_columns: + get_columns.return_value = [_catalog_row("12345")] + adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn")) + table = adapter._get_schema_for_catalog("cat", "schema", "name") + + types = dict(zip(table.column_names, table.column_types)) + for col in ( + "table_database", + "table_schema", + "table_name", + "table_type", + "table_owner", + "table_comment", + "column_name", + "column_type", + "comment", + ): + assert col in types, f"missing metadata column {col!r}" + assert isinstance(types[col], agate.data_types.text.Text), ( + f"{col!r} must be Text, got {type(types[col]).__name__}" + ) + + @patch("dbt.adapters.databricks.api_client.DatabricksApiClient") + def test_get_schema_for_catalog__results_merge_across_schemas(self, _): + # Two schemas whose column_name values would force agate to infer + # different types (Text vs Number) without the text_only override. + # `catch_as_completed` merges per-schema results via `merge_tables`, so + # a type mismatch surfaces there as the user-visible RuntimeError. + with patch.object(DatabricksAdapter, "_list_relations_with_information") as list_info: + list_info.return_value = [(Mock(), "info")] + with patch.object(DatabricksAdapter, "_get_columns_for_catalog") as get_columns: + adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn")) + get_columns.return_value = [_catalog_row("id")] + table_a = adapter._get_schema_for_catalog("cat", "schema_a", "id") + get_columns.return_value = [_catalog_row("12345")] + table_b = adapter._get_schema_for_catalog("cat", "schema_b", "id") + + merged = merge_tables([table_a, table_b]) + assert len(merged.rows) == 2 + types = dict(zip(merged.column_names, merged.column_types)) + assert isinstance(types["column_name"], agate.data_types.text.Text) + + @patch("dbt.adapters.databricks.api_client.DatabricksApiClient") + def test_get_schema_for_catalog__no_columns_emits_no_warnings(self, _): + # An empty schema must not spam RuntimeWarnings about forced columns + # missing from the (empty) table; that's the exact `dbt docs generate` + # scenario the fix targets. + with patch.object(DatabricksAdapter, "_list_relations_with_information") as list_info: + list_info.return_value = [(Mock(), "info")] + with patch.object(DatabricksAdapter, "_get_columns_for_catalog") as get_columns: + get_columns.return_value = [] + adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn")) + with warnings.catch_warnings(): + warnings.simplefilter("error", RuntimeWarning) + table = adapter._get_schema_for_catalog("database", "schema", "name") + assert len(table.rows) == 0 + def test_simple_catalog_relation(self): self.maxDiff = None rel_type = DatabricksRelation.get_relation_type.Table