diff --git a/services/processing/flake_processing.py b/services/processing/flake_processing.py index b6fc40570..f6b6c3ee5 100644 --- a/services/processing/flake_processing.py +++ b/services/processing/flake_processing.py @@ -10,12 +10,15 @@ TestInstance, ) +from services.test_analytics.ta_metrics import process_flakes_summary + log = logging.getLogger(__name__) FLAKE_EXPIRY_COUNT = 30 +@process_flakes_summary.labels("old").time() def process_flake_for_repo_commit( repo_id: int, commit_id: str, diff --git a/services/test_analytics/ta_cache_rollups.py b/services/test_analytics/ta_cache_rollups.py index 15ebb09b9..fdca2c7a4 100644 --- a/services/test_analytics/ta_cache_rollups.py +++ b/services/test_analytics/ta_cache_rollups.py @@ -5,6 +5,10 @@ import shared.storage from django_scaffold import settings +from services.test_analytics.ta_metrics import ( + read_rollups_from_db_summary, + rollup_size_summary, +) from services.test_analytics.ta_timeseries import ( get_branch_summary, get_summary, @@ -39,13 +43,14 @@ def cache_rollups(repoid: int, branch: str | None = None): storage_service = shared.storage.get_appropriate_storage_service(repoid) serialized_table: BytesIO - if branch: - if branch in {"main", "master", "develop"}: - summaries = get_branch_summary(repoid, branch) + with read_rollups_from_db_summary.labels("new").time(): + if branch: + if branch in {"main", "master", "develop"}: + summaries = get_branch_summary(repoid, branch) + else: + summaries = get_testrun_branch_summary_via_testrun(repoid, branch) else: - summaries = get_testrun_branch_summary_via_testrun(repoid, branch) - else: - summaries = get_summary(repoid) + summaries = get_summary(repoid) data = [ { @@ -75,3 +80,4 @@ def cache_rollups(repoid: int, branch: str | None = None): storage_service.write_file( settings.GCS_BUCKET_NAME, rollup_blob_path(repoid, branch), serialized_table ) + rollup_size_summary.labels("new").observe(serialized_table.tell()) diff --git a/services/test_analytics/ta_metrics.py b/services/test_analytics/ta_metrics.py new file mode 100644 index 000000000..002d5778d --- /dev/null +++ b/services/test_analytics/ta_metrics.py @@ -0,0 +1,39 @@ +from shared.metrics import Summary + +write_tests_summary = Summary( + "write_tests_summary", + "The time it takes to write tests to the database", + ["impl"], +) + +read_tests_totals_summary = Summary( + "read_tests_totals_summary", + "The time it takes to read tests totals from the database", + ["impl"], +) + +read_failures_summary = Summary( + "read_failures_summary", + "The time it takes to read failures from the database", + ["impl"], +) + + +read_rollups_from_db_summary = Summary( + "read_rollups_from_db_summary", + "The time it takes to read rollups from the database", + ["impl"], +) + +rollup_size_summary = Summary( + "rollup_size_summary", + "The size of the rollup", + ["impl"], +) + + +process_flakes_summary = Summary( + "process_flakes_summary", + "The time it takes to process flakes", + ["impl"], +) diff --git a/services/test_analytics/ta_process_flakes.py b/services/test_analytics/ta_process_flakes.py index c8f9728e0..f9cbeddfa 100644 --- a/services/test_analytics/ta_process_flakes.py +++ b/services/test_analytics/ta_process_flakes.py @@ -9,6 +9,7 @@ from shared.django_apps.test_analytics.models import Flake from services.redis import get_redis_connection +from services.test_analytics.ta_metrics import process_flakes_summary log = logging.getLogger(__name__) @@ -76,6 +77,7 @@ def handle_failure( curr_flakes[test_id] = new_flake +@process_flakes_summary.labels("new").time() def process_flakes_for_commit(repo_id: int, commit_id: str): uploads = get_relevant_uploads(repo_id, commit_id) diff --git a/services/test_analytics/ta_processor.py b/services/test_analytics/ta_processor.py index 0b0b91877..0b1dc7cdd 100644 --- a/services/test_analytics/ta_processor.py +++ b/services/test_analytics/ta_processor.py @@ -7,6 +7,7 @@ from services.archive import ArchiveService from services.processing.types import UploadArguments +from services.test_analytics.ta_metrics import write_tests_summary from services.test_analytics.ta_processing import ( get_ta_processing_info, handle_file_not_found, @@ -66,9 +67,10 @@ def ta_processor_impl( handle_parsing_error(upload, exc) return False - insert_testruns_timeseries( - repoid, commitid, ta_proc_info.branch, upload, parsing_infos - ) + with write_tests_summary.labels("new").time(): + insert_testruns_timeseries( + repoid, commitid, ta_proc_info.branch, upload, parsing_infos + ) if update_state: upload.state = "processed" diff --git a/tasks/cache_test_rollups.py b/tasks/cache_test_rollups.py index 3a0d00b8e..d734da5e6 100644 --- a/tasks/cache_test_rollups.py +++ b/tasks/cache_test_rollups.py @@ -13,6 +13,10 @@ from django_scaffold import settings from services.redis import get_redis_connection from services.test_analytics.ta_cache_rollups import cache_rollups +from services.test_analytics.ta_metrics import ( + read_rollups_from_db_summary, + rollup_size_summary, +) from tasks.base import BaseCodecovTask # Reminder: `a BETWEEN x AND y` is equivalent to `a >= x AND a <= y` @@ -168,8 +172,9 @@ def run_impl_within_lock(self, repoid: int, branch: str): "interval_end": f"{interval_end + 1 if interval_end else 0} days", } - cursor.execute(ROLLUP_QUERY, query_params) - aggregation_of_test_results = cursor.fetchall() + with read_rollups_from_db_summary.labels("old").time(): + cursor.execute(ROLLUP_QUERY, query_params) + aggregation_of_test_results = cursor.fetchall() df = pl.DataFrame( aggregation_of_test_results, @@ -203,6 +208,7 @@ def run_impl_within_lock(self, repoid: int, branch: str): storage_service.write_file( settings.GCS_BUCKET_NAME, storage_key, serialized_table ) + rollup_size_summary.labels("old").observe(serialized_table.tell()) RegisteredCacheTestRollupTask = celery_app.register_task(CacheTestRollupsTask()) diff --git a/tasks/test_results_finisher.py b/tasks/test_results_finisher.py index 9555351b5..8b99f354b 100644 --- a/tasks/test_results_finisher.py +++ b/tasks/test_results_finisher.py @@ -21,6 +21,10 @@ get_repo_provider_service, ) from services.seats import ShouldActivateSeat, determine_seat_activation +from services.test_analytics.ta_metrics import ( + read_failures_summary, + read_tests_totals_summary, +) from services.test_results import ( FinisherResult, FlakeInfo, @@ -200,16 +204,18 @@ def old_impl( "test_analytics", "shorten_paths", _else=True ) - test_summary = get_test_summary_for_commit(db_session, repoid, commitid) + with read_tests_totals_summary.labels("old").time(): + test_summary = get_test_summary_for_commit(db_session, repoid, commitid) failed_tests = test_summary.get("error", 0) + test_summary.get("failure", 0) passed_tests = test_summary.get("pass", 0) skipped_tests = test_summary.get("skip", 0) failures = [] if failed_tests: - failed_test_instances = latest_failures_for_commit( - db_session, repoid, commitid - ) + with read_failures_summary.labels("old").time(): + failed_test_instances = latest_failures_for_commit( + db_session, repoid, commitid + ) for test_instance in failed_test_instances: failure_message = test_instance.failure_message diff --git a/tasks/test_results_processor.py b/tasks/test_results_processor.py index c77afb0e0..1b52a8ef2 100644 --- a/tasks/test_results_processor.py +++ b/tasks/test_results_processor.py @@ -24,9 +24,9 @@ TestInstance, Upload, ) -from helpers.metrics import metrics from services.archive import ArchiveService from services.processing.types import UploadArguments +from services.test_analytics.ta_metrics import write_tests_summary from services.test_analytics.ta_processor import ta_processor_impl from services.test_results import generate_flags_hash, generate_test_id from services.yaml import read_yaml_field @@ -288,7 +288,6 @@ def _bulk_write_tests_to_db( # Upsert Tests if len(test_data) > 0: - metrics.gauge("test_results_processor.test_count", len(test_data)) sorted_tests = sorted( test_data.values(), key=lambda x: str(x["id"]), @@ -318,9 +317,6 @@ def _bulk_write_tests_to_db( # Save TestInstances if len(test_instance_data) > 0: - metrics.gauge( - "test_results_processor.test_instance_count", len(test_instance_data) - ) self.save_test_instances(db_session, test_instance_data) log.info( @@ -455,16 +451,17 @@ def process_individual_upload( else: successful = True - self._bulk_write_tests_to_db( - db_session, - repository.repoid, - commitid, - upload_id, - upload.report.commit.branch, - parsing_results, - flaky_test_set, - upload.flag_names, - ) + with write_tests_summary.labels("old").time(): + self._bulk_write_tests_to_db( + db_session, + repository.repoid, + commitid, + upload_id, + upload.report.commit.branch, + parsing_results, + flaky_test_set, + upload.flag_names, + ) upload.state = "processed" db_session.commit()