Skip to content

Commit d2ac510

Browse files
committed
fix(monitors): handle errored monitors
1 parent 507c4bb commit d2ac510

17 files changed

Lines changed: 190 additions & 50 deletions

File tree

testgen/commands/queries/execute_tests_query.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ def has_schema_changes(self) -> tuple[dict]:
180180
# Runs on App database
181181
return self._get_query("has_schema_changes.sql")
182182

183+
def get_errored_autogen_monitors(self) -> tuple[str, dict]:
184+
# Runs on App database
185+
return self._get_query("get_errored_autogen_monitors.sql")
186+
183187
def get_active_test_definitions(self) -> tuple[dict]:
184188
# Runs on App database
185189
return self._get_query("get_active_test_definitions.sql")

testgen/commands/run_test_execution.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import subprocess
33
import threading
4+
from collections import defaultdict
45
from datetime import UTC, datetime, timedelta
56
from functools import partial
67
from typing import Literal
@@ -86,12 +87,7 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
8687
sql_generator = TestExecutionSQL(connection, table_group, test_run)
8788

8889
if test_suite.is_monitor:
89-
schema_changes = fetch_dict_from_db(*sql_generator.has_schema_changes())[0]
90-
if schema_changes["has_table_drops"]:
91-
run_monitor_generation(test_suite_id, ["Freshness_Trend", "Volume_Trend", "Metric_Trend"], mode="delete")
92-
if schema_changes["has_table_adds"]:
93-
# Freshness monitors will be inserted after profiling
94-
run_monitor_generation(test_suite_id, ["Volume_Trend"], mode="insert")
90+
_sync_monitor_definitions(sql_generator)
9591

9692
# Update the thresholds before retrieving the test definitions in the next steps
9793
LOG.info("Updating test thresholds based on history calculations")
@@ -190,6 +186,26 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
190186
"""
191187

192188

189+
def _sync_monitor_definitions(sql_generator: TestExecutionSQL) -> None:
190+
test_suite_id = sql_generator.test_run.test_suite_id
191+
192+
schema_changes = fetch_dict_from_db(*sql_generator.has_schema_changes())[0]
193+
if schema_changes["has_table_drops"]:
194+
run_monitor_generation(test_suite_id, ["Freshness_Trend", "Volume_Trend", "Metric_Trend"], mode="delete")
195+
if schema_changes["has_table_adds"]:
196+
# Freshness monitors will be inserted after profiling
197+
run_monitor_generation(test_suite_id, ["Volume_Trend"], mode="insert")
198+
199+
# Regenerate monitors that errored in previous run
200+
errored_monitors = fetch_dict_from_db(*sql_generator.get_errored_autogen_monitors())
201+
if errored_monitors:
202+
errored_by_type: dict[str, list[str]] = defaultdict(list)
203+
for row in errored_monitors:
204+
errored_by_type[row["test_type"]].append(row["table_name"])
205+
for test_type, table_names in errored_by_type.items():
206+
run_monitor_generation(test_suite_id, [test_type], mode="upsert", table_names=table_names)
207+
208+
193209
def _run_tests(
194210
sql_generator: TestExecutionSQL,
195211
run_type: Literal["QUERY", "METADATA"],

testgen/commands/test_generation.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def run_monitor_generation(
7272
monitor_suite_id: str | UUID,
7373
monitors: list[MonitorTestType],
7474
mode: MonitorGenerationMode = "upsert",
75+
table_names: list[str] | None = None,
7576
) -> None:
7677
"""
7778
Modes:
@@ -90,7 +91,7 @@ def run_monitor_generation(
9091
table_group = TableGroup.get(monitor_suite.table_groups_id)
9192
connection = Connection.get(table_group.connection_id)
9293

93-
TestGeneration(connection, table_group, monitor_suite, "Monitor", monitors).monitor_run(mode)
94+
TestGeneration(connection, table_group, monitor_suite, "Monitor", monitors).monitor_run(mode, table_names=table_names)
9495

9596

9697
class TestGeneration:
@@ -123,16 +124,19 @@ def run(self) -> None:
123124
self._get_query("delete_stale_autogen_tests.sql"),
124125
])
125126

126-
def monitor_run(self, mode: MonitorGenerationMode) -> None:
127+
def monitor_run(self, mode: MonitorGenerationMode, table_names: list[str] | None = None) -> None:
127128
if mode == "delete":
128129
execute_db_queries([self._get_query("delete_stale_monitors.sql")])
129130
return
130131

132+
extra_params = {"INSERT_ONLY": mode == "insert"}
133+
if table_names:
134+
table_list = ", ".join(f"'{table}'" for table in table_names)
135+
extra_params["TABLE_FILTER"] = f"AND table_name IN ({table_list})"
136+
131137
LOG.info("Running monitor generation queries")
132138
execute_db_queries(
133-
self._get_generation_queries(
134-
extra_params={"INSERT_ONLY": mode == "insert"},
135-
),
139+
self._get_generation_queries(extra_params=extra_params),
136140
)
137141

138142
def _get_generation_queries(self, extra_params: dict | None = None) -> list[tuple[str, dict]]:
@@ -188,6 +192,7 @@ def _get_params(self, test_type: TestTypeParams | None = None) -> dict:
188192
"SQL_FLAVOR": self.flavor,
189193
"QUOTE": self.flavor_service.quote_character,
190194
"INSERT_ONLY": False,
195+
"TABLE_FILTER": "",
191196
})
192197
return params
193198

testgen/common/models/table_group.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ class TableGroupSummary(EntityMinimal):
7171
monitor_schema_anomalies: int | None
7272
monitor_volume_anomalies: int | None
7373
monitor_metric_anomalies: int | None
74+
monitor_freshness_has_errors: bool | None
75+
monitor_volume_has_errors: bool | None
76+
monitor_schema_has_errors: bool | None
77+
monitor_metric_has_errors: bool | None
7478
monitor_freshness_is_training: bool | None
7579
monitor_volume_is_training: bool | None
7680
monitor_metric_is_training: bool | None
@@ -266,6 +270,10 @@ def select_summary(cls, project_code: str, for_dashboard: bool = False) -> Itera
266270
SUM(CASE WHEN results.test_type = 'Schema_Drift' AND results.result_code = 0 THEN 1 ELSE 0 END) AS schema_anomalies,
267271
SUM(CASE WHEN results.test_type = 'Volume_Trend' AND results.result_code = 0 THEN 1 ELSE 0 END) AS volume_anomalies,
268272
SUM(CASE WHEN results.test_type = 'Metric_Trend' AND results.result_code = 0 THEN 1 ELSE 0 END) AS metric_anomalies,
273+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Freshness_Trend' AND ranked_test_runs.position = 1) AS freshness_has_errors,
274+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Volume_Trend' AND ranked_test_runs.position = 1) AS volume_has_errors,
275+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Schema_Drift' AND ranked_test_runs.position = 1) AS schema_has_errors,
276+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Metric_Trend' AND ranked_test_runs.position = 1) AS metric_has_errors,
269277
BOOL_AND(results.result_code = -1) FILTER (WHERE results.test_type = 'Freshness_Trend' AND ranked_test_runs.position = 1) AS freshness_is_training,
270278
BOOL_AND(results.result_code = -1) FILTER (WHERE results.test_type = 'Volume_Trend' AND ranked_test_runs.position = 1) AS volume_is_training,
271279
BOOL_AND(results.result_code = -1) FILTER (WHERE results.test_type = 'Metric_Trend' AND ranked_test_runs.position = 1) AS metric_is_training,
@@ -319,6 +327,10 @@ def select_summary(cls, project_code: str, for_dashboard: bool = False) -> Itera
319327
monitor_tables.schema_anomalies AS monitor_schema_anomalies,
320328
monitor_tables.volume_anomalies AS monitor_volume_anomalies,
321329
monitor_tables.metric_anomalies AS monitor_metric_anomalies,
330+
monitor_tables.freshness_has_errors AS monitor_freshness_has_errors,
331+
monitor_tables.volume_has_errors AS monitor_volume_has_errors,
332+
monitor_tables.schema_has_errors AS monitor_schema_has_errors,
333+
monitor_tables.metric_has_errors AS monitor_metric_has_errors,
322334
monitor_tables.freshness_is_training AS monitor_freshness_is_training,
323335
monitor_tables.volume_is_training AS monitor_volume_is_training,
324336
monitor_tables.metric_is_training AS monitor_metric_is_training,
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
WITH prev_run AS (
2+
SELECT id
3+
FROM test_runs
4+
WHERE test_suite_id = :TEST_SUITE_ID ::UUID
5+
AND id <> :TEST_RUN_ID ::UUID
6+
AND status = 'Complete'
7+
ORDER BY test_starttime DESC
8+
LIMIT 1
9+
)
10+
SELECT DISTINCT tr.test_type, tr.table_name
11+
FROM test_results tr
12+
INNER JOIN prev_run ON tr.test_run_id = prev_run.id
13+
WHERE tr.result_status = 'Error'
14+
AND tr.auto_gen IS TRUE
15+
AND tr.test_type IN ('Freshness_Trend', 'Volume_Trend')

testgen/template/flavors/bigquery/gen_query_tests/gen_Freshness_Trend.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ FROM selected_tables s
180180
WHERE EXISTS (SELECT 1 FROM test_types WHERE test_type = 'Freshness_Trend' AND active = 'Y')
181181
-- Only insert if test type is included in generation set
182182
AND EXISTS (SELECT 1 FROM generation_sets WHERE test_type = 'Freshness_Trend' AND generation_set = :GENERATION_SET)
183+
{TABLE_FILTER}
183184

184185
-- Match "uix_td_autogen_table" unique index exactly
185186
ON CONFLICT (test_suite_id, test_type, schema_name, table_name)

testgen/template/flavors/databricks/gen_query_tests/gen_Freshness_Trend.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ FROM selected_tables s
180180
WHERE EXISTS (SELECT 1 FROM test_types WHERE test_type = 'Freshness_Trend' AND active = 'Y')
181181
-- Only insert if test type is included in generation set
182182
AND EXISTS (SELECT 1 FROM generation_sets WHERE test_type = 'Freshness_Trend' AND generation_set = :GENERATION_SET)
183+
{TABLE_FILTER}
183184

184185
-- Match "uix_td_autogen_table" unique index exactly
185186
ON CONFLICT (test_suite_id, test_type, schema_name, table_name)

testgen/template/flavors/mssql/gen_query_tests/gen_Freshness_Trend.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ FROM selected_tables s
180180
WHERE EXISTS (SELECT 1 FROM test_types WHERE test_type = 'Freshness_Trend' AND active = 'Y')
181181
-- Only insert if test type is included in generation set
182182
AND EXISTS (SELECT 1 FROM generation_sets WHERE test_type = 'Freshness_Trend' AND generation_set = :GENERATION_SET)
183+
{TABLE_FILTER}
183184

184185
-- Match "uix_td_autogen_table" unique index exactly
185186
ON CONFLICT (test_suite_id, test_type, schema_name, table_name)

testgen/template/gen_query_tests/gen_Freshness_Trend.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ FROM selected_tables s
178178
WHERE EXISTS (SELECT 1 FROM test_types WHERE test_type = 'Freshness_Trend' AND active = 'Y')
179179
-- Only insert if test type is included in generation set
180180
AND EXISTS (SELECT 1 FROM generation_sets WHERE test_type = 'Freshness_Trend' AND generation_set = :GENERATION_SET)
181+
{TABLE_FILTER}
181182

182183
-- Match "uix_td_autogen_table" unique index exactly
183184
ON CONFLICT (test_suite_id, test_type, schema_name, table_name)

testgen/template/gen_query_tests/gen_Volume_Trend.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ WHERE c.table_groups_id = :TABLE_GROUPS_ID ::UUID
2525
AND EXISTS (SELECT 1 FROM test_types WHERE test_type = 'Volume_Trend' AND active = 'Y')
2626
-- Only insert if test type is included in generation set
2727
AND EXISTS (SELECT 1 FROM generation_sets WHERE test_type = 'Volume_Trend' AND generation_set = :GENERATION_SET)
28+
{TABLE_FILTER}
2829

2930
-- Match "uix_td_autogen_table" unique index exactly
3031
ON CONFLICT (test_suite_id, test_type, schema_name, table_name)

0 commit comments

Comments
 (0)