diff --git a/tasks/process_flakes.py b/tasks/process_flakes.py index 8fbfdec99..8edf0fe1c 100644 --- a/tasks/process_flakes.py +++ b/tasks/process_flakes.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Literal +from typing import Any, Literal, cast from redis import Redis from redis.exceptions import LockError @@ -21,21 +21,12 @@ OLD_KEY = "flake_uploads:{}" -def get_redis_val(redis_client: Redis, repo_id: int) -> tuple[list[bytes], bool]: - commit_ids = redis_client.lpop(NEW_KEY.format(repo_id), 10) +def get_redis_val(redis_client: Redis, repo_id: int) -> list[bytes]: + commit_ids = cast(list[bytes], redis_client.lpop(NEW_KEY.format(repo_id), 10)) if commit_ids is None: commit_ids = [] - current_commit = False - with redis_client.pipeline() as pipe: - # can't use getdel because the value of the key is not a string - pipe.get(OLD_KEY.format(repo_id)) - pipe.delete(OLD_KEY.format(repo_id)) - commit_id = pipe.execute() - if commit_id[0] is not None: - current_commit = True - - return commit_ids, current_commit + return commit_ids class ProcessFlakesTask(BaseCodecovTask, name=process_flakes_task_name): @@ -48,7 +39,6 @@ def run_impl( _db_session: Session, *, repo_id: int, - commit_id: str, impl_type: Literal["old", "new", "both"] = "old", **kwargs: Any, ): @@ -75,10 +65,7 @@ def run_impl( The locking scheme is set up such that no upload will be unprocessed. Before queuing up the process flakes task the test results finisher and sync pulls tasks will set the flake_uploads key in redis for that repo. """ - log.info( - "Received process flakes task", - extra=dict(repoid=repo_id, commit=commit_id), - ) + log.info("Received process flakes task") if impl_type == "new" or impl_type == "both": process_flakes_for_repo(repo_id) @@ -97,15 +84,12 @@ def run_impl( blocking_timeout=3, ): while True: - commit_ids, current_commit = get_redis_val(redis_client, repo_id) - if not commit_ids and not current_commit: + commit_shas = get_redis_val(redis_client, repo_id) + if not commit_shas: break - for commitid in commit_ids: - process_func(repo_id, commitid.decode()) - - if current_commit: - process_func(repo_id, commit_id) + for commit_sha in commit_shas: + process_func(repo_id, commit_sha.decode()) except LockError: log.warning("Unable to acquire process flakeslock for key %s.", lock_name)