Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.

Commit 76a8a8e

Browse files
committed
feat: implement flake processing using timeseries models
1 parent c9ed88b commit 76a8a8e

6 files changed

Lines changed: 496 additions & 20 deletions

File tree

services/processing/flake_processing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def process_flake_in_transaction(
3939
report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
4040
report__commit__repository__repoid=repo_id,
4141
report__commit__commitid=commit_id,
42-
state__in=["processed", "v2_finished"],
42+
state__in=["processed"],
4343
)
4444

4545
curr_flakes = fetch_curr_flakes(repo_id)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import logging
2+
from datetime import datetime
3+
4+
from django.db import transaction
5+
from django.db.models import Q, QuerySet
6+
from redis.exceptions import LockError
7+
from shared.django_apps.reports.models import CommitReport, ReportSession
8+
from shared.django_apps.ta_timeseries.models import Testrun
9+
from shared.django_apps.test_analytics.models import Flake
10+
11+
from services.redis import get_redis_connection
12+
13+
log = logging.getLogger(__name__)
14+
15+
FAIL_FILTER = Q(outcome="failure") | Q(outcome="flaky_failure") | Q(outcome="error")
16+
17+
LOCK_NAME = "ta_flake_lock:{}"
18+
KEY_NAME = "ta_flake_key:{}"
19+
20+
21+
def get_relevant_uploads(repo_id: int, commit_id: str) -> QuerySet[ReportSession]:
22+
return ReportSession.objects.filter(
23+
report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
24+
report__commit__repository__repoid=repo_id,
25+
report__commit__commitid=commit_id,
26+
state__in=["processed"],
27+
)
28+
29+
30+
def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]:
31+
return {
32+
bytes(flake.test_id): flake for flake in Flake.objects.filter(repoid=repo_id)
33+
}
34+
35+
36+
def get_testruns(
37+
upload: ReportSession, curr_flakes: dict[bytes, Flake]
38+
) -> QuerySet[Testrun]:
39+
upload_filter = Q(upload_id=upload.id)
40+
flaky_pass_filter = Q(outcome="pass") & Q(test_id__in=curr_flakes.keys())
41+
return Testrun.objects.filter(upload_filter & (FAIL_FILTER | flaky_pass_filter))
42+
43+
44+
def handle_pass(curr_flakes: dict[bytes, Flake], test_id: bytes):
45+
# possible that we expire it and stop caring about it
46+
if test_id not in curr_flakes:
47+
return
48+
49+
curr_flakes[test_id].recent_passes_count += 1
50+
curr_flakes[test_id].count += 1
51+
if curr_flakes[test_id].recent_passes_count == 30:
52+
curr_flakes[test_id].end_date = datetime.now()
53+
curr_flakes[test_id].save()
54+
del curr_flakes[test_id]
55+
56+
57+
def handle_failure(
58+
curr_flakes: dict[bytes, Flake], test_id: bytes, testrun: Testrun, repo_id: int
59+
):
60+
existing_flake = curr_flakes.get(test_id)
61+
if existing_flake:
62+
existing_flake.fail_count += 1
63+
existing_flake.count += 1
64+
existing_flake.recent_passes_count = 0
65+
else:
66+
if testrun.outcome != "flaky_failure":
67+
testrun.outcome = "flaky_failure"
68+
new_flake = Flake(
69+
repoid=repo_id,
70+
test_id=test_id,
71+
count=1,
72+
fail_count=1,
73+
recent_passes_count=0,
74+
start_date=datetime.now(),
75+
)
76+
curr_flakes[test_id] = new_flake
77+
78+
79+
def process_flakes_for_commit(repo_id: int, commit_id: str):
80+
uploads = get_relevant_uploads(repo_id, commit_id)
81+
82+
curr_flakes = fetch_current_flakes(repo_id)
83+
84+
for upload in uploads:
85+
testruns = get_testruns(upload, curr_flakes)
86+
87+
for testrun in testruns:
88+
test_id = bytes(testrun.test_id)
89+
match testrun.outcome:
90+
case "pass":
91+
handle_pass(curr_flakes, test_id)
92+
case "failure" | "flaky_failure" | "error":
93+
handle_failure(curr_flakes, test_id, testrun, repo_id)
94+
case _:
95+
continue
96+
97+
Testrun.objects.bulk_update(testruns, ["outcome"])
98+
99+
Flake.objects.bulk_create(
100+
curr_flakes.values(),
101+
update_conflicts=True,
102+
unique_fields=["id"],
103+
update_fields=["end_date", "count", "recent_passes_count", "fail_count"],
104+
)
105+
106+
transaction.commit()
107+
108+
109+
def process_flakes_for_repo(repo_id: int):
110+
redis_client = get_redis_connection()
111+
lock_name = LOCK_NAME.format(repo_id)
112+
key_name = KEY_NAME.format(repo_id)
113+
try:
114+
with redis_client.lock(lock_name, timeout=300, blocking_timeout=3):
115+
while commit_ids := redis_client.lpop(key_name, 10):
116+
for commit_id in commit_ids:
117+
process_flakes_for_commit(repo_id, commit_id.decode())
118+
return True
119+
except LockError:
120+
log.warning("Failed to acquire lock for repo %s", repo_id)
121+
return False
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"test1": {
3+
"count": 6,
4+
"fail_count": 2,
5+
"recent_passes_count": 1
6+
},
7+
"test3": {
8+
"count": 1,
9+
"fail_count": 1,
10+
"recent_passes_count": 0
11+
},
12+
"test4": {
13+
"count": 1,
14+
"fail_count": 1,
15+
"recent_passes_count": 0
16+
},
17+
"test5": {
18+
"count": 1,
19+
"fail_count": 1,
20+
"recent_passes_count": 0
21+
}
22+
}

0 commit comments

Comments
 (0)