Skip to content

Commit e69902f

Browse files
authored
Add finished job metrics TTL setting (#2628)
* Introduce `DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS` (1 week by default) * Rename `DSTACK_SERVER_METRICS_TTL_SECONDS` to `DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS` Closes: #2618
1 parent 4ff4d63 commit e69902f

File tree

4 files changed

+101
-14
lines changed

4 files changed

+101
-14
lines changed

src/dstack/_internal/server/background/tasks/process_metrics.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,33 @@ async def collect_metrics():
4242

4343

4444
async def delete_metrics():
45-
cutoff = _get_delete_metrics_cutoff()
45+
now_timestamp_micro = int(get_current_datetime().timestamp() * 1_000_000)
46+
running_timestamp_micro_cutoff = (
47+
now_timestamp_micro - settings.SERVER_METRICS_RUNNING_TTL_SECONDS * 1_000_000
48+
)
49+
finished_timestamp_micro_cutoff = (
50+
now_timestamp_micro - settings.SERVER_METRICS_FINISHED_TTL_SECONDS * 1_000_000
51+
)
4652
async with get_session_ctx() as session:
47-
await session.execute(
48-
delete(JobMetricsPoint).where(JobMetricsPoint.timestamp_micro < cutoff)
53+
await asyncio.gather(
54+
session.execute(
55+
delete(JobMetricsPoint).where(
56+
JobMetricsPoint.job_id.in_(
57+
select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING]))
58+
),
59+
JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff,
60+
)
61+
),
62+
session.execute(
63+
delete(JobMetricsPoint).where(
64+
JobMetricsPoint.job_id.in_(
65+
select(JobModel.id).where(
66+
JobModel.status.in_(JobStatus.finished_statuses())
67+
)
68+
),
69+
JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff,
70+
)
71+
),
4972
)
5073
await session.commit()
5174

@@ -134,9 +157,3 @@ def _pull_runner_metrics(
134157
) -> Optional[MetricsResponse]:
135158
runner_client = client.RunnerClient(port=ports[DSTACK_RUNNER_HTTP_PORT])
136159
return runner_client.get_metrics()
137-
138-
139-
def _get_delete_metrics_cutoff() -> int:
140-
now = int(get_current_datetime().timestamp() * 1_000_000)
141-
cutoff = now - (settings.SERVER_METRICS_TTL_SECONDS * 1_000_000)
142-
return cutoff

src/dstack/_internal/server/services/runs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -870,10 +870,10 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
870870
if (
871871
run_spec.merged_profile.utilization_policy is not None
872872
and run_spec.merged_profile.utilization_policy.time_window
873-
> settings.SERVER_METRICS_TTL_SECONDS
873+
> settings.SERVER_METRICS_RUNNING_TTL_SECONDS
874874
):
875875
raise ServerClientError(
876-
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s"
876+
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_RUNNING_TTL_SECONDS}s"
877877
)
878878
set_resources_defaults(run_spec.configuration.resources)
879879

src/dstack/_internal/server/settings.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import warnings
23
from pathlib import Path
34

45
DSTACK_DIR_PATH = Path("~/.dstack/").expanduser()
@@ -45,7 +46,25 @@
4546

4647
SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT")
4748

48-
SERVER_METRICS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS", 3600))
49+
SERVER_METRICS_RUNNING_TTL_SECONDS: int
50+
_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS")
51+
if _SERVER_METRICS_RUNNING_TTL_SECONDS is None:
52+
_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS")
53+
if _SERVER_METRICS_RUNNING_TTL_SECONDS is not None:
54+
warnings.warn(
55+
(
56+
"DSTACK_SERVER_METRICS_TTL_SECONDS is deprecated,"
57+
" use DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS instead"
58+
),
59+
DeprecationWarning,
60+
)
61+
else:
62+
_SERVER_METRICS_RUNNING_TTL_SECONDS = 3600
63+
SERVER_METRICS_RUNNING_TTL_SECONDS = int(_SERVER_METRICS_RUNNING_TTL_SECONDS)
64+
del _SERVER_METRICS_RUNNING_TTL_SECONDS
65+
SERVER_METRICS_FINISHED_TTL_SECONDS = int(
66+
os.getenv("DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS", 7 * 24 * 3600)
67+
)
4968

5069
DEFAULT_PROJECT_NAME = "main"
5170

src/tests/_internal/server/background/tasks/test_process_metrics.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class TestDeleteMetrics:
9494
@pytest.mark.asyncio
9595
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
9696
@freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc))
97-
async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
97+
async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSession):
9898
user = await create_user(session=session, global_role=GlobalRole.USER)
9999
project = await create_project(session=session, owner=user)
100100
await add_project_member(
@@ -113,6 +113,55 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
113113
job = await create_job(
114114
session=session,
115115
run=run,
116+
status=JobStatus.RUNNING,
117+
)
118+
await create_job_metrics_point(
119+
session=session,
120+
job_model=job,
121+
timestamp=datetime(2023, 1, 2, 3, 4, 10, tzinfo=timezone.utc),
122+
)
123+
await create_job_metrics_point(
124+
session=session,
125+
job_model=job,
126+
timestamp=datetime(2023, 1, 2, 3, 4, 20, tzinfo=timezone.utc),
127+
)
128+
last_metric = await create_job_metrics_point(
129+
session=session,
130+
job_model=job,
131+
timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc),
132+
)
133+
with patch.multiple(
134+
settings, SERVER_METRICS_RUNNING_TTL_SECONDS=15, SERVER_METRICS_FINISHED_TTL_SECONDS=0
135+
):
136+
await delete_metrics()
137+
res = await session.execute(select(JobMetricsPoint))
138+
points = res.scalars().all()
139+
assert len(points) == 1
140+
assert points[0].id == last_metric.id
141+
142+
@pytest.mark.asyncio
143+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
144+
@freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc))
145+
async def test_deletes_old_metrics_finished_job(self, test_db, session: AsyncSession):
146+
user = await create_user(session=session, global_role=GlobalRole.USER)
147+
project = await create_project(session=session, owner=user)
148+
await add_project_member(
149+
session=session, project=project, user=user, project_role=ProjectRole.USER
150+
)
151+
repo = await create_repo(
152+
session=session,
153+
project_id=project.id,
154+
)
155+
run = await create_run(
156+
session=session,
157+
project=project,
158+
repo=repo,
159+
user=user,
160+
)
161+
job = await create_job(
162+
session=session,
163+
run=run,
164+
status=JobStatus.FAILED,
116165
)
117166
await create_job_metrics_point(
118167
session=session,
@@ -129,7 +178,9 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
129178
job_model=job,
130179
timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc),
131180
)
132-
with patch.object(settings, "SERVER_METRICS_TTL_SECONDS", 15):
181+
with patch.multiple(
182+
settings, SERVER_METRICS_RUNNING_TTL_SECONDS=0, SERVER_METRICS_FINISHED_TTL_SECONDS=15
183+
):
133184
await delete_metrics()
134185
res = await session.execute(select(JobMetricsPoint))
135186
points = res.scalars().all()

0 commit comments

Comments
 (0)