Skip to content

Commit 4a3b798

Browse files
committed
test(monitors): add scenarios for freshness monitor
1 parent 064ad0c commit 4a3b798

3 files changed

Lines changed: 787 additions & 1 deletion

File tree

tests/unit/common/conftest.py

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
from datetime import datetime, timedelta
2+
from typing import NamedTuple
3+
14
import pandas as pd
25

6+
from testgen.commands.test_thresholds_prediction import compute_freshness_threshold
7+
from testgen.common.freshness_service import count_excluded_minutes, get_schedule_params, is_excluded_day
8+
from testgen.common.models.test_suite import PredictSensitivity
9+
310

411
def _make_freshness_history(
512
update_timestamps: list[str],
@@ -29,3 +36,308 @@ def _make_freshness_history(
2936
df = pd.DataFrame(rows, columns=["timestamp", "result_signal"])
3037
df = df.set_index("timestamp")
3138
return df
39+
40+
41+
# ─── Scenario test infrastructure ────────────────────────────────────
42+
43+
44+
class ScenarioPoint(NamedTuple):
45+
timestamp: pd.Timestamp
46+
value: float
47+
lower: float | None
48+
upper: float | None
49+
staleness: float | None
50+
prediction_json: str | None
51+
result_code: int # -1 = training, 1 = passed, 0 = failed
52+
result_status: str # "Log", "Passed", "Failed"
53+
54+
55+
def _to_csv_rows(raw: list[tuple[str, str]]) -> list[tuple[pd.Timestamp, float]]:
56+
"""Convert (str, str) tuples from generate_test_data to (Timestamp, float)."""
57+
return [(pd.Timestamp(ts), float(val)) for ts, val in raw]
58+
59+
60+
def _to_history_df(rows: list[tuple[pd.Timestamp, float]]) -> pd.DataFrame:
61+
"""Convert a list of (timestamp, value) tuples to a DataFrame with DatetimeIndex."""
62+
df = pd.DataFrame(rows, columns=["timestamp", "value"])
63+
df["timestamp"] = pd.to_datetime(df["timestamp"])
64+
return df.set_index("timestamp")
65+
66+
67+
def _evaluate_freshness_point(
68+
timestamp: pd.Timestamp,
69+
value: float,
70+
lower: float | None,
71+
upper: float | None,
72+
staleness: float | None,
73+
prediction_json: str | None,
74+
freshness_last_update: pd.Timestamp | None,
75+
exclude_weekends: bool,
76+
tz: str | None,
77+
) -> tuple[int, str]:
78+
"""Evaluate a single freshness observation against thresholds.
79+
80+
Mirrors the 3-branch decision in simulate_monitor.py (lines 421-476)
81+
and the SQL template logic. Returns (result_code, result_status).
82+
"""
83+
effective_staleness = staleness if staleness is not None else upper
84+
sched = get_schedule_params(prediction_json) if prediction_json else None
85+
inferred_excluded = sched.excluded_days if sched else None
86+
win_s = sched.window_start if sched else None
87+
win_e = sched.window_end if sched else None
88+
89+
# Training: thresholds not yet available
90+
if upper is None:
91+
return -1, "Log"
92+
93+
# Update point: check completed gap against [lower, upper]
94+
if value == 0 and freshness_last_update is not None:
95+
completed_gap = (timestamp - freshness_last_update).total_seconds() / 60
96+
has_exclusions = exclude_weekends or inferred_excluded or win_s is not None
97+
if has_exclusions:
98+
excluded = count_excluded_minutes(
99+
freshness_last_update, timestamp, exclude_weekends, holiday_dates=None,
100+
tz=tz, excluded_days=inferred_excluded,
101+
window_start=win_s, window_end=win_e,
102+
)
103+
completed_gap = max(completed_gap - excluded, 0)
104+
if (lower is not None and completed_gap < lower) or completed_gap > upper:
105+
return 0, "Failed"
106+
return 1, "Passed"
107+
108+
# Between updates: check growing interval against staleness
109+
if value > 0:
110+
has_exclusions = exclude_weekends or inferred_excluded or win_s is not None
111+
is_excl = has_exclusions and is_excluded_day(
112+
timestamp, exclude_weekends, holiday_dates=None, tz=tz,
113+
excluded_days=inferred_excluded, window_start=win_s, window_end=win_e,
114+
)
115+
if is_excl:
116+
return 1, "Passed"
117+
118+
excluded = count_excluded_minutes(
119+
freshness_last_update, timestamp, exclude_weekends, holiday_dates=None,
120+
tz=tz, excluded_days=inferred_excluded,
121+
window_start=win_s, window_end=win_e,
122+
) if has_exclusions and freshness_last_update else 0
123+
business_interval = value - excluded
124+
if business_interval > effective_staleness:
125+
return 0, "Failed"
126+
return 1, "Passed"
127+
128+
# First update point (value == 0, no prior update)
129+
return 1, "Passed"
130+
131+
132+
def _run_scenario(
133+
csv_rows: list[tuple[pd.Timestamp, float]],
134+
sensitivity: PredictSensitivity,
135+
exclude_weekends: bool = False,
136+
tz: str | None = None,
137+
) -> list[ScenarioPoint]:
138+
"""Iterate through csv_rows calling compute_freshness_threshold at each step."""
139+
results: list[ScenarioPoint] = []
140+
freshness_last_update: pd.Timestamp | None = None
141+
142+
for i, (timestamp, value) in enumerate(csv_rows):
143+
history_df = _to_history_df(csv_rows[:i])
144+
145+
lower, upper, staleness, prediction_json = compute_freshness_threshold(
146+
history_df, sensitivity, min_lookback=30,
147+
exclude_weekends=exclude_weekends, schedule_tz=tz,
148+
)
149+
150+
result_code, result_status = _evaluate_freshness_point(
151+
timestamp, value, lower, upper, staleness, prediction_json,
152+
freshness_last_update, exclude_weekends, tz,
153+
)
154+
155+
results.append(ScenarioPoint(
156+
timestamp=timestamp,
157+
value=value,
158+
lower=lower,
159+
upper=upper,
160+
staleness=staleness,
161+
prediction_json=prediction_json,
162+
result_code=result_code,
163+
result_status=result_status,
164+
))
165+
166+
if value == 0:
167+
freshness_last_update = timestamp
168+
169+
return results
170+
171+
172+
# ─── Scenario data generators (from generate_test_data.py) ───────────
173+
174+
175+
def _ts(dt: datetime) -> str:
176+
return dt.strftime("%Y-%m-%d %H:%M:%S")
177+
178+
179+
def _make_observations(
180+
start: datetime,
181+
end: datetime,
182+
interval_hours: int | float,
183+
update_times: set[datetime],
184+
) -> list[tuple[str, str]]:
185+
rows: list[tuple[str, str]] = []
186+
last_update: datetime | None = None
187+
current = start
188+
while current <= end:
189+
if current in update_times:
190+
rows.append((_ts(current), "0"))
191+
last_update = current
192+
elif last_update is not None:
193+
minutes = int((current - last_update).total_seconds() / 60)
194+
rows.append((_ts(current), str(minutes)))
195+
current += timedelta(hours=interval_hours)
196+
return rows
197+
198+
199+
def _weekday_updates(
200+
hour: int,
201+
start: datetime,
202+
end: datetime,
203+
skip_dates: set | None = None,
204+
) -> set[datetime]:
205+
updates: set[datetime] = set()
206+
d = start.replace(hour=0, minute=0, second=0)
207+
while d <= end:
208+
if d.weekday() < 5 and (skip_dates is None or d.date() not in skip_dates):
209+
updates.add(d.replace(hour=hour, minute=0, second=0))
210+
d += timedelta(days=1)
211+
return updates
212+
213+
214+
def _gen_daily_regular() -> list[tuple[pd.Timestamp, float]]:
215+
start = datetime(2025, 10, 6, 7, 0)
216+
end = datetime(2025, 11, 9, 19, 0)
217+
updates = _weekday_updates(7, start, end)
218+
return _to_csv_rows(_make_observations(start, end, 12, updates))
219+
220+
221+
def _gen_daily_late_gap_phase() -> list[tuple[pd.Timestamp, float]]:
222+
start = datetime(2025, 10, 6, 7, 0)
223+
end = datetime(2025, 11, 16, 19, 0)
224+
skip = {
225+
datetime(2025, 10, 29).date(),
226+
datetime(2025, 10, 30).date(),
227+
datetime(2025, 10, 31).date(),
228+
}
229+
updates = _weekday_updates(7, start, end, skip_dates=skip)
230+
return _to_csv_rows(_make_observations(start, end, 12, updates))
231+
232+
233+
def _gen_daily_late_schedule_phase() -> list[tuple[pd.Timestamp, float]]:
234+
start = datetime(2025, 10, 6, 7, 0)
235+
end = datetime(2025, 11, 30, 19, 0)
236+
skip = {
237+
datetime(2025, 11, 12).date(),
238+
datetime(2025, 11, 13).date(),
239+
datetime(2025, 11, 14).date(),
240+
}
241+
updates = _weekday_updates(7, start, end, skip_dates=skip)
242+
return _to_csv_rows(_make_observations(start, end, 12, updates))
243+
244+
245+
def _gen_subdaily_regular() -> list[tuple[pd.Timestamp, float]]:
246+
start = datetime(2025, 10, 6, 0, 0)
247+
end = datetime(2025, 11, 2, 23, 0)
248+
updates: set[datetime] = set()
249+
d = start.replace(hour=0)
250+
while d <= end:
251+
if d.weekday() < 5:
252+
for h in range(8, 19, 2):
253+
updates.add(d.replace(hour=h))
254+
d += timedelta(days=1)
255+
return _to_csv_rows(_make_observations(start, end, 2, updates))
256+
257+
258+
def _gen_subdaily_gap_phase() -> list[tuple[pd.Timestamp, float]]:
259+
start = datetime(2025, 10, 6, 0, 0)
260+
end = datetime(2025, 11, 2, 23, 0)
261+
gap_date = datetime(2025, 10, 22).date()
262+
updates: set[datetime] = set()
263+
d = start.replace(hour=0)
264+
while d <= end:
265+
if d.weekday() < 5:
266+
for h in range(8, 19, 2):
267+
dt = d.replace(hour=h)
268+
if dt.date() == gap_date and h >= 12:
269+
continue
270+
updates.add(dt)
271+
d += timedelta(days=1)
272+
return _to_csv_rows(_make_observations(start, end, 2, updates))
273+
274+
275+
def _gen_subdaily_gap_schedule_phase() -> list[tuple[pd.Timestamp, float]]:
276+
start = datetime(2025, 10, 6, 0, 0)
277+
end = datetime(2025, 11, 9, 23, 0)
278+
gap_date = datetime(2025, 10, 29).date()
279+
updates: set[datetime] = set()
280+
d = start.replace(hour=0)
281+
while d <= end:
282+
if d.weekday() < 5:
283+
for h in range(8, 19, 2):
284+
dt = d.replace(hour=h)
285+
if dt.date() == gap_date and h >= 12:
286+
continue
287+
updates.add(dt)
288+
d += timedelta(days=1)
289+
return _to_csv_rows(_make_observations(start, end, 2, updates))
290+
291+
292+
def _gen_weekly_early() -> list[tuple[pd.Timestamp, float]]:
293+
start = datetime(2025, 8, 7, 10, 0)
294+
end = datetime(2025, 11, 6, 22, 0)
295+
updates: set[datetime] = set()
296+
d = start.replace(hour=0)
297+
while d <= end:
298+
if d.weekday() == 3:
299+
updates.add(d.replace(hour=10, minute=0))
300+
d += timedelta(days=1)
301+
updates.add(datetime(2025, 10, 21, 10, 0))
302+
updates.discard(datetime(2025, 10, 23, 10, 0))
303+
return _to_csv_rows(_make_observations(start, end, 12, updates))
304+
305+
306+
def _gen_training_only() -> list[tuple[pd.Timestamp, float]]:
307+
start = datetime(2025, 10, 6, 7, 0)
308+
end = datetime(2025, 11, 2, 19, 0)
309+
updates = {
310+
datetime(2025, 10, 6, 7, 0),
311+
datetime(2025, 10, 13, 7, 0),
312+
datetime(2025, 10, 20, 7, 0),
313+
datetime(2025, 10, 27, 7, 0),
314+
}
315+
return _to_csv_rows(_make_observations(start, end, 12, updates))
316+
317+
318+
def _gen_mwf_regular() -> list[tuple[pd.Timestamp, float]]:
319+
start = datetime(2025, 10, 6, 7, 0)
320+
end = datetime(2025, 12, 1, 19, 0)
321+
updates: set[datetime] = set()
322+
d = start.replace(hour=0)
323+
while d <= end:
324+
if d.weekday() in {0, 2, 4}:
325+
updates.add(d.replace(hour=7, minute=0, second=0))
326+
d += timedelta(days=1)
327+
return _to_csv_rows(_make_observations(start, end, 12, updates))
328+
329+
330+
def _gen_mwf_late() -> list[tuple[pd.Timestamp, float]]:
331+
start = datetime(2025, 10, 6, 7, 0)
332+
end = datetime(2025, 12, 15, 19, 0)
333+
skip = {
334+
datetime(2025, 11, 26).date(),
335+
datetime(2025, 11, 28).date(),
336+
}
337+
updates: set[datetime] = set()
338+
d = start.replace(hour=0)
339+
while d <= end:
340+
if d.weekday() in {0, 2, 4} and d.date() not in skip:
341+
updates.add(d.replace(hour=7, minute=0, second=0))
342+
d += timedelta(days=1)
343+
return _to_csv_rows(_make_observations(start, end, 12, updates))

0 commit comments

Comments
 (0)