Skip to content

Commit acd006d

Browse files
committed
Service probes
Allow configuring HTTP probes for services and use probe results to avoid downtime during deployments.
1 parent b58ad40 commit acd006d

File tree

24 files changed

+682
-22
lines changed

24 files changed

+682
-22
lines changed

docs/docs/concepts/services.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ The rolling deployment stops when all replicas are updated or when a new deploym
783783
??? info "Supported properties"
784784
<!-- NOTE: should be in sync with constants in server/services/runs.py -->
785785

786-
Rolling deployment supports changes to the following properties: `port`, `resources`, `volumes`, `docker`, `files`, `image`, `user`, `privileged`, `entrypoint`, `working_dir`, `python`, `nvcc`, `single_branch`, `env`, `shell`, `commands`, as well as changes to [repo](repos.md) or [file](#files) contents.
786+
Rolling deployment supports changes to the following properties: `port`, `probes`, `resources`, `volumes`, `docker`, `files`, `image`, `user`, `privileged`, `entrypoint`, `working_dir`, `python`, `nvcc`, `single_branch`, `env`, `shell`, `commands`, as well as changes to [repo](repos.md) or [file](#files) contents.
787787

788788
Changes to `replicas` and `scaling` can be applied without redeploying replicas.
789789

@@ -792,6 +792,8 @@ The rolling deployment stops when all replicas are updated or when a new deploym
792792
To trigger a rolling deployment when no properties have changed (e.g., after updating [secrets](secrets.md) or to restart all replicas),
793793
make a minor config change, such as adding a dummy [environment variable](#environment-variables).
794794

795+
<!-- TODO: probes -->
796+
795797
--8<-- "docs/concepts/snippets/manage-runs.ext"
796798

797799
!!! info "What's next?"

src/dstack/_internal/cli/utils/run.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,21 @@
55
from rich.table import Table
66

77
from dstack._internal.cli.utils.common import NO_OFFERS_WARNING, add_row_from_dict, console
8-
from dstack._internal.core.models.configurations import DevEnvironmentConfiguration
8+
from dstack._internal.core.models.configurations import DevEnvironmentConfiguration, ProbeConfig
99
from dstack._internal.core.models.instances import InstanceAvailability
1010
from dstack._internal.core.models.profiles import (
1111
DEFAULT_RUN_TERMINATION_IDLE_TIME,
1212
TerminationPolicy,
1313
)
1414
from dstack._internal.core.models.runs import (
15+
JobStatus,
16+
Probe,
1517
RunPlan,
1618
)
1719
from dstack._internal.core.services.profiles import get_termination
1820
from dstack._internal.utils.common import (
1921
DateFormatter,
22+
batched,
2023
format_duration_multiunit,
2124
format_pretty_duration,
2225
pretty_date,
@@ -156,6 +159,12 @@ def get_runs_table(
156159
table.add_column("INSTANCE TYPE", no_wrap=True, ratio=1)
157160
table.add_column("PRICE", style="grey58", ratio=1)
158161
table.add_column("STATUS", no_wrap=True, ratio=1)
162+
if verbose or any(
163+
run._run.is_deployment_in_progress()
164+
and any(job.job_submissions[-1].probes for job in run._run.jobs)
165+
for run in runs
166+
):
167+
table.add_column("PROBES", ratio=1)
159168
table.add_column("SUBMITTED", style="grey58", no_wrap=True, ratio=1)
160169
if verbose:
161170
table.add_column("ERROR", no_wrap=True, ratio=2)
@@ -198,6 +207,9 @@ def get_runs_table(
198207
else ""
199208
),
200209
"STATUS": latest_job_submission.status_message,
210+
"PROBES": _format_job_probes(
211+
job.job_spec.probes, latest_job_submission.probes, latest_job_submission.status
212+
),
201213
"SUBMITTED": format_date(latest_job_submission.submitted_at),
202214
"ERROR": latest_job_submission.error,
203215
}
@@ -226,3 +238,21 @@ def get_runs_table(
226238
add_row_from_dict(table, job_row, style="secondary" if len(run.jobs) != 1 else None)
227239

228240
return table
241+
242+
243+
def _format_job_probes(
244+
probe_configs: list[ProbeConfig], probes: list[Probe], job_status: JobStatus
245+
) -> str:
246+
if not probes or job_status != JobStatus.RUNNING:
247+
return ""
248+
statuses = []
249+
for probe_config, probe in zip(probe_configs, probes):
250+
if probe.success_streak >= probe_config.ready_after:
251+
status = "[code]✓[/]"
252+
elif probe.success_streak > 0:
253+
status = "[warning]~[/]"
254+
else:
255+
status = "[error]×[/]"
256+
statuses.append(status)
257+
# split into whitespace-delimited batches to allow column wrapping
258+
return " ".join("".join(batch) for batch in batched(statuses, 5))

src/dstack/_internal/core/compatibility/runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType:
120120
profile_excludes.add("startup_order")
121121
if configuration.stop_criteria is None:
122122
configuration_excludes["stop_criteria"] = True
123+
# TODO: probes
123124
if profile is not None and profile.stop_criteria is None:
124125
profile_excludes.add("stop_criteria")
125126
if not configuration.files:
@@ -154,6 +155,7 @@ def get_job_spec_excludes(job_specs: list[JobSpec]) -> IncludeExcludeDictType:
154155
spec_excludes["file_archives"] = True
155156
if all(s.service_port is None for s in job_specs):
156157
spec_excludes["service_port"] = True
158+
# TODO: probes
157159

158160
return spec_excludes
159161

src/dstack/_internal/core/models/configurations.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from dstack._internal.core.models.files import FilePathMapping
1515
from dstack._internal.core.models.fleets import FleetConfiguration
1616
from dstack._internal.core.models.gateways import GatewayConfiguration
17-
from dstack._internal.core.models.profiles import ProfileParams, parse_off_duration
17+
from dstack._internal.core.models.profiles import ProfileParams, parse_duration, parse_off_duration
1818
from dstack._internal.core.models.resources import Range, ResourcesSpec
1919
from dstack._internal.core.models.services import AnyModel, OpenAIChatModel
2020
from dstack._internal.core.models.unix import UnixUser
@@ -32,6 +32,8 @@
3232
RUN_PRIOTIRY_MAX = 100
3333
RUN_PRIORITY_DEFAULT = 0
3434
DEFAULT_REPO_DIR = "/workflow"
35+
MIN_PROBE_TIMEOUT = 1
36+
MIN_PROBE_INTERVAL = 1
3537

3638

3739
class RunConfigurationType(str, Enum):
@@ -162,6 +164,58 @@ class RateLimit(CoreModel):
162164
] = 0
163165

164166

167+
class ProbeConfig(CoreModel):
168+
type: Literal["http"] # expect other probe types in the future, namely `exec`
169+
url: Annotated[str, Field(description="The URL to request")] = "/"
170+
timeout: Annotated[
171+
Union[int, str],
172+
Field(description=("Maximum amount of time the HTTP request is allowed to take")),
173+
] = "10s"
174+
interval: Annotated[
175+
Union[int, str],
176+
Field(
177+
description=(
178+
"Minimum amount of time between the end of one probe execution"
179+
" and the start of the next"
180+
)
181+
),
182+
] = "15s"
183+
ready_after: Annotated[
184+
int,
185+
Field(
186+
ge=1,
187+
description=(
188+
"The number of consecutive successful probe executions required for the job"
189+
" to be considered ready. Used during rolling deployments"
190+
),
191+
),
192+
] = 1
193+
194+
class Config:
195+
frozen = True
196+
197+
@validator("timeout")
198+
def parse_timeout(cls, v: Union[int, str]) -> int:
199+
parsed = parse_duration(v)
200+
if parsed < MIN_PROBE_TIMEOUT:
201+
raise ValueError(f"Probe timeout cannot be shorter than {MIN_PROBE_TIMEOUT}s")
202+
return parsed
203+
204+
@validator("interval")
205+
def parse_interval(cls, v: Union[int, str]) -> int:
206+
parsed = parse_duration(v)
207+
if parsed < MIN_PROBE_INTERVAL:
208+
raise ValueError(f"Probe interval cannot be shorter than {MIN_PROBE_INTERVAL}s")
209+
return parsed
210+
211+
@validator("url")
212+
def validate_url(cls, v: str) -> str:
213+
# TODO: stricter constraints to avoid HTTPX URL parsing errors
214+
if not v.startswith("/"):
215+
raise ValueError("Must start with `/`")
216+
return v
217+
218+
165219
class BaseRunConfiguration(CoreModel):
166220
type: Literal["none"]
167221
name: Annotated[
@@ -448,6 +502,10 @@ class ServiceConfigurationParams(CoreModel):
448502
Field(description="The auto-scaling rules. Required if `replicas` is set to a range"),
449503
] = None
450504
rate_limits: Annotated[list[RateLimit], Field(description="Rate limiting rules")] = []
505+
probes: Annotated[
506+
list[ProbeConfig],
507+
Field(unique_items=True, description="List of probes used to determine job health"),
508+
] = []
451509

452510
@validator("port")
453511
def convert_port(cls, v) -> PortMapping:

src/dstack/_internal/core/models/runs.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from dstack._internal.core.models.configurations import (
1111
DEFAULT_REPO_DIR,
1212
AnyRunConfiguration,
13+
ProbeConfig,
1314
RunConfiguration,
1415
ServiceConfiguration,
1516
)
@@ -256,6 +257,7 @@ class JobSpec(CoreModel):
256257
file_archives: list[FileArchiveMapping] = []
257258
# None for non-services and pre-0.19.19 services. See `get_service_port`
258259
service_port: Optional[int] = None
260+
probes: list[ProbeConfig] = []
259261

260262

261263
class JobProvisioningData(CoreModel):
@@ -325,6 +327,10 @@ class ClusterInfo(CoreModel):
325327
gpus_per_job: int
326328

327329

330+
class Probe(CoreModel):
331+
success_streak: int
332+
333+
328334
class JobSubmission(CoreModel):
329335
id: UUID4
330336
submission_num: int
@@ -341,6 +347,7 @@ class JobSubmission(CoreModel):
341347
job_provisioning_data: Optional[JobProvisioningData]
342348
job_runtime_data: Optional[JobRuntimeData]
343349
error: Optional[str] = None
350+
probes: list[Probe] = []
344351

345352
@property
346353
def age(self) -> timedelta:

src/dstack/_internal/core/services/ssh/tunnel.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,13 @@ def __enter__(self):
236236
def __exit__(self, exc_type, exc_val, exc_tb):
237237
self.close()
238238

239+
async def __aenter__(self):
240+
await self.aopen()
241+
return self
242+
243+
async def __aexit__(self, exc_type, exc_val, exc_tb):
244+
await self.aclose()
245+
239246
def _get_proxy_command(self) -> Optional[str]:
240247
proxy_command: Optional[str] = None
241248
for params, identity_path in self.ssh_proxies:

src/dstack/_internal/server/app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from dstack._internal.proxy.lib.routers import model_proxy
2222
from dstack._internal.server import settings
2323
from dstack._internal.server.background import start_background_tasks
24+
from dstack._internal.server.background.tasks.process_probes import PROBES_SCHEDULER
2425
from dstack._internal.server.db import get_db, get_session_ctx, migrate
2526
from dstack._internal.server.routers import (
2627
backends,
@@ -155,6 +156,7 @@ async def lifespan(app: FastAPI):
155156
scheduler = start_background_tasks()
156157
else:
157158
logger.info("Background processing is disabled")
159+
PROBES_SCHEDULER.start()
158160
dstack_version = DSTACK_VERSION if DSTACK_VERSION else "(no version)"
159161
logger.info(f"The admin token is {admin.token.get_plaintext_or_error()}", {"show_path": False})
160162
logger.info(
@@ -166,6 +168,7 @@ async def lifespan(app: FastAPI):
166168
yield
167169
if settings.SERVER_BACKGROUND_PROCESSING_ENABLED:
168170
scheduler.shutdown()
171+
PROBES_SCHEDULER.shutdown(wait=False)
169172
await gateway_connections_pool.remove_all()
170173
service_conn_pool = await get_injector_from_app(app).get_service_connection_pool()
171174
await service_conn_pool.remove_all()

src/dstack/_internal/server/background/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from dstack._internal.server.background.tasks.process_placement_groups import (
1919
process_placement_groups,
2020
)
21+
from dstack._internal.server.background.tasks.process_probes import process_probes
2122
from dstack._internal.server.background.tasks.process_prometheus_metrics import (
2223
collect_prometheus_metrics,
2324
delete_prometheus_metrics,
@@ -63,6 +64,7 @@ def start_background_tasks() -> AsyncIOScheduler:
6364
# that the first waiting for the lock will acquire it.
6465
# The jitter is needed to give all tasks a chance to acquire locks.
6566

67+
_scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1))
6668
_scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1)
6769
_scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1)
6870
if settings.ENABLE_PROMETHEUS_METRICS:

0 commit comments

Comments
 (0)