From 76a8a8ec2151ff3f9bf7de9c4d3178c505a7b7d1 Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Tue, 11 Feb 2025 16:47:34 -0500 Subject: [PATCH] feat: implement flake processing using timeseries models --- services/processing/flake_processing.py | 2 +- services/test_analytics/ta_process_flakes.py | 121 +++++++ ...ta_process_flakes__testrun_filters__0.json | 22 ++ .../tests/test_ta_process_flakes.py | 312 ++++++++++++++++++ tasks/process_flakes.py | 11 +- tasks/tests/unit/test_process_flakes.py | 48 ++- 6 files changed, 496 insertions(+), 20 deletions(-) create mode 100644 services/test_analytics/ta_process_flakes.py create mode 100644 services/test_analytics/tests/snapshots/ta_process_flakes__testrun_filters__0.json create mode 100644 services/test_analytics/tests/test_ta_process_flakes.py diff --git a/services/processing/flake_processing.py b/services/processing/flake_processing.py index 286890fde..b6fc40570 100644 --- a/services/processing/flake_processing.py +++ b/services/processing/flake_processing.py @@ -39,7 +39,7 @@ def process_flake_in_transaction( report__report_type=CommitReport.ReportType.TEST_RESULTS.value, report__commit__repository__repoid=repo_id, report__commit__commitid=commit_id, - state__in=["processed", "v2_finished"], + state__in=["processed"], ) curr_flakes = fetch_curr_flakes(repo_id) diff --git a/services/test_analytics/ta_process_flakes.py b/services/test_analytics/ta_process_flakes.py new file mode 100644 index 000000000..c8f9728e0 --- /dev/null +++ b/services/test_analytics/ta_process_flakes.py @@ -0,0 +1,121 @@ +import logging +from datetime import datetime + +from django.db import transaction +from django.db.models import Q, QuerySet +from redis.exceptions import LockError +from shared.django_apps.reports.models import CommitReport, ReportSession +from shared.django_apps.ta_timeseries.models import Testrun +from shared.django_apps.test_analytics.models import Flake + +from services.redis import get_redis_connection + +log = logging.getLogger(__name__) + +FAIL_FILTER = Q(outcome="failure") | Q(outcome="flaky_failure") | Q(outcome="error") + +LOCK_NAME = "ta_flake_lock:{}" +KEY_NAME = "ta_flake_key:{}" + + +def get_relevant_uploads(repo_id: int, commit_id: str) -> QuerySet[ReportSession]: + return ReportSession.objects.filter( + report__report_type=CommitReport.ReportType.TEST_RESULTS.value, + report__commit__repository__repoid=repo_id, + report__commit__commitid=commit_id, + state__in=["processed"], + ) + + +def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]: + return { + bytes(flake.test_id): flake for flake in Flake.objects.filter(repoid=repo_id) + } + + +def get_testruns( + upload: ReportSession, curr_flakes: dict[bytes, Flake] +) -> QuerySet[Testrun]: + upload_filter = Q(upload_id=upload.id) + flaky_pass_filter = Q(outcome="pass") & Q(test_id__in=curr_flakes.keys()) + return Testrun.objects.filter(upload_filter & (FAIL_FILTER | flaky_pass_filter)) + + +def handle_pass(curr_flakes: dict[bytes, Flake], test_id: bytes): + # possible that we expire it and stop caring about it + if test_id not in curr_flakes: + return + + curr_flakes[test_id].recent_passes_count += 1 + curr_flakes[test_id].count += 1 + if curr_flakes[test_id].recent_passes_count == 30: + curr_flakes[test_id].end_date = datetime.now() + curr_flakes[test_id].save() + del curr_flakes[test_id] + + +def handle_failure( + curr_flakes: dict[bytes, Flake], test_id: bytes, testrun: Testrun, repo_id: int +): + existing_flake = curr_flakes.get(test_id) + if existing_flake: + existing_flake.fail_count += 1 + existing_flake.count += 1 + existing_flake.recent_passes_count = 0 + else: + if testrun.outcome != "flaky_failure": + testrun.outcome = "flaky_failure" + new_flake = Flake( + repoid=repo_id, + test_id=test_id, + count=1, + fail_count=1, + recent_passes_count=0, + start_date=datetime.now(), + ) + curr_flakes[test_id] = new_flake + + +def process_flakes_for_commit(repo_id: int, commit_id: str): + uploads = get_relevant_uploads(repo_id, commit_id) + + curr_flakes = fetch_current_flakes(repo_id) + + for upload in uploads: + testruns = get_testruns(upload, curr_flakes) + + for testrun in testruns: + test_id = bytes(testrun.test_id) + match testrun.outcome: + case "pass": + handle_pass(curr_flakes, test_id) + case "failure" | "flaky_failure" | "error": + handle_failure(curr_flakes, test_id, testrun, repo_id) + case _: + continue + + Testrun.objects.bulk_update(testruns, ["outcome"]) + + Flake.objects.bulk_create( + curr_flakes.values(), + update_conflicts=True, + unique_fields=["id"], + update_fields=["end_date", "count", "recent_passes_count", "fail_count"], + ) + + transaction.commit() + + +def process_flakes_for_repo(repo_id: int): + redis_client = get_redis_connection() + lock_name = LOCK_NAME.format(repo_id) + key_name = KEY_NAME.format(repo_id) + try: + with redis_client.lock(lock_name, timeout=300, blocking_timeout=3): + while commit_ids := redis_client.lpop(key_name, 10): + for commit_id in commit_ids: + process_flakes_for_commit(repo_id, commit_id.decode()) + return True + except LockError: + log.warning("Failed to acquire lock for repo %s", repo_id) + return False diff --git a/services/test_analytics/tests/snapshots/ta_process_flakes__testrun_filters__0.json b/services/test_analytics/tests/snapshots/ta_process_flakes__testrun_filters__0.json new file mode 100644 index 000000000..2a136b9c4 --- /dev/null +++ b/services/test_analytics/tests/snapshots/ta_process_flakes__testrun_filters__0.json @@ -0,0 +1,22 @@ +{ + "test1": { + "count": 6, + "fail_count": 2, + "recent_passes_count": 1 + }, + "test3": { + "count": 1, + "fail_count": 1, + "recent_passes_count": 0 + }, + "test4": { + "count": 1, + "fail_count": 1, + "recent_passes_count": 0 + }, + "test5": { + "count": 1, + "fail_count": 1, + "recent_passes_count": 0 + } +} diff --git a/services/test_analytics/tests/test_ta_process_flakes.py b/services/test_analytics/tests/test_ta_process_flakes.py new file mode 100644 index 000000000..ef9356fd0 --- /dev/null +++ b/services/test_analytics/tests/test_ta_process_flakes.py @@ -0,0 +1,312 @@ +from typing import TypedDict + +import pytest +from django.utils import timezone +from shared.django_apps.reports.models import CommitReport, ReportSession +from shared.django_apps.reports.tests.factories import CommitReportFactory +from shared.django_apps.ta_timeseries.models import Testrun +from shared.django_apps.test_analytics.models import Flake + +from services.redis import get_redis_connection +from services.test_analytics.ta_process_flakes import KEY_NAME, process_flakes_for_repo + + +class TestrunData(TypedDict): + test_id: str + outcome: str + + +class UploadData(TypedDict): + state: str + testruns: list[TestrunData] + + +class FlakeDataRequired(TypedDict): + test_id: str + + +class FlakeDataOptional(FlakeDataRequired, total=False): + count: int + fail_count: int + recent_passes_count: int + start_date: timezone.datetime + end_date: timezone.datetime | None + + +class SetupResult(TypedDict): + repoid: int + commitid: str + + +pytestmark = pytest.mark.django_db( + databases=["default", "ta_timeseries"], transaction=True +) + + +@pytest.fixture +def setup_test_data(db): + def _create_test_data( + uploads: list[UploadData], + existing_flakes: list[FlakeDataOptional], + ) -> SetupResult: + report = CommitReportFactory( + report_type=CommitReport.ReportType.TEST_RESULTS.value, + ) + report.save() + repo_id = report.commit.repository.repoid + commit_id = report.commit.commitid + + redis = get_redis_connection() + redis.lpush(KEY_NAME.format(repo_id), commit_id) + + sessions = [] + testruns = [] + for upload in uploads: + session = ReportSession.objects.create( + report=report, + state=upload["state"], + ) + sessions.append(session) + + for testrun_data in upload.get("testruns", []): + testrun = Testrun.objects.create( + timestamp=timezone.now(), + test_id=testrun_data["test_id"].encode(), + outcome=testrun_data["outcome"], + repo_id=repo_id, + commit_sha=commit_id, + upload_id=session.id, + ) + testruns.append(testrun) + + created_flakes = [] + for flake_data in existing_flakes: + flake = Flake.objects.create( + repoid=repo_id, + test_id=flake_data["test_id"].encode(), + recent_passes_count=flake_data.get("recent_passes_count", 0), + count=flake_data.get("count", 0), + fail_count=flake_data.get("fail_count", 0), + start_date=flake_data.get("start_date", timezone.now()), + end_date=flake_data.get("end_date", None), + ) + created_flakes.append(flake) + + return { + "repoid": repo_id, + "commitid": commit_id, + } + + return _create_test_data + + +def test_process_flakes_valid_states_only(setup_test_data): + result = setup_test_data( + uploads=[ + { + "state": "processed", + "testruns": [{"test_id": "test1", "outcome": "failure"}], + }, + { + "state": "finished", + "testruns": [{"test_id": "test3", "outcome": "failure"}], + }, + { + "state": "started", + "testruns": [{"test_id": "test4", "outcome": "failure"}], + }, + ], + existing_flakes=[], + ) + + process_flakes_for_repo(result["repoid"]) + + assert Flake.objects.count() == 1 + + +def test_testrun_filters(setup_test_data, snapshot): + result = setup_test_data( + uploads=[ + { + "state": "processed", + "testruns": [ + {"test_id": "test1", "outcome": "pass"}, + {"test_id": "test2", "outcome": "pass"}, + {"test_id": "test3", "outcome": "failure"}, + {"test_id": "test4", "outcome": "flaky_failure"}, + {"test_id": "test5", "outcome": "error"}, + {"test_id": "test6", "outcome": "skip"}, + ], + } + ], + existing_flakes=[ + {"test_id": "test1", "count": 5, "fail_count": 2}, + ], + ) + + process_flakes_for_repo(result["repoid"]) + + flakes = { + bytes(flake.test_id).decode(): { + "count": flake.count, + "fail_count": flake.fail_count, + "recent_passes_count": flake.recent_passes_count, + } + for flake in Flake.objects.all() + } + + assert snapshot("json") == flakes + + +def test_update_existing_flakes(setup_test_data): + result = setup_test_data( + uploads=[ + { + "state": "processed", + "testruns": [ + {"test_id": "test1", "outcome": "pass"}, + {"test_id": "test1", "outcome": "failure"}, + ], + } + ], + existing_flakes=[ + { + "test_id": "test1", + "count": 5, + "fail_count": 2, + "recent_passes_count": 0, + }, + ], + ) + + process_flakes_for_repo(result["repoid"]) + + flake = Flake.objects.get(test_id=b"test1") + assert flake.count == 7 + assert flake.fail_count == 3 + assert flake.recent_passes_count == 0 + + +def test_create_new_flakes(setup_test_data): + result = setup_test_data( + uploads=[ + { + "state": "processed", + "testruns": [ + {"test_id": "test1", "outcome": "failure"}, + {"test_id": "test2", "outcome": "flaky_failure"}, + {"test_id": "test3", "outcome": "error"}, + ], + } + ], + existing_flakes=[], + ) + + process_flakes_for_repo(result["repoid"]) + + assert Flake.objects.count() == 3 + for test_id in [b"test1", b"test2", b"test3"]: + flake = Flake.objects.get(test_id=test_id) + assert flake.count == 1 + assert flake.fail_count == 1 + assert flake.recent_passes_count == 0 + assert flake.start_date is not None + assert flake.end_date is None + + +def test_flake_expiry_and_recreation(setup_test_data): + result = setup_test_data( + uploads=[ + { + "state": "processed", + "testruns": [ + {"test_id": "test1", "outcome": "pass"}, + {"test_id": "test1", "outcome": "failure"}, + ], + } + ], + existing_flakes=[ + { + "test_id": "test1", + "count": 29, + "fail_count": 1, + "recent_passes_count": 29, + }, + ], + ) + + process_flakes_for_repo(result["repoid"]) + + flakes = Flake.objects.filter(test_id=b"test1").order_by("start_date") + assert len(flakes) == 2 + + expired_flake = flakes[0] + assert expired_flake.end_date is not None + assert expired_flake.recent_passes_count == 30 + + new_flake = flakes[1] + assert new_flake.end_date is None + assert new_flake.count == 1 + assert new_flake.fail_count == 1 + assert new_flake.recent_passes_count == 0 + + +def test_flake_expiry_and_more_passes(setup_test_data): + result = setup_test_data( + uploads=[ + { + "state": "processed", + "testruns": [ + {"test_id": "test1", "outcome": "pass"}, + {"test_id": "test1", "outcome": "pass"}, + {"test_id": "test1", "outcome": "pass"}, + ], + } + ], + existing_flakes=[ + { + "test_id": "test1", + "count": 29, + "fail_count": 1, + "recent_passes_count": 29, + }, + ], + ) + + process_flakes_for_repo(result["repoid"]) + + flakes = Flake.objects.filter(test_id=b"test1").order_by("start_date") + assert len(flakes) == 1 + + expired_flake = flakes[0] + assert expired_flake.end_date is not None + assert expired_flake.recent_passes_count == 30 + + +def test_testrun_outcome_updates(setup_test_data): + result = setup_test_data( + uploads=[ + { + "state": "processed", + "testruns": [ + {"test_id": "test1", "outcome": "failure"}, + {"test_id": "test2", "outcome": "error"}, + {"test_id": "test3", "outcome": "flaky_failure"}, + ], + } + ], + existing_flakes=[], + ) + + process_flakes_for_repo(result["repoid"]) + + testruns = { + bytes(testrun.test_id).decode(): testrun.outcome + for testrun in Testrun.objects.all() + } + + assert testruns == { + "test1": "flaky_failure", # Updated from failure + "test2": "flaky_failure", # Updated from error + "test3": "flaky_failure", # Already flaky_failure, unchanged + } diff --git a/tasks/process_flakes.py b/tasks/process_flakes.py index fd90b128d..8fbfdec99 100644 --- a/tasks/process_flakes.py +++ b/tasks/process_flakes.py @@ -1,5 +1,5 @@ import logging -from typing import Any +from typing import Any, Literal from redis import Redis from redis.exceptions import LockError @@ -9,6 +9,7 @@ from app import celery_app from services.processing.flake_processing import process_flake_for_repo_commit from services.redis import get_redis_connection +from services.test_analytics.ta_process_flakes import process_flakes_for_repo from tasks.base import BaseCodecovTask log = logging.getLogger(__name__) @@ -48,6 +49,7 @@ def run_impl( *, repo_id: int, commit_id: str, + impl_type: Literal["old", "new", "both"] = "old", **kwargs: Any, ): """ @@ -78,8 +80,13 @@ def run_impl( extra=dict(repoid=repo_id, commit=commit_id), ) + if impl_type == "new" or impl_type == "both": + process_flakes_for_repo(repo_id) + if impl_type == "new": + return {"successful": True} + redis_client = get_redis_connection() - lock_name = f"flake_lock:{repo_id}" + lock_name = LOCK_NAME.format(repo_id) process_func = process_flake_for_repo_commit diff --git a/tasks/tests/unit/test_process_flakes.py b/tasks/tests/unit/test_process_flakes.py index 4d27fb8a0..9386c6caa 100644 --- a/tasks/tests/unit/test_process_flakes.py +++ b/tasks/tests/unit/test_process_flakes.py @@ -34,8 +34,6 @@ ) from tests.helpers import mock_all_plans_and_tiers -pytestmark = pytest.mark.django_db - class RepoSimulator: def __init__(self): @@ -134,7 +132,8 @@ def reset(self): self.test_count = 0 -def test_generate_flake_dict(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_generate_flake_dict(): repo = RepositoryFactory() flake_dict = fetch_curr_flakes(repo.repoid) @@ -150,7 +149,8 @@ def test_generate_flake_dict(transactional_db): assert "id" in flake_dict -def test_get_test_instances_when_test_is_flaky(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_get_test_instances_when_test_is_flaky(): repo = RepositoryFactory() commit = CommitFactory() upload = UploadFactory(report__commit=commit) @@ -169,7 +169,8 @@ def test_get_test_instances_when_test_is_flaky(transactional_db): assert tis[0].commitid -def test_get_test_instances_when_instance_is_failure(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_get_test_instances_when_instance_is_failure(): repo = RepositoryFactory() commit = CommitFactory() upload = UploadFactory(report__commit=commit) @@ -188,7 +189,8 @@ def test_get_test_instances_when_instance_is_failure(transactional_db): assert tis[0].commitid -def test_get_test_instances_when_test_is_flaky_and_instance_is_skip(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_get_test_instances_when_test_is_flaky_and_instance_is_skip(): repo = RepositoryFactory() commit = CommitFactory() upload = UploadFactory(report__commit=commit) @@ -206,7 +208,8 @@ def test_get_test_instances_when_test_is_flaky_and_instance_is_skip(transactiona assert len(tis) == 0 -def test_get_test_instances_when_instance_is_pass(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_get_test_instances_when_instance_is_pass(): repo = RepositoryFactory() commit = CommitFactory() upload = UploadFactory(report__commit=commit) @@ -224,7 +227,8 @@ def test_get_test_instances_when_instance_is_pass(transactional_db): assert len(tis) == 0 -def test_update_flake_pass(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_update_flake_pass(): rs = RepoSimulator() c = rs.create_commit() ti = rs.add_test_instance(c, outcome=TestInstance.Outcome.PASS.value) @@ -240,7 +244,8 @@ def test_update_flake_pass(transactional_db): assert f.recent_passes_count == 1 -def test_update_flake_fail(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_update_flake_fail(): rs = RepoSimulator() c = rs.create_commit() ti = rs.add_test_instance(c, outcome=TestInstance.Outcome.FAILURE.value) @@ -257,7 +262,8 @@ def test_update_flake_fail(transactional_db): assert f.fail_count == 1 -def test_upsert_failed_flakes(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_upsert_failed_flakes(): repo = RepositoryFactory() repo.save() commit = CommitFactory() @@ -286,6 +292,7 @@ def test_upsert_failed_flakes(transactional_db): assert r.flaky_fail_count == 1 +@pytest.mark.django_db(transaction=False) def test_upsert_failed_flakes_rollup_is_none(): repo = RepositoryFactory() repo.save() @@ -305,7 +312,8 @@ def test_upsert_failed_flakes_rollup_is_none(): assert r is None -def test_it_handles_only_passes(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_it_handles_only_passes(): rs = RepoSimulator() c1 = rs.create_commit() rs.add_test_instance(c1) @@ -317,10 +325,11 @@ def test_it_handles_only_passes(transactional_db): @time_machine.travel(dt.datetime.now(tz=dt.UTC), tick=False) -def test_it_creates_flakes_from_processed_uploads(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_it_creates_flakes_from_processed_uploads(): rs = RepoSimulator() c1 = rs.create_commit() - rs.add_test_instance(c1, state="v2_finished") + rs.add_test_instance(c1, state="finished") rs.add_test_instance( c1, outcome=TestInstance.Outcome.FAILURE.value, state="processed" ) @@ -337,10 +346,11 @@ def test_it_creates_flakes_from_processed_uploads(transactional_db): @time_machine.travel(dt.datetime.now(tz=dt.UTC), tick=False) +@pytest.mark.django_db(transaction=True) def test_it_does_not_create_flakes_from_flake_processed_uploads(): rs = RepoSimulator() c1 = rs.create_commit() - rs.add_test_instance(c1, state="v2_processed") + rs.add_test_instance(c1, state="processed") rs.add_test_instance( c1, outcome=TestInstance.Outcome.FAILURE.value, state="flake_processed" ) @@ -351,7 +361,8 @@ def test_it_does_not_create_flakes_from_flake_processed_uploads(): @time_machine.travel(dt.datetime.now(tz=dt.UTC), tick=False) -def test_it_processes_two_commits_separately(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_it_processes_two_commits_separately(): rs = RepoSimulator() c1 = rs.create_commit() rs.add_test_instance(c1, outcome=TestInstance.Outcome.FAILURE.value) @@ -373,7 +384,8 @@ def test_it_processes_two_commits_separately(transactional_db): assert flake.start_date == dt.datetime.now(dt.UTC) -def test_it_creates_flakes_expires(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_it_creates_flakes_expires(): with time_machine.travel(dt.datetime.now(tz=dt.UTC), tick=False) as traveller: rs = RepoSimulator() commits: list[str] = [] @@ -419,7 +431,8 @@ def test_it_creates_flakes_expires(transactional_db): assert flake.end_date == new_time -def test_it_creates_rollups(transactional_db): +@pytest.mark.django_db(transaction=True) +def test_it_creates_rollups(): with time_machine.travel("1970-1-1T00:00:00Z"): rs = RepoSimulator() c1 = rs.create_commit() @@ -456,6 +469,7 @@ def test_it_creates_rollups(transactional_db): assert rollups[3].date == dt.date.today() +@pytest.mark.django_db(transaction=False) def test_it_locks(mocker): mock_all_plans_and_tiers() result2 = None