diff --git a/src/dstack/_internal/server/background/tasks/process_metrics.py b/src/dstack/_internal/server/background/tasks/process_metrics.py index 43d8b7838a..b2ad5ca39b 100644 --- a/src/dstack/_internal/server/background/tasks/process_metrics.py +++ b/src/dstack/_internal/server/background/tasks/process_metrics.py @@ -2,7 +2,7 @@ import json from typing import Dict, List, Optional -from sqlalchemy import delete, select +from sqlalchemy import Delete, delete, select from sqlalchemy.orm import joinedload from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT @@ -49,27 +49,29 @@ async def delete_metrics(): finished_timestamp_micro_cutoff = ( now_timestamp_micro - settings.SERVER_METRICS_FINISHED_TTL_SECONDS * 1_000_000 ) + await asyncio.gather( + _execute_delete_statement( + delete(JobMetricsPoint).where( + JobMetricsPoint.job_id.in_( + select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING])) + ), + JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff, + ) + ), + _execute_delete_statement( + delete(JobMetricsPoint).where( + JobMetricsPoint.job_id.in_( + select(JobModel.id).where(JobModel.status.in_(JobStatus.finished_statuses())) + ), + JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff, + ) + ), + ) + + +async def _execute_delete_statement(stmt: Delete) -> None: async with get_session_ctx() as session: - await asyncio.gather( - session.execute( - delete(JobMetricsPoint).where( - JobMetricsPoint.job_id.in_( - select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING])) - ), - JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff, - ) - ), - session.execute( - delete(JobMetricsPoint).where( - JobMetricsPoint.job_id.in_( - select(JobModel.id).where( - JobModel.status.in_(JobStatus.finished_statuses()) - ) - ), - JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff, - ) - ), - ) + await session.execute(stmt) await session.commit()