Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions apps/worker/services/test_analytics/ta_process_flakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]:
}


def get_testruns(upload: ReportSession) -> QuerySet[Testrun]:
upload_filter = Q(upload_id=upload.id)

def get_testruns(upload_ids: list[int]) -> QuerySet[Testrun]:
# we won't process flakes for testruns older than 1 day
return Testrun.objects.filter(
Q(timestamp__gte=timezone.now() - timedelta(days=1)) & upload_filter
Q(timestamp__gte=timezone.now() - timedelta(days=1))
& Q(upload_id__in=upload_ids)
).order_by("timestamp")


Expand Down Expand Up @@ -81,10 +80,8 @@ def handle_failure(

@sentry_sdk.trace
def process_single_upload(
upload: ReportSession, curr_flakes: dict[bytes, Flake], repo_id: int
testruns: list[Testrun], curr_flakes: dict[bytes, Flake], repo_id: int
):
testruns = get_testruns(upload)

for testrun in testruns:
test_id = bytes(testrun.test_id)
match testrun.outcome:
Expand All @@ -98,19 +95,18 @@ def process_single_upload(
case _:
continue

Testrun.objects.bulk_update(testruns, ["outcome"])


@sentry_sdk.trace
def process_flakes_for_commit(repo_id: int, commit_id: str):
log.info(
"process_flakes_for_commit: starting processing",
)
uploads = get_relevant_uploads(repo_id, commit_id)
uploads = list(get_relevant_uploads(repo_id, commit_id))
upload_ids = [upload.id for upload in uploads]

log.info(
"process_flakes_for_commit: fetched uploads",
extra={"uploads": [upload.id for upload in uploads]},
extra={"uploads": upload_ids},
)

curr_flakes = fetch_current_flakes(repo_id)
Expand All @@ -120,8 +116,17 @@ def process_flakes_for_commit(repo_id: int, commit_id: str):
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

# Fetch all testruns for all uploads in a single query to avoid N+1
all_testruns = list(get_testruns(upload_ids))

# Group testruns by upload_id
testruns_by_upload: dict[int, list[Testrun]] = {uid: [] for uid in upload_ids}
for testrun in all_testruns:
if testrun.upload_id in testruns_by_upload:
testruns_by_upload[testrun.upload_id].append(testrun)

for upload in uploads:
process_single_upload(upload, curr_flakes, repo_id)
process_single_upload(testruns_by_upload[upload.id], curr_flakes, repo_id)
log.info(
"process_flakes_for_commit: processed upload",
extra={"upload": upload.id},
Expand All @@ -139,6 +144,9 @@ def process_flakes_for_commit(repo_id: int, commit_id: str):
update_fields=["end_date", "count", "recent_passes_count", "fail_count"],
)

# Bulk update all testrun outcomes in a single query
Testrun.objects.bulk_update(all_testruns, ["outcome"])
Comment on lines +147 to +148
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The bulk_create for Flake and bulk_update for Testrun are not in an atomic transaction, risking data inconsistency if the process fails between them.
Severity: CRITICAL

Suggested Fix

Wrap the Flake.objects.bulk_create and Testrun.objects.bulk_update calls within a single atomic transaction by adding the @transaction.atomic() decorator to the process_flakes_for_commit function. This ensures that both database operations either complete successfully together or are both rolled back in case of a failure.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: apps/worker/services/test_analytics/ta_process_flakes.py#L147-L148

Potential issue: The function `process_flakes_for_commit` performs a
`Flake.objects.bulk_create` followed by a `Testrun.objects.bulk_update` without wrapping
them in a database transaction. If the worker process terminates between these two
operations, for instance due to a timeout or out-of-memory error, the `Flake` records
will be updated, but the `Testrun` outcomes will not be changed to "flaky_fail". This
results in a permanent data inconsistency between the two tables. Furthermore,
re-running the process after such a failure would cause data corruption by
double-counting flakes, as the logic is not idempotent.

Did we get this right? 👍 / 👎 to inform future reviews.



@sentry_sdk.trace
def process_flakes_for_repo(repo_id: int):
Expand Down
Loading