Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.

Commit 85165e6

Browse files
committed
feat: implement flake processing using timeseries models
1 parent 0aa256d commit 85165e6

6 files changed

Lines changed: 431 additions & 5 deletions

File tree

services/processing/flake_processing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def process_flake_for_repo_commit(
2424
report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
2525
report__commit__repository__repoid=repo_id,
2626
report__commit__commitid=commit_id,
27-
state__in=["processed", "v2_finished"],
27+
state__in=["processed"],
2828
)
2929

3030
curr_flakes = fetch_curr_flakes(repo_id)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import logging
2+
from datetime import datetime
3+
4+
from django.db import transaction
5+
from django.db.models import Q, QuerySet
6+
from redis.exceptions import LockError
7+
from shared.django_apps.reports.models import CommitReport, ReportSession
8+
from shared.django_apps.test_analytics.models import Flake
9+
from shared.django_apps.timeseries.models import Testrun
10+
11+
from services.redis import get_redis_connection
12+
13+
log = logging.getLogger(__name__)
14+
15+
FAIL_FILTER = Q(outcome="failure") | Q(outcome="flaky_failure") | Q(outcome="error")
16+
17+
LOCK_NAME = "ta_flake_lock:{}"
18+
KEY_NAME = "ta_flake_key:{}"
19+
20+
21+
def get_relevant_uploads(repo_id: int, commit_id: str) -> QuerySet[ReportSession]:
22+
return ReportSession.objects.filter(
23+
report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
24+
report__commit__repository__repoid=repo_id,
25+
report__commit__commitid=commit_id,
26+
state__in=["processed"],
27+
)
28+
29+
30+
def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]:
31+
return {
32+
bytes(flake.test_id): flake for flake in Flake.objects.filter(repoid=repo_id)
33+
}
34+
35+
36+
def get_testruns(
37+
upload: ReportSession, curr_flakes: dict[bytes, Flake]
38+
) -> QuerySet[Testrun]:
39+
upload_filter = Q(upload_id=upload.id)
40+
flaky_pass_filter = Q(outcome="pass") & Q(test_id__in=curr_flakes.keys())
41+
return Testrun.objects.filter(upload_filter & (FAIL_FILTER | flaky_pass_filter))
42+
43+
44+
def handle_pass(curr_flakes: dict[bytes, Flake], test_id: bytes):
45+
curr_flakes[test_id].recent_passes_count += 1
46+
curr_flakes[test_id].count += 1
47+
if curr_flakes[test_id].recent_passes_count == 30:
48+
curr_flakes[test_id].end_date = datetime.now()
49+
curr_flakes[test_id].save()
50+
del curr_flakes[test_id]
51+
52+
53+
def handle_failure(
54+
curr_flakes: dict[bytes, Flake], test_id: bytes, testrun: Testrun, repo_id: int
55+
):
56+
existing_flake = curr_flakes.get(test_id)
57+
if existing_flake:
58+
existing_flake.fail_count += 1
59+
existing_flake.count += 1
60+
existing_flake.recent_passes_count = 0
61+
else:
62+
if testrun.outcome != "flaky_failure":
63+
testrun.outcome = "flaky_failure"
64+
new_flake = Flake(
65+
repoid=repo_id,
66+
test_id=test_id,
67+
count=1,
68+
fail_count=1,
69+
recent_passes_count=0,
70+
start_date=datetime.now(),
71+
)
72+
curr_flakes[test_id] = new_flake
73+
74+
75+
def process_flakes_for_commit(repo_id: int, commit_id: str):
76+
uploads = get_relevant_uploads(repo_id, commit_id)
77+
78+
curr_flakes = fetch_current_flakes(repo_id)
79+
80+
for upload in uploads:
81+
testruns = get_testruns(upload, curr_flakes)
82+
83+
for testrun in testruns:
84+
test_id = bytes(testrun.test_id)
85+
match testrun.outcome:
86+
case "pass":
87+
handle_pass(curr_flakes, test_id)
88+
case "failure" | "flaky_failure" | "error":
89+
handle_failure(curr_flakes, test_id, testrun, repo_id)
90+
case _:
91+
continue
92+
93+
Testrun.objects.bulk_update(testruns, ["outcome"])
94+
95+
Flake.objects.bulk_create(
96+
curr_flakes.values(),
97+
update_conflicts=True,
98+
unique_fields=["id"],
99+
update_fields=["end_date", "count", "recent_passes_count", "fail_count"],
100+
)
101+
102+
transaction.commit()
103+
104+
105+
def process_flakes_for_repo(repo_id: int):
106+
redis_client = get_redis_connection()
107+
lock_name = LOCK_NAME.format(repo_id)
108+
key_name = KEY_NAME.format(repo_id)
109+
try:
110+
with redis_client.lock(lock_name, timeout=300, blocking_timeout=3):
111+
while commit_ids := redis_client.lpop(key_name, 10):
112+
for commit_id in commit_ids:
113+
process_flakes_for_commit(repo_id, commit_id.decode())
114+
return True
115+
except LockError:
116+
log.warning("Failed to acquire lock for repo %s", repo_id)
117+
return False
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"test1": {
3+
"count": 6,
4+
"fail_count": 2,
5+
"recent_passes_count": 1
6+
},
7+
"test3": {
8+
"count": 1,
9+
"fail_count": 1,
10+
"recent_passes_count": 0
11+
},
12+
"test4": {
13+
"count": 1,
14+
"fail_count": 1,
15+
"recent_passes_count": 0
16+
},
17+
"test5": {
18+
"count": 1,
19+
"fail_count": 1,
20+
"recent_passes_count": 0
21+
}
22+
}

0 commit comments

Comments
 (0)