Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions dbt/adapters/databricks/dbr_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
class DBRCapability(Enum):
"""Named capabilities that depend on DBR version."""

TIMESTAMPDIFF = "timestampdiff"
ICEBERG = "iceberg"
COMMENT_ON_COLUMN = "comment_on_column"
JSON_COLUMN_METADATA = "json_column_metadata"
STREAMING_TABLE_JSON_METADATA = "streaming_table_json_metadata"
DESCRIBE_TABLE_EXTENDED_AS_JSON = "describe_table_extended_as_json"
ICEBERG = "iceberg"
INSERT_BY_NAME = "insert_by_name"
JSON_COLUMN_METADATA = "json_column_metadata"
REPLACE_ON = "replace_on"
STREAMING_TABLE_JSON_METADATA = "streaming_table_json_metadata"
TIMESTAMPDIFF = "timestampdiff"


@dataclass
Expand Down Expand Up @@ -61,6 +62,9 @@ class DBRCapabilities:
DBRCapability.REPLACE_ON: CapabilitySpec(
min_version=(17, 1),
),
DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON: CapabilitySpec(
min_version=(17, 3),
),
}

def __init__(
Expand Down
245 changes: 226 additions & 19 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import posixpath
import re
from abc import ABC, abstractmethod
Expand All @@ -11,6 +12,7 @@
from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, Optional, Union, cast
from uuid import uuid4

import agate
from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed, log_code_execution
from dbt.adapters.base.meta import available
Expand Down Expand Up @@ -402,6 +404,23 @@ def require_capability(self, capability: DBRCapability) -> None:
f"Current connection does not meet this requirement."
)

def is_describe_as_json_supported(self, relation: DatabricksRelation) -> bool:
"""
Check if DESCRIBE TABLE EXTENDED AS JSON can be used for the relation.
"""
return (
not relation.is_hive_metastore()
and not relation.is_foreign_table
and self.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON)
)

def fetch_json_metadata(self, relation: DatabricksRelation) -> dict[str, Any]:
"""Fetch the JSON metadata for a relation using DESCRIBE TABLE EXTENDED AS JSON."""
kwargs = {"relation": relation}
describe_results = self.execute_macro("describe_table_extended_as_json", kwargs=kwargs)
json_metadata = json.loads(describe_results.rows[0].get("json_metadata"))
return json_metadata

def list_schemas(self, database: Optional[str]) -> list[str]:
results = self.execute_macro(LIST_SCHEMAS_MACRO_NAME, kwargs={"database": database})
return [row[0] for row in results]
Expand Down Expand Up @@ -1084,11 +1103,19 @@ def _describe_relation(
)

kwargs = {"relation": relation}
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
if adapter.is_describe_as_json_supported(relation):
json_metadata = adapter.fetch_json_metadata(relation)
results["information_schema.views"] = (
DatabricksDescribeJsonMetadata.parse_view_description(json_metadata)
)
results["row_filters"] = DatabricksDescribeJsonMetadata.parse_row_filter(json_metadata)
else:
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)

results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)
return results


Expand All @@ -1112,7 +1139,13 @@ def _describe_relation(
kwargs = {"relation": relation}

results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)

if adapter.is_describe_as_json_supported(relation):
json_metadata = adapter.fetch_json_metadata(relation)
results["row_filters"] = DatabricksDescribeJsonMetadata.parse_row_filter(json_metadata)
else:
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)

return results


Expand All @@ -1135,17 +1168,28 @@ def _describe_relation(
results["information_schema.column_tags"] = adapter.execute_macro(
"fetch_column_tags", kwargs=kwargs
)
results["non_null_constraint_columns"] = adapter.execute_macro(
"fetch_non_null_constraint_columns", kwargs=kwargs
)
results["primary_key_constraints"] = adapter.execute_macro(
"fetch_primary_key_constraints", kwargs=kwargs
)
results["foreign_key_constraints"] = adapter.execute_macro(
"fetch_foreign_key_constraints", kwargs=kwargs
)
results["column_masks"] = adapter.execute_macro("fetch_column_masks", kwargs=kwargs)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)

if adapter.is_describe_as_json_supported(relation):
json_metadata = adapter.fetch_json_metadata(relation)
relation_metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata)
results["non_null_constraint_columns"] = relation_metadata.non_null_constraints
results["primary_key_constraints"] = relation_metadata.primary_key_constraints
results["foreign_key_constraints"] = relation_metadata.foreign_key_constraints
results["column_masks"] = relation_metadata.column_masks
results["row_filters"] = relation_metadata.row_filters
else:
results["non_null_constraint_columns"] = adapter.execute_macro(
"fetch_non_null_constraint_columns", kwargs=kwargs
)
results["primary_key_constraints"] = adapter.execute_macro(
"fetch_primary_key_constraints", kwargs=kwargs
)
results["foreign_key_constraints"] = adapter.execute_macro(
"fetch_foreign_key_constraints", kwargs=kwargs
)
results["column_masks"] = adapter.execute_macro("fetch_column_masks", kwargs=kwargs)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)

results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

kwargs = {"table_name": relation}
Expand All @@ -1169,9 +1213,16 @@ def _describe_relation(
results = {}
kwargs = {"relation": relation}

results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
if adapter.is_describe_as_json_supported(relation):
json_metadata = adapter.fetch_json_metadata(relation)
results["information_schema.views"] = (
DatabricksDescribeJsonMetadata.parse_view_description(json_metadata)
)
else:
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)

results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

Expand Down Expand Up @@ -1202,3 +1253,159 @@ def _describe_relation(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs
)
return results


@dataclass
class DatabricksDescribeJsonMetadata:
column_masks: Optional["agate.Table"] = None
foreign_key_constraints: Optional["agate.Table"] = None
non_null_constraints: Optional["agate.Table"] = None
primary_key_constraints: Optional["agate.Table"] = None
row_filters: Optional["agate.Table"] = None
view_description: Optional["agate.Row"] = None

@classmethod
def from_json_metadata(cls, json_metadata: dict[str, Any]) -> "DatabricksDescribeJsonMetadata":
"""Parse and convert the json metadata into structured metadata for the adapter to use."""
return DatabricksDescribeJsonMetadata(
column_masks=cls.parse_column_masks(json_metadata),
foreign_key_constraints=cls.parse_foreign_key_constraints(json_metadata),
non_null_constraints=cls.parse_non_null_constraints(json_metadata),
primary_key_constraints=cls.parse_primary_key_constraints(json_metadata),
row_filters=cls.parse_row_filter(json_metadata),
view_description=cls.parse_view_description(json_metadata),
)

@classmethod
def parse_column_masks(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of column masks (info_schema format)."""
raw_masks = json_metadata.get("column_masks", [])
rows = []
for mask in raw_masks:
column_name = mask["column_name"]
fn = mask["function_name"]
mask_name = f"{fn['catalog_name']}.{fn['schema_name']}.{fn['function_name']}"
using_columns = ",".join(mask.get("using_column_names", []))
rows.append((column_name, mask_name, using_columns or None))

return agate.Table(
rows=rows,
column_names=["column_name", "mask_name", "using_columns"],
column_types=[agate.Text(), agate.Text(), agate.Text()],
)

@classmethod
def parse_foreign_key_constraints(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of FK constraints (info_schema format)."""
table_constraint = re.sub(r"\s+", " ", json_metadata.get("table_constraints", "").strip())
pairs = re.findall(r"\(\s*(\w+)\s*,(.*?)\)(?=\s*,\s*\(|\s*\])", table_constraint)
fk_rows = []
for name, constraint in pairs:
constraint = constraint.strip()
if re.search(r"FOREIGN\s+KEY", constraint):
fk_part, ref_part = constraint.split("REFERENCES", 1)
from_cols = re.findall(r"`([^`]+)`", fk_part)
ref_parts = re.findall(r"`([^`]+)`", ref_part)
to_catalog = ref_parts[0]
to_schema = ref_parts[1]
to_table = ref_parts[2]
to_cols = ref_parts[3:]
for from_col, to_col in zip(from_cols, to_cols):
fk_rows.append([name, from_col, to_catalog, to_schema, to_table, to_col])

fk_column_names = [
"constraint_name",
"from_column",
"to_catalog",
"to_schema",
"to_table",
"to_column",
]
fk_columns_types = [
agate.Text(),
agate.Text(),
agate.Text(),
agate.Text(),
agate.Text(),
agate.Text(),
]
return agate.Table(fk_rows, fk_column_names, fk_columns_types)

@classmethod
def parse_non_null_constraints(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of non-null constraints (info_schema format)."""
columns = json_metadata.get("columns", [])

non_null_cols = [column["name"] for column in columns if not column.get("nullable")]
return agate.Table(
rows=[[col] for col in non_null_cols],
column_names=["column_name"],
column_types=[agate.Text()],
)

@classmethod
def parse_primary_key_constraints(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of PK constraints (info_schema format)."""
table_constraint = re.sub(r"\s+", " ", json_metadata.get("table_constraints", "").strip())
pairs = re.findall(r"\(\s*(\w+)\s*,(.*?)\)(?=\s*,\s*\(|\s*\])", table_constraint)
pk_rows = []
for name, constraint in pairs:
constraint = constraint.strip()
parts = re.findall(r"`([^`]+)`", constraint)
if re.search(r"PRIMARY\s+KEY", constraint):
for col in parts:
pk_rows.append([name, col])

pk_column_names = ["constraint_name", "column_name"]
pk_columns_types = [agate.Text(), agate.Text()]
return agate.Table(pk_rows, pk_column_names, pk_columns_types)

@classmethod
def parse_view_description(cls, json_metadata: dict[str, Any]) -> "agate.Row":
"""Parse json metadata into an agate Row for the view description (info_schema format)."""
view_text = json_metadata.get("view_text", None)
if view_text is None:
return agate.Row(values=set())
else:
return agate.Row(values=(view_text,), keys=("view_definition",))

@classmethod
def parse_row_filter(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of row filter (info_schema format)."""
row_filter_metadata = json_metadata.get("row_filter")
rows: list[Any] = []
column_names = [
"table_catalog",
"table_schema",
"table_name",
"filter_name",
"target_columns",
]
column_types = [agate.Text(), agate.Text(), agate.Text(), agate.Text(), agate.Text()]

if not row_filter_metadata:
return agate.Table(rows=rows, column_names=column_names, column_types=column_types)

table_catalog = json_metadata["catalog_name"]
table_schema = json_metadata["schema_name"]
table_name = json_metadata["table_name"]

function_name = row_filter_metadata["function_name"]
filter_name = (
function_name["catalog_name"]
+ "."
+ function_name["schema_name"]
+ "."
+ function_name["function_name"]
)
filter_column_names = row_filter_metadata["column_names"]

rows.append(
[table_catalog, table_schema, table_name, filter_name, ",".join(filter_column_names)]
)

return agate.Table(
rows=rows,
column_names=column_names,
column_types=column_types,
)
4 changes: 4 additions & 0 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def is_metric_view(self) -> bool:
def is_streaming_table(self) -> bool:
return self.type == DatabricksRelationType.StreamingTable

@property
def is_foreign_table(self) -> bool:
return self.type == DatabricksRelationType.Foreign

@property
def is_external_table(self) -> bool:
return self.databricks_table_type == DatabricksTableType.External
Expand Down
10 changes: 9 additions & 1 deletion dbt/include/databricks/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,17 @@ SELECT
NULL
) AS databricks_table_type
FROM `system`.`information_schema`.`tables`
WHERE table_catalog = '{{ relation.database|lower }}'
WHERE table_catalog = '{{ relation.database|lower }}'
AND table_schema = '{{ relation.schema|lower }}'
{%- if relation.identifier %}
AND table_name = '{{ relation.identifier|lower }}'
{% endif %}
{% endmacro %}

{% macro describe_table_extended_as_json(relation) %}
{{ return(run_query_as(describe_table_extended_as_json_sql(relation), 'describe_table_extended_as_json')) }}
{% endmacro %}

{% macro describe_table_extended_as_json_sql(relation) %}
DESCRIBE TABLE EXTENDED {{ relation.render() }} AS JSON
{% endmacro %}
7 changes: 7 additions & 0 deletions tests/unit/macros/adapters/test_metadata_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ def test_check_schema_exists_sql_with_hyphenated_database(self, template_bundle)
expected_sql = "SHOW SCHEMAS IN `data_engineering-uc-dev` LIKE 'my_schema'"
self.assert_sql_equal(result, expected_sql)

def test_describe_table_extended_as_json_sql(self, template_bundle, relation):
result = self.run_macro(
template_bundle.template, "describe_table_extended_as_json_sql", relation
)
expected_sql = "DESCRIBE TABLE EXTENDED `some_database`.`some_schema`.`some_table` AS JSON"
self.assert_sql_equal(result, expected_sql)

def test_case_sensitivity(self, template_bundle):
relation = Mock()
relation.database = "TEST_DB"
Expand Down
Loading
Loading