From 4373f12e16036b908fbdaf64b5c6c929a54dfd8a Mon Sep 17 00:00:00 2001 From: "sentry[bot]" <39604003+sentry[bot]@users.noreply.github.com> Date: Mon, 20 Apr 2026 17:09:21 +0000 Subject: [PATCH] perf(worker): Batch fetch testruns for flake processing --- .../test_analytics/ta_process_flakes.py | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/apps/worker/services/test_analytics/ta_process_flakes.py b/apps/worker/services/test_analytics/ta_process_flakes.py index 61cf26df09..4db91b4ae8 100644 --- a/apps/worker/services/test_analytics/ta_process_flakes.py +++ b/apps/worker/services/test_analytics/ta_process_flakes.py @@ -1,4 +1,5 @@ import logging +from collections import defaultdict from datetime import timedelta import sentry_sdk @@ -33,14 +34,20 @@ def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]: } -def get_testruns(upload: ReportSession) -> QuerySet[Testrun]: - upload_filter = Q(upload_id=upload.id) - +def get_testruns_for_uploads( + upload_ids: list[int], +) -> dict[int, list[Testrun]]: # we won't process flakes for testruns older than 1 day - return Testrun.objects.filter( - Q(timestamp__gte=timezone.now() - timedelta(days=1)) & upload_filter + testruns = Testrun.objects.filter( + Q(timestamp__gte=timezone.now() - timedelta(days=1)) + & Q(upload_id__in=upload_ids) ).order_by("timestamp") + grouped: dict[int, list[Testrun]] = defaultdict(list) + for testrun in testruns: + grouped[testrun.upload_id].append(testrun) + return grouped + def handle_pass(curr_flakes: dict[bytes, Flake], test_id: bytes): # possible that we expire it and stop caring about it @@ -81,10 +88,11 @@ def handle_failure( @sentry_sdk.trace def process_single_upload( - upload: ReportSession, curr_flakes: dict[bytes, Flake], repo_id: int + upload: ReportSession, + curr_flakes: dict[bytes, Flake], + repo_id: int, + testruns: list[Testrun], ): - testruns = get_testruns(upload) - for testrun in testruns: test_id = bytes(testrun.test_id) match testrun.outcome: @@ -106,7 +114,7 @@ def process_flakes_for_commit(repo_id: int, commit_id: str): log.info( "process_flakes_for_commit: starting processing", ) - uploads = get_relevant_uploads(repo_id, commit_id) + uploads = list(get_relevant_uploads(repo_id, commit_id)) log.info( "process_flakes_for_commit: fetched uploads", @@ -120,8 +128,12 @@ def process_flakes_for_commit(repo_id: int, commit_id: str): extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]}, ) + upload_ids = [upload.id for upload in uploads] + testruns_by_upload = get_testruns_for_uploads(upload_ids) + for upload in uploads: - process_single_upload(upload, curr_flakes, repo_id) + testruns = testruns_by_upload.get(upload.id, []) + process_single_upload(upload, curr_flakes, repo_id, testruns) log.info( "process_flakes_for_commit: processed upload", extra={"upload": upload.id},