diff --git a/src/dstack/_internal/server/background/tasks/process_metrics.py b/src/dstack/_internal/server/background/tasks/process_metrics.py index 9ac5685076..43d8b7838a 100644 --- a/src/dstack/_internal/server/background/tasks/process_metrics.py +++ b/src/dstack/_internal/server/background/tasks/process_metrics.py @@ -42,10 +42,33 @@ async def collect_metrics(): async def delete_metrics(): - cutoff = _get_delete_metrics_cutoff() + now_timestamp_micro = int(get_current_datetime().timestamp() * 1_000_000) + running_timestamp_micro_cutoff = ( + now_timestamp_micro - settings.SERVER_METRICS_RUNNING_TTL_SECONDS * 1_000_000 + ) + finished_timestamp_micro_cutoff = ( + now_timestamp_micro - settings.SERVER_METRICS_FINISHED_TTL_SECONDS * 1_000_000 + ) async with get_session_ctx() as session: - await session.execute( - delete(JobMetricsPoint).where(JobMetricsPoint.timestamp_micro < cutoff) + await asyncio.gather( + session.execute( + delete(JobMetricsPoint).where( + JobMetricsPoint.job_id.in_( + select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING])) + ), + JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff, + ) + ), + session.execute( + delete(JobMetricsPoint).where( + JobMetricsPoint.job_id.in_( + select(JobModel.id).where( + JobModel.status.in_(JobStatus.finished_statuses()) + ) + ), + JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff, + ) + ), ) await session.commit() @@ -134,9 +157,3 @@ def _pull_runner_metrics( ) -> Optional[MetricsResponse]: runner_client = client.RunnerClient(port=ports[DSTACK_RUNNER_HTTP_PORT]) return runner_client.get_metrics() - - -def _get_delete_metrics_cutoff() -> int: - now = int(get_current_datetime().timestamp() * 1_000_000) - cutoff = now - (settings.SERVER_METRICS_TTL_SECONDS * 1_000_000) - return cutoff diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index d2df9eda44..8066063319 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -870,10 +870,10 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec): if ( run_spec.merged_profile.utilization_policy is not None and run_spec.merged_profile.utilization_policy.time_window - > settings.SERVER_METRICS_TTL_SECONDS + > settings.SERVER_METRICS_RUNNING_TTL_SECONDS ): raise ServerClientError( - f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s" + f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_RUNNING_TTL_SECONDS}s" ) set_resources_defaults(run_spec.configuration.resources) diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 47eb68a82e..d26fb95b85 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -1,4 +1,5 @@ import os +import warnings from pathlib import Path DSTACK_DIR_PATH = Path("~/.dstack/").expanduser() @@ -45,7 +46,25 @@ SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT") -SERVER_METRICS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS", 3600)) +SERVER_METRICS_RUNNING_TTL_SECONDS: int +_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS") +if _SERVER_METRICS_RUNNING_TTL_SECONDS is None: + _SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS") + if _SERVER_METRICS_RUNNING_TTL_SECONDS is not None: + warnings.warn( + ( + "DSTACK_SERVER_METRICS_TTL_SECONDS is deprecated," + " use DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS instead" + ), + DeprecationWarning, + ) + else: + _SERVER_METRICS_RUNNING_TTL_SECONDS = 3600 +SERVER_METRICS_RUNNING_TTL_SECONDS = int(_SERVER_METRICS_RUNNING_TTL_SECONDS) +del _SERVER_METRICS_RUNNING_TTL_SECONDS +SERVER_METRICS_FINISHED_TTL_SECONDS = int( + os.getenv("DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS", 7 * 24 * 3600) +) DEFAULT_PROJECT_NAME = "main" diff --git a/src/tests/_internal/server/background/tasks/test_process_metrics.py b/src/tests/_internal/server/background/tasks/test_process_metrics.py index e7563e81c1..0be650a223 100644 --- a/src/tests/_internal/server/background/tasks/test_process_metrics.py +++ b/src/tests/_internal/server/background/tasks/test_process_metrics.py @@ -94,7 +94,7 @@ class TestDeleteMetrics: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) - async def test_deletes_old_metrics(self, test_db, session: AsyncSession): + async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSession): user = await create_user(session=session, global_role=GlobalRole.USER) project = await create_project(session=session, owner=user) await add_project_member( @@ -113,6 +113,55 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession): job = await create_job( session=session, run=run, + status=JobStatus.RUNNING, + ) + await create_job_metrics_point( + session=session, + job_model=job, + timestamp=datetime(2023, 1, 2, 3, 4, 10, tzinfo=timezone.utc), + ) + await create_job_metrics_point( + session=session, + job_model=job, + timestamp=datetime(2023, 1, 2, 3, 4, 20, tzinfo=timezone.utc), + ) + last_metric = await create_job_metrics_point( + session=session, + job_model=job, + timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc), + ) + with patch.multiple( + settings, SERVER_METRICS_RUNNING_TTL_SECONDS=15, SERVER_METRICS_FINISHED_TTL_SECONDS=0 + ): + await delete_metrics() + res = await session.execute(select(JobMetricsPoint)) + points = res.scalars().all() + assert len(points) == 1 + assert points[0].id == last_metric.id + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc)) + async def test_deletes_old_metrics_finished_job(self, test_db, session: AsyncSession): + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + repo = await create_repo( + session=session, + project_id=project.id, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.FAILED, ) await create_job_metrics_point( session=session, @@ -129,7 +178,9 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession): job_model=job, timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc), ) - with patch.object(settings, "SERVER_METRICS_TTL_SECONDS", 15): + with patch.multiple( + settings, SERVER_METRICS_RUNNING_TTL_SECONDS=0, SERVER_METRICS_FINISHED_TTL_SECONDS=15 + ): await delete_metrics() res = await session.execute(select(JobMetricsPoint)) points = res.scalars().all()