Skip to content

Commit 8176892

Browse files
committed
extract Databricks patch logic to separate module
1 parent 4a2a8c7 commit 8176892

2 files changed

Lines changed: 127 additions & 114 deletions

File tree

elementary/clients/dbt/command_line_dbt_runner.py

Lines changed: 3 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import os
33
import re
44
from dataclasses import dataclass
5-
from typing import Any, Dict, List, Optional, Union
5+
from typing import Any, Dict, List, Optional
66

77
import yaml
88

99
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
10+
from elementary.clients.dbt.databricks_patch import apply_databricks_patch
1011
from elementary.clients.dbt.dbt_log import parse_dbt_output
1112
from elementary.exceptions.exceptions import DbtCommandError, DbtLsCommandError
1213
from elementary.monitor.dbt_project_utils import is_dbt_package_up_to_date
@@ -56,8 +57,7 @@ def __init__(
5657

5758
# Apply databricks compatibility patch for version 1.10.2 only once
5859
if not CommandLineDbtRunner._dbx_patch_applied:
59-
self._apply_databricks_compatibility_patch()
60-
CommandLineDbtRunner._dbx_patch_applied = True
60+
CommandLineDbtRunner._dbx_patch_applied = apply_databricks_patch()
6161

6262
if force_dbt_deps:
6363
self.deps()
@@ -326,114 +326,3 @@ def _run_deps_if_needed(self):
326326

327327
if should_run_deps:
328328
self.deps()
329-
330-
def _apply_databricks_compatibility_patch(self):
331-
"""Apply monkey patch to fix dbt-databricks 1.10.2 compatibility issues"""
332-
try:
333-
from typing import Any, Optional
334-
335-
from dbt.adapters.databricks import parse_model # type: ignore
336-
337-
def is_unsupported_object(model: Any) -> bool:
338-
"""Check if the object is a Macro or other unsupported type"""
339-
return hasattr(model, "__class__") and "Macro" in str(model.__class__)
340-
341-
def safe_catalog_name(model: Any) -> str:
342-
try:
343-
if is_unsupported_object(model):
344-
logger.debug(
345-
"Received unsupported object type for catalog_name, using unity as default"
346-
)
347-
return "unity"
348-
# Handle RelationConfig objects
349-
if (
350-
hasattr(model, "config")
351-
and model.config
352-
and hasattr(model.config, "get")
353-
):
354-
catalog = model.config.get("catalog")
355-
if catalog:
356-
return catalog
357-
# Fallback to unity catalog
358-
return "unity"
359-
except Exception as e:
360-
logger.debug(
361-
f"Failed to parse catalog name from model: {e}. Using unity as default."
362-
)
363-
return "unity"
364-
365-
def safe_file_format(model: Any) -> Optional[str]:
366-
try:
367-
if is_unsupported_object(model):
368-
return None
369-
return safe_get(model, "file_format")
370-
except Exception as e:
371-
logger.debug(f"Failed to get file_format from model: {e}")
372-
return None
373-
374-
def safe_location_path(model: Any) -> Optional[str]:
375-
try:
376-
if is_unsupported_object(model):
377-
return None
378-
if not hasattr(model, "config") or not model.config:
379-
return None
380-
if model.config.get("include_full_name_in_path"):
381-
return f"{model.database}/{model.schema}/{model.identifier}"
382-
return model.identifier if hasattr(model, "identifier") else None
383-
except Exception as e:
384-
logger.debug(f"Failed to get location_path from model: {e}")
385-
return None
386-
387-
def safe_location_root(model: Any) -> Optional[str]:
388-
try:
389-
if is_unsupported_object(model):
390-
return None
391-
return safe_get(model, "location_root")
392-
except Exception as e:
393-
logger.debug(f"Failed to get location_root from model: {e}")
394-
return None
395-
396-
def safe_table_format(model: Any) -> Optional[str]:
397-
try:
398-
if is_unsupported_object(model):
399-
return None
400-
return safe_get(model, "table_format")
401-
except Exception as e:
402-
logger.debug(f"Failed to get table_format from model: {e}")
403-
return None
404-
405-
def safe_get(
406-
model: Any, setting: str, case_sensitive: Union[bool, None] = False
407-
) -> Union[str, None]:
408-
try:
409-
if is_unsupported_object(model):
410-
return None
411-
# Check if model has config attribute
412-
if not hasattr(model, "config") or not model.config:
413-
return None
414-
# Check if config has get method
415-
if not hasattr(model.config, "get"):
416-
return None
417-
value = model.config.get(setting)
418-
if value:
419-
return value if case_sensitive else value.lower()
420-
return None
421-
except Exception as e:
422-
logger.debug(f"Failed to get {setting} from model config: {e}")
423-
return None
424-
425-
# Replace problematic functions with safe versions
426-
parse_model.catalog_name = safe_catalog_name
427-
parse_model.file_format = safe_file_format
428-
parse_model.location_path = safe_location_path
429-
parse_model.location_root = safe_location_root
430-
parse_model.table_format = safe_table_format
431-
parse_model._get = safe_get
432-
433-
logger.debug("Applied dbt-databricks 1.10.2 compatibility patch")
434-
435-
except ImportError:
436-
# parse_model module doesn't exist in older versions
437-
pass
438-
except Exception as e:
439-
logger.debug(f"Failed to apply dbt-databricks compatibility patch: {e}")
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""Databricks compatibility patch for dbt-databricks 1.10.2."""
2+
import logging
3+
from typing import Any, Union
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
def is_unsupported_object(model: Any) -> bool:
9+
"""Check if the object is a Macro or other unsupported type"""
10+
return hasattr(model, "__class__") and "Macro" in str(model.__class__)
11+
12+
13+
def safe_catalog_name(model: Any) -> str:
14+
try:
15+
if is_unsupported_object(model):
16+
logger.debug(
17+
"Received unsupported object type for catalog_name, using unity as default"
18+
)
19+
return "unity"
20+
# Handle RelationConfig objects
21+
if hasattr(model, "config") and model.config and hasattr(model.config, "get"):
22+
catalog = model.config.get("catalog")
23+
if catalog:
24+
return catalog
25+
# Fallback to unity catalog
26+
return "unity"
27+
except Exception as e:
28+
logger.debug(
29+
f"Failed to parse catalog name from model: {e}. Using unity as default."
30+
)
31+
return "unity"
32+
33+
34+
def safe_file_format(model: Any) -> Union[str, None]:
35+
try:
36+
if is_unsupported_object(model):
37+
return None
38+
return safe_get(model, "file_format")
39+
except Exception as e:
40+
logger.debug(f"Failed to get file_format from model: {e}")
41+
return None
42+
43+
44+
def safe_location_path(model: Any) -> Union[str, None]:
45+
try:
46+
if is_unsupported_object(model):
47+
return None
48+
if not hasattr(model, "config") or not model.config:
49+
return None
50+
if model.config.get("include_full_name_in_path"):
51+
return f"{model.database}/{model.schema}/{model.identifier}"
52+
return model.identifier if hasattr(model, "identifier") else None
53+
except Exception as e:
54+
logger.debug(f"Failed to get location_path from model: {e}")
55+
return None
56+
57+
58+
def safe_location_root(model: Any) -> Union[str, None]:
59+
try:
60+
if is_unsupported_object(model):
61+
return None
62+
return safe_get(model, "location_root")
63+
except Exception as e:
64+
logger.debug(f"Failed to get location_root from model: {e}")
65+
return None
66+
67+
68+
def safe_table_format(model: Any) -> Union[str, None]:
69+
try:
70+
if is_unsupported_object(model):
71+
return None
72+
return safe_get(model, "table_format")
73+
except Exception as e:
74+
logger.debug(f"Failed to get table_format from model: {e}")
75+
return None
76+
77+
78+
def safe_get(
79+
model: Any, setting: str, case_sensitive: Union[bool, None] = False
80+
) -> Union[str, None]:
81+
try:
82+
if is_unsupported_object(model):
83+
return None
84+
# Check if model has config attribute
85+
if not hasattr(model, "config") or not model.config:
86+
return None
87+
# Check if config has get method
88+
if not hasattr(model.config, "get"):
89+
return None
90+
value = model.config.get(setting)
91+
if value:
92+
return value if case_sensitive else value.lower()
93+
return None
94+
except Exception as e:
95+
logger.debug(f"Failed to get {setting} from model config: {e}")
96+
return None
97+
98+
99+
def apply_databricks_patch() -> bool:
100+
"""Apply monkey patch to fix dbt-databricks 1.10.2 compatibility issues.
101+
102+
Returns:
103+
bool: True if patch was successfully applied, False otherwise.
104+
"""
105+
try:
106+
from dbt.adapters.databricks import parse_model # type: ignore
107+
108+
# Replace problematic functions with safe versions
109+
parse_model.catalog_name = safe_catalog_name
110+
parse_model.file_format = safe_file_format
111+
parse_model.location_path = safe_location_path
112+
parse_model.location_root = safe_location_root
113+
parse_model.table_format = safe_table_format
114+
parse_model._get = safe_get
115+
116+
logger.debug("Applied dbt-databricks 1.10.2 compatibility patch")
117+
return True
118+
119+
except ImportError:
120+
# parse_model module doesn't exist in older versions
121+
return False
122+
except Exception as e:
123+
logger.debug(f"Failed to apply dbt-databricks compatibility patch: {e}")
124+
return False

0 commit comments

Comments
 (0)