Skip to content

Commit 388129f

Browse files
author
ci bot
committed
Merge branch 'aarthy/freshness' into 'enterprise'
feat(monitors): freshness monitor with gap-based thresholds and schedule inference See merge request dkinternal/testgen/dataops-testgen!416
2 parents 3c27372 + 4a3b798 commit 388129f

35 files changed

Lines changed: 4084 additions & 380 deletions

testgen/__main__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ def quick_start(
428428
click.echo("loading initial data")
429429
run_quick_start_increment(0)
430430
now_date = datetime.now(UTC)
431-
time_delta = timedelta(days=-30) # 1 month ago
431+
time_delta = timedelta(days=-35) # before the first monitor iteration (~34 days back)
432432
table_group_id = "0ea85e17-acbe-47fe-8394-9970725ad37d"
433433
test_suite_id = "9df7489d-92b3-49f9-95ca-512160d7896f"
434434

@@ -449,16 +449,19 @@ def quick_start(
449449
run_quick_start_increment(iteration)
450450
run_test_execution(test_suite_id, run_date=run_date)
451451

452-
monitor_iterations = 42 # 3 weeks
452+
monitor_iterations = 68 # ~5 weeks
453453
monitor_interval = timedelta(hours=12)
454454
monitor_test_suite_id = "823a1fef-9b6d-48d5-9d0f-2db9812cc318"
455455
# Round down to nearest 12-hour mark (12:00 AM or 12:00 PM UTC)
456456
now = datetime.now(UTC)
457457
nearest_12h_mark = now.replace(hour=12 if now.hour >= 12 else 0, minute=0, second=0, microsecond=0)
458458
monitor_run_date = nearest_12h_mark - monitor_interval * (monitor_iterations - 1)
459+
weekday_morning_count = 0
459460
for iteration in range(1, monitor_iterations + 1):
460461
click.echo(f"Running monitor iteration: {iteration} / {monitor_iterations}")
461-
run_monitor_increment(monitor_run_date, iteration)
462+
if monitor_run_date.weekday() < 5 and monitor_run_date.hour < 12:
463+
weekday_morning_count += 1
464+
run_monitor_increment(monitor_run_date, iteration, weekday_morning_count)
462465
run_test_execution(monitor_test_suite_id, run_date=monitor_run_date)
463466
monitor_run_date += monitor_interval
464467

testgen/commands/queries/execute_tests_query.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,26 @@
11
import dataclasses
22
from collections.abc import Iterable
3-
from datetime import datetime
3+
from datetime import date, datetime
44
from typing import TypedDict
55
from uuid import UUID
66

7+
import pandas as pd
8+
79
from testgen.common import read_template_sql_file
810
from testgen.common.clean_sql import concat_columns
911
from testgen.common.database.database_service import get_flavor_service, get_tg_schema, replace_params
12+
from testgen.common.freshness_service import (
13+
count_excluded_minutes,
14+
get_schedule_params,
15+
is_excluded_day,
16+
resolve_holiday_dates,
17+
)
1018
from testgen.common.models.connection import Connection
19+
from testgen.common.models.scheduler import JobSchedule
1120
from testgen.common.models.table_group import TableGroup
1221
from testgen.common.models.test_definition import TestRunType, TestScope
1322
from testgen.common.models.test_run import TestRun
23+
from testgen.common.models.test_suite import TestSuite
1424
from testgen.common.read_file import replace_templated_functions
1525
from testgen.utils import to_sql_timestamp
1626

@@ -49,6 +59,7 @@ class TestExecutionDef(InputParameters):
4959
skip_errors: int
5060
history_calculation: str
5161
custom_query: str
62+
prediction: dict | str | None
5263
run_type: TestRunType
5364
test_scope: TestScope
5465
template: str
@@ -88,14 +99,27 @@ class TestExecutionSQL:
8899
"result_measure",
89100
)
90101

91-
def __init__(self, connection: Connection, table_group: TableGroup, test_run: TestRun):
102+
def __init__(self, connection: Connection, table_group: TableGroup, test_suite: TestSuite, test_run: TestRun):
92103
self.connection = connection
93104
self.table_group = table_group
105+
self.test_suite = test_suite
94106
self.test_run = test_run
95107
self.run_date = test_run.test_starttime
96108
self.flavor = connection.sql_flavor
97109
self.flavor_service = get_flavor_service(self.flavor)
98110

111+
self._exclude_weekends = bool(self.test_suite.predict_exclude_weekends)
112+
self._holiday_dates: set[date] | None = None
113+
self._schedule_tz: str | None = None
114+
if test_suite.is_monitor:
115+
schedule = JobSchedule.get(JobSchedule.kwargs["test_suite_id"].astext == str(test_suite.id))
116+
self._schedule_tz = schedule.cron_tz or "UTC" if schedule else None
117+
if test_suite.holiday_codes_list:
118+
self._holiday_dates = resolve_holiday_dates(
119+
test_suite.holiday_codes_list,
120+
pd.DatetimeIndex([datetime(self.run_date.year - 1, 1, 1), datetime(self.run_date.year + 1, 12, 31)]),
121+
)
122+
99123
def _get_input_parameters(self, test_def: TestExecutionDef) -> str:
100124
return "; ".join(
101125
f"{field.name}={getattr(test_def, field.name)}"
@@ -135,8 +159,8 @@ def _get_params(self, test_def: TestExecutionDef | None = None) -> dict:
135159
"BASELINE_SUM": test_def.baseline_sum,
136160
"BASELINE_AVG": test_def.baseline_avg,
137161
"BASELINE_SD": test_def.baseline_sd,
138-
"LOWER_TOLERANCE": test_def.lower_tolerance or "NULL",
139-
"UPPER_TOLERANCE": test_def.upper_tolerance or "NULL",
162+
"LOWER_TOLERANCE": "NULL" if test_def.lower_tolerance in (None, "") else test_def.lower_tolerance,
163+
"UPPER_TOLERANCE": "NULL" if test_def.upper_tolerance in (None, "") else test_def.upper_tolerance,
140164
# SUBSET_CONDITION should be replaced after CUSTOM_QUERY
141165
# since the latter may contain the former
142166
"SUBSET_CONDITION": test_def.subset_condition or "1=1",
@@ -154,6 +178,32 @@ def _get_params(self, test_def: TestExecutionDef | None = None) -> dict:
154178
"COLUMN_TYPE": test_def.column_type,
155179
"INPUT_PARAMETERS": self._get_input_parameters(test_def),
156180
})
181+
182+
# Freshness exclusion params — computed per test at execution time
183+
if test_def.test_type == "Freshness_Trend" and test_def.baseline_sum:
184+
sched = get_schedule_params(test_def.prediction)
185+
has_exclusions = self._exclude_weekends or sched.excluded_days or sched.window_start is not None
186+
if has_exclusions:
187+
last_update = pd.Timestamp(test_def.baseline_sum)
188+
excluded = int(count_excluded_minutes(
189+
last_update, self.run_date, self._exclude_weekends, self._holiday_dates,
190+
tz=self._schedule_tz, excluded_days=sched.excluded_days,
191+
window_start=sched.window_start, window_end=sched.window_end,
192+
))
193+
is_excl = 1 if is_excluded_day(
194+
pd.Timestamp(self.run_date), self._exclude_weekends, self._holiday_dates,
195+
tz=self._schedule_tz, excluded_days=sched.excluded_days,
196+
window_start=sched.window_start, window_end=sched.window_end,
197+
) else 0
198+
params["EXCLUDED_MINUTES"] = excluded
199+
params["IS_EXCLUDED_DAY"] = is_excl
200+
else:
201+
params["EXCLUDED_MINUTES"] = 0
202+
params["IS_EXCLUDED_DAY"] = 0
203+
else:
204+
params["EXCLUDED_MINUTES"] = 0
205+
params["IS_EXCLUDED_DAY"] = 0
206+
157207
return params
158208

159209
def _get_query(
@@ -266,7 +316,7 @@ def aggregate_cat_tests(
266316
td.measure_expression = f"COALESCE(CAST({measure} AS {varchar_type}) {concat_operator} '|', '{self.null_value}|')"
267317

268318
# For prediction mode, return -1 during training period
269-
if td.history_calculation == "PREDICT" and (not td.lower_tolerance or not td.upper_tolerance):
319+
if td.history_calculation == "PREDICT" and (td.lower_tolerance in (None, "") or td.upper_tolerance in (None, "")):
270320
td.condition_expression = "'-1,'"
271321
else:
272322
condition = (

testgen/commands/run_quick_start.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,18 @@ def _metric_cumulative_shift(iteration: int) -> tuple[float, float]:
130130
return discount, price
131131

132132

133-
def _get_monitor_params_mapping(run_date: datetime, iteration: int = 0) -> dict:
133+
def _get_monitor_params_mapping(run_date: datetime, iteration: int = 0, weekday_morning_count: int = 0) -> dict:
134134
# Volume: linear growth with jitter, spike at specific iteration for anomaly
135-
if iteration == 37:
135+
if iteration == 60:
136136
new_sales = 100
137137
else:
138-
new_sales = random.randint(8, 12) # noqa: S311
138+
new_sales = random.randint(5, 15) # noqa: S311
139139

140-
# Freshness: update every other iteration, late update for anomaly
141-
is_update_suppliers_iter = (iteration % 2 == 0 and iteration != 38) or iteration == 39
140+
# Freshness: weekday morning updates with 1-day outage after schedule goes active
141+
is_weekday = run_date.weekday() < 5
142+
is_morning = run_date.hour < 12
143+
is_outage = weekday_morning_count == 21
144+
is_update_suppliers_iter = is_weekday and is_morning and not is_outage
142145

143146
# Metrics: compute deltas for discount and price shifts
144147
curr_discount, curr_price = _metric_cumulative_shift(iteration)
@@ -151,11 +154,11 @@ def _get_monitor_params_mapping(run_date: datetime, iteration: int = 0) -> dict:
151154
"ITERATION_NUMBER": iteration,
152155
"RUN_DATE": run_date,
153156
"NEW_SALES": new_sales,
154-
"IS_ADD_CUSTOMER_COL_ITER": iteration == 29,
155-
"IS_DELETE_CUSTOMER_COL_ITER": iteration == 36,
156-
"IS_UPDATE_PRODUCT_ITER": not 14 < iteration < 18,
157-
"IS_CREATE_RETURNS_TABLE_ITER": iteration == 32,
158-
"IS_DELETE_CUSTOMER_ITER": iteration in (18, 22, 34),
157+
"IS_ADD_CUSTOMER_COL_ITER": iteration == 47,
158+
"IS_DELETE_CUSTOMER_COL_ITER": iteration == 58,
159+
"IS_UPDATE_PRODUCT_ITER": not 24 < iteration < 28,
160+
"IS_CREATE_RETURNS_TABLE_ITER": iteration == 52,
161+
"IS_DELETE_CUSTOMER_ITER": iteration in (29, 36, 55),
159162
"IS_UPDATE_SUPPLIERS_ITER": is_update_suppliers_iter,
160163
"DISCOUNT_DELTA": discount_delta,
161164
"PRICE_DELTA": price_delta,
@@ -234,8 +237,8 @@ def run_quick_start_increment(iteration):
234237
setup_cat_tests(iteration)
235238

236239

237-
def run_monitor_increment(run_date, iteration):
238-
params_mapping = _get_monitor_params_mapping(run_date, iteration)
240+
def run_monitor_increment(run_date, iteration, weekday_morning_count=0):
241+
params_mapping = _get_monitor_params_mapping(run_date, iteration, weekday_morning_count)
239242
_prepare_connection_to_target_database(params_mapping)
240243

241244
target_db_name = params_mapping["PROJECT_DB"]

testgen/commands/run_test_execution.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
8484
data_chars = run_data_chars_refresh(connection, table_group, test_run.test_starttime)
8585
test_run.set_progress("data_chars", "Completed")
8686

87-
sql_generator = TestExecutionSQL(connection, table_group, test_run)
87+
sql_generator = TestExecutionSQL(connection, table_group, test_suite, test_run)
8888

8989
if test_suite.is_monitor:
9090
_sync_monitor_definitions(sql_generator)

0 commit comments

Comments
 (0)