From b79b2e2ec59d6384328219d2840391ef251ea1b6 Mon Sep 17 00:00:00 2001 From: "sentry[bot]" <39604003+sentry[bot]@users.noreply.github.com> Date: Sun, 19 Apr 2026 14:36:55 +0000 Subject: [PATCH] perf(worker): Batch testrun queries for flake processing --- .../test_analytics/ta_process_flakes.py | 57 ++++++++----------- 1 file changed, 23 insertions(+), 34 deletions(-) diff --git a/apps/worker/services/test_analytics/ta_process_flakes.py b/apps/worker/services/test_analytics/ta_process_flakes.py index 61cf26df09..99ae999c7c 100644 --- a/apps/worker/services/test_analytics/ta_process_flakes.py +++ b/apps/worker/services/test_analytics/ta_process_flakes.py @@ -33,12 +33,11 @@ def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]: } -def get_testruns(upload: ReportSession) -> QuerySet[Testrun]: - upload_filter = Q(upload_id=upload.id) - +def get_testruns(upload_ids: list[int]) -> QuerySet[Testrun]: # we won't process flakes for testruns older than 1 day return Testrun.objects.filter( - Q(timestamp__gte=timezone.now() - timedelta(days=1)) & upload_filter + Q(timestamp__gte=timezone.now() - timedelta(days=1)) + & Q(upload_id__in=upload_ids) ).order_by("timestamp") @@ -79,38 +78,18 @@ def handle_failure( testrun.outcome = "flaky_fail" -@sentry_sdk.trace -def process_single_upload( - upload: ReportSession, curr_flakes: dict[bytes, Flake], repo_id: int -): - testruns = get_testruns(upload) - - for testrun in testruns: - test_id = bytes(testrun.test_id) - match testrun.outcome: - case "pass": - if test_id not in curr_flakes: - continue - - handle_pass(curr_flakes, test_id) - case "failure" | "flaky_fail" | "error": - handle_failure(curr_flakes, test_id, testrun, repo_id) - case _: - continue - - Testrun.objects.bulk_update(testruns, ["outcome"]) - - @sentry_sdk.trace def process_flakes_for_commit(repo_id: int, commit_id: str): log.info( "process_flakes_for_commit: starting processing", ) - uploads = get_relevant_uploads(repo_id, commit_id) + upload_ids = list( + get_relevant_uploads(repo_id, commit_id).values_list("id", flat=True) + ) log.info( "process_flakes_for_commit: fetched uploads", - extra={"uploads": [upload.id for upload in uploads]}, + extra={"uploads": upload_ids}, ) curr_flakes = fetch_current_flakes(repo_id) @@ -120,12 +99,22 @@ def process_flakes_for_commit(repo_id: int, commit_id: str): extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]}, ) - for upload in uploads: - process_single_upload(upload, curr_flakes, repo_id) - log.info( - "process_flakes_for_commit: processed upload", - extra={"upload": upload.id}, - ) + testruns = get_testruns(upload_ids) + + for testrun in testruns: + test_id = bytes(testrun.test_id) + match testrun.outcome: + case "pass": + if test_id not in curr_flakes: + continue + + handle_pass(curr_flakes, test_id) + case "failure" | "flaky_fail" | "error": + handle_failure(curr_flakes, test_id, testrun, repo_id) + case _: + continue + + Testrun.objects.bulk_update(testruns, ["outcome"]) log.info( "process_flakes_for_commit: bulk creating flakes",