Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions apps/worker/services/test_analytics/ta_process_flakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,43 +102,26 @@ def process_single_upload(


@sentry_sdk.trace
def process_flakes_for_commit(repo_id: int, commit_id: str):
def process_flakes_for_commit(
repo_id: int, commit_id: str, curr_flakes: dict[bytes, Flake]
):
log.info(
"process_flakes_for_commit: starting processing",
)
uploads = get_relevant_uploads(repo_id, commit_id)
uploads = list(get_relevant_uploads(repo_id, commit_id))

log.info(
"process_flakes_for_commit: fetched uploads",
extra={"uploads": [upload.id for upload in uploads]},
)

curr_flakes = fetch_current_flakes(repo_id)

log.info(
"process_flakes_for_commit: fetched current flakes",
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

for upload in uploads:
process_single_upload(upload, curr_flakes, repo_id)
log.info(
"process_flakes_for_commit: processed upload",
extra={"upload": upload.id},
)

log.info(
"process_flakes_for_commit: bulk creating flakes",
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

Flake.objects.bulk_create(
curr_flakes.values(),
update_conflicts=True,
unique_fields=["id"],
update_fields=["end_date", "count", "recent_passes_count", "fail_count"],
)


@sentry_sdk.trace
def process_flakes_for_repo(repo_id: int):
Expand All @@ -147,10 +130,30 @@ def process_flakes_for_repo(repo_id: int):
key_name = KEY_NAME.format(repo_id)
try:
with redis_client.lock(lock_name, timeout=300, blocking_timeout=3):
curr_flakes = fetch_current_flakes(repo_id)

log.info(
"process_flakes_for_repo: fetched current flakes",
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

while commit_ids := redis_client.lpop(key_name, 10):
for commit_id in commit_ids:
with process_flakes_summary.labels("new").time():
process_flakes_for_commit(repo_id, commit_id.decode())
process_flakes_for_commit(repo_id, commit_id.decode(), curr_flakes)

log.info(
"process_flakes_for_repo: bulk creating flakes",
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

Flake.objects.bulk_create(
curr_flakes.values(),
update_conflicts=True,
unique_fields=["id"],
update_fields=["end_date", "count", "recent_passes_count", "fail_count"],
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferred bulk_create loses all flake data on mid-processing error

Medium Severity

Moving bulk_create to after the while loop means that if any exception occurs during processing (e.g., a DB error in Testrun.objects.bulk_update or get_relevant_uploads), ALL accumulated flake changes across ALL lpop batches are lost. Meanwhile, testrun outcomes are already persisted per-upload via bulk_update in process_single_upload, and commit IDs are already removed from Redis by lpop — so they won't be reprocessed. Previously, flakes were persisted after each commit, so only the failing commit's data was at risk. The sibling implementation in detect_flakes.py handles this correctly by calling bulk_create per iteration inside transaction.atomic().

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 6fff16b. Configure here.


return True
except LockError:
log.warning("Failed to acquire lock for repo %s", repo_id)
Expand Down
Loading