From d944b64509aef639e69d5d84a4a91e9e81a6082a Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Tue, 13 May 2025 15:54:28 +0000 Subject: [PATCH 1/2] Keep last metrics for finished jobs The retention window is 1800 seconds (last 30 minutes) by default, configurable via the `DSTACK_SERVER_METRICS_WINDOW_SECONDS` environment variable. Closes: https://github.com/dstackai/dstack/issues/2618 --- .../background/tasks/process_metrics.py | 58 +++++++++++++++---- src/dstack/_internal/server/settings.py | 1 + .../background/tasks/test_process_metrics.py | 51 +++++++++++++++- 3 files changed, 98 insertions(+), 12 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_metrics.py b/src/dstack/_internal/server/background/tasks/process_metrics.py index 9ac5685076..c166bc69df 100644 --- a/src/dstack/_internal/server/background/tasks/process_metrics.py +++ b/src/dstack/_internal/server/background/tasks/process_metrics.py @@ -1,14 +1,15 @@ import asyncio import json +from datetime import timedelta from typing import Dict, List, Optional -from sqlalchemy import delete, select +from sqlalchemy import BigInteger, delete, func, select from sqlalchemy.orm import joinedload from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT from dstack._internal.core.models.runs import JobStatus from dstack._internal.server import settings -from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import InstanceModel, JobMetricsPoint, JobModel from dstack._internal.server.schemas.runner import MetricsResponse from dstack._internal.server.services.instances import get_instance_ssh_private_keys @@ -42,10 +43,51 @@ async def collect_metrics(): async def delete_metrics(): - cutoff = _get_delete_metrics_cutoff() + now = get_current_datetime() + running_timestamp_micro_cutoff = int(now.timestamp() * 1_000_000) - ( + settings.SERVER_METRICS_TTL_SECONDS * 1_000_000 + ) + retention_window = timedelta(seconds=settings.SERVER_METRICS_WINDOW_SECONDS) + retention_window_micro = int(retention_window.total_seconds() * 1_000_000) + dialect_name = get_db().dialect_name + if dialect_name == "sqlite": + last_processed_at_epoch = func.unixepoch(JobModel.last_processed_at) + elif dialect_name == "postgresql": + last_processed_at_epoch = func.date_part("epoch", JobModel.last_processed_at) + else: + raise ValueError(f"unsupported dialect: {dialect_name}") + # Optimization - only check recently finished jobs, + # where last_processed_at > now() - retention_window * 2 + finished_last_processed_cutoff = now - retention_window * 2 async with get_session_ctx() as session: - await session.execute( - delete(JobMetricsPoint).where(JobMetricsPoint.timestamp_micro < cutoff) + await asyncio.gather( + session.execute( + delete(JobMetricsPoint).where( + JobMetricsPoint.job_id.in_( + select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING])) + ), + JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff, + ) + ), + session.execute( + delete(JobMetricsPoint).where( + JobMetricsPoint.job_id.in_( + select(JobModel.id).where( + JobModel.status.in_(JobStatus.finished_statuses()), + JobModel.last_processed_at > finished_last_processed_cutoff, + ) + ), + JobMetricsPoint.timestamp_micro + < ( + select( + last_processed_at_epoch.cast(BigInteger) * 1_000_000 + - retention_window_micro + ) + .where(JobModel.id == JobMetricsPoint.job_id) + .scalar_subquery() + ), + ) + ), ) await session.commit() @@ -134,9 +176,3 @@ def _pull_runner_metrics( ) -> Optional[MetricsResponse]: runner_client = client.RunnerClient(port=ports[DSTACK_RUNNER_HTTP_PORT]) return runner_client.get_metrics() - - -def _get_delete_metrics_cutoff() -> int: - now = int(get_current_datetime().timestamp() * 1_000_000) - cutoff = now - (settings.SERVER_METRICS_TTL_SECONDS * 1_000_000) - return cutoff diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 47eb68a82e..bed9b13c10 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -46,6 +46,7 @@ SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT") SERVER_METRICS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS", 3600)) +SERVER_METRICS_WINDOW_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_WINDOW_SECONDS", 1800)) DEFAULT_PROJECT_NAME = "main" diff --git a/src/tests/_internal/server/background/tasks/test_process_metrics.py b/src/tests/_internal/server/background/tasks/test_process_metrics.py index e7563e81c1..be0c034d3a 100644 --- a/src/tests/_internal/server/background/tasks/test_process_metrics.py +++ b/src/tests/_internal/server/background/tasks/test_process_metrics.py @@ -94,7 +94,7 @@ class TestDeleteMetrics: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) - async def test_deletes_old_metrics(self, test_db, session: AsyncSession): + async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSession): user = await create_user(session=session, global_role=GlobalRole.USER) project = await create_project(session=session, owner=user) await add_project_member( @@ -113,6 +113,8 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession): job = await create_job( session=session, run=run, + status=JobStatus.RUNNING, + last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc), ) await create_job_metrics_point( session=session, @@ -135,3 +137,50 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession): points = res.scalars().all() assert len(points) == 1 assert points[0].id == last_metric.id + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) + async def test_deletes_old_metrics_finished_job(self, test_db, session: AsyncSession): + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + repo = await create_repo( + session=session, + project_id=project.id, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.FAILED, + last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc), + ) + await create_job_metrics_point( + session=session, + job_model=job, + timestamp=datetime(2023, 1, 2, 3, 4, 30, tzinfo=timezone.utc), + ) + await create_job_metrics_point( + session=session, + job_model=job, + timestamp=datetime(2023, 1, 2, 3, 4, 40, tzinfo=timezone.utc), + ) + last_metric = await create_job_metrics_point( + session=session, + job_model=job, + timestamp=datetime(2023, 1, 2, 3, 4, 50, tzinfo=timezone.utc), + ) + with patch.object(settings, "SERVER_METRICS_WINDOW_SECONDS", 15): + await delete_metrics() + res = await session.execute(select(JobMetricsPoint)) + points = res.scalars().all() + assert len(points) == 1 + assert points[0].id == last_metric.id From 42397abeda6691140013e8f19fa13dcff383e746 Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Wed, 14 May 2025 09:10:20 +0000 Subject: [PATCH 2/2] Replace retention window with TTL --- .../background/tasks/process_metrics.py | 39 +++++-------------- src/dstack/_internal/server/services/runs.py | 4 +- src/dstack/_internal/server/settings.py | 22 ++++++++++- .../background/tasks/test_process_metrics.py | 16 ++++---- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_metrics.py b/src/dstack/_internal/server/background/tasks/process_metrics.py index c166bc69df..43d8b7838a 100644 --- a/src/dstack/_internal/server/background/tasks/process_metrics.py +++ b/src/dstack/_internal/server/background/tasks/process_metrics.py @@ -1,15 +1,14 @@ import asyncio import json -from datetime import timedelta from typing import Dict, List, Optional -from sqlalchemy import BigInteger, delete, func, select +from sqlalchemy import delete, select from sqlalchemy.orm import joinedload from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT from dstack._internal.core.models.runs import JobStatus from dstack._internal.server import settings -from dstack._internal.server.db import get_db, get_session_ctx +from dstack._internal.server.db import get_session_ctx from dstack._internal.server.models import InstanceModel, JobMetricsPoint, JobModel from dstack._internal.server.schemas.runner import MetricsResponse from dstack._internal.server.services.instances import get_instance_ssh_private_keys @@ -43,22 +42,13 @@ async def collect_metrics(): async def delete_metrics(): - now = get_current_datetime() - running_timestamp_micro_cutoff = int(now.timestamp() * 1_000_000) - ( - settings.SERVER_METRICS_TTL_SECONDS * 1_000_000 + now_timestamp_micro = int(get_current_datetime().timestamp() * 1_000_000) + running_timestamp_micro_cutoff = ( + now_timestamp_micro - settings.SERVER_METRICS_RUNNING_TTL_SECONDS * 1_000_000 + ) + finished_timestamp_micro_cutoff = ( + now_timestamp_micro - settings.SERVER_METRICS_FINISHED_TTL_SECONDS * 1_000_000 ) - retention_window = timedelta(seconds=settings.SERVER_METRICS_WINDOW_SECONDS) - retention_window_micro = int(retention_window.total_seconds() * 1_000_000) - dialect_name = get_db().dialect_name - if dialect_name == "sqlite": - last_processed_at_epoch = func.unixepoch(JobModel.last_processed_at) - elif dialect_name == "postgresql": - last_processed_at_epoch = func.date_part("epoch", JobModel.last_processed_at) - else: - raise ValueError(f"unsupported dialect: {dialect_name}") - # Optimization - only check recently finished jobs, - # where last_processed_at > now() - retention_window * 2 - finished_last_processed_cutoff = now - retention_window * 2 async with get_session_ctx() as session: await asyncio.gather( session.execute( @@ -73,19 +63,10 @@ async def delete_metrics(): delete(JobMetricsPoint).where( JobMetricsPoint.job_id.in_( select(JobModel.id).where( - JobModel.status.in_(JobStatus.finished_statuses()), - JobModel.last_processed_at > finished_last_processed_cutoff, - ) - ), - JobMetricsPoint.timestamp_micro - < ( - select( - last_processed_at_epoch.cast(BigInteger) * 1_000_000 - - retention_window_micro + JobModel.status.in_(JobStatus.finished_statuses()) ) - .where(JobModel.id == JobMetricsPoint.job_id) - .scalar_subquery() ), + JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff, ) ), ) diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index d2df9eda44..8066063319 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -870,10 +870,10 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec): if ( run_spec.merged_profile.utilization_policy is not None and run_spec.merged_profile.utilization_policy.time_window - > settings.SERVER_METRICS_TTL_SECONDS + > settings.SERVER_METRICS_RUNNING_TTL_SECONDS ): raise ServerClientError( - f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s" + f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_RUNNING_TTL_SECONDS}s" ) set_resources_defaults(run_spec.configuration.resources) diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index bed9b13c10..d26fb95b85 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -1,4 +1,5 @@ import os +import warnings from pathlib import Path DSTACK_DIR_PATH = Path("~/.dstack/").expanduser() @@ -45,8 +46,25 @@ SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT") -SERVER_METRICS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS", 3600)) -SERVER_METRICS_WINDOW_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_WINDOW_SECONDS", 1800)) +SERVER_METRICS_RUNNING_TTL_SECONDS: int +_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS") +if _SERVER_METRICS_RUNNING_TTL_SECONDS is None: + _SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS") + if _SERVER_METRICS_RUNNING_TTL_SECONDS is not None: + warnings.warn( + ( + "DSTACK_SERVER_METRICS_TTL_SECONDS is deprecated," + " use DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS instead" + ), + DeprecationWarning, + ) + else: + _SERVER_METRICS_RUNNING_TTL_SECONDS = 3600 +SERVER_METRICS_RUNNING_TTL_SECONDS = int(_SERVER_METRICS_RUNNING_TTL_SECONDS) +del _SERVER_METRICS_RUNNING_TTL_SECONDS +SERVER_METRICS_FINISHED_TTL_SECONDS = int( + os.getenv("DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS", 7 * 24 * 3600) +) DEFAULT_PROJECT_NAME = "main" diff --git a/src/tests/_internal/server/background/tasks/test_process_metrics.py b/src/tests/_internal/server/background/tasks/test_process_metrics.py index be0c034d3a..0be650a223 100644 --- a/src/tests/_internal/server/background/tasks/test_process_metrics.py +++ b/src/tests/_internal/server/background/tasks/test_process_metrics.py @@ -114,7 +114,6 @@ async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSess session=session, run=run, status=JobStatus.RUNNING, - last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc), ) await create_job_metrics_point( session=session, @@ -131,7 +130,9 @@ async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSess job_model=job, timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc), ) - with patch.object(settings, "SERVER_METRICS_TTL_SECONDS", 15): + with patch.multiple( + settings, SERVER_METRICS_RUNNING_TTL_SECONDS=15, SERVER_METRICS_FINISHED_TTL_SECONDS=0 + ): await delete_metrics() res = await session.execute(select(JobMetricsPoint)) points = res.scalars().all() @@ -161,24 +162,25 @@ async def test_deletes_old_metrics_finished_job(self, test_db, session: AsyncSes session=session, run=run, status=JobStatus.FAILED, - last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc), ) await create_job_metrics_point( session=session, job_model=job, - timestamp=datetime(2023, 1, 2, 3, 4, 30, tzinfo=timezone.utc), + timestamp=datetime(2023, 1, 2, 3, 4, 10, tzinfo=timezone.utc), ) await create_job_metrics_point( session=session, job_model=job, - timestamp=datetime(2023, 1, 2, 3, 4, 40, tzinfo=timezone.utc), + timestamp=datetime(2023, 1, 2, 3, 4, 20, tzinfo=timezone.utc), ) last_metric = await create_job_metrics_point( session=session, job_model=job, - timestamp=datetime(2023, 1, 2, 3, 4, 50, tzinfo=timezone.utc), + timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc), ) - with patch.object(settings, "SERVER_METRICS_WINDOW_SECONDS", 15): + with patch.multiple( + settings, SERVER_METRICS_RUNNING_TTL_SECONDS=0, SERVER_METRICS_FINISHED_TTL_SECONDS=15 + ): await delete_metrics() res = await session.execute(select(JobMetricsPoint)) points = res.scalars().all()