-
Notifications
You must be signed in to change notification settings - Fork 138
Expand file tree
/
Copy pathtest_dimension_anomalies.py
More file actions
207 lines (180 loc) · 7.45 KB
/
test_dimension_anomalies.py
File metadata and controls
207 lines (180 loc) · 7.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import json
from datetime import datetime, timedelta
from typing import Any, Dict, List
import pytest
from data_generator import DATE_FORMAT, generate_dates
from dbt_project import DbtProject
TIMESTAMP_COLUMN = "updated_at"
DBT_TEST_NAME = "elementary.dimension_anomalies"
DBT_TEST_ARGS = {"timestamp_column": TIMESTAMP_COLUMN, "dimensions": ["superhero"]}
# This returns data points used in the latest anomaly test
ANOMALY_TEST_POINTS_QUERY = """
with latest_elementary_test_result as (
select id
from {{{{ ref("elementary_test_results") }}}}
where lower(table_name) = lower('{test_id}')
order by created_at desc
limit 1
)
select result_row
from {{{{ ref("test_result_rows") }}}}
where elementary_test_results_id in (select * from latest_elementary_test_result)
"""
def get_latest_anomaly_test_points(dbt_project: DbtProject, test_id: str):
results = dbt_project.run_query(ANOMALY_TEST_POINTS_QUERY.format(test_id=test_id))
return [json.loads(result["result_row"]) for result in results]
# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_anomalyless_dimension_anomalies(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1))
for superhero in ["Superman", "Spiderman"]
]
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "pass"
# Dimension anomalies only stores anomalous rows (unlike other anomaly tests) - so we should get 0 rows for a passing test.
anomaly_test_points = get_latest_anomaly_test_points(dbt_project, test_id)
assert len(anomaly_test_points) == 0
# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_dimension_anomalies_with_timestamp_as_sql_expression(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1))
for superhero in ["Superman", "Spiderman"]
]
test_args = {
"timestamp_column": "case when updated_at is not null then updated_at else updated_at end",
"dimensions": ["superhero"],
}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "pass"
# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for superhero in ["Superman", "Superman", "Superman", "Spiderman"]
]
data += [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in training_dates
for superhero in ["Superman", "Spiderman"]
]
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "fail"
# Dimension anomalies only stores anomalous rows (unlike other anomaly tests) - so we should only get 1 row with the problematic value
anomaly_test_points = get_latest_anomaly_test_points(dbt_project, test_id)
assert len(anomaly_test_points) == 1
assert anomaly_test_points[0]["is_anomalous"]
assert anomaly_test_points[0]["dimension"] == "superhero"
assert anomaly_test_points[0]["dimension_value"] == "Superman"
# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_dimensions_anomalies_with_where_parameter(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT),
"universe": universe,
"superhero": superhero,
}
for universe, superhero in [
("DC", "Superman"),
("DC", "Superman"),
("DC", "Superman"),
("Marvel", "Spiderman"),
]
] + [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"universe": universe,
"superhero": superhero,
}
for cur_date in training_dates
for universe, superhero in [("DC", "Superman"), ("Marvel", "Spiderman")]
]
params = DBT_TEST_ARGS
test_result = dbt_project.test(test_id, DBT_TEST_NAME, params, data=data)
assert test_result["status"] == "fail"
params = dict(DBT_TEST_ARGS, where="universe = 'Marvel'")
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, params, test_vars={"force_metrics_backfill": True}
)
assert test_result["status"] == "pass"
params = dict(params, where="universe = 'DC'")
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, params, test_vars={"force_metrics_backfill": True}
)
assert test_result["status"] == "fail"
# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_dimension_anomalies_with_timestamp_exclude_final_results(
test_id: str, dbt_project: DbtProject
):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(3))
for superhero in ["Superman", "Spiderman"]
]
data += [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1), days_back=2)
for superhero in ["Spiderman"]
] * 30
data += [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"superhero": superhero,
}
for cur_date in generate_dates(base_date=utc_today - timedelta(1), days_back=2)
for superhero in ["Superman"]
] * 15
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
assert test_result["status"] == "fail"
assert test_result["failures"] == 2
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"dimensions": ["superhero"],
"exclude_final_results": '{{ elementary.escape_reserved_keywords("value") }} > 15',
}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "fail"
assert test_result["failures"] == 1
test_args = {
"timestamp_column": TIMESTAMP_COLUMN,
"dimensions": ["superhero"],
"exclude_final_results": '{{ elementary.escape_reserved_keywords("average") }} > 3',
}
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
assert test_result["status"] == "fail"
assert test_result["failures"] == 1