Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions apps/worker/services/test_analytics/ta_process_flakes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections import defaultdict
from datetime import timedelta

import sentry_sdk
Expand Down Expand Up @@ -33,14 +34,21 @@ def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]:
}


def get_testruns(upload: ReportSession) -> QuerySet[Testrun]:
upload_filter = Q(upload_id=upload.id)

def get_testruns_for_uploads(
upload_ids: list[int],
) -> dict[int, list[Testrun]]:
# we won't process flakes for testruns older than 1 day
return Testrun.objects.filter(
Q(timestamp__gte=timezone.now() - timedelta(days=1)) & upload_filter
testruns = Testrun.objects.filter(
Q(timestamp__gte=timezone.now() - timedelta(days=1))
& Q(upload_id__in=upload_ids)
).order_by("timestamp")

testruns_by_upload: dict[int, list[Testrun]] = defaultdict(list)
for testrun in testruns:
testruns_by_upload[testrun.upload_id].append(testrun)

return testruns_by_upload


def handle_pass(curr_flakes: dict[bytes, Flake], test_id: bytes):
# possible that we expire it and stop caring about it
Expand Down Expand Up @@ -81,10 +89,11 @@ def handle_failure(

@sentry_sdk.trace
def process_single_upload(
upload: ReportSession, curr_flakes: dict[bytes, Flake], repo_id: int
upload: ReportSession,
curr_flakes: dict[bytes, Flake],
repo_id: int,
testruns: list[Testrun],
):
testruns = get_testruns(upload)

for testrun in testruns:
test_id = bytes(testrun.test_id)
match testrun.outcome:
Expand All @@ -98,15 +107,13 @@ def process_single_upload(
case _:
continue

Testrun.objects.bulk_update(testruns, ["outcome"])


@sentry_sdk.trace
def process_flakes_for_commit(repo_id: int, commit_id: str):
log.info(
"process_flakes_for_commit: starting processing",
)
uploads = get_relevant_uploads(repo_id, commit_id)
uploads = list(get_relevant_uploads(repo_id, commit_id))

log.info(
"process_flakes_for_commit: fetched uploads",
Expand All @@ -120,13 +127,21 @@ def process_flakes_for_commit(repo_id: int, commit_id: str):
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

upload_ids = [upload.id for upload in uploads]
testruns_by_upload = get_testruns_for_uploads(upload_ids)

all_testruns: list[Testrun] = []
for upload in uploads:
process_single_upload(upload, curr_flakes, repo_id)
testruns = testruns_by_upload.get(upload.id, [])
process_single_upload(upload, curr_flakes, repo_id, testruns)
all_testruns.extend(testruns)
log.info(
"process_flakes_for_commit: processed upload",
extra={"upload": upload.id},
)

Testrun.objects.bulk_update(all_testruns, ["outcome"])
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: An exception during the processing loop in process_flakes_for_commit will cause all pending testrun outcome updates for the commit to be lost, as the final bulk_update is no longer atomic per-upload.
Severity: MEDIUM

Suggested Fix

Wrap the processing for each upload within the main loop of process_flakes_for_commit in a with transaction.atomic(): block. This will ensure that database operations for each upload are treated as a single atomic unit, preventing partial updates and data inconsistency if an error occurs mid-process.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: apps/worker/services/test_analytics/ta_process_flakes.py#L143

Potential issue: The function `process_flakes_for_commit` was refactored to perform a
single `bulk_update` of testrun outcomes after processing all uploads for a commit.
However, the processing loop still contains individual database writes, such as
`flake.save()` within `handle_pass`. If a database exception occurs during one of these
individual writes, the function will exit prematurely. As a result, the final
`bulk_update` is never executed, causing all accumulated testrun outcome updates for
that commit to be lost. This can lead to data inconsistency, where a flake's state is
updated but the corresponding testrun outcome is not.

Did we get this right? 👍 / 👎 to inform future reviews.


log.info(
"process_flakes_for_commit: bulk creating flakes",
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
Expand Down
Loading