Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 9 additions & 25 deletions tasks/process_flakes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Literal
from typing import Any, Literal, cast

from redis import Redis
from redis.exceptions import LockError
Expand All @@ -21,21 +21,12 @@
OLD_KEY = "flake_uploads:{}"


def get_redis_val(redis_client: Redis, repo_id: int) -> tuple[list[bytes], bool]:
commit_ids = redis_client.lpop(NEW_KEY.format(repo_id), 10)
def get_redis_val(redis_client: Redis, repo_id: int) -> list[bytes]:
commit_ids = cast(list[bytes], redis_client.lpop(NEW_KEY.format(repo_id), 10))
if commit_ids is None:
commit_ids = []

current_commit = False
with redis_client.pipeline() as pipe:
# can't use getdel because the value of the key is not a string
pipe.get(OLD_KEY.format(repo_id))
pipe.delete(OLD_KEY.format(repo_id))
commit_id = pipe.execute()
if commit_id[0] is not None:
current_commit = True

return commit_ids, current_commit
return commit_ids


class ProcessFlakesTask(BaseCodecovTask, name=process_flakes_task_name):
Expand All @@ -48,7 +39,6 @@ def run_impl(
_db_session: Session,
*,
repo_id: int,
commit_id: str,
impl_type: Literal["old", "new", "both"] = "old",
**kwargs: Any,
):
Expand All @@ -75,10 +65,7 @@ def run_impl(
The locking scheme is set up such that no upload will be unprocessed. Before queuing up the process flakes task the
test results finisher and sync pulls tasks will set the flake_uploads key in redis for that repo.
"""
log.info(
"Received process flakes task",
extra=dict(repoid=repo_id, commit=commit_id),
)
log.info("Received process flakes task")

if impl_type == "new" or impl_type == "both":
process_flakes_for_repo(repo_id)
Expand All @@ -97,15 +84,12 @@ def run_impl(
blocking_timeout=3,
):
while True:
commit_ids, current_commit = get_redis_val(redis_client, repo_id)
if not commit_ids and not current_commit:
commit_shas = get_redis_val(redis_client, repo_id)
if not commit_shas:
break

for commitid in commit_ids:
process_func(repo_id, commitid.decode())

if current_commit:
process_func(repo_id, commit_id)
for commit_sha in commit_shas:
process_func(repo_id, commit_sha.decode())

except LockError:
log.warning("Unable to acquire process flakeslock for key %s.", lock_name)
Expand Down
Loading