-
Notifications
You must be signed in to change notification settings - Fork 134
Expand file tree
/
Copy pathupload_dbt_models.sql
More file actions
128 lines (123 loc) · 5.09 KB
/
upload_dbt_models.sql
File metadata and controls
128 lines (123 loc) · 5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
{%- macro upload_dbt_models(should_commit=false, metadata_hashes=none) -%}
{% set relation = elementary.get_elementary_relation("dbt_models") %}
{% if execute and relation %}
{% set models = elementary.filter_to_current_project_if_needed(
graph.nodes.values() | selectattr("resource_type", "==", "model")
) %}
{% do elementary.upload_artifacts_to_table(
relation,
models,
elementary.flatten_model,
should_commit=should_commit,
metadata_hashes=metadata_hashes,
) %}
{%- endif -%}
{{- return("") -}}
{%- endmacro -%}
{% macro get_dbt_models_empty_table_query() %}
{% set columns = [
("unique_id", "string"),
("alias", "string"),
("checksum", "string"),
("materialization", "string"),
("tags", "long_string"),
("meta", "long_string"),
("owner", "string"),
("database_name", "string"),
("schema_name", "string"),
("depends_on_macros", "long_string"),
("depends_on_nodes", "long_string"),
("description", "long_string"),
("name", "string"),
("package_name", "string"),
("original_path", "long_string"),
("path", "string"),
("patch_path", "string"),
("generated_at", "string"),
("metadata_hash", "string"),
("unique_key", "string"),
("incremental_strategy", "string"),
("group_name", "string"),
("access", "string"),
("deprecation_date", "string"),
] %}
{% if target.type == "bigquery" or elementary.get_config_var(
"include_other_warehouse_specific_columns"
) %}
{% do columns.extend(
[
("bigquery_partition_by", "string"),
("bigquery_cluster_by", "string"),
]
) %}
{% endif %}
{% set dbt_models_empty_table_query = elementary.empty_table(columns) %}
{{ return(dbt_models_empty_table_query) }}
{% endmacro %}
{% macro flatten_model(node_dict) %}
{% set checksum_dict = elementary.safe_get_with_default(
node_dict, "checksum", {}
) %}
{% set config_dict = elementary.safe_get_with_default(node_dict, "config", {}) %}
{% set depends_on_dict = elementary.safe_get_with_default(
node_dict, "depends_on", {}
) %}
{% set config_meta_dict = elementary.safe_get_with_default(
config_dict, "meta", {}
) %}
{% set meta_dict = elementary.safe_get_with_default(node_dict, "meta", {}) %}
{% do meta_dict.update(config_meta_dict) %}
{% set formatted_owner = [] %}
{% set raw_owner = meta_dict.get("owner") or config_dict.get("owner") %}
{% if raw_owner is string %}
{% set owners = raw_owner.split(",") %}
{% for owner in owners %}
{% do formatted_owner.append(owner | trim) %}
{% endfor %}
{% elif raw_owner is iterable %} {% do formatted_owner.extend(raw_owner) %}
{% endif %}
{% set config_tags = elementary.safe_get_with_default(config_dict, "tags", []) %}
{% set global_tags = elementary.safe_get_with_default(node_dict, "tags", []) %}
{% set meta_tags = elementary.safe_get_with_default(meta_dict, "tags", []) %}
{% set tags = elementary.union_lists(config_tags, global_tags) %}
{% set tags = elementary.union_lists(tags, meta_tags) %}
{% set flatten_model_metadata_dict = {
"unique_id": node_dict.get("unique_id"),
"alias": node_dict.get("alias"),
"checksum": checksum_dict.get("checksum"),
"materialization": config_dict.get("materialized"),
"tags": elementary.filter_none_and_sort(tags),
"meta": meta_dict,
"owner": elementary.filter_none_and_sort(formatted_owner),
"database_name": node_dict.get("database"),
"schema_name": node_dict.get("schema"),
"depends_on_macros": elementary.filter_none_and_sort(
depends_on_dict.get("macros", [])
),
"depends_on_nodes": elementary.filter_none_and_sort(
depends_on_dict.get("nodes", [])
),
"description": node_dict.get("description"),
"name": node_dict.get("name"),
"package_name": node_dict.get("package_name"),
"original_path": node_dict.get("original_file_path"),
"path": node_dict.get("path"),
"patch_path": node_dict.get("patch_path"),
"generated_at": elementary.datetime_now_utc_as_string(),
"unique_key": config_dict.get("unique_key"),
"incremental_strategy": config_dict.get("incremental_strategy"),
"bigquery_partition_by": config_dict.get("partition_by"),
"bigquery_cluster_by": config_dict.get("cluster_by"),
"group_name": config_dict.get("group") or node_dict.get("group"),
"access": config_dict.get("access") or node_dict.get("access"),
"deprecation_date": node_dict.get("deprecation_date"),
} %}
{% do flatten_model_metadata_dict.update(
{
"metadata_hash": elementary.get_artifact_metadata_hash(
flatten_model_metadata_dict
)
}
) %}
{{ return(flatten_model_metadata_dict) }}
{% endmacro %}