Skip to content

Commit 42397ab

Browse files
committed
Replace retention window with TTL
1 parent d944b64 commit 42397ab

4 files changed

Lines changed: 41 additions & 40 deletions

File tree

src/dstack/_internal/server/background/tasks/process_metrics.py

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import asyncio
22
import json
3-
from datetime import timedelta
43
from typing import Dict, List, Optional
54

6-
from sqlalchemy import BigInteger, delete, func, select
5+
from sqlalchemy import delete, select
76
from sqlalchemy.orm import joinedload
87

98
from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT
109
from dstack._internal.core.models.runs import JobStatus
1110
from dstack._internal.server import settings
12-
from dstack._internal.server.db import get_db, get_session_ctx
11+
from dstack._internal.server.db import get_session_ctx
1312
from dstack._internal.server.models import InstanceModel, JobMetricsPoint, JobModel
1413
from dstack._internal.server.schemas.runner import MetricsResponse
1514
from dstack._internal.server.services.instances import get_instance_ssh_private_keys
@@ -43,22 +42,13 @@ async def collect_metrics():
4342

4443

4544
async def delete_metrics():
46-
now = get_current_datetime()
47-
running_timestamp_micro_cutoff = int(now.timestamp() * 1_000_000) - (
48-
settings.SERVER_METRICS_TTL_SECONDS * 1_000_000
45+
now_timestamp_micro = int(get_current_datetime().timestamp() * 1_000_000)
46+
running_timestamp_micro_cutoff = (
47+
now_timestamp_micro - settings.SERVER_METRICS_RUNNING_TTL_SECONDS * 1_000_000
48+
)
49+
finished_timestamp_micro_cutoff = (
50+
now_timestamp_micro - settings.SERVER_METRICS_FINISHED_TTL_SECONDS * 1_000_000
4951
)
50-
retention_window = timedelta(seconds=settings.SERVER_METRICS_WINDOW_SECONDS)
51-
retention_window_micro = int(retention_window.total_seconds() * 1_000_000)
52-
dialect_name = get_db().dialect_name
53-
if dialect_name == "sqlite":
54-
last_processed_at_epoch = func.unixepoch(JobModel.last_processed_at)
55-
elif dialect_name == "postgresql":
56-
last_processed_at_epoch = func.date_part("epoch", JobModel.last_processed_at)
57-
else:
58-
raise ValueError(f"unsupported dialect: {dialect_name}")
59-
# Optimization - only check recently finished jobs,
60-
# where last_processed_at > now() - retention_window * 2
61-
finished_last_processed_cutoff = now - retention_window * 2
6252
async with get_session_ctx() as session:
6353
await asyncio.gather(
6454
session.execute(
@@ -73,19 +63,10 @@ async def delete_metrics():
7363
delete(JobMetricsPoint).where(
7464
JobMetricsPoint.job_id.in_(
7565
select(JobModel.id).where(
76-
JobModel.status.in_(JobStatus.finished_statuses()),
77-
JobModel.last_processed_at > finished_last_processed_cutoff,
78-
)
79-
),
80-
JobMetricsPoint.timestamp_micro
81-
< (
82-
select(
83-
last_processed_at_epoch.cast(BigInteger) * 1_000_000
84-
- retention_window_micro
66+
JobModel.status.in_(JobStatus.finished_statuses())
8567
)
86-
.where(JobModel.id == JobMetricsPoint.job_id)
87-
.scalar_subquery()
8868
),
69+
JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff,
8970
)
9071
),
9172
)

src/dstack/_internal/server/services/runs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -870,10 +870,10 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
870870
if (
871871
run_spec.merged_profile.utilization_policy is not None
872872
and run_spec.merged_profile.utilization_policy.time_window
873-
> settings.SERVER_METRICS_TTL_SECONDS
873+
> settings.SERVER_METRICS_RUNNING_TTL_SECONDS
874874
):
875875
raise ServerClientError(
876-
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s"
876+
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_RUNNING_TTL_SECONDS}s"
877877
)
878878
set_resources_defaults(run_spec.configuration.resources)
879879

src/dstack/_internal/server/settings.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import warnings
23
from pathlib import Path
34

45
DSTACK_DIR_PATH = Path("~/.dstack/").expanduser()
@@ -45,8 +46,25 @@
4546

4647
SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT")
4748

48-
SERVER_METRICS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS", 3600))
49-
SERVER_METRICS_WINDOW_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_WINDOW_SECONDS", 1800))
49+
SERVER_METRICS_RUNNING_TTL_SECONDS: int
50+
_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS")
51+
if _SERVER_METRICS_RUNNING_TTL_SECONDS is None:
52+
_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS")
53+
if _SERVER_METRICS_RUNNING_TTL_SECONDS is not None:
54+
warnings.warn(
55+
(
56+
"DSTACK_SERVER_METRICS_TTL_SECONDS is deprecated,"
57+
" use DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS instead"
58+
),
59+
DeprecationWarning,
60+
)
61+
else:
62+
_SERVER_METRICS_RUNNING_TTL_SECONDS = 3600
63+
SERVER_METRICS_RUNNING_TTL_SECONDS = int(_SERVER_METRICS_RUNNING_TTL_SECONDS)
64+
del _SERVER_METRICS_RUNNING_TTL_SECONDS
65+
SERVER_METRICS_FINISHED_TTL_SECONDS = int(
66+
os.getenv("DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS", 7 * 24 * 3600)
67+
)
5068

5169
DEFAULT_PROJECT_NAME = "main"
5270

src/tests/_internal/server/background/tasks/test_process_metrics.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSess
114114
session=session,
115115
run=run,
116116
status=JobStatus.RUNNING,
117-
last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc),
118117
)
119118
await create_job_metrics_point(
120119
session=session,
@@ -131,7 +130,9 @@ async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSess
131130
job_model=job,
132131
timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc),
133132
)
134-
with patch.object(settings, "SERVER_METRICS_TTL_SECONDS", 15):
133+
with patch.multiple(
134+
settings, SERVER_METRICS_RUNNING_TTL_SECONDS=15, SERVER_METRICS_FINISHED_TTL_SECONDS=0
135+
):
135136
await delete_metrics()
136137
res = await session.execute(select(JobMetricsPoint))
137138
points = res.scalars().all()
@@ -161,24 +162,25 @@ async def test_deletes_old_metrics_finished_job(self, test_db, session: AsyncSes
161162
session=session,
162163
run=run,
163164
status=JobStatus.FAILED,
164-
last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc),
165165
)
166166
await create_job_metrics_point(
167167
session=session,
168168
job_model=job,
169-
timestamp=datetime(2023, 1, 2, 3, 4, 30, tzinfo=timezone.utc),
169+
timestamp=datetime(2023, 1, 2, 3, 4, 10, tzinfo=timezone.utc),
170170
)
171171
await create_job_metrics_point(
172172
session=session,
173173
job_model=job,
174-
timestamp=datetime(2023, 1, 2, 3, 4, 40, tzinfo=timezone.utc),
174+
timestamp=datetime(2023, 1, 2, 3, 4, 20, tzinfo=timezone.utc),
175175
)
176176
last_metric = await create_job_metrics_point(
177177
session=session,
178178
job_model=job,
179-
timestamp=datetime(2023, 1, 2, 3, 4, 50, tzinfo=timezone.utc),
179+
timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc),
180180
)
181-
with patch.object(settings, "SERVER_METRICS_WINDOW_SECONDS", 15):
181+
with patch.multiple(
182+
settings, SERVER_METRICS_RUNNING_TTL_SECONDS=0, SERVER_METRICS_FINISHED_TTL_SECONDS=15
183+
):
182184
await delete_metrics()
183185
res = await session.execute(select(JobMetricsPoint))
184186
points = res.scalars().all()

0 commit comments

Comments
 (0)