-
Notifications
You must be signed in to change notification settings - Fork 130
Expand file tree
/
Copy pathupload_source_freshness.sql
More file actions
97 lines (95 loc) · 3.68 KB
/
upload_source_freshness.sql
File metadata and controls
97 lines (95 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
{% macro upload_source_freshness() %}
{% set source_freshness_results_relation = elementary.get_elementary_relation(
"dbt_source_freshness_results"
) %}
{% set source_freshness_results_dicts = [] %}
{% for result in results %}
{% do source_freshness_results_dicts.append(
elementary.process_freshness_result(result)
) %}
{% endfor %}
{% do elementary.upload_artifacts_to_table(
source_freshness_results_relation,
source_freshness_results_dicts,
elementary.flatten_source_freshness,
append=True,
should_commit=True,
) %}
{% endmacro %}
{% macro process_freshness_result(result) %}
{% set result_dict = result.to_dict() %}
{% if result_dict.status == "runtime error" %}
{% do return(
{
"unique_id": result_dict.node.unique_id,
"status": result_dict.status,
"error": result_dict.message,
}
) %}
{% endif %}
{% do return(
{
"unique_id": result_dict.node.unique_id,
"status": result_dict.status,
"max_loaded_at": result_dict.max_loaded_at,
"snapshotted_at": result_dict.snapshotted_at,
"max_loaded_at_time_ago_in_s": result_dict.age,
"criteria": result_dict.node.get("freshness", {}),
"adapter_response": result_dict.adapter_response,
"timing": result_dict.timing,
"thread_id": result_dict.thread_id,
"execution_time": result_dict.execution_time,
}
) %}
{% endmacro %}
{% macro flatten_source_freshness(node_dict) %}
{% set compile_timing = {} %}
{% set execute_timing = {} %}
{% for timing in node_dict["timing"] %}
{% if timing["name"] == "compile" %} {% do compile_timing.update(timing) %}
{% elif timing["name"] == "execute" %} {% do execute_timing.update(timing) %}
{% endif %}
{% endfor %}
{% set metadata_dict = elementary.safe_get_with_default(
node_dict, "metadata", {}
) %}
{% set criteria_dict = elementary.safe_get_with_default(
node_dict, "criteria", {}
) %}
{% set source_freshness_invocation_id = metadata_dict.get(
"invocation_id", invocation_id
) %}
{% set flatten_source_freshness_dict = {
"source_freshness_execution_id": [
source_freshness_invocation_id,
node_dict.get("unique_id"),
]
| join("."),
"unique_id": node_dict.get("unique_id"),
"max_loaded_at": node_dict.get("max_loaded_at"),
"snapshotted_at": node_dict.get("snapshotted_at"),
"max_loaded_at_time_ago_in_s": node_dict.get(
"max_loaded_at_time_ago_in_s"
),
"status": node_dict.get("status"),
"error": node_dict.get("error"),
"warn_after": criteria_dict.get("warn_after"),
"error_after": criteria_dict.get("error_after"),
"filter": criteria_dict.get("filter"),
"generated_at": elementary.datetime_now_utc_as_string(),
"invocation_id": source_freshness_invocation_id,
"compile_started_at": elementary.normalize_artifact_timestamp_precision(
compile_timing.get("started_at")
),
"compile_completed_at": elementary.normalize_artifact_timestamp_precision(
compile_timing.get("completed_at")
),
"execute_started_at": elementary.normalize_artifact_timestamp_precision(
execute_timing.get("started_at")
),
"execute_completed_at": elementary.normalize_artifact_timestamp_precision(
execute_timing.get("completed_at")
),
} %}
{{ return(flatten_source_freshness_dict) }}
{% endmacro %}