Skip to content

Commit 5270171

Browse files
author
ci bot
committed
Merge branch 'aarthy/fixes' into 'enterprise'
fix(monitors): handle errored monitors See merge request dkinternal/testgen/dataops-testgen!407
2 parents beb6749 + d2ac510 commit 5270171

25 files changed

Lines changed: 216 additions & 57 deletions

testgen/__main__.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from testgen.commands.run_test_execution import run_test_execution
3333
from testgen.commands.run_test_metadata_exporter import run_test_metadata_exporter
3434
from testgen.commands.run_upgrade_db_config import get_schema_revision, is_db_revision_up_to_date, run_upgrade_db_config
35-
from testgen.commands.test_generation import run_test_generation
35+
from testgen.commands.test_generation import run_monitor_generation, run_test_generation
3636
from testgen.common import (
3737
configure_logging,
3838
display_service,
@@ -176,6 +176,20 @@ def run_generation(test_suite_id: str | None = None, table_group_id: str | None
176176
click.echo("\n" + message)
177177

178178

179+
@cli.command("run-monitor-generation", help="Generates or refreshes the monitors for a table group.")
180+
@click.option(
181+
"-t",
182+
"--test-suite-id",
183+
required=True,
184+
type=click.STRING,
185+
help="ID of the monitor suite to generate",
186+
)
187+
@with_database_session
188+
def generate_monitors(test_suite_id: str):
189+
click.echo(f"run-monitor-generation for suite: {test_suite_id}")
190+
run_monitor_generation(test_suite_id, ["Freshness_Trend", "Volume_Trend", "Schema_Drift"])
191+
192+
179193
@register_scheduler_job
180194
@cli.command("run-tests", help="Performs tests defined for a test suite.")
181195
@click.option(

testgen/commands/queries/execute_tests_query.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ def has_schema_changes(self) -> tuple[dict]:
180180
# Runs on App database
181181
return self._get_query("has_schema_changes.sql")
182182

183+
def get_errored_autogen_monitors(self) -> tuple[str, dict]:
184+
# Runs on App database
185+
return self._get_query("get_errored_autogen_monitors.sql")
186+
183187
def get_active_test_definitions(self) -> tuple[dict]:
184188
# Runs on App database
185189
return self._get_query("get_active_test_definitions.sql")

testgen/commands/run_test_execution.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import subprocess
33
import threading
4+
from collections import defaultdict
45
from datetime import UTC, datetime, timedelta
56
from functools import partial
67
from typing import Literal
@@ -86,12 +87,7 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
8687
sql_generator = TestExecutionSQL(connection, table_group, test_run)
8788

8889
if test_suite.is_monitor:
89-
schema_changes = fetch_dict_from_db(*sql_generator.has_schema_changes())[0]
90-
if schema_changes["has_table_drops"]:
91-
run_monitor_generation(test_suite_id, ["Freshness_Trend", "Volume_Trend", "Metric_Trend"], mode="delete")
92-
if schema_changes["has_table_adds"]:
93-
# Freshness monitors will be inserted after profiling
94-
run_monitor_generation(test_suite_id, ["Volume_Trend"], mode="insert")
90+
_sync_monitor_definitions(sql_generator)
9591

9692
# Update the thresholds before retrieving the test definitions in the next steps
9793
LOG.info("Updating test thresholds based on history calculations")
@@ -190,6 +186,26 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
190186
"""
191187

192188

189+
def _sync_monitor_definitions(sql_generator: TestExecutionSQL) -> None:
190+
test_suite_id = sql_generator.test_run.test_suite_id
191+
192+
schema_changes = fetch_dict_from_db(*sql_generator.has_schema_changes())[0]
193+
if schema_changes["has_table_drops"]:
194+
run_monitor_generation(test_suite_id, ["Freshness_Trend", "Volume_Trend", "Metric_Trend"], mode="delete")
195+
if schema_changes["has_table_adds"]:
196+
# Freshness monitors will be inserted after profiling
197+
run_monitor_generation(test_suite_id, ["Volume_Trend"], mode="insert")
198+
199+
# Regenerate monitors that errored in previous run
200+
errored_monitors = fetch_dict_from_db(*sql_generator.get_errored_autogen_monitors())
201+
if errored_monitors:
202+
errored_by_type: dict[str, list[str]] = defaultdict(list)
203+
for row in errored_monitors:
204+
errored_by_type[row["test_type"]].append(row["table_name"])
205+
for test_type, table_names in errored_by_type.items():
206+
run_monitor_generation(test_suite_id, [test_type], mode="upsert", table_names=table_names)
207+
208+
193209
def _run_tests(
194210
sql_generator: TestExecutionSQL,
195211
run_type: Literal["QUERY", "METADATA"],

testgen/commands/test_generation.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def run_monitor_generation(
7272
monitor_suite_id: str | UUID,
7373
monitors: list[MonitorTestType],
7474
mode: MonitorGenerationMode = "upsert",
75+
table_names: list[str] | None = None,
7576
) -> None:
7677
"""
7778
Modes:
@@ -90,7 +91,7 @@ def run_monitor_generation(
9091
table_group = TableGroup.get(monitor_suite.table_groups_id)
9192
connection = Connection.get(table_group.connection_id)
9293

93-
TestGeneration(connection, table_group, monitor_suite, "Monitor", monitors).monitor_run(mode)
94+
TestGeneration(connection, table_group, monitor_suite, "Monitor", monitors).monitor_run(mode, table_names=table_names)
9495

9596

9697
class TestGeneration:
@@ -123,16 +124,19 @@ def run(self) -> None:
123124
self._get_query("delete_stale_autogen_tests.sql"),
124125
])
125126

126-
def monitor_run(self, mode: MonitorGenerationMode) -> None:
127+
def monitor_run(self, mode: MonitorGenerationMode, table_names: list[str] | None = None) -> None:
127128
if mode == "delete":
128129
execute_db_queries([self._get_query("delete_stale_monitors.sql")])
129130
return
130131

132+
extra_params = {"INSERT_ONLY": mode == "insert"}
133+
if table_names:
134+
table_list = ", ".join(f"'{table}'" for table in table_names)
135+
extra_params["TABLE_FILTER"] = f"AND table_name IN ({table_list})"
136+
131137
LOG.info("Running monitor generation queries")
132138
execute_db_queries(
133-
self._get_generation_queries(
134-
extra_params={"INSERT_ONLY": mode == "insert"},
135-
),
139+
self._get_generation_queries(extra_params=extra_params),
136140
)
137141

138142
def _get_generation_queries(self, extra_params: dict | None = None) -> list[tuple[str, dict]]:
@@ -188,6 +192,7 @@ def _get_params(self, test_type: TestTypeParams | None = None) -> dict:
188192
"SQL_FLAVOR": self.flavor,
189193
"QUOTE": self.flavor_service.quote_character,
190194
"INSERT_ONLY": False,
195+
"TABLE_FILTER": "",
191196
})
192197
return params
193198

testgen/common/models/table_group.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ class TableGroupSummary(EntityMinimal):
7171
monitor_schema_anomalies: int | None
7272
monitor_volume_anomalies: int | None
7373
monitor_metric_anomalies: int | None
74+
monitor_freshness_has_errors: bool | None
75+
monitor_volume_has_errors: bool | None
76+
monitor_schema_has_errors: bool | None
77+
monitor_metric_has_errors: bool | None
7478
monitor_freshness_is_training: bool | None
7579
monitor_volume_is_training: bool | None
7680
monitor_metric_is_training: bool | None
@@ -266,6 +270,10 @@ def select_summary(cls, project_code: str, for_dashboard: bool = False) -> Itera
266270
SUM(CASE WHEN results.test_type = 'Schema_Drift' AND results.result_code = 0 THEN 1 ELSE 0 END) AS schema_anomalies,
267271
SUM(CASE WHEN results.test_type = 'Volume_Trend' AND results.result_code = 0 THEN 1 ELSE 0 END) AS volume_anomalies,
268272
SUM(CASE WHEN results.test_type = 'Metric_Trend' AND results.result_code = 0 THEN 1 ELSE 0 END) AS metric_anomalies,
273+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Freshness_Trend' AND ranked_test_runs.position = 1) AS freshness_has_errors,
274+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Volume_Trend' AND ranked_test_runs.position = 1) AS volume_has_errors,
275+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Schema_Drift' AND ranked_test_runs.position = 1) AS schema_has_errors,
276+
BOOL_OR(results.result_status = 'Error') FILTER (WHERE results.test_type = 'Metric_Trend' AND ranked_test_runs.position = 1) AS metric_has_errors,
269277
BOOL_AND(results.result_code = -1) FILTER (WHERE results.test_type = 'Freshness_Trend' AND ranked_test_runs.position = 1) AS freshness_is_training,
270278
BOOL_AND(results.result_code = -1) FILTER (WHERE results.test_type = 'Volume_Trend' AND ranked_test_runs.position = 1) AS volume_is_training,
271279
BOOL_AND(results.result_code = -1) FILTER (WHERE results.test_type = 'Metric_Trend' AND ranked_test_runs.position = 1) AS metric_is_training,
@@ -319,6 +327,10 @@ def select_summary(cls, project_code: str, for_dashboard: bool = False) -> Itera
319327
monitor_tables.schema_anomalies AS monitor_schema_anomalies,
320328
monitor_tables.volume_anomalies AS monitor_volume_anomalies,
321329
monitor_tables.metric_anomalies AS monitor_metric_anomalies,
330+
monitor_tables.freshness_has_errors AS monitor_freshness_has_errors,
331+
monitor_tables.volume_has_errors AS monitor_volume_has_errors,
332+
monitor_tables.schema_has_errors AS monitor_schema_has_errors,
333+
monitor_tables.metric_has_errors AS monitor_metric_has_errors,
322334
monitor_tables.freshness_is_training AS monitor_freshness_is_training,
323335
monitor_tables.volume_is_training AS monitor_volume_is_training,
324336
monitor_tables.metric_is_training AS monitor_metric_is_training,

testgen/common/notifications/monitor_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ def send_monitor_notifications(test_run: TestRun, result_list_ct=20):
230230
str(table_group.id),
231231
"&table_name_filter=" if table_name else "",
232232
table_name if table_name else "",
233+
"&source=email",
233234
)
234235
)
235236
try:

testgen/common/notifications/profiling_run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def send_profiling_run_notifications(profiling_run: ProfilingRun, result_list_ct
258258
return
259259

260260
profiling_run_issues_url = "".join(
261-
(PersistedSetting.get("BASE_URL", ""), "/profiling-runs:hygiene?run_id=", str(profiling_run.id))
261+
(PersistedSetting.get("BASE_URL", ""), "/profiling-runs:hygiene?run_id=", str(profiling_run.id), "&source=email")
262262
)
263263

264264
hygiene_issues_summary = []
@@ -304,7 +304,7 @@ def send_profiling_run_notifications(profiling_run: ProfilingRun, result_list_ct
304304
"id": str(profiling_run.id),
305305
"issues_url": profiling_run_issues_url,
306306
"results_url": "".join(
307-
(PersistedSetting.get("BASE_URL", ""), "/profiling-runs:results?run_id=", str(profiling_run.id))
307+
(PersistedSetting.get("BASE_URL", ""), "/profiling-runs:results?run_id=", str(profiling_run.id), "&source=email")
308308
),
309309
"start_time": profiling_run.profiling_starttime,
310310
"end_time": profiling_run.profiling_endtime,

testgen/common/notifications/score_drop.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ def send_score_drop_notifications(notification_data: list[tuple[ScoreDefinition,
180180
PersistedSetting.get("BASE_URL", ""),
181181
"/quality-dashboard:score-details?definition_id=",
182182
str(definition.id),
183+
"&source=email",
183184
)
184185
),
185186
"diff": context_diff,

testgen/common/notifications/test_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ def send_test_run_notifications(test_run: TestRun, result_list_ct=20, result_sta
328328
PersistedSetting.get("BASE_URL", ""),
329329
"/test-runs:results?run_id=",
330330
str(test_run.id),
331+
"&source=email",
331332
)
332333
)
333334

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
WITH prev_run AS (
2+
SELECT id
3+
FROM test_runs
4+
WHERE test_suite_id = :TEST_SUITE_ID ::UUID
5+
AND id <> :TEST_RUN_ID ::UUID
6+
AND status = 'Complete'
7+
ORDER BY test_starttime DESC
8+
LIMIT 1
9+
)
10+
SELECT DISTINCT tr.test_type, tr.table_name
11+
FROM test_results tr
12+
INNER JOIN prev_run ON tr.test_run_id = prev_run.id
13+
WHERE tr.result_status = 'Error'
14+
AND tr.auto_gen IS TRUE
15+
AND tr.test_type IN ('Freshness_Trend', 'Volume_Trend')

0 commit comments

Comments
 (0)