Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/dstack/_internal/server/background/tasks/process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,33 @@ async def collect_metrics():


async def delete_metrics():
cutoff = _get_delete_metrics_cutoff()
now_timestamp_micro = int(get_current_datetime().timestamp() * 1_000_000)
running_timestamp_micro_cutoff = (
now_timestamp_micro - settings.SERVER_METRICS_RUNNING_TTL_SECONDS * 1_000_000
)
finished_timestamp_micro_cutoff = (
now_timestamp_micro - settings.SERVER_METRICS_FINISHED_TTL_SECONDS * 1_000_000
)
async with get_session_ctx() as session:
await session.execute(
delete(JobMetricsPoint).where(JobMetricsPoint.timestamp_micro < cutoff)
await asyncio.gather(
session.execute(
delete(JobMetricsPoint).where(
JobMetricsPoint.job_id.in_(
select(JobModel.id).where(JobModel.status.in_([JobStatus.RUNNING]))
),
JobMetricsPoint.timestamp_micro < running_timestamp_micro_cutoff,
)
),
session.execute(
delete(JobMetricsPoint).where(
JobMetricsPoint.job_id.in_(
select(JobModel.id).where(
JobModel.status.in_(JobStatus.finished_statuses())
)
),
JobMetricsPoint.timestamp_micro < finished_timestamp_micro_cutoff,
)
),
)
await session.commit()

Expand Down Expand Up @@ -134,9 +157,3 @@ def _pull_runner_metrics(
) -> Optional[MetricsResponse]:
runner_client = client.RunnerClient(port=ports[DSTACK_RUNNER_HTTP_PORT])
return runner_client.get_metrics()


def _get_delete_metrics_cutoff() -> int:
now = int(get_current_datetime().timestamp() * 1_000_000)
cutoff = now - (settings.SERVER_METRICS_TTL_SECONDS * 1_000_000)
return cutoff
4 changes: 2 additions & 2 deletions src/dstack/_internal/server/services/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,10 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
if (
run_spec.merged_profile.utilization_policy is not None
and run_spec.merged_profile.utilization_policy.time_window
> settings.SERVER_METRICS_TTL_SECONDS
> settings.SERVER_METRICS_RUNNING_TTL_SECONDS
):
raise ServerClientError(
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s"
f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_RUNNING_TTL_SECONDS}s"
)
set_resources_defaults(run_spec.configuration.resources)

Expand Down
21 changes: 20 additions & 1 deletion src/dstack/_internal/server/settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import warnings
from pathlib import Path

DSTACK_DIR_PATH = Path("~/.dstack/").expanduser()
Expand Down Expand Up @@ -45,7 +46,25 @@

SERVER_GCP_LOGGING_PROJECT = os.getenv("DSTACK_SERVER_GCP_LOGGING_PROJECT")

SERVER_METRICS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS", 3600))
SERVER_METRICS_RUNNING_TTL_SECONDS: int
_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS")
if _SERVER_METRICS_RUNNING_TTL_SECONDS is None:
_SERVER_METRICS_RUNNING_TTL_SECONDS = os.getenv("DSTACK_SERVER_METRICS_TTL_SECONDS")
if _SERVER_METRICS_RUNNING_TTL_SECONDS is not None:
warnings.warn(
(
"DSTACK_SERVER_METRICS_TTL_SECONDS is deprecated,"
" use DSTACK_SERVER_METRICS_RUNNING_TTL_SECONDS instead"
),
DeprecationWarning,
)
else:
_SERVER_METRICS_RUNNING_TTL_SECONDS = 3600
SERVER_METRICS_RUNNING_TTL_SECONDS = int(_SERVER_METRICS_RUNNING_TTL_SECONDS)
del _SERVER_METRICS_RUNNING_TTL_SECONDS
SERVER_METRICS_FINISHED_TTL_SECONDS = int(
os.getenv("DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS", 7 * 24 * 3600)
)

DEFAULT_PROJECT_NAME = "main"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TestDeleteMetrics:
@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc))
async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
async def test_deletes_old_metrics_running_job(self, test_db, session: AsyncSession):
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
Expand All @@ -113,6 +113,55 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
job = await create_job(
session=session,
run=run,
status=JobStatus.RUNNING,
)
await create_job_metrics_point(
session=session,
job_model=job,
timestamp=datetime(2023, 1, 2, 3, 4, 10, tzinfo=timezone.utc),
)
await create_job_metrics_point(
session=session,
job_model=job,
timestamp=datetime(2023, 1, 2, 3, 4, 20, tzinfo=timezone.utc),
)
last_metric = await create_job_metrics_point(
session=session,
job_model=job,
timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc),
)
with patch.multiple(
settings, SERVER_METRICS_RUNNING_TTL_SECONDS=15, SERVER_METRICS_FINISHED_TTL_SECONDS=0
):
await delete_metrics()
res = await session.execute(select(JobMetricsPoint))
points = res.scalars().all()
assert len(points) == 1
assert points[0].id == last_metric.id

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@freeze_time(datetime(2023, 1, 2, 3, 5, 20, tzinfo=timezone.utc))
async def test_deletes_old_metrics_finished_job(self, test_db, session: AsyncSession):
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
repo = await create_repo(
session=session,
project_id=project.id,
)
run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
)
job = await create_job(
session=session,
run=run,
status=JobStatus.FAILED,
)
await create_job_metrics_point(
session=session,
Expand All @@ -129,7 +178,9 @@ async def test_deletes_old_metrics(self, test_db, session: AsyncSession):
job_model=job,
timestamp=datetime(2023, 1, 2, 3, 5, 10, tzinfo=timezone.utc),
)
with patch.object(settings, "SERVER_METRICS_TTL_SECONDS", 15):
with patch.multiple(
settings, SERVER_METRICS_RUNNING_TTL_SECONDS=0, SERVER_METRICS_FINISHED_TTL_SECONDS=15
):
await delete_metrics()
res = await session.execute(select(JobMetricsPoint))
points = res.scalars().all()
Expand Down
Loading