Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions apps/worker/services/test_analytics/ta_process_flakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]:
}


def get_testruns(upload: ReportSession) -> QuerySet[Testrun]:
upload_filter = Q(upload_id=upload.id)

def get_testruns_for_uploads(upload_ids: list[int]) -> QuerySet[Testrun]:
# we won't process flakes for testruns older than 1 day
return Testrun.objects.filter(
Q(timestamp__gte=timezone.now() - timedelta(days=1)) & upload_filter
Q(timestamp__gte=timezone.now() - timedelta(days=1))
& Q(upload_id__in=upload_ids)
).order_by("timestamp")


Expand Down Expand Up @@ -81,10 +80,8 @@ def handle_failure(

@sentry_sdk.trace
def process_single_upload(
upload: ReportSession, curr_flakes: dict[bytes, Flake], repo_id: int
testruns: list[Testrun], curr_flakes: dict[bytes, Flake], repo_id: int
):
testruns = get_testruns(upload)

for testrun in testruns:
test_id = bytes(testrun.test_id)
match testrun.outcome:
Expand All @@ -98,19 +95,18 @@ def process_single_upload(
case _:
continue

Testrun.objects.bulk_update(testruns, ["outcome"])


@sentry_sdk.trace
def process_flakes_for_commit(repo_id: int, commit_id: str):
log.info(
"process_flakes_for_commit: starting processing",
)
uploads = get_relevant_uploads(repo_id, commit_id)
uploads = list(get_relevant_uploads(repo_id, commit_id))
upload_ids = [upload.id for upload in uploads]

log.info(
"process_flakes_for_commit: fetched uploads",
extra={"uploads": [upload.id for upload in uploads]},
extra={"uploads": upload_ids},
)

curr_flakes = fetch_current_flakes(repo_id)
Expand All @@ -120,13 +116,23 @@ def process_flakes_for_commit(repo_id: int, commit_id: str):
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

# Fetch all testruns for all uploads in a single query, then group by upload_id
all_testruns = list(get_testruns_for_uploads(upload_ids))
testruns_by_upload: dict[int, list[Testrun]] = {uid: [] for uid in upload_ids}
for testrun in all_testruns:
if testrun.upload_id in testruns_by_upload:
testruns_by_upload[testrun.upload_id].append(testrun)

for upload in uploads:
process_single_upload(upload, curr_flakes, repo_id)
process_single_upload(testruns_by_upload[upload.id], curr_flakes, repo_id)
log.info(
"process_flakes_for_commit: processed upload",
extra={"upload": upload.id},
)

# Bulk-update all testruns whose outcome may have been changed to "flaky_fail"
Testrun.objects.bulk_update(all_testruns, ["outcome"])
Comment on lines 128 to +134
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: An exception during the upload processing loop will cause all previously processed data from that batch to be lost, as database updates now only occur after the entire loop finishes.
Severity: MEDIUM

Suggested Fix

To restore the previous fault-tolerant behavior, move the Testrun.objects.bulk_update and Flake.objects.bulk_create calls back inside the for loop that iterates through uploads. This could be done by collecting testruns and flakes per-upload and saving them at the end of each loop iteration, or by wrapping each iteration in a transaction.atomic() block to ensure atomicity per upload.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: apps/worker/services/test_analytics/ta_process_flakes.py#L128-L134

Potential issue: The refactoring moves the `Testrun.objects.bulk_update` and
`Flake.objects.bulk_create` calls from inside the per-upload processing loop to after
the loop completes. In the original code, if an exception occurred while processing one
upload, the results from previously completed uploads in the same batch were already
persisted. In the new code, if any exception occurs at any point within the loop over
uploads, the entire operation is aborted, and all in-memory changes to `Testrun` objects
and newly created `Flake` objects from preceding, successfully processed uploads are
discarded and never written to the database. While the likelihood of an exception in
`process_single_upload` is low, this change represents a regression in fault tolerance.

Did we get this right? 👍 / 👎 to inform future reviews.


log.info(
"process_flakes_for_commit: bulk creating flakes",
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
Expand Down
Loading