Skip to content

Commit 064ad0c

Browse files
committed
fix(monitors): constrain volume lower bound for cumulative tables
1 parent 225d268 commit 064ad0c

3 files changed

Lines changed: 97 additions & 4 deletions

File tree

testgen/commands/test_thresholds_prediction.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,18 @@ def run(self) -> None:
110110
)
111111
test_prediction.extend([lower, upper, staleness, prediction])
112112
else:
113+
functional_table_type = group["functional_table_type"].iloc[0]
114+
is_cumulative = bool(
115+
functional_table_type and str(functional_table_type).startswith("cumulative")
116+
)
113117
lower, upper, prediction = compute_sarimax_threshold(
114118
history,
115119
sensitivity=self.test_suite.predict_sensitivity or PredictSensitivity.medium,
116120
min_lookback=self.test_suite.predict_min_lookback or 1,
117121
exclude_weekends=self.test_suite.predict_exclude_weekends,
118122
holiday_codes=self.test_suite.holiday_codes_list,
119123
schedule_tz=self.tz,
124+
is_cumulative=is_cumulative,
120125
)
121126
test_prediction.extend([lower, upper, None, prediction])
122127

@@ -258,10 +263,13 @@ def compute_sarimax_threshold(
258263
exclude_weekends: bool = False,
259264
holiday_codes: list[str] | None = None,
260265
schedule_tz: str | None = None,
266+
is_cumulative: bool = False,
261267
) -> tuple[float | None, float | None, str | None]:
262268
"""Compute SARIMAX-based thresholds for the next forecast point.
263269
264270
Returns (lower, upper, forecast_json) or (None, None, None) if insufficient data.
271+
For cumulative tables, the lower tolerance is floored at the last observed value
272+
so that any decrease in row count is detected as an anomaly.
265273
"""
266274
if len(history) < min_lookback:
267275
return None, None, None
@@ -291,7 +299,12 @@ def compute_sarimax_threshold(
291299

292300
if pd.isna(lower_tolerance) or pd.isna(upper_tolerance):
293301
return None, None, None
294-
else:
295-
return float(lower_tolerance), float(upper_tolerance), forecast.to_json()
302+
303+
lower_tolerance = float(lower_tolerance)
304+
if is_cumulative:
305+
last_observed = float(history["result_signal"].iloc[-1])
306+
lower_tolerance = max(lower_tolerance, last_observed)
307+
308+
return lower_tolerance, float(upper_tolerance), forecast.to_json()
296309
except NotEnoughData:
297310
return None, None, None

testgen/template/prediction/get_historical_test_results.sql

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ WITH filtered_defs AS (
22
-- Filter definitions first to minimize join surface area
33
SELECT id,
44
test_suite_id,
5+
table_groups_id,
56
schema_name,
67
table_name,
78
column_name,
@@ -17,8 +18,13 @@ SELECT r.test_definition_id,
1718
CASE
1819
WHEN r.result_signal ~ '^-?[0-9]*\.?[0-9]+$' THEN r.result_signal::NUMERIC
1920
ELSE NULL
20-
END AS result_signal
21+
END AS result_signal,
22+
dtc.functional_table_type
2123
FROM test_results r
2224
JOIN filtered_defs d ON d.id = r.test_definition_id
25+
LEFT JOIN data_table_chars dtc
26+
ON dtc.table_groups_id = d.table_groups_id
27+
AND dtc.schema_name = d.schema_name
28+
AND dtc.table_name = d.table_name
2329
WHERE r.test_suite_id = :TEST_SUITE_ID
2430
ORDER BY r.test_time;

tests/unit/common/test_time_series_service.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pandas as pd
55
import pytest
66

7-
from testgen.commands.test_thresholds_prediction import compute_freshness_threshold
7+
from testgen.commands.test_thresholds_prediction import compute_freshness_threshold, compute_sarimax_threshold
88
from testgen.common.freshness_service import (
99
MIN_FRESHNESS_GAPS,
1010
FreshnessThreshold,
@@ -634,3 +634,77 @@ def test_without_exclusions_timezone_has_no_effect(self):
634634
forecast_with_tz = get_sarimax_forecast(history, num_forecast=3, exclude_weekends=False, tz="America/New_York")
635635

636636
pd.testing.assert_frame_equal(forecast_no_tz, forecast_with_tz)
637+
638+
639+
class Test_ComputeSarimaxThreshold_CumulativeFloor:
640+
"""Tests for the cumulative table floor constraint in compute_sarimax_threshold."""
641+
642+
@staticmethod
643+
def _make_monotonic_history(n_days: int = 30, start_value: int = 1000, daily_growth: int = 100) -> pd.DataFrame:
644+
"""Create a monotonically increasing row count history (cumulative table)."""
645+
dates = pd.date_range("2026-01-01", periods=n_days, freq="1D")
646+
values = [start_value + i * daily_growth for i in range(n_days)]
647+
return pd.DataFrame({"result_signal": values}, index=dates)
648+
649+
def test_cumulative_floors_lower_at_last_observed(self):
650+
history = self._make_monotonic_history(n_days=30, start_value=1000, daily_growth=100)
651+
last_observed = float(history["result_signal"].iloc[-1])
652+
653+
lower, upper, prediction = compute_sarimax_threshold(
654+
history, PredictSensitivity.medium, is_cumulative=True,
655+
)
656+
657+
assert lower is not None
658+
assert upper is not None
659+
assert prediction is not None
660+
assert lower >= last_observed
661+
662+
def test_non_cumulative_allows_lower_below_last_observed(self):
663+
# With high variance, SARIMAX lower bound can drop below last observed
664+
rng = np.random.default_rng(42)
665+
dates = pd.date_range("2026-01-01", periods=30, freq="1D")
666+
# Trending up but with large noise — lower bound should be below last value
667+
values = [1000 + i * 50 + rng.normal(0, 200) for i in range(30)]
668+
history = pd.DataFrame({"result_signal": values}, index=dates)
669+
last_observed = float(history["result_signal"].iloc[-1])
670+
671+
lower, upper, prediction = compute_sarimax_threshold(
672+
history, PredictSensitivity.low, is_cumulative=False,
673+
)
674+
675+
assert lower is not None
676+
# With low sensitivity (z=-3.0) and high noise, lower should be below last value
677+
# This is the behavior we're protecting against with the cumulative floor
678+
assert lower < last_observed
679+
680+
def test_cumulative_does_not_affect_upper_tolerance(self):
681+
history = self._make_monotonic_history(n_days=30)
682+
683+
_, upper_cumulative, _ = compute_sarimax_threshold(
684+
history, PredictSensitivity.medium, is_cumulative=True,
685+
)
686+
_, upper_normal, _ = compute_sarimax_threshold(
687+
history, PredictSensitivity.medium, is_cumulative=False,
688+
)
689+
690+
assert upper_cumulative == upper_normal
691+
692+
def test_cumulative_with_insufficient_data_returns_none(self):
693+
history = self._make_monotonic_history(n_days=2)
694+
695+
lower, upper, prediction = compute_sarimax_threshold(
696+
history, PredictSensitivity.medium, min_lookback=5, is_cumulative=True,
697+
)
698+
699+
assert lower is None
700+
assert upper is None
701+
assert prediction is None
702+
703+
def test_cumulative_default_is_false(self):
704+
history = self._make_monotonic_history(n_days=30)
705+
706+
# Without is_cumulative param, should behave as non-cumulative
707+
lower_default, _, _ = compute_sarimax_threshold(history, PredictSensitivity.medium)
708+
lower_explicit, _, _ = compute_sarimax_threshold(history, PredictSensitivity.medium, is_cumulative=False)
709+
710+
assert lower_default == lower_explicit

0 commit comments

Comments
 (0)