-
Notifications
You must be signed in to change notification settings - Fork 138
Expand file tree
/
Copy pathtest.sql
More file actions
150 lines (135 loc) · 7.08 KB
/
test.sql
File metadata and controls
150 lines (135 loc) · 7.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
{% materialization test, default %}
{% set result = elementary.materialize_test(dbt.materialization_test_default) %}
{% do return(result) %}
{% endmaterialization %}
{% materialization test, adapter="snowflake" %}
{%- if dbt.materialization_test_snowflake -%}
{% set materialization_macro = dbt.materialization_test_snowflake %}
{%- else -%}
{% set materialization_macro = dbt.materialization_test_default %}
{%- endif -%}
{% set result = elementary.materialize_test(materialization_macro) %}
{% do return(result) %}
{% endmaterialization %}
{% macro materialize_test(materialization_macro) %}
{% if not elementary.is_elementary_enabled() %}
{% do return(materialization_macro()) %}
{% endif %}
{% set test_unique_id = model.get('unique_id') %}
{% do elementary.debug_log(test_unique_id ~ ": starting test materialization hook") %}
{% if elementary.get_config_var("tests_use_temp_tables") %}
{% set temp_table_sql = elementary.create_test_result_temp_table() %}
{% do context.update({"sql": temp_table_sql}) %}
{% do elementary.debug_log(test_unique_id ~ ": created test temp table") %}
{% endif %}
{% set test_namespace = model.get('test_metadata', {}).get('namespace') %}
{% if test_namespace == 'elementary' %}
{# Custom test materialization is needed only for non-elementary tests #}
{% do return(materialization_macro()) %}
{% endif %}
{% set flattened_test = elementary.flatten_test(model) %}
{% do elementary.debug_log(test_unique_id ~ ": flattened test node") %}
{% set result = elementary.handle_dbt_test(flattened_test, materialization_macro) %}
{% do elementary.debug_log(test_unique_id ~ ": handler called by test type - " ~ elementary.get_test_type(flattened_test)) %}
{% if elementary.get_config_var("calculate_failed_count") %}
{% set failed_row_count = elementary.get_failed_row_count(flattened_test) %}
{% if failed_row_count is not none %}
{% do elementary.get_cache("elementary_test_failed_row_counts").update({model.unique_id: failed_row_count}) %}
{% do elementary.debug_log(test_unique_id ~ ": calculated failed row count") %}
{% endif %}
{% endif %}
{% do elementary.debug_log(test_unique_id ~ ": finished test materialization hook") %}
{% do return(result) %}
{% endmacro %}
{% macro handle_dbt_test(flattened_test, materialization_macro) %}
{% set result = materialization_macro() %}
{% set result_rows = elementary.query_test_result_rows(sample_limit=elementary.get_config_var('test_sample_row_count'),
ignore_passed_tests=true,
flattened_test=flattened_test) %}
{% set elementary_test_results_row = elementary.get_dbt_test_result_row(flattened_test, result_rows) %}
{% do elementary.cache_elementary_test_results_rows([elementary_test_results_row]) %}
{% do return(result) %}
{% endmacro %}
{% macro get_dbt_test_result_row(flattened_test, result_rows=none) %}
{% if not result_rows %}
{% set result_rows = [] %}
{% endif %}
{% set test_execution_id = elementary.get_node_execution_id(flattened_test) %}
{% set parent_model_unique_id = elementary.insensitive_get_dict_value(flattened_test, 'parent_model_unique_id') %}
{% set parent_model = elementary.get_node(parent_model_unique_id) %}
{% set parent_model_name = elementary.get_table_name_from_node(parent_model) %}
{% set test_result_dict = {
'id': test_execution_id,
'data_issue_id': none,
'test_execution_id': test_execution_id,
'test_unique_id': elementary.insensitive_get_dict_value(flattened_test, 'unique_id'),
'model_unique_id': parent_model_unique_id,
'detected_at': elementary.insensitive_get_dict_value(flattened_test, 'generated_at'),
'database_name': elementary.insensitive_get_dict_value(flattened_test, 'database_name'),
'schema_name': elementary.insensitive_get_dict_value(flattened_test, 'schema_name'),
'table_name': parent_model_name,
'column_name': elementary.insensitive_get_dict_value(flattened_test, 'test_column_name'),
'test_type': elementary.get_test_type(flattened_test),
'test_sub_type': elementary.insensitive_get_dict_value(flattened_test, 'type'),
'other': none,
'owners': elementary.insensitive_get_dict_value(flattened_test, 'model_owners'),
'tags': elementary.insensitive_get_dict_value(flattened_test, 'model_tags') + elementary.insensitive_get_dict_value(flattened_test, 'tags'),
'test_results_query': elementary.get_compiled_code(flattened_test),
'test_name': elementary.insensitive_get_dict_value(flattened_test, 'name'),
'test_params': elementary.insensitive_get_dict_value(flattened_test, 'test_params'),
'severity': elementary.insensitive_get_dict_value(flattened_test, 'severity'),
'test_short_name': elementary.insensitive_get_dict_value(flattened_test, 'short_name'),
'test_alias': elementary.insensitive_get_dict_value(flattened_test, 'alias'),
'result_rows': result_rows
}%}
{% do return(test_result_dict) %}
{% endmacro %}
{% macro create_test_result_temp_table() %}
{% set database, schema = elementary.get_package_database_and_schema() %}
{% set test_id = model["alias"] %}
{% set relation = elementary.create_temp_table(database, schema, test_id, sql) %}
{% set new_sql %}
select * from {{ relation }}
{% endset %}
{% do return(new_sql) %}
{% endmacro %}
{% macro query_test_result_rows(sample_limit=none, ignore_passed_tests=false, flattened_test=none) %}
{% if sample_limit == 0 %} {# performance: no need to run a sql query that we know returns an empty list #}
{% do return([]) %}
{% endif %}
{% if ignore_passed_tests and elementary.did_test_pass() %}
{% do elementary.debug_log("Skipping sample query because the test passed.") %}
{% do return([]) %}
{% endif %}
{% set pii_columns = [] %}
{% if flattened_test %}
{% set pii_columns = elementary.get_pii_columns_from_parent_model(flattened_test) %}
{% endif %}
{% set select_clause = "*" %}
{% if pii_columns %}
{% set query_to_get_columns %}
with test_results as (
{{ sql }}
)
select * from test_results limit 0
{% endset %}
{% set columns_result = elementary.run_query(query_to_get_columns) %}
{% set all_columns = columns_result.column_names %}
{% set safe_columns = all_columns | reject("in", pii_columns) | list %}
{% if safe_columns %}
{% set select_clause = safe_columns | join(", ") %}
{% else %}
{% set select_clause = "1 as _no_non_pii_columns" %}
{% endif %}
{% endif %}
{% set query %}
with test_results as (
{{ sql }}
)
select {{ select_clause }} from test_results {% if sample_limit is not none %} limit {{ sample_limit }} {% endif %}
{% endset %}
{% do return(elementary.agate_to_dicts(elementary.run_query(query))) %}
{% endmacro %}
{% macro cache_elementary_test_results_rows(elementary_test_results_rows) %}
{% do elementary.get_cache("elementary_test_results").update({model.unique_id: elementary_test_results_rows}) %}
{% endmacro %}