Skip to content

Commit be7de95

Browse files
Add integration tests for data_freshness_sla and volume_threshold (#965)
* Add integration tests for data_freshness_sla and volume_threshold Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix sla_time YAML sexagesimal issue - use AM/PM format Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Itamar Hartstein <haritamar@gmail.com>
1 parent 9c888fa commit be7de95

File tree

2 files changed

+341
-0
lines changed

2 files changed

+341
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from datetime import datetime, timedelta
2+
3+
from data_generator import DATE_FORMAT
4+
from dbt_project import DbtProject
5+
6+
TEST_NAME = "elementary.data_freshness_sla"
7+
TIMESTAMP_COLUMN = "updated_at"
8+
9+
10+
def test_fresh_data_passes(test_id: str, dbt_project: DbtProject):
11+
"""Data updated today should pass when the SLA deadline has already passed."""
12+
utc_now = datetime.utcnow()
13+
data = [
14+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)},
15+
{TIMESTAMP_COLUMN: (utc_now - timedelta(hours=1)).strftime(DATE_FORMAT)},
16+
]
17+
test_args = {
18+
"timestamp_column": TIMESTAMP_COLUMN,
19+
"sla_time": "11:59pm",
20+
"timezone": "UTC",
21+
}
22+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
23+
assert test_result["status"] == "pass"
24+
25+
26+
def test_stale_data_fails(test_id: str, dbt_project: DbtProject):
27+
"""Data only from previous days should fail when today's SLA deadline has passed."""
28+
utc_now = datetime.utcnow()
29+
yesterday = utc_now - timedelta(days=2)
30+
data = [
31+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)},
32+
{TIMESTAMP_COLUMN: (yesterday - timedelta(hours=5)).strftime(DATE_FORMAT)},
33+
]
34+
# Use a deadline early in the day so it has certainly passed
35+
test_args = {
36+
"timestamp_column": TIMESTAMP_COLUMN,
37+
"sla_time": "12:01am",
38+
"timezone": "UTC",
39+
}
40+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
41+
assert test_result["status"] == "fail"
42+
43+
44+
def test_no_data_fails(test_id: str, dbt_project: DbtProject):
45+
"""An empty table (after WHERE filter) should fail when deadline has passed."""
46+
utc_now = datetime.utcnow()
47+
# Seed with data that will be excluded by the where_expression
48+
data = [
49+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "excluded"},
50+
]
51+
test_args = {
52+
"timestamp_column": TIMESTAMP_COLUMN,
53+
"sla_time": "12:01am",
54+
"timezone": "UTC",
55+
"where_expression": "category = 'included'",
56+
}
57+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
58+
assert test_result["status"] == "fail"
59+
60+
61+
def test_deadline_not_passed_does_not_fail(test_id: str, dbt_project: DbtProject):
62+
"""Even if data is stale, the test should pass if the deadline hasn't passed yet."""
63+
utc_now = datetime.utcnow()
64+
yesterday = utc_now - timedelta(days=2)
65+
data = [
66+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)},
67+
]
68+
# Set the deadline far in the future so it hasn't passed yet.
69+
# Etc/GMT-14 is UTC+14, the farthest-ahead timezone, so 11:59pm there
70+
# is well into the future from UTC's perspective.
71+
test_args = {
72+
"timestamp_column": TIMESTAMP_COLUMN,
73+
"sla_time": "11:59pm",
74+
"timezone": "Etc/GMT-14",
75+
}
76+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
77+
assert test_result["status"] == "pass"
78+
79+
80+
def test_with_where_expression(test_id: str, dbt_project: DbtProject):
81+
"""The where_expression should filter which rows count toward freshness."""
82+
utc_now = datetime.utcnow()
83+
yesterday = utc_now - timedelta(days=2)
84+
data = [
85+
# Fresh data for category A
86+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "a"},
87+
# Stale data for category B
88+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"},
89+
]
90+
# Test with category A (fresh data) -> should pass
91+
test_args = {
92+
"timestamp_column": TIMESTAMP_COLUMN,
93+
"sla_time": "11:59pm",
94+
"timezone": "UTC",
95+
"where_expression": "category = 'a'",
96+
}
97+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
98+
assert test_result["status"] == "pass"
99+
100+
# Test with category B (stale data) and early deadline -> should fail
101+
test_args_stale = {
102+
"timestamp_column": TIMESTAMP_COLUMN,
103+
"sla_time": "12:01am",
104+
"timezone": "UTC",
105+
"where_expression": "category = 'b'",
106+
}
107+
test_result = dbt_project.test(test_id, TEST_NAME, test_args_stale)
108+
assert test_result["status"] == "fail"
109+
110+
111+
def test_with_timezone(test_id: str, dbt_project: DbtProject):
112+
"""Test that timezone conversion works correctly."""
113+
utc_now = datetime.utcnow()
114+
data = [
115+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)},
116+
]
117+
test_args = {
118+
"timestamp_column": TIMESTAMP_COLUMN,
119+
"sla_time": "11:59pm",
120+
"timezone": "America/New_York",
121+
}
122+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
123+
assert test_result["status"] == "pass"
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
from datetime import datetime, timedelta
2+
3+
from data_generator import DATE_FORMAT, generate_dates
4+
from dbt_project import DbtProject
5+
6+
TIMESTAMP_COLUMN = "updated_at"
7+
DBT_TEST_NAME = "elementary.volume_threshold"
8+
DBT_TEST_ARGS = {
9+
"timestamp_column": TIMESTAMP_COLUMN,
10+
"time_bucket": {"period": "day", "count": 1},
11+
"days_back": 14,
12+
"backfill_days": 14,
13+
}
14+
15+
16+
def _generate_stable_data(rows_per_day=100, days_back=14):
17+
"""Generate data with a consistent number of rows per day bucket."""
18+
utc_today = datetime.utcnow().date()
19+
data = []
20+
for cur_date in generate_dates(base_date=utc_today, days_back=days_back):
21+
for _ in range(rows_per_day):
22+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
23+
return data
24+
25+
26+
def test_stable_volume_passes(test_id: str, dbt_project: DbtProject):
27+
"""Consistent row counts across buckets should pass."""
28+
data = _generate_stable_data(rows_per_day=100)
29+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
30+
assert test_result["status"] == "pass"
31+
32+
33+
def test_large_spike_fails(test_id: str, dbt_project: DbtProject):
34+
"""A large spike in row count (>10% default error threshold) should fail."""
35+
utc_today = datetime.utcnow().date()
36+
yesterday = utc_today - timedelta(days=1)
37+
data = []
38+
# Previous days: 100 rows each
39+
for cur_date in generate_dates(base_date=utc_today, days_back=14):
40+
if cur_date < yesterday:
41+
for _ in range(100):
42+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
43+
# Yesterday (current bucket): 100 rows
44+
for _ in range(100):
45+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
46+
# Today (current bucket): 150 rows (50% spike)
47+
for _ in range(150):
48+
data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)})
49+
50+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
51+
assert test_result["status"] != "pass"
52+
53+
54+
def test_large_drop_fails(test_id: str, dbt_project: DbtProject):
55+
"""A large drop in row count (>10% default error threshold) should fail."""
56+
utc_today = datetime.utcnow().date()
57+
yesterday = utc_today - timedelta(days=1)
58+
data = []
59+
# Previous days: 100 rows each
60+
for cur_date in generate_dates(base_date=utc_today, days_back=14):
61+
if cur_date < yesterday:
62+
for _ in range(100):
63+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
64+
# Yesterday (previous bucket): 100 rows
65+
for _ in range(100):
66+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
67+
# Today (current bucket): 50 rows (50% drop)
68+
for _ in range(50):
69+
data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)})
70+
71+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
72+
assert test_result["status"] != "pass"
73+
74+
75+
def test_direction_spike_ignores_drop(test_id: str, dbt_project: DbtProject):
76+
"""With direction=spike, a drop should not trigger a failure."""
77+
utc_today = datetime.utcnow().date()
78+
yesterday = utc_today - timedelta(days=1)
79+
data = []
80+
# Previous days: 100 rows each
81+
for cur_date in generate_dates(base_date=utc_today, days_back=14):
82+
if cur_date < yesterday:
83+
for _ in range(100):
84+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
85+
# Yesterday: 100 rows
86+
for _ in range(100):
87+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
88+
# Today: 50 rows (50% drop)
89+
for _ in range(50):
90+
data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)})
91+
92+
test_args = {**DBT_TEST_ARGS, "direction": "spike"}
93+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
94+
assert test_result["status"] == "pass"
95+
96+
97+
def test_direction_drop_ignores_spike(test_id: str, dbt_project: DbtProject):
98+
"""With direction=drop, a spike should not trigger a failure."""
99+
utc_today = datetime.utcnow().date()
100+
yesterday = utc_today - timedelta(days=1)
101+
data = []
102+
# Previous days: 100 rows each
103+
for cur_date in generate_dates(base_date=utc_today, days_back=14):
104+
if cur_date < yesterday:
105+
for _ in range(100):
106+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
107+
# Yesterday: 100 rows
108+
for _ in range(100):
109+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
110+
# Today: 150 rows (50% spike)
111+
for _ in range(150):
112+
data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)})
113+
114+
test_args = {**DBT_TEST_ARGS, "direction": "drop"}
115+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
116+
assert test_result["status"] == "pass"
117+
118+
119+
def test_min_row_count_skips_small_baseline(test_id: str, dbt_project: DbtProject):
120+
"""When previous bucket has fewer rows than min_row_count, check is skipped (pass)."""
121+
utc_today = datetime.utcnow().date()
122+
yesterday = utc_today - timedelta(days=1)
123+
data = []
124+
# Previous days: only 5 rows each (below default min_row_count=100)
125+
for cur_date in generate_dates(base_date=utc_today, days_back=14):
126+
if cur_date < yesterday:
127+
for _ in range(5):
128+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
129+
# Yesterday: 5 rows
130+
for _ in range(5):
131+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
132+
# Today: 50 rows (huge spike but baseline is too small)
133+
for _ in range(50):
134+
data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)})
135+
136+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
137+
assert test_result["status"] == "pass"
138+
139+
140+
def test_custom_thresholds(test_id: str, dbt_project: DbtProject):
141+
"""Custom thresholds should control the sensitivity of the test."""
142+
utc_today = datetime.utcnow().date()
143+
yesterday = utc_today - timedelta(days=1)
144+
data = []
145+
# Previous days: 100 rows each
146+
for cur_date in generate_dates(base_date=utc_today, days_back=14):
147+
if cur_date < yesterday:
148+
for _ in range(100):
149+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
150+
# Yesterday: 100 rows
151+
for _ in range(100):
152+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
153+
# Today: 108 rows (8% change)
154+
for _ in range(108):
155+
data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)})
156+
157+
# With default thresholds (warn=5, error=10), 8% should warn but not error
158+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
159+
assert test_result["status"] == "warn"
160+
161+
# With high thresholds (warn=20, error=50), 8% should pass
162+
test_args_high = {
163+
**DBT_TEST_ARGS,
164+
"warn_threshold_percent": 20,
165+
"error_threshold_percent": 50,
166+
}
167+
test_result = dbt_project.test(
168+
test_id,
169+
DBT_TEST_NAME,
170+
test_args_high,
171+
test_vars={"force_metrics_backfill": True},
172+
)
173+
assert test_result["status"] == "pass"
174+
175+
176+
def test_where_expression(test_id: str, dbt_project: DbtProject):
177+
"""The where_expression should filter which rows are counted."""
178+
utc_today = datetime.utcnow().date()
179+
yesterday = utc_today - timedelta(days=1)
180+
data = []
181+
# Previous days: 100 rows of category A each
182+
for cur_date in generate_dates(base_date=utc_today, days_back=14):
183+
if cur_date < yesterday:
184+
for _ in range(100):
185+
data.append(
186+
{TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "category": "a"}
187+
)
188+
# Yesterday: 100 rows of category A
189+
for _ in range(100):
190+
data.append(
191+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "a"}
192+
)
193+
# Today: 100 rows of category A (stable) + 200 rows of category B (noise)
194+
for _ in range(100):
195+
data.append(
196+
{TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT), "category": "a"}
197+
)
198+
for _ in range(200):
199+
data.append(
200+
{TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT), "category": "b"}
201+
)
202+
203+
# Without filter: total today = 300 vs 100 yesterday -> big spike -> fail
204+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
205+
assert test_result["status"] != "pass"
206+
207+
# With filter on category A: 100 today vs 100 yesterday -> stable -> pass
208+
test_args_filtered = {
209+
**DBT_TEST_ARGS,
210+
"where_expression": "category = 'a'",
211+
}
212+
test_result = dbt_project.test(
213+
test_id,
214+
DBT_TEST_NAME,
215+
test_args_filtered,
216+
test_vars={"force_metrics_backfill": True},
217+
)
218+
assert test_result["status"] == "pass"

0 commit comments

Comments
 (0)