Skip to content

Commit d944b64

Browse files
committed
Keep last metrics for finished jobs
The retention window is 1800 seconds (last 30 minutes) by default, configurable via the `DSTACK_SERVER_METRICS_WINDOW_SECONDS` environment variable. Closes: #2618
1 parent 208ed86 commit d944b64

3 files changed

Lines changed: 98 additions & 12 deletions

File tree

src/dstack/_internal/server/background/tasks/process_metrics.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import asyncio
22
import json
3+
from datetime import timedelta
34
from typing import Dict, List, Optional
45

5-
from sqlalchemy import delete, select
6+
from sqlalchemy import BigInteger, delete, func, select
67
from sqlalchemy.orm import joinedload
78

89
from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT
910
from dstack._internal.core.models.runs import JobStatus
1011
from dstack._internal.server import settings
11-
from dstack._internal.server.db import get_session_ctx
12+
from dstack._internal.server.db import get_db, get_session_ctx
1213
from dstack._internal.server.models import InstanceModel, JobMetricsPoint, JobModel
1314
from dstack._internal.server.schemas.runner import MetricsResponse
1415
from dstack._internal.server.services.instances import get_instance_ssh_private_keys
@@ -42,10 +43,51 @@ async def collect_metrics():
4243

4344

4445
async def delete_metrics():
45-
cutoff = _get_delete_metrics_cutoff()
46+
now = get_current_datetime()
47+
running_timestamp_micro_cutoff = int(now.timestamp() * 1_000_000) - (
48+
settings.SERVER_METRICS_TTL_SECONDS * 1_000_000
49+
)
50+
retention_window = timedelta(seconds=settings.SERVER_METRICS_WINDOW_SECONDS)
51+
retention_window_micro = int(retention_window.total_seconds() * 1_000_000)
52+
dialect_name = get_db().dialect_name
53+
if dialect_name == "sqlite":
54+
last_processed_at_epoch = func.unixepoch(JobModel.last_processed_at)
55+
elif dialect_name == "postgresql":
56+
last_processed_at_epoch = func.date_part("epoch", JobModel.last_processed_at)
57+
else:
58+
raise ValueError(f"unsupported dialect: {dialect_name}")
59+
# Optimization - only check recently finished jobs,
60+
# where last_processed_at > now() - retention_window * 2
61+
finished_last_processed_cutoff = now - retention_window * 2
4662
async with get_session_ctx() as session:
47-
await session.execute(
48-
delete(JobMetricsPoint).where(JobMetricsPoint.timestamp_micro < cutoff)
63+
await asyncio.gather(
64+
session.execute(
65+
delete(JobMetricsPoint).where(
66+
JobMetricsPoint.job_id.in_(
67+
select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING]))
68+
),
69+
JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff,
70+
)
71+
),
72+
session.execute(
73+
delete(JobMetricsPoint).where(
74+
JobMetricsPoint.job_id.in_(
75+
select(JobModel.id).where(
76+
JobModel.status.in_(JobStatus.finished_statuses()),
77+
JobModel.last_processed_at > finished_last_processed_cutoff,
78+
)
79+
),
80+
JobMetricsPoint.timestamp_micro
81+
< (
82+
select(
83+
last_processed_at_epoch.cast(BigInteger) * 1_000_000
84+
- retention_window_micro
85+
)
86+
.where(JobModel.id == JobMetricsPoint.job_id)
87+
.scalar_subquery()
88+
),
89+
)
90+
),
4991
)
5092
await session.commit()
5193

@@ -134,9 +176,3 @@ def _pull_runner_metrics(
134176
) -> Optional[MetricsResponse]:
135177
runner_client = client.RunnerClient(port=ports[DSTACK_RUNNER_HTTP_PORT])
136178
return runner_client.get_metrics()
137-
138-
139-
def _get_delete_metrics_cutoff() -> int:
140-
now = int(get_current_datetime().timestamp() * 1_000_000)
141-
cutoff = now - (settings.SERVER_METRICS_TTL_SECONDS * 1_000_000)
142-
return cutoff

src/dstack/_internal/server/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT")
4747

4848
SERVER_METRICS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS", 3600))
49+
SERVER_METRICS_WINDOW_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_WINDOW_SECONDS", 1800))
4950

5051
DEFAULT_PROJECT_NAME = "main"
5152

src/tests/_internal/server/background/tasks/test_process_metrics.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class TestDeleteMetrics:
9494
@pytest.mark.asyncio
9595
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
9696
@freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc))
97-
async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
97+
async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSession):
9898
user = await create_user(session=session, global_role=GlobalRole.USER)
9999
project = await create_project(session=session, owner=user)
100100
await add_project_member(
@@ -113,6 +113,8 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
113113
job = await create_job(
114114
session=session,
115115
run=run,
116+
status=JobStatus.RUNNING,
117+
last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc),
116118
)
117119
await create_job_metrics_point(
118120
session=session,
@@ -135,3 +137,50 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
135137
points = res.scalars().all()
136138
assert len(points) == 1
137139
assert points[0].id == last_metric.id
140+
141+
@pytest.mark.asyncio
142+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
143+
@freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc))
144+
async def test_deletes_old_metrics_finished_job(self, test_db, session: AsyncSession):
145+
user = await create_user(session=session, global_role=GlobalRole.USER)
146+
project = await create_project(session=session, owner=user)
147+
await add_project_member(
148+
session=session, project=project, user=user, project_role=ProjectRole.USER
149+
)
150+
repo = await create_repo(
151+
session=session,
152+
project_id=project.id,
153+
)
154+
run = await create_run(
155+
session=session,
156+
project=project,
157+
repo=repo,
158+
user=user,
159+
)
160+
job = await create_job(
161+
session=session,
162+
run=run,
163+
status=JobStatus.FAILED,
164+
last_processed_at=datetime(2023, 1, 2, 3, 5, 0, tzinfo=timezone.utc),
165+
)
166+
await create_job_metrics_point(
167+
session=session,
168+
job_model=job,
169+
timestamp=datetime(2023, 1, 2, 3, 4, 30, tzinfo=timezone.utc),
170+
)
171+
await create_job_metrics_point(
172+
session=session,
173+
job_model=job,
174+
timestamp=datetime(2023, 1, 2, 3, 4, 40, tzinfo=timezone.utc),
175+
)
176+
last_metric = await create_job_metrics_point(
177+
session=session,
178+
job_model=job,
179+
timestamp=datetime(2023, 1, 2, 3, 4, 50, tzinfo=timezone.utc),
180+
)
181+
with patch.object(settings, "SERVER_METRICS_WINDOW_SECONDS", 15):
182+
await delete_metrics()
183+
res = await session.execute(select(JobMetricsPoint))
184+
points = res.scalars().all()
185+
assert len(points) == 1
186+
assert points[0].id == last_metric.id

0 commit comments

Comments
 (0)