From 139138ef2bd2d06e61a830d41ae37192d2adeac1 Mon Sep 17 00:00:00 2001 From: "sentry[bot]" <39604003+sentry[bot]@users.noreply.github.com> Date: Sat, 18 Apr 2026 16:30:56 +0000 Subject: [PATCH] perf(worker): Bulk fetch and update testruns in flake processing --- .../test_analytics/ta_process_flakes.py | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/apps/worker/services/test_analytics/ta_process_flakes.py b/apps/worker/services/test_analytics/ta_process_flakes.py index 61cf26df09..2d0d07f9de 100644 --- a/apps/worker/services/test_analytics/ta_process_flakes.py +++ b/apps/worker/services/test_analytics/ta_process_flakes.py @@ -33,12 +33,11 @@ def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]: } -def get_testruns(upload: ReportSession) -> QuerySet[Testrun]: - upload_filter = Q(upload_id=upload.id) - +def get_testruns(upload_ids: list[int]) -> QuerySet[Testrun]: # we won't process flakes for testruns older than 1 day return Testrun.objects.filter( - Q(timestamp__gte=timezone.now() - timedelta(days=1)) & upload_filter + Q(timestamp__gte=timezone.now() - timedelta(days=1)) + & Q(upload_id__in=upload_ids) ).order_by("timestamp") @@ -81,10 +80,8 @@ def handle_failure( @sentry_sdk.trace def process_single_upload( - upload: ReportSession, curr_flakes: dict[bytes, Flake], repo_id: int + testruns: list[Testrun], curr_flakes: dict[bytes, Flake], repo_id: int ): - testruns = get_testruns(upload) - for testrun in testruns: test_id = bytes(testrun.test_id) match testrun.outcome: @@ -98,19 +95,18 @@ def process_single_upload( case _: continue - Testrun.objects.bulk_update(testruns, ["outcome"]) - @sentry_sdk.trace def process_flakes_for_commit(repo_id: int, commit_id: str): log.info( "process_flakes_for_commit: starting processing", ) - uploads = get_relevant_uploads(repo_id, commit_id) + uploads = list(get_relevant_uploads(repo_id, commit_id)) + upload_ids = [upload.id for upload in uploads] log.info( "process_flakes_for_commit: fetched uploads", - extra={"uploads": [upload.id for upload in uploads]}, + extra={"uploads": upload_ids}, ) curr_flakes = fetch_current_flakes(repo_id) @@ -120,8 +116,17 @@ def process_flakes_for_commit(repo_id: int, commit_id: str): extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]}, ) + # Fetch all testruns for all uploads in a single query to avoid N+1 + all_testruns = list(get_testruns(upload_ids)) + + # Group testruns by upload_id + testruns_by_upload: dict[int, list[Testrun]] = {uid: [] for uid in upload_ids} + for testrun in all_testruns: + if testrun.upload_id in testruns_by_upload: + testruns_by_upload[testrun.upload_id].append(testrun) + for upload in uploads: - process_single_upload(upload, curr_flakes, repo_id) + process_single_upload(testruns_by_upload[upload.id], curr_flakes, repo_id) log.info( "process_flakes_for_commit: processed upload", extra={"upload": upload.id}, @@ -139,6 +144,9 @@ def process_flakes_for_commit(repo_id: int, commit_id: str): update_fields=["end_date", "count", "recent_passes_count", "fail_count"], ) + # Bulk update all testrun outcomes in a single query + Testrun.objects.bulk_update(all_testruns, ["outcome"]) + @sentry_sdk.trace def process_flakes_for_repo(repo_id: int):