Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions services/processing/flake_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
TestInstance,
)

from services.test_analytics.ta_metrics import process_flakes_summary

log = logging.getLogger(__name__)


FLAKE_EXPIRY_COUNT = 30


@process_flakes_summary.labels("old").time()
def process_flake_for_repo_commit(
repo_id: int,
commit_id: str,
Expand Down
18 changes: 12 additions & 6 deletions services/test_analytics/ta_cache_rollups.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import shared.storage

from django_scaffold import settings
from services.test_analytics.ta_metrics import (
read_rollups_from_db_summary,
rollup_size_summary,
)
from services.test_analytics.ta_timeseries import (
get_branch_summary,
get_summary,
Expand Down Expand Up @@ -39,13 +43,14 @@ def cache_rollups(repoid: int, branch: str | None = None):
storage_service = shared.storage.get_appropriate_storage_service(repoid)
serialized_table: BytesIO

if branch:
if branch in {"main", "master", "develop"}:
summaries = get_branch_summary(repoid, branch)
with read_rollups_from_db_summary.labels("new").time():
if branch:
if branch in {"main", "master", "develop"}:
summaries = get_branch_summary(repoid, branch)
else:
summaries = get_testrun_branch_summary_via_testrun(repoid, branch)
else:
summaries = get_testrun_branch_summary_via_testrun(repoid, branch)
else:
summaries = get_summary(repoid)
summaries = get_summary(repoid)

data = [
{
Expand Down Expand Up @@ -75,3 +80,4 @@ def cache_rollups(repoid: int, branch: str | None = None):
storage_service.write_file(
settings.GCS_BUCKET_NAME, rollup_blob_path(repoid, branch), serialized_table
)
rollup_size_summary.labels("new").observe(serialized_table.tell())
39 changes: 39 additions & 0 deletions services/test_analytics/ta_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from shared.metrics import Summary

write_tests_summary = Summary(
"write_tests_summary",
"The time it takes to write tests to the database",
["impl"],
)

read_tests_totals_summary = Summary(
"read_tests_totals_summary",
"The time it takes to read tests totals from the database",
["impl"],
)

read_failures_summary = Summary(
"read_failures_summary",
"The time it takes to read failures from the database",
["impl"],
)


read_rollups_from_db_summary = Summary(
"read_rollups_from_db_summary",
"The time it takes to read rollups from the database",
["impl"],
)

rollup_size_summary = Summary(
"rollup_size_summary",
"The size of the rollup",
["impl"],
)


process_flakes_summary = Summary(
"process_flakes_summary",
"The time it takes to process flakes",
["impl"],
)
2 changes: 2 additions & 0 deletions services/test_analytics/ta_process_flakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from shared.django_apps.test_analytics.models import Flake

from services.redis import get_redis_connection
from services.test_analytics.ta_metrics import process_flakes_summary

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,6 +77,7 @@ def handle_failure(
curr_flakes[test_id] = new_flake


@process_flakes_summary.labels("new").time()
def process_flakes_for_commit(repo_id: int, commit_id: str):
uploads = get_relevant_uploads(repo_id, commit_id)

Expand Down
8 changes: 5 additions & 3 deletions services/test_analytics/ta_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from services.archive import ArchiveService
from services.processing.types import UploadArguments
from services.test_analytics.ta_metrics import write_tests_summary
from services.test_analytics.ta_processing import (
get_ta_processing_info,
handle_file_not_found,
Expand Down Expand Up @@ -66,9 +67,10 @@ def ta_processor_impl(
handle_parsing_error(upload, exc)
return False

insert_testruns_timeseries(
repoid, commitid, ta_proc_info.branch, upload, parsing_infos
)
with write_tests_summary.labels("new").time():
insert_testruns_timeseries(
repoid, commitid, ta_proc_info.branch, upload, parsing_infos
)

if update_state:
upload.state = "processed"
Expand Down
10 changes: 8 additions & 2 deletions tasks/cache_test_rollups.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from django_scaffold import settings
from services.redis import get_redis_connection
from services.test_analytics.ta_cache_rollups import cache_rollups
from services.test_analytics.ta_metrics import (
read_rollups_from_db_summary,
rollup_size_summary,
)
from tasks.base import BaseCodecovTask

# Reminder: `a BETWEEN x AND y` is equivalent to `a >= x AND a <= y`
Expand Down Expand Up @@ -168,8 +172,9 @@ def run_impl_within_lock(self, repoid: int, branch: str):
"interval_end": f"{interval_end + 1 if interval_end else 0} days",
}

cursor.execute(ROLLUP_QUERY, query_params)
aggregation_of_test_results = cursor.fetchall()
with read_rollups_from_db_summary.labels("old").time():
cursor.execute(ROLLUP_QUERY, query_params)
aggregation_of_test_results = cursor.fetchall()

df = pl.DataFrame(
aggregation_of_test_results,
Expand Down Expand Up @@ -203,6 +208,7 @@ def run_impl_within_lock(self, repoid: int, branch: str):
storage_service.write_file(
settings.GCS_BUCKET_NAME, storage_key, serialized_table
)
rollup_size_summary.labels("old").observe(serialized_table.tell())


RegisteredCacheTestRollupTask = celery_app.register_task(CacheTestRollupsTask())
Expand Down
14 changes: 10 additions & 4 deletions tasks/test_results_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
get_repo_provider_service,
)
from services.seats import ShouldActivateSeat, determine_seat_activation
from services.test_analytics.ta_metrics import (
read_failures_summary,
read_tests_totals_summary,
)
from services.test_results import (
FinisherResult,
FlakeInfo,
Expand Down Expand Up @@ -200,16 +204,18 @@ def old_impl(
"test_analytics", "shorten_paths", _else=True
)

test_summary = get_test_summary_for_commit(db_session, repoid, commitid)
with read_tests_totals_summary.labels("old").time():
test_summary = get_test_summary_for_commit(db_session, repoid, commitid)
failed_tests = test_summary.get("error", 0) + test_summary.get("failure", 0)
passed_tests = test_summary.get("pass", 0)
skipped_tests = test_summary.get("skip", 0)

failures = []
if failed_tests:
failed_test_instances = latest_failures_for_commit(
db_session, repoid, commitid
)
with read_failures_summary.labels("old").time():
failed_test_instances = latest_failures_for_commit(
db_session, repoid, commitid
)

for test_instance in failed_test_instances:
failure_message = test_instance.failure_message
Expand Down
27 changes: 12 additions & 15 deletions tasks/test_results_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
TestInstance,
Upload,
)
from helpers.metrics import metrics
from services.archive import ArchiveService
from services.processing.types import UploadArguments
from services.test_analytics.ta_metrics import write_tests_summary
from services.test_analytics.ta_processor import ta_processor_impl
from services.test_results import generate_flags_hash, generate_test_id
from services.yaml import read_yaml_field
Expand Down Expand Up @@ -288,7 +288,6 @@ def _bulk_write_tests_to_db(

# Upsert Tests
if len(test_data) > 0:
metrics.gauge("test_results_processor.test_count", len(test_data))
sorted_tests = sorted(
test_data.values(),
key=lambda x: str(x["id"]),
Expand Down Expand Up @@ -318,9 +317,6 @@ def _bulk_write_tests_to_db(

# Save TestInstances
if len(test_instance_data) > 0:
metrics.gauge(
"test_results_processor.test_instance_count", len(test_instance_data)
)
self.save_test_instances(db_session, test_instance_data)

log.info(
Expand Down Expand Up @@ -455,16 +451,17 @@ def process_individual_upload(
else:
successful = True

self._bulk_write_tests_to_db(
db_session,
repository.repoid,
commitid,
upload_id,
upload.report.commit.branch,
parsing_results,
flaky_test_set,
upload.flag_names,
)
with write_tests_summary.labels("old").time():
self._bulk_write_tests_to_db(
db_session,
repository.repoid,
commitid,
upload_id,
upload.report.commit.branch,
parsing_results,
flaky_test_set,
upload.flag_names,
)

upload.state = "processed"
db_session.commit()
Expand Down
Loading