diff --git a/apps/worker/services/test_analytics/ta_process_flakes.py b/apps/worker/services/test_analytics/ta_process_flakes.py index 61cf26df09..25d5afeeca 100644 --- a/apps/worker/services/test_analytics/ta_process_flakes.py +++ b/apps/worker/services/test_analytics/ta_process_flakes.py @@ -102,24 +102,19 @@ def process_single_upload( @sentry_sdk.trace -def process_flakes_for_commit(repo_id: int, commit_id: str): +def process_flakes_for_commit( + repo_id: int, commit_id: str, curr_flakes: dict[bytes, Flake] +): log.info( "process_flakes_for_commit: starting processing", ) - uploads = get_relevant_uploads(repo_id, commit_id) + uploads = list(get_relevant_uploads(repo_id, commit_id)) log.info( "process_flakes_for_commit: fetched uploads", extra={"uploads": [upload.id for upload in uploads]}, ) - curr_flakes = fetch_current_flakes(repo_id) - - log.info( - "process_flakes_for_commit: fetched current flakes", - extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]}, - ) - for upload in uploads: process_single_upload(upload, curr_flakes, repo_id) log.info( @@ -127,18 +122,6 @@ def process_flakes_for_commit(repo_id: int, commit_id: str): extra={"upload": upload.id}, ) - log.info( - "process_flakes_for_commit: bulk creating flakes", - extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]}, - ) - - Flake.objects.bulk_create( - curr_flakes.values(), - update_conflicts=True, - unique_fields=["id"], - update_fields=["end_date", "count", "recent_passes_count", "fail_count"], - ) - @sentry_sdk.trace def process_flakes_for_repo(repo_id: int): @@ -147,10 +130,30 @@ def process_flakes_for_repo(repo_id: int): key_name = KEY_NAME.format(repo_id) try: with redis_client.lock(lock_name, timeout=300, blocking_timeout=3): + curr_flakes = fetch_current_flakes(repo_id) + + log.info( + "process_flakes_for_repo: fetched current flakes", + extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]}, + ) + while commit_ids := redis_client.lpop(key_name, 10): for commit_id in commit_ids: with process_flakes_summary.labels("new").time(): - process_flakes_for_commit(repo_id, commit_id.decode()) + process_flakes_for_commit(repo_id, commit_id.decode(), curr_flakes) + + log.info( + "process_flakes_for_repo: bulk creating flakes", + extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]}, + ) + + Flake.objects.bulk_create( + curr_flakes.values(), + update_conflicts=True, + unique_fields=["id"], + update_fields=["end_date", "count", "recent_passes_count", "fail_count"], + ) + return True except LockError: log.warning("Failed to acquire lock for repo %s", repo_id)