Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 23 additions & 34 deletions apps/worker/services/test_analytics/ta_process_flakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ def fetch_current_flakes(repo_id: int) -> dict[bytes, Flake]:
}


def get_testruns(upload: ReportSession) -> QuerySet[Testrun]:
upload_filter = Q(upload_id=upload.id)

def get_testruns(upload_ids: list[int]) -> QuerySet[Testrun]:
# we won't process flakes for testruns older than 1 day
return Testrun.objects.filter(
Q(timestamp__gte=timezone.now() - timedelta(days=1)) & upload_filter
Q(timestamp__gte=timezone.now() - timedelta(days=1))
& Q(upload_id__in=upload_ids)
).order_by("timestamp")


Expand Down Expand Up @@ -79,38 +78,18 @@ def handle_failure(
testrun.outcome = "flaky_fail"


@sentry_sdk.trace
def process_single_upload(
upload: ReportSession, curr_flakes: dict[bytes, Flake], repo_id: int
):
testruns = get_testruns(upload)

for testrun in testruns:
test_id = bytes(testrun.test_id)
match testrun.outcome:
case "pass":
if test_id not in curr_flakes:
continue

handle_pass(curr_flakes, test_id)
case "failure" | "flaky_fail" | "error":
handle_failure(curr_flakes, test_id, testrun, repo_id)
case _:
continue

Testrun.objects.bulk_update(testruns, ["outcome"])


@sentry_sdk.trace
def process_flakes_for_commit(repo_id: int, commit_id: str):
log.info(
"process_flakes_for_commit: starting processing",
)
uploads = get_relevant_uploads(repo_id, commit_id)
upload_ids = list(
get_relevant_uploads(repo_id, commit_id).values_list("id", flat=True)
)

log.info(
"process_flakes_for_commit: fetched uploads",
extra={"uploads": [upload.id for upload in uploads]},
extra={"uploads": upload_ids},
)

curr_flakes = fetch_current_flakes(repo_id)
Expand All @@ -120,12 +99,22 @@ def process_flakes_for_commit(repo_id: int, commit_id: str):
extra={"flakes": [flake.test_id.hex() for flake in curr_flakes.values()]},
)

for upload in uploads:
process_single_upload(upload, curr_flakes, repo_id)
log.info(
"process_flakes_for_commit: processed upload",
extra={"upload": upload.id},
)
testruns = get_testruns(upload_ids)

for testrun in testruns:
test_id = bytes(testrun.test_id)
match testrun.outcome:
case "pass":
if test_id not in curr_flakes:
continue

handle_pass(curr_flakes, test_id)
case "failure" | "flaky_fail" | "error":
handle_failure(curr_flakes, test_id, testrun, repo_id)
case _:
continue

Testrun.objects.bulk_update(testruns, ["outcome"])

log.info(
"process_flakes_for_commit: bulk creating flakes",
Expand Down
Loading