Skip to content

Commit c137ea9

Browse files
Fixes #24795: dbt snapshot model patch (#26874)
* Fix dbt snapshot metadata ingestion * address comments * address comments * checkstyle
1 parent d3b2277 commit c137ea9

5 files changed

Lines changed: 296 additions & 13 deletions

File tree

ingestion/src/metadata/ingestion/source/database/dbt/dbt_utils.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
DbtCommonEnum,
3030
RawQueriesEnum,
3131
)
32+
from metadata.ingestion.source.database.dbt.models import SnapshotNodeLocation
3233
from metadata.utils import entity_link
3334
from metadata.utils.logger import ingestion_logger
3435

@@ -786,6 +787,30 @@ def get_data_model_path(manifest_node):
786787
return datamodel_path
787788

788789

790+
def get_snapshot_effective_schema_and_database(
791+
manifest_node: Any,
792+
) -> SnapshotNodeLocation:
793+
"""
794+
For snapshot nodes, config.target_schema and config.target_database
795+
override manifest_node.schema_ and manifest_node.database respectively.
796+
Returns a SnapshotNodeLocation with the resolved schema and database.
797+
"""
798+
effective_schema: str = manifest_node.schema_
799+
effective_database: Optional[str] = manifest_node.database
800+
if hasattr(manifest_node, "config") and manifest_node.config:
801+
if (
802+
hasattr(manifest_node.config, "target_schema")
803+
and manifest_node.config.target_schema
804+
):
805+
effective_schema = manifest_node.config.target_schema
806+
if (
807+
hasattr(manifest_node.config, "target_database")
808+
and manifest_node.config.target_database
809+
):
810+
effective_database = manifest_node.config.target_database
811+
return SnapshotNodeLocation(schema_=effective_schema, database=effective_database)
812+
813+
789814
def find_entity_by_type_and_fqn(
790815
metadata: OpenMetadata, entity_type: str, entity_fqn: str
791816
) -> Optional[Any]:

ingestion/src/metadata/ingestion/source/database/dbt/metadata.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
get_dbt_model_name,
9999
get_dbt_raw_query,
100100
get_manifest_column_name,
101+
get_snapshot_effective_schema_and_database,
101102
validate_custom_property_value,
102103
)
103104
from metadata.ingestion.source.database.dbt.models import DbtMeta
@@ -615,8 +616,8 @@ def yield_dbt_tags(
615616
if model_tags:
616617
dbt_tags_list.extend(self.filter_tags(model_tags))
617618

618-
# Add the tags from the columns
619-
for _, column in manifest_node.columns.items():
619+
# snapshot nodes may have columns=None (columns are inferred at runtime)
620+
for _, column in (manifest_node.columns or {}).items():
620621
column_tags = column.tags
621622
if column_tags:
622623
dbt_tags_list.extend(self.filter_tags(column_tags))
@@ -903,10 +904,21 @@ def yield_data_models(
903904

904905
model_name = get_dbt_model_name(manifest_node)
905906

906-
# Filter the dbt models based on filter patterns
907+
# snapshots can redirect output to a different schema/database via config.target_schema/target_database
908+
if resource_type == "snapshot":
909+
location = get_snapshot_effective_schema_and_database(
910+
manifest_node
911+
)
912+
node_schema = location.schema_
913+
node_database = location.database
914+
else:
915+
node_schema = manifest_node.schema_
916+
node_database = manifest_node.database
917+
918+
# Filter the dbt models based on filter patterns using effective schema/database
907919
filter_model = self.is_filtered(
908-
database_name=get_corrected_name(manifest_node.database),
909-
schema_name=get_corrected_name(manifest_node.schema_),
920+
database_name=get_corrected_name(node_database),
921+
schema_name=get_corrected_name(node_schema),
910922
table_name=model_name,
911923
)
912924
if filter_model.is_filtered:
@@ -931,13 +943,12 @@ def yield_data_models(
931943
)
932944
or []
933945
)
934-
935946
table_fqn = fqn.build(
936947
self.metadata,
937948
entity_type=Table,
938949
service_name=self.config.serviceName,
939-
database_name=get_corrected_name(manifest_node.database),
940-
schema_name=get_corrected_name(manifest_node.schema_),
950+
database_name=get_corrected_name(node_database),
951+
schema_name=get_corrected_name(node_schema),
941952
table_name=model_name,
942953
)
943954

@@ -1017,9 +1028,24 @@ def parse_upstream_nodes(self, manifest_entities, dbt_node):
10171028
parent_node = manifest_entities[node]
10181029
table_name = get_dbt_model_name(parent_node)
10191030

1031+
parent_resource_type = getattr(
1032+
parent_node.resource_type,
1033+
"value",
1034+
parent_node.resource_type,
1035+
)
1036+
if parent_resource_type == "snapshot":
1037+
parent_location = get_snapshot_effective_schema_and_database(
1038+
parent_node
1039+
)
1040+
parent_database = parent_location.database
1041+
parent_schema = parent_location.schema_
1042+
else:
1043+
parent_database = parent_node.database
1044+
parent_schema = parent_node.schema_
1045+
10201046
filter_model = self.is_filtered(
1021-
database_name=get_corrected_name(parent_node.database),
1022-
schema_name=get_corrected_name(parent_node.schema_),
1047+
database_name=get_corrected_name(parent_database),
1048+
schema_name=get_corrected_name(parent_schema),
10231049
table_name=table_name,
10241050
)
10251051
if filter_model.is_filtered:
@@ -1036,8 +1062,8 @@ def parse_upstream_nodes(self, manifest_entities, dbt_node):
10361062
self.metadata,
10371063
entity_type=Table,
10381064
service_name=self.config.serviceName,
1039-
database_name=get_corrected_name(parent_node.database),
1040-
schema_name=get_corrected_name(parent_node.schema_),
1065+
database_name=get_corrected_name(parent_database),
1066+
schema_name=get_corrected_name(parent_schema),
10411067
table_name=table_name,
10421068
)
10431069

@@ -1060,7 +1086,8 @@ def parse_data_model_columns(
10601086
Method to parse the DBT columns
10611087
"""
10621088
columns = []
1063-
manifest_columns = manifest_node.columns
1089+
# snapshot nodes default columns to None; treat as empty to avoid AttributeError
1090+
manifest_columns = manifest_node.columns or {}
10641091
for key, manifest_column in manifest_columns.items():
10651092
try:
10661093
logger.debug(f"Processing DBT column: {key}")

ingestion/src/metadata/ingestion/source/database/dbt/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,10 @@ class DbtMetaOpenmetadata(BaseModel):
4747

4848
class DbtMeta(BaseModel):
4949
openmetadata: Optional[DbtMetaOpenmetadata] = None
50+
51+
52+
class SnapshotNodeLocation(BaseModel):
53+
"""Resolved schema and database for a dbt snapshot node after applying config overrides."""
54+
55+
schema_: str
56+
database: Optional[str] = None
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
{
2+
"metadata": {
3+
"dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v12.json",
4+
"dbt_version": "1.8.0",
5+
"generated_at": "2024-01-01T00:00:00.000000Z",
6+
"invocation_id": "test-snapshot-invocation",
7+
"env": {},
8+
"project_name": "jaffle_shop",
9+
"project_id": "test-project"
10+
},
11+
"nodes": {
12+
"snapshot.jaffle_shop.snap_orders": {
13+
"resource_type": "snapshot",
14+
"database": "dev",
15+
"schema": "jaffle_shop",
16+
"name": "snap_orders",
17+
"alias": "snap_orders",
18+
"package_name": "jaffle_shop",
19+
"path": "snap_orders.sql",
20+
"original_file_path": "snapshots/snap_orders.sql",
21+
"unique_id": "snapshot.jaffle_shop.snap_orders",
22+
"fqn": ["jaffle_shop", "snap_orders"],
23+
"checksum": {"name": "sha256", "checksum": "abc123"},
24+
"config": {
25+
"enabled": true,
26+
"alias": null,
27+
"schema": null,
28+
"database": null,
29+
"tags": [],
30+
"meta": {},
31+
"group": null,
32+
"materialized": "snapshot",
33+
"incremental_strategy": null,
34+
"batch_size": null,
35+
"lookback": 1,
36+
"begin": null,
37+
"persist_docs": {},
38+
"post-hook": [],
39+
"pre-hook": [],
40+
"quoting": {},
41+
"column_types": {},
42+
"full_refresh": null,
43+
"unique_key": "order_id",
44+
"on_schema_change": "ignore",
45+
"on_configuration_change": "apply",
46+
"grants": {},
47+
"packages": [],
48+
"docs": {"show": true, "node_color": null},
49+
"contract": {"enforced": false, "alias_types": true},
50+
"event_time": null,
51+
"concurrent_batches": null,
52+
"strategy": "timestamp",
53+
"target_schema": "snapshots",
54+
"target_database": null,
55+
"updated_at": "updated_at",
56+
"check_cols": null,
57+
"snapshot_meta_column_names": null,
58+
"dbt_valid_to_current": null
59+
},
60+
"tags": [],
61+
"description": "Snapshot of orders table for SCD Type 2",
62+
"columns": null,
63+
"meta": {},
64+
"group": null,
65+
"docs": {"show": true, "node_color": null},
66+
"patch_path": null,
67+
"build_path": null,
68+
"created_at": 1704067200.0,
69+
"relation_name": "\"dev\".\"snapshots\".\"snap_orders\"",
70+
"raw_code": "SELECT * FROM {{ ref('stg_orders') }}",
71+
"compiled_code": "SELECT * FROM dev.jaffle_shop.stg_orders",
72+
"language": "sql",
73+
"depends_on": {
74+
"macros": [],
75+
"nodes": ["model.jaffle_shop.stg_orders"]
76+
},
77+
"refs": [{"name": "stg_orders", "package": null, "version": null}],
78+
"sources": []
79+
}
80+
},
81+
"sources": {},
82+
"exposures": {},
83+
"metrics": {},
84+
"groups": {},
85+
"selectors": {},
86+
"disabled": {},
87+
"docs": {},
88+
"parent_map": {},
89+
"child_map": {},
90+
"group_map": {},
91+
"saved_queries": {},
92+
"semantic_models": {},
93+
"unit_tests": {},
94+
"macros": {}
95+
}

0 commit comments

Comments
 (0)