-
Notifications
You must be signed in to change notification settings - Fork 137
Add data_freshness_sla and volume_threshold tests #932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
a51ff11
Add data_freshness_sla and volume_threshold tests
joostboon 9fc552e
fix: handle missing table in read_table when raise_if_empty=False (Bi…
joostboon 9bfc0b7
Revert "fix: handle missing table in read_table when raise_if_empty=F…
joostboon 9c888fa
Merge remote-tracking branch 'origin/master' into feature/volume-thre…
haritamar be7de95
Add integration tests for data_freshness_sla and volume_threshold (#965)
devin-ai-integration[bot] 4bb34e6
Fix sqlfmt issues and Postgres round() bug in test macros
devin-ai-integration[bot] 09de7d1
Merge remote-tracking branch 'origin/master' into feature/volume-thre…
devin-ai-integration[bot] 6e75bc8
Fix volume_threshold integration tests to use complete buckets
devin-ai-integration[bot] 0184a34
Fix cross-database compatibility in volume_threshold macro
devin-ai-integration[bot] c858993
Fix Dremio: rename 'prev' alias to 'prev_b' (reserved keyword)
devin-ai-integration[bot] da156c4
Fix Dremio: rename 'result' CTE to 'volume_result' (reserved keyword)
devin-ai-integration[bot] e83c89c
Fix fusion pytz issue and SQL Server/Fabric concat in data_freshness_sla
devin-ai-integration[bot] b47df2a
Fix fusion pytz.localize() producing incorrect results in calculate_s…
devin-ai-integration[bot] b4d1f39
Fix fusion: use naive UTC datetimes to avoid broken tz-aware operations
devin-ai-integration[bot] cad4331
Fix Fabric, BigQuery, and fusion failures in volume_threshold and dat…
joostboon c6873c9
Fix sqlfmt formatting in test_data_freshness_sla.sql
joostboon 380a9fa
Fix Fabric: replace scalar subquery with JOIN in previous_bucket CTE
joostboon 5c289d1
Fix sqlfmt formatting in test_volume_threshold.sql
joostboon 75a0779
Fix freshness SLA deadline check and volume threshold Fabric compatib…
haritamar ccd5be4
Revert fusion path to use naive UTC datetimes instead of datetime.tim…
haritamar bba307d
Address CodeRabbit review feedback
haritamar a2f6944
Fix volume threshold assertions: dbt reports 'fail' not 'error'
haritamar 33d17f7
Fix ClickHouse and DuckDB CI failures
haritamar 23db0bf
Fix ClickHouse freshness SLA: rename string-cast alias to avoid shado…
haritamar b277efa
Fix DuckDB volume_threshold: replace LAG with ROW_NUMBER + self-join
haritamar ec3220f
Fix fusion: use pytz.utc instead of datetime.timezone.utc; fix brittl…
joostboon 6c5968e
Fix Fabric, ClickHouse, and Vertica failures in volume_threshold and …
joostboon d809c8b
Fix Dremio: rename 'prev' alias to 'prev_b' (reserved keyword in Dremio)
devin-ai-integration[bot] 8c5a3c2
Address CodeRabbit review comments: clarify DATA_FRESH semantics, han…
devin-ai-integration[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| from datetime import datetime, timedelta | ||
|
|
||
| from data_generator import DATE_FORMAT | ||
| from dbt_project import DbtProject | ||
|
|
||
| TEST_NAME = "elementary.data_freshness_sla" | ||
| TIMESTAMP_COLUMN = "updated_at" | ||
|
|
||
|
|
||
| def test_fresh_data_passes(test_id: str, dbt_project: DbtProject): | ||
| """Data updated today should pass when the SLA deadline has already passed.""" | ||
| utc_now = datetime.utcnow() | ||
| data = [ | ||
| {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)}, | ||
| {TIMESTAMP_COLUMN: (utc_now - timedelta(hours=1)).strftime(DATE_FORMAT)}, | ||
| ] | ||
| test_args = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "sla_time": "11:59pm", | ||
| "timezone": "UTC", | ||
| } | ||
| test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
|
|
||
| def test_stale_data_fails(test_id: str, dbt_project: DbtProject): | ||
| """Data only from previous days should fail when today's SLA deadline has passed.""" | ||
| utc_now = datetime.utcnow() | ||
| yesterday = utc_now - timedelta(days=2) | ||
| data = [ | ||
| {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}, | ||
| {TIMESTAMP_COLUMN: (yesterday - timedelta(hours=5)).strftime(DATE_FORMAT)}, | ||
| ] | ||
| # Use a deadline early in the day so it has certainly passed | ||
| test_args = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "sla_time": "12:01am", | ||
| "timezone": "UTC", | ||
| } | ||
| test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "fail" | ||
|
|
||
|
|
||
| def test_no_data_fails(test_id: str, dbt_project: DbtProject): | ||
| """An empty table (after WHERE filter) should fail when deadline has passed.""" | ||
| utc_now = datetime.utcnow() | ||
| # Seed with data that will be excluded by the where_expression | ||
| data = [ | ||
| {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "excluded"}, | ||
| ] | ||
| test_args = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "sla_time": "12:01am", | ||
| "timezone": "UTC", | ||
| "where_expression": "category = 'included'", | ||
| } | ||
| test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "fail" | ||
|
|
||
|
|
||
| def test_deadline_not_passed_does_not_fail(test_id: str, dbt_project: DbtProject): | ||
| """Even if data is stale, the test should pass if the deadline hasn't passed yet.""" | ||
| utc_now = datetime.utcnow() | ||
| yesterday = utc_now - timedelta(days=2) | ||
| data = [ | ||
| {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}, | ||
| ] | ||
| # Set the deadline far in the future so it hasn't passed yet. | ||
| # Etc/GMT-14 is UTC+14, the farthest-ahead timezone, so 11:59pm there | ||
| # is well into the future from UTC's perspective. | ||
| test_args = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "sla_time": "11:59pm", | ||
| "timezone": "Etc/GMT-14", | ||
| } | ||
| test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
|
|
||
| def test_with_where_expression(test_id: str, dbt_project: DbtProject): | ||
| """The where_expression should filter which rows count toward freshness.""" | ||
| utc_now = datetime.utcnow() | ||
| yesterday = utc_now - timedelta(days=2) | ||
| data = [ | ||
| # Fresh data for category A | ||
| {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT), "category": "a"}, | ||
| # Stale data for category B | ||
| {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "b"}, | ||
| ] | ||
| # Test with category A (fresh data) -> should pass | ||
| test_args = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "sla_time": "11:59pm", | ||
| "timezone": "UTC", | ||
| "where_expression": "category = 'a'", | ||
| } | ||
| test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
| # Test with category B (stale data) and early deadline -> should fail | ||
| test_args_stale = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "sla_time": "12:01am", | ||
| "timezone": "UTC", | ||
| "where_expression": "category = 'b'", | ||
| } | ||
| test_result = dbt_project.test(test_id, TEST_NAME, test_args_stale) | ||
| assert test_result["status"] == "fail" | ||
|
|
||
|
|
||
| def test_with_timezone(test_id: str, dbt_project: DbtProject): | ||
| """Test that timezone conversion works correctly.""" | ||
| utc_now = datetime.utcnow() | ||
| data = [ | ||
| {TIMESTAMP_COLUMN: utc_now.strftime(DATE_FORMAT)}, | ||
| ] | ||
| test_args = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "sla_time": "11:59pm", | ||
| "timezone": "America/New_York", | ||
| } | ||
| test_result = dbt_project.test(test_id, TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "pass" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,218 @@ | ||
| from datetime import datetime, timedelta | ||
|
|
||
| from data_generator import DATE_FORMAT, generate_dates | ||
| from dbt_project import DbtProject | ||
|
|
||
| TIMESTAMP_COLUMN = "updated_at" | ||
| DBT_TEST_NAME = "elementary.volume_threshold" | ||
| DBT_TEST_ARGS = { | ||
| "timestamp_column": TIMESTAMP_COLUMN, | ||
| "time_bucket": {"period": "day", "count": 1}, | ||
| "days_back": 14, | ||
| "backfill_days": 14, | ||
| } | ||
|
|
||
|
|
||
| def _generate_stable_data(rows_per_day=100, days_back=14): | ||
| """Generate data with a consistent number of rows per day bucket.""" | ||
| utc_today = datetime.utcnow().date() | ||
| data = [] | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=days_back): | ||
| for _ in range(rows_per_day): | ||
| data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) | ||
| return data | ||
|
|
||
|
|
||
| def test_stable_volume_passes(test_id: str, dbt_project: DbtProject): | ||
| """Consistent row counts across buckets should pass.""" | ||
| data = _generate_stable_data(rows_per_day=100) | ||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
|
|
||
| def test_large_spike_fails(test_id: str, dbt_project: DbtProject): | ||
| """A large spike in row count (>10% default error threshold) should fail.""" | ||
| utc_today = datetime.utcnow().date() | ||
| yesterday = utc_today - timedelta(days=1) | ||
| data = [] | ||
| # Previous days: 100 rows each | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=14): | ||
| if cur_date < yesterday: | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) | ||
| # Yesterday (current bucket): 100 rows | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) | ||
| # Today (current bucket): 150 rows (50% spike) | ||
| for _ in range(150): | ||
| data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)}) | ||
|
|
||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) | ||
| assert test_result["status"] != "pass" | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
| def test_large_drop_fails(test_id: str, dbt_project: DbtProject): | ||
| """A large drop in row count (>10% default error threshold) should fail.""" | ||
| utc_today = datetime.utcnow().date() | ||
| yesterday = utc_today - timedelta(days=1) | ||
| data = [] | ||
| # Previous days: 100 rows each | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=14): | ||
| if cur_date < yesterday: | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) | ||
| # Yesterday (previous bucket): 100 rows | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) | ||
| # Today (current bucket): 50 rows (50% drop) | ||
| for _ in range(50): | ||
| data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)}) | ||
|
|
||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) | ||
| assert test_result["status"] != "pass" | ||
|
|
||
|
|
||
| def test_direction_spike_ignores_drop(test_id: str, dbt_project: DbtProject): | ||
| """With direction=spike, a drop should not trigger a failure.""" | ||
| utc_today = datetime.utcnow().date() | ||
| yesterday = utc_today - timedelta(days=1) | ||
| data = [] | ||
| # Previous days: 100 rows each | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=14): | ||
| if cur_date < yesterday: | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) | ||
| # Yesterday: 100 rows | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) | ||
| # Today: 50 rows (50% drop) | ||
| for _ in range(50): | ||
| data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)}) | ||
|
|
||
| test_args = {**DBT_TEST_ARGS, "direction": "spike"} | ||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
|
|
||
| def test_direction_drop_ignores_spike(test_id: str, dbt_project: DbtProject): | ||
| """With direction=drop, a spike should not trigger a failure.""" | ||
| utc_today = datetime.utcnow().date() | ||
| yesterday = utc_today - timedelta(days=1) | ||
| data = [] | ||
| # Previous days: 100 rows each | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=14): | ||
| if cur_date < yesterday: | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) | ||
| # Yesterday: 100 rows | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) | ||
| # Today: 150 rows (50% spike) | ||
| for _ in range(150): | ||
| data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)}) | ||
|
|
||
| test_args = {**DBT_TEST_ARGS, "direction": "drop"} | ||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args, data=data) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
|
|
||
| def test_min_row_count_skips_small_baseline(test_id: str, dbt_project: DbtProject): | ||
| """When previous bucket has fewer rows than min_row_count, check is skipped (pass).""" | ||
| utc_today = datetime.utcnow().date() | ||
| yesterday = utc_today - timedelta(days=1) | ||
| data = [] | ||
| # Previous days: only 5 rows each (below default min_row_count=100) | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=14): | ||
| if cur_date < yesterday: | ||
| for _ in range(5): | ||
| data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) | ||
| # Yesterday: 5 rows | ||
| for _ in range(5): | ||
| data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) | ||
| # Today: 50 rows (huge spike but baseline is too small) | ||
| for _ in range(50): | ||
| data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)}) | ||
|
|
||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
|
|
||
| def test_custom_thresholds(test_id: str, dbt_project: DbtProject): | ||
| """Custom thresholds should control the sensitivity of the test.""" | ||
| utc_today = datetime.utcnow().date() | ||
| yesterday = utc_today - timedelta(days=1) | ||
| data = [] | ||
| # Previous days: 100 rows each | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=14): | ||
| if cur_date < yesterday: | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)}) | ||
| # Yesterday: 100 rows | ||
| for _ in range(100): | ||
| data.append({TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT)}) | ||
| # Today: 108 rows (8% change) | ||
| for _ in range(108): | ||
| data.append({TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT)}) | ||
|
|
||
| # With default thresholds (warn=5, error=10), 8% should warn but not error | ||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) | ||
| assert test_result["status"] == "warn" | ||
|
|
||
| # With high thresholds (warn=20, error=50), 8% should pass | ||
| test_args_high = { | ||
| **DBT_TEST_ARGS, | ||
| "warn_threshold_percent": 20, | ||
| "error_threshold_percent": 50, | ||
| } | ||
| test_result = dbt_project.test( | ||
| test_id, | ||
| DBT_TEST_NAME, | ||
| test_args_high, | ||
| test_vars={"force_metrics_backfill": True}, | ||
| ) | ||
| assert test_result["status"] == "pass" | ||
|
|
||
|
|
||
| def test_where_expression(test_id: str, dbt_project: DbtProject): | ||
| """The where_expression should filter which rows are counted.""" | ||
| utc_today = datetime.utcnow().date() | ||
| yesterday = utc_today - timedelta(days=1) | ||
| data = [] | ||
| # Previous days: 100 rows of category A each | ||
| for cur_date in generate_dates(base_date=utc_today, days_back=14): | ||
| if cur_date < yesterday: | ||
| for _ in range(100): | ||
| data.append( | ||
| {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "category": "a"} | ||
| ) | ||
| # Yesterday: 100 rows of category A | ||
| for _ in range(100): | ||
| data.append( | ||
| {TIMESTAMP_COLUMN: yesterday.strftime(DATE_FORMAT), "category": "a"} | ||
| ) | ||
| # Today: 100 rows of category A (stable) + 200 rows of category B (noise) | ||
| for _ in range(100): | ||
| data.append( | ||
| {TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT), "category": "a"} | ||
| ) | ||
| for _ in range(200): | ||
| data.append( | ||
| {TIMESTAMP_COLUMN: utc_today.strftime(DATE_FORMAT), "category": "b"} | ||
| ) | ||
|
|
||
| # Without filter: total today = 300 vs 100 yesterday -> big spike -> fail | ||
| test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) | ||
| assert test_result["status"] != "pass" | ||
|
|
||
| # With filter on category A: 100 today vs 100 yesterday -> stable -> pass | ||
| test_args_filtered = { | ||
| **DBT_TEST_ARGS, | ||
| "where_expression": "category = 'a'", | ||
| } | ||
| test_result = dbt_project.test( | ||
| test_id, | ||
| DBT_TEST_NAME, | ||
| test_args_filtered, | ||
| test_vars={"force_metrics_backfill": True}, | ||
| ) | ||
| assert test_result["status"] == "pass" | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.