Skip to content

Commit f4e8e6e

Browse files
joostbooncursoragentharitamardevin-ai-integration[bot]claude
authored
Add data_freshness_sla and volume_threshold tests (#932)
* Add data_freshness_sla and volume_threshold tests Add two new Elementary tests: - data_freshness_sla: checks if data was updated before a specified SLA deadline - volume_threshold: monitors row count changes with configurable warn/error thresholds, using Elementary's metric caching to avoid redundant computation Fixes applied: - volume_threshold: union historical metrics with new metrics for comparison - volume_threshold: deterministic dedup with source_priority tiebreaker - volume_threshold: let get_time_bucket handle defaults - data_freshness_sla: treat future-dated data as fresh (remove upper bound) - data_freshness_sla: escape single quotes in where_expression - data_freshness_sla: simplify deadline_passed logic - data_freshness_sla: rename max_timestamp_utc to max_timestamp (no UTC conversion) - data_freshness_sla: fix macro comment to match actual behavior - data_freshness_sla: document UTC assumption, add ephemeral model check Co-authored-by: Cursor <cursoragent@cursor.com> * fix: handle missing table in read_table when raise_if_empty=False (BigQuery test_seed_run_results) Co-authored-by: Cursor <cursoragent@cursor.com> * Revert "fix: handle missing table in read_table when raise_if_empty=False (BigQuery test_seed_run_results)" This reverts commit 9fc552e. * Add integration tests for data_freshness_sla and volume_threshold (#965) * Add integration tests for data_freshness_sla and volume_threshold Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix sla_time YAML sexagesimal issue - use AM/PM format Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Itamar Hartstein <haritamar@gmail.com> * Fix sqlfmt issues and Postgres round() bug in test macros - Reformat both test_data_freshness_sla.sql and test_volume_threshold.sql to pass sqlfmt - Fix Postgres round() bug: cast expression to numeric for round(numeric, int) compatibility - Restructure Jinja conditionals in data_freshness_sla to be sqlfmt-compatible - Extract where_suffix Jinja set to avoid parentheses inside SQL CASE expressions - Use edr_boolean_literal for is_failure CASE and WHERE clause - Remove 'where severity_level > 0' filter to prevent NULL fail_calc validation error Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix volume_threshold integration tests to use complete buckets Elementary only processes complete time buckets (the latest full bucket before run_started_at). With daily buckets, today's bucket is incomplete and gets excluded. Tests were putting anomalous data in today's bucket, so the macro never saw the spike/drop. Fix: shift all test data one day back so the anomalous data lands in yesterday's bucket (the latest complete bucket) and baseline data in the day before. Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix cross-database compatibility in volume_threshold macro - Replace 'cast(... as numeric)' with edr_type_numeric() for adapter-aware type - Replace 'limit 1' with row_number() pattern (SQL Server/Fabric compatible) - Replace '||' string concat with edr_concat() (SQL Server/Fabric compatible) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix Dremio: rename 'prev' alias to 'prev_b' (reserved keyword) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix Dremio: rename 'result' CTE to 'volume_result' (reserved keyword) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix fusion pytz issue and SQL Server/Fabric concat in data_freshness_sla - Skip pytz.all_timezones validation in dbt-fusion (known discrepancy, dbt-labs/dbt-fusion#143) - Replace || concatenation with edr_concat() in test_data_freshness_sla.sql for SQL Server/Fabric Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix fusion pytz.localize() producing incorrect results in calculate_sla_deadline_utc In dbt-fusion, pytz.localize() produces incorrect timezone-aware datetimes, causing deadline_passed to be False when it should be True. Use datetime.timezone.utc and replace(tzinfo=) in the fusion path instead. Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix fusion: use naive UTC datetimes to avoid broken tz-aware operations In dbt-fusion, datetime.timezone.utc doesn't exist and timezone-aware datetime comparison produces incorrect results. Use datetime.utcnow() with manual offset calculation so all comparisons use naive datetimes. Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Fix Fabric, BigQuery, and fusion failures in volume_threshold and data_freshness_sla - Fabric: rename CTE current_bucket -> curr_bucket (SQL Server rejects CURRENT_ prefix) - BigQuery: use \' instead of '' to escape single quotes in where_suffix string literals - fusion: replace pytz.localize() with stdlib datetime.timezone.utc in calculate_sla_deadline_utc to fix deadline_passed always being False Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix sqlfmt formatting in test_data_freshness_sla.sql Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix Fabric: replace scalar subquery with JOIN in previous_bucket CTE SQL Server/Fabric cannot resolve a CTE name inside a nested subquery within another CTE. Replace (select bucket_start from curr_bucket) with an INNER JOIN to curr_bucket. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix sqlfmt formatting in test_volume_threshold.sql Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix freshness SLA deadline check and volume threshold Fabric compatibility Move SLA deadline_passed check from compile-time Python (broken in dbt-fusion due to pytz issues) to SQL-level using edr_condition_as_boolean with edr_current_timestamp_in_utc. Replace cross-CTE subquery in volume_threshold with LAG window function to fix Fabric/SQL Server "Invalid object name" error. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Revert fusion path to use naive UTC datetimes instead of datetime.timezone.utc dbt-fusion does not support datetime.timezone.utc, causing "undefined value" render errors. Revert to the pytz probe approach. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Address CodeRabbit review feedback - Fix clock-flaky SLA tests: use Etc/GMT-14 for deadline-passed cases so the deadline is always in the past regardless of CI runner time - Use exact status assertions (== "error") instead of != "pass" in volume threshold tests to catch severity regressions - Add negative threshold and min_row_count validation in volume_threshold - Document DST limitation in fusion sla_utils path Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix volume threshold assertions: dbt reports 'fail' not 'error' Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix ClickHouse and DuckDB CI failures ClickHouse: reorder columns in freshness SLA final_result CTE so is_failure is computed before sla_deadline_utc is cast to string (ClickHouse resolves column refs to already-aliased columns in the same SELECT, causing DateTime vs String type mismatch). DuckDB: add explicit casts to LAG window function results in volume_threshold to work around DuckDB internal binder bug that confuses TIMESTAMP and FLOAT types across multiple LAG calls. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix ClickHouse freshness SLA: rename string-cast alias to avoid shadowing ClickHouse resolves column references against output aliases regardless of SELECT clause order. The cast(sla_deadline_utc as string) with the same alias name caused the is_failure comparison to use the string version instead of the timestamp, producing DateTime vs String type mismatch. Renamed to sla_deadline_utc_str internally and re-aliased in the final SELECT. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix DuckDB volume_threshold: replace LAG with ROW_NUMBER + self-join DuckDB has an internal binder bug where LAG window functions over UNION ALL sources confuse TIMESTAMP and FLOAT column types, causing "Failed to bind column reference bucket_end: inequal types". Using ROW_NUMBER + self-join achieves the same result without triggering the bug, and is cross-database compatible. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix fusion: use pytz.utc instead of datetime.timezone.utc; fix brittle test timezone - sla_utils.sql: replace datetime.timezone.utc with pytz.utc in fusion path. dbt-fusion's modules.datetime does not expose datetime.timezone, causing dbt1501 "undefined value" errors in all data_freshness_sla tests on fusion. - test_data_freshness_sla.py: change test_deadline_not_passed_does_not_fail from Etc/GMT-14 (UTC+14, deadline = 09:59 UTC) to plain UTC (deadline = 23:59 UTC). Etc/GMT-14 caused the test to fail whenever CI ran after 09:59 UTC. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix Fabric, ClickHouse, and Vertica failures in volume_threshold and freshness_sla volume_threshold (Fabric + ClickHouse): - Cast row_number() to signed int in bucket_numbered CTE to fix ClickHouse NO_COMMON_TYPE error (UInt64 vs Int64) on JOIN with bucket_num - 1. - Move max(bucket_num) into a separate max_bucket CTE and use INNER JOIN instead of a scalar subquery in the WHERE clause to fix SQL Server / Fabric "Invalid object name 'bucket_numbered'" error. data_freshness_sla (Vertica): - Replace timezone: "Etc/GMT-14" with timezone: "UTC" for tests that need a deadline in the past. Etc/GMT-14 behaved incorrectly in some pytz versions, causing Vertica tests to return 'pass' instead of 'fail'. 12:01am UTC (= 00:01 UTC) is always in the past when CI runs at 07:00+ UTC. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix Dremio: rename 'prev' alias to 'prev_b' (reserved keyword in Dremio) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> * Address CodeRabbit review comments: clarify DATA_FRESH semantics, handle zero-baseline spikes, add timestamp_column validation to volume_threshold Co-Authored-By: Itamar Hartstein <haritamar@gmail.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: Itamar Hartstein <haritamar@gmail.com> Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b8c7ab0 commit f4e8e6e

File tree

5 files changed

+1138
-40
lines changed

5 files changed

+1138
-40
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
from datetime import datetime, timedelta
2+
3+
from data_generator import DATE_FORMAT
4+
from dbt_project import DbtProject
5+
6+
TEST_NAME = "elementary.data_freshness_sla"
7+
TIMESTAMP_COLUMN = "updated_at"
8+
9+
10+
def test_fresh_data_passes(test_id: str, dbt_project: DbtProject):
11+
"""Data updated today should pass when the SLA deadline has already passed."""
12+
utc_now = datetime.utcnow()
13+
data = [
14+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)},
15+
{TIMESTAMP_COLUMN: (utc_now - timedelta(hours=1)).strftime(DATE_FORMAT)},
16+
]
17+
test_args = {
18+
"timestamp_column": TIMESTAMP_COLUMN,
19+
"sla_time": "11:59pm",
20+
"timezone": "UTC",
21+
}
22+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
23+
assert test_result["status"] == "pass"
24+
25+
26+
def test_stale_data_fails(test_id: str, dbt_project: DbtProject):
27+
"""Data only from previous days should fail when today's SLA deadline has passed."""
28+
utc_now = datetime.utcnow()
29+
yesterday = utc_now - timedelta(days=2)
30+
data = [
31+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)},
32+
{TIMESTAMP_COLUMN: (yesterday - timedelta(hours=5)).strftime(DATE_FORMAT)},
33+
]
34+
# Use 12:01am UTC (= 00:01 UTC) so the deadline is always in the past when
35+
# CI runs (typically 07:00+ UTC). Etc/GMT-14 was ambiguous in some pytz
36+
# versions and caused Vertica to return wrong results.
37+
test_args = {
38+
"timestamp_column": TIMESTAMP_COLUMN,
39+
"sla_time": "12:01am",
40+
"timezone": "UTC",
41+
}
42+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
43+
assert test_result["status"] == "fail"
44+
45+
46+
def test_no_data_fails(test_id: str, dbt_project: DbtProject):
47+
"""An empty table (after WHERE filter) should fail when deadline has passed."""
48+
utc_now = datetime.utcnow()
49+
# Seed with data that will be excluded by the where_expression
50+
data = [
51+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "excluded"},
52+
]
53+
test_args = {
54+
"timestamp_column": TIMESTAMP_COLUMN,
55+
"sla_time": "12:01am",
56+
"timezone": "UTC",
57+
"where_expression": "category = 'included'",
58+
}
59+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
60+
assert test_result["status"] == "fail"
61+
62+
63+
def test_deadline_not_passed_does_not_fail(test_id: str, dbt_project: DbtProject):
64+
"""Even if data is stale, the test should pass if the deadline hasn't passed yet."""
65+
utc_now = datetime.utcnow()
66+
yesterday = utc_now - timedelta(days=2)
67+
data = [
68+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)},
69+
]
70+
# Set the deadline to 11:59pm UTC so it reliably hasn't passed yet.
71+
# (Etc/GMT-14 = UTC+14 means 11:59pm there = 09:59 UTC — not reliably future)
72+
test_args = {
73+
"timestamp_column": TIMESTAMP_COLUMN,
74+
"sla_time": "11:59pm",
75+
"timezone": "UTC",
76+
}
77+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
78+
assert test_result["status"] == "pass"
79+
80+
81+
def test_with_where_expression(test_id: str, dbt_project: DbtProject):
82+
"""The where_expression should filter which rows count toward freshness."""
83+
utc_now = datetime.utcnow()
84+
yesterday = utc_now - timedelta(days=2)
85+
data = [
86+
# Fresh data for category A
87+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "a"},
88+
# Stale data for category B
89+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"},
90+
]
91+
# Test with category A (fresh data) -> should pass
92+
test_args = {
93+
"timestamp_column": TIMESTAMP_COLUMN,
94+
"sla_time": "11:59pm",
95+
"timezone": "UTC",
96+
"where_expression": "category = 'a'",
97+
}
98+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
99+
assert test_result["status"] == "pass"
100+
101+
# Test with category B (stale data) and early deadline -> should fail
102+
test_args_stale = {
103+
"timestamp_column": TIMESTAMP_COLUMN,
104+
"sla_time": "12:01am",
105+
"timezone": "UTC",
106+
"where_expression": "category = 'b'",
107+
}
108+
test_result = dbt_project.test(test_id, TEST_NAME, test_args_stale)
109+
assert test_result["status"] == "fail"
110+
111+
112+
def test_with_timezone(test_id: str, dbt_project: DbtProject):
113+
"""Test that timezone conversion works correctly."""
114+
utc_now = datetime.utcnow()
115+
data = [
116+
{TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)},
117+
]
118+
test_args = {
119+
"timestamp_column": TIMESTAMP_COLUMN,
120+
"sla_time": "11:59pm",
121+
"timezone": "America/New_York",
122+
}
123+
test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data)
124+
assert test_result["status"] == "pass"
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
from datetime import datetime, timedelta
2+
3+
from data_generator import DATE_FORMAT, generate_dates
4+
from dbt_project import DbtProject
5+
6+
TIMESTAMP_COLUMN = "updated_at"
7+
DBT_TEST_NAME = "elementary.volume_threshold"
8+
DBT_TEST_ARGS = {
9+
"timestamp_column": TIMESTAMP_COLUMN,
10+
"time_bucket": {"period": "day", "count": 1},
11+
"days_back": 14,
12+
"backfill_days": 14,
13+
}
14+
15+
16+
def _generate_stable_data(rows_per_day=100, days_back=14):
17+
"""Generate data with a consistent number of rows per day bucket.
18+
19+
Note: Elementary only processes *complete* buckets (the latest full bucket
20+
before ``run_started_at``). With daily buckets that means "yesterday" is
21+
the newest bucket the macro will ever look at. We therefore generate data
22+
up to yesterday only so that all buckets are complete.
23+
"""
24+
yesterday = datetime.utcnow().date() - timedelta(days=1)
25+
data = []
26+
for cur_date in generate_dates(base_date=yesterday, days_back=days_back):
27+
for _ in range(rows_per_day):
28+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
29+
return data
30+
31+
32+
def test_stable_volume_passes(test_id: str, dbt_project: DbtProject):
33+
"""Consistent row counts across buckets should pass."""
34+
data = _generate_stable_data(rows_per_day=100)
35+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
36+
assert test_result["status"] == "pass"
37+
38+
39+
def test_large_spike_fails(test_id: str, dbt_project: DbtProject):
40+
"""A large spike in row count (>10% default error threshold) should fail."""
41+
yesterday = datetime.utcnow().date() - timedelta(days=1)
42+
two_days_ago = yesterday - timedelta(days=1)
43+
data = []
44+
# Older days: 100 rows each
45+
for cur_date in generate_dates(base_date=yesterday, days_back=14):
46+
if cur_date < two_days_ago:
47+
for _ in range(100):
48+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
49+
# Two days ago (previous bucket): 100 rows
50+
for _ in range(100):
51+
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
52+
# Yesterday (current bucket — latest complete bucket): 150 rows (50% spike)
53+
for _ in range(150):
54+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
55+
56+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
57+
assert test_result["status"] == "fail"
58+
59+
60+
def test_large_drop_fails(test_id: str, dbt_project: DbtProject):
61+
"""A large drop in row count (>10% default error threshold) should fail."""
62+
yesterday = datetime.utcnow().date() - timedelta(days=1)
63+
two_days_ago = yesterday - timedelta(days=1)
64+
data = []
65+
# Older days: 100 rows each
66+
for cur_date in generate_dates(base_date=yesterday, days_back=14):
67+
if cur_date < two_days_ago:
68+
for _ in range(100):
69+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
70+
# Two days ago (previous bucket): 100 rows
71+
for _ in range(100):
72+
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
73+
# Yesterday (current bucket — latest complete bucket): 50 rows (50% drop)
74+
for _ in range(50):
75+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
76+
77+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
78+
assert test_result["status"] == "fail"
79+
80+
81+
def test_direction_spike_ignores_drop(test_id: str, dbt_project: DbtProject):
82+
"""With direction=spike, a drop should not trigger a failure."""
83+
yesterday = datetime.utcnow().date() - timedelta(days=1)
84+
two_days_ago = yesterday - timedelta(days=1)
85+
data = []
86+
# Older days: 100 rows each
87+
for cur_date in generate_dates(base_date=yesterday, days_back=14):
88+
if cur_date < two_days_ago:
89+
for _ in range(100):
90+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
91+
# Two days ago: 100 rows
92+
for _ in range(100):
93+
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
94+
# Yesterday: 50 rows (50% drop)
95+
for _ in range(50):
96+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
97+
98+
test_args = {**DBT_TEST_ARGS, "direction": "spike"}
99+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
100+
assert test_result["status"] == "pass"
101+
102+
103+
def test_direction_drop_ignores_spike(test_id: str, dbt_project: DbtProject):
104+
"""With direction=drop, a spike should not trigger a failure."""
105+
yesterday = datetime.utcnow().date() - timedelta(days=1)
106+
two_days_ago = yesterday - timedelta(days=1)
107+
data = []
108+
# Older days: 100 rows each
109+
for cur_date in generate_dates(base_date=yesterday, days_back=14):
110+
if cur_date < two_days_ago:
111+
for _ in range(100):
112+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
113+
# Two days ago: 100 rows
114+
for _ in range(100):
115+
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
116+
# Yesterday: 150 rows (50% spike)
117+
for _ in range(150):
118+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
119+
120+
test_args = {**DBT_TEST_ARGS, "direction": "drop"}
121+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data)
122+
assert test_result["status"] == "pass"
123+
124+
125+
def test_min_row_count_skips_small_baseline(test_id: str, dbt_project: DbtProject):
126+
"""When previous bucket has fewer rows than min_row_count, check is skipped (pass)."""
127+
yesterday = datetime.utcnow().date() - timedelta(days=1)
128+
two_days_ago = yesterday - timedelta(days=1)
129+
data = []
130+
# Older days: only 5 rows each (below default min_row_count=100)
131+
for cur_date in generate_dates(base_date=yesterday, days_back=14):
132+
if cur_date < two_days_ago:
133+
for _ in range(5):
134+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
135+
# Two days ago: 5 rows
136+
for _ in range(5):
137+
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
138+
# Yesterday: 50 rows (huge spike but baseline is too small)
139+
for _ in range(50):
140+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
141+
142+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
143+
assert test_result["status"] == "pass"
144+
145+
146+
def test_custom_thresholds(test_id: str, dbt_project: DbtProject):
147+
"""Custom thresholds should control the sensitivity of the test."""
148+
yesterday = datetime.utcnow().date() - timedelta(days=1)
149+
two_days_ago = yesterday - timedelta(days=1)
150+
data = []
151+
# Older days: 100 rows each
152+
for cur_date in generate_dates(base_date=yesterday, days_back=14):
153+
if cur_date < two_days_ago:
154+
for _ in range(100):
155+
data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)})
156+
# Two days ago: 100 rows
157+
for _ in range(100):
158+
data.append({TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT)})
159+
# Yesterday: 108 rows (8% change)
160+
for _ in range(108):
161+
data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)})
162+
163+
# With default thresholds (warn=5, error=10), 8% should warn but not error
164+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
165+
assert test_result["status"] == "warn"
166+
167+
# With high thresholds (warn=20, error=50), 8% should pass
168+
test_args_high = {
169+
**DBT_TEST_ARGS,
170+
"warn_threshold_percent": 20,
171+
"error_threshold_percent": 50,
172+
}
173+
test_result = dbt_project.test(
174+
test_id,
175+
DBT_TEST_NAME,
176+
test_args_high,
177+
test_vars={"force_metrics_backfill": True},
178+
)
179+
assert test_result["status"] == "pass"
180+
181+
182+
def test_where_expression(test_id: str, dbt_project: DbtProject):
183+
"""The where_expression should filter which rows are counted."""
184+
yesterday = datetime.utcnow().date() - timedelta(days=1)
185+
two_days_ago = yesterday - timedelta(days=1)
186+
data = []
187+
# Older days: 100 rows of category A each
188+
for cur_date in generate_dates(base_date=yesterday, days_back=14):
189+
if cur_date < two_days_ago:
190+
for _ in range(100):
191+
data.append(
192+
{TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "category": "a"}
193+
)
194+
# Two days ago: 100 rows of category A
195+
for _ in range(100):
196+
data.append(
197+
{TIMESTAMP_COLUMN: two_days_ago.strftime(DATE_FORMAT), "category": "a"}
198+
)
199+
# Yesterday: 100 rows of category A (stable) + 200 rows of category B (noise)
200+
for _ in range(100):
201+
data.append(
202+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "a"}
203+
)
204+
for _ in range(200):
205+
data.append(
206+
{TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"}
207+
)
208+
209+
# Without filter: total yesterday = 300 vs 100 two days ago -> big spike -> error
210+
test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data)
211+
assert test_result["status"] == "fail"
212+
213+
# With filter on category A: 100 yesterday vs 100 two days ago -> stable -> pass
214+
test_args_filtered = {
215+
**DBT_TEST_ARGS,
216+
"where_expression": "category = 'a'",
217+
}
218+
test_result = dbt_project.test(
219+
test_id,
220+
DBT_TEST_NAME,
221+
test_args_filtered,
222+
test_vars={"force_metrics_backfill": True},
223+
)
224+
assert test_result["status"] == "pass"

0 commit comments

Comments
 (0)