-
Notifications
You must be signed in to change notification settings - Fork 134
Expand file tree
/
Copy pathupload_run_results.sql
More file actions
162 lines (153 loc) · 6.54 KB
/
upload_run_results.sql
File metadata and controls
162 lines (153 loc) · 6.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
{% macro upload_run_results() %}
{% set relation = elementary.get_elementary_relation("dbt_run_results") %}
{% if execute and relation %}
{{ elementary.file_log("Uploading run results.") }}
{% do elementary.upload_artifacts_to_table(
relation,
results,
elementary.flatten_run_result,
append=True,
should_commit=True,
on_query_exceed=elementary.on_run_result_query_exceed,
) %}
{{ elementary.file_log("Uploaded run results successfully.") }}
{% endif %}
{{ return("") }}
{% endmacro %}
{% macro normalize_artifact_timestamp_precision(timestamp_value) %}
{% if target.type != "bigquery" %}
{{ return(timestamp_value) }}
{% endif %}
{% if timestamp_value is string and timestamp_value.endswith("Z") and "." in timestamp_value %}
{% set ts_no_z = timestamp_value[:-1] %}
{% set ts_parts = ts_no_z.split(".") %}
{% if ts_parts | length == 2 %}
{% set fractional_part = ts_parts[1] %}
{% if fractional_part | length > 6 %}
{{ return(ts_parts[0] ~ "." ~ fractional_part[:6] ~ "Z") }}
{% endif %}
{% endif %}
{% endif %}
{{ return(timestamp_value) }}
{% endmacro %}
{% macro get_dbt_run_results_empty_table_query() %}
{% set dbt_run_results_empty_table_query = elementary.empty_table(
[
("model_execution_id", "long_string"),
("unique_id", "long_string"),
("invocation_id", "string"),
("generated_at", "string"),
("created_at", "timestamp"),
("name", "long_string"),
("message", "long_string"),
("status", "string"),
("resource_type", "string"),
("execution_time", "float"),
("execute_started_at", "string"),
("execute_completed_at", "string"),
("compile_started_at", "string"),
("compile_completed_at", "string"),
("rows_affected", "bigint"),
("full_refresh", "boolean"),
("compiled_code", "long_string"),
("failures", "bigint"),
("query_id", "string"),
("thread_id", "string"),
("materialization", "string"),
("adapter_response", "string"),
("group_name", "string"),
]
) %}
{{ return(dbt_run_results_empty_table_query) }}
{% endmacro %}
{% macro normalize_rows_affected(rows_affected) %}
{% if rows_affected is none %} {{ return(none) }}
{% elif rows_affected is string %}
{% if rows_affected == "-1" %} {{ return(none) }}
{% else %} {{ return(rows_affected | int) }}
{% endif %}
{% else %} {{ return(rows_affected) }}
{% endif %}
{% endmacro %}
{% macro flatten_run_result(run_result) %}
{% set run_result_dict = elementary.get_run_result_dict(run_result) %}
{% set node = elementary.safe_get_with_default(run_result_dict, "node", {}) %}
{% set config_dict = elementary.safe_get_with_default(node, "config", {}) %}
{% set raw_rows_affected = run_result_dict.get("adapter_response", {}).get(
"rows_affected"
) %}
{% set flatten_run_result_dict = {
"model_execution_id": elementary.get_node_execution_id(node),
"invocation_id": invocation_id,
"unique_id": node.get("unique_id"),
"name": node.get("name"),
"message": run_result_dict.get("message"),
"generated_at": elementary.datetime_now_utc_as_string(),
"rows_affected": elementary.normalize_rows_affected(
raw_rows_affected
),
"execution_time": run_result_dict.get("execution_time"),
"status": run_result_dict.get("status"),
"resource_type": node.get("resource_type"),
"execute_started_at": none,
"execute_completed_at": none,
"compile_started_at": none,
"compile_completed_at": none,
"full_refresh": flags.FULL_REFRESH,
"compiled_code": elementary.get_compiled_code(
node, as_column_value=true
),
"failures": run_result_dict.get("failures"),
"query_id": run_result_dict.get("adapter_response", {}).get(
"query_id"
),
"thread_id": run_result_dict.get("thread_id"),
"materialization": config_dict.get("materialized"),
"adapter_response": run_result_dict.get("adapter_response", {}),
"group_name": config_dict.get("group"),
} %}
{% set timings = elementary.safe_get_with_default(run_result_dict, "timing", []) %}
{% if timings %}
{% for timing in timings %}
{% if timing is mapping %}
{% if timing.get("name") == "execute" %}
{% do flatten_run_result_dict.update(
{
"execute_started_at": elementary.normalize_artifact_timestamp_precision(
timing.get("started_at")
),
"execute_completed_at": elementary.normalize_artifact_timestamp_precision(
timing.get("completed_at")
),
}
) %}
{% elif timing.get("name") == "compile" %}
{% do flatten_run_result_dict.update(
{
"compile_started_at": elementary.normalize_artifact_timestamp_precision(
timing.get("started_at")
),
"compile_completed_at": elementary.normalize_artifact_timestamp_precision(
timing.get("completed_at")
),
}
) %}
{% endif %}
{% endif %}
{% endfor %}
{% endif %}
{{ return(flatten_run_result_dict) }}
{% endmacro %}
{% macro on_run_result_query_exceed(flattened_node) %}
{% do flattened_node.update(
{"compiled_code": elementary.get_compiled_code_too_long_err_msg()}
) %}
{#- On adapters with limited string-literal / varchar sizes (e.g. Vertica
65 000 bytes) the error *message* can also embed the full compiled SQL,
making the INSERT statement exceed the adapter's limits. Truncate the
message so the row can still be persisted. -#}
{% set msg = flattened_node.get("message", "") %}
{% if msg is string and msg | length > 4096 %}
{% do flattened_node.update({"message": msg[:4096] ~ "... (truncated)"}) %}
{% endif %}
{% endmacro %}