Skip to content

Commit 450b6a1

Browse files
authored
Drop deprecated scheduled tasks (#3749)
* Drop deprecated scheduled tasks * Clean up dead code * Update AUTOSCALING.md and RUNS-AND-JOBS.md for pipelines * Make JobSubmittedPipeline to wait for master election * Fix tests
1 parent fc9afa9 commit 450b6a1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+379
-15010
lines changed

contributing/AUTOSCALING.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44

55
- STEP 1: `dstack-gateway` parses nginx `access.log` to collect per-second statistics about requests to the service and request times.
66
- STEP 2: `dstack-gateway` aggregates statistics over a 1-minute window.
7-
- STEP 3: The dstack server pulls all service statistics in the `process_gateways` background task.
8-
- STEP 4: The `process_runs` background task passes statistics and current replicas to the autoscaler.
9-
- STEP 5: The autoscaler (configured via the `dstack.yml` file) returns the replica change as an int.
10-
- STEP 6: `process_runs` calls `scale_run_replicas` to add or remove replicas.
11-
- STEP 7: `scale_run_replicas` terminates or starts replicas.
12-
- `SUBMITTED` and `PROVISIONING` replicas get terminated before `RUNNING`.
13-
- Replicas are terminated by descending `replica_num` and launched by ascending `replica_num`.
7+
- STEP 3: The server keeps gateway connections alive in the scheduled `process_gateways_connections` task and continuously collects stats from active gateways. This is separate from `GatewayPipeline`, which handles gateway provisioning and deletion.
8+
- STEP 4: When `RunPipeline` processes a service run, it loads the latest collected gateway stats for that service.
9+
- STEP 5: The autoscaler (configured via `dstack.yml`) computes the desired replica count for each replica group.
10+
- STEP 6: `RunPipeline` applies that desired state.
11+
- For scale-up, it creates new `SUBMITTED` jobs. `JobSubmittedPipeline` then assigns existing capacity or provisions new capacity for them.
12+
- For scale-down, it marks the least-important active replicas as `TERMINATING` with `SCALED_DOWN`. `JobTerminatingPipeline` unregisters and cleans them up.
13+
- STEP 7: If the service is in rolling deployment, `RunPipeline` handles that in the same active-run processing path.
14+
- It allows only a limited surge of replacement replicas.
15+
- It delays teardown of old replicas until replacement capacity is available.
16+
- It also cleans up replicas that belong to replica groups removed from the configuration.
1417

1518
## RPSAutoscaler
1619

contributing/RUNS-AND-JOBS.md

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,38 @@ A run can spawn one or multiple jobs, depending on the configuration. A task tha
1717

1818
## Run's Lifecycle
1919

20-
- STEP 1: The user submits the run. `services.runs.submit_run` creates jobs with status `SUBMITTED`. Now the run has status `SUBMITTED`.
21-
- STEP 2: `background.tasks.process_runs` periodically pulls unfinished runs and processes them:
22-
- If any job is `RUNNING`, the run becomes `RUNNING`.
23-
- If any job is `PROVISIONING` or `PULLING`, the run becomes `PROVISIONING`.
24-
- If any job fails and cannot be retried, the run becomes `TERMINATING`, and after processing, `FAILED`.
25-
- If all jobs are `DONE`, the run becomes `TERMINATING`, and after processing, `DONE`.
26-
- If any job fails, can be retried, and there is any other active job, the failed job will be resubmitted in-place.
27-
- If any jobs in a replica fail and can be retried and there is other active replicas, the jobs of the failed replica are resubmitted in-place (without stopping other replicas). But if some jobs in a replica fail, then all the jobs in a replica are terminated and resubmitted. This include multi-node tasks that represent one replica with multiple jobs.
28-
- If all jobs fail and can be resubmitted, the run becomes `PENDING`.
29-
- STEP 3: If the run is `TERMINATING`, the server makes all jobs `TERMINATING`. `background.tasks.process_runs` sets their status to `TERMINATING`, assigns `JobTerminationReason`, and sends a graceful stop command to `dstack-runner`. `process_terminating_jobs` then ensures that jobs are terminated assigns a finished status.
30-
- STEP 4: Once all jobs are finished, the run becomes `TERMINATED`, `DONE`, or `FAILED` based on `RunTerminationReason`.
31-
- STEP 0: If the run is `PENDING`, `background.tasks.process_runs` will resubmit jobs. The run becomes `SUBMITTED` again.
32-
33-
> Use `switch_run_status()` for all status transitions. Do not set `RunModel.status` directly.
34-
35-
> No one must assign the finished status to the run, except `services.runs.process_terminating_run`. To terminate the run, assign `TERMINATING` status and `RunTerminationReason`.
20+
- STEP 1: The user submits the run. `services.runs.submit_run` creates jobs with status `SUBMITTED`. The run starts in `SUBMITTED`.
21+
- STEP 2: `RunPipeline` continuously processes unfinished runs.
22+
- For active runs, it derives the run status from the latest job states in priority order:
23+
1. If any non-retryable failure is present, the run becomes `TERMINATING` with the relevant `RunTerminationReason`.
24+
2. If `stop_criteria == MASTER_DONE` and the master job is done, the run becomes `TERMINATING` with `ALL_JOBS_DONE`.
25+
3. Otherwise, if any job is `RUNNING`, the run becomes `RUNNING`.
26+
4. Otherwise, if any job is `PROVISIONING` or `PULLING`, the run becomes `PROVISIONING`.
27+
5. Otherwise, if jobs are still waiting for placement or provisioning, the run stays `SUBMITTED`.
28+
6. Otherwise, if all contributing jobs are `DONE`, the run becomes `TERMINATING` with `ALL_JOBS_DONE`.
29+
7. Otherwise, if no active replicas remain and the run should be retried, the run becomes `PENDING`.
30+
- Retryable replica failures are handled before the final transition is applied:
31+
- If a replica fails with a retryable reason while other replicas are still active, `RunPipeline` creates a new `SUBMITTED` submission for that replica and terminates the old jobs in that replica.
32+
- If all remaining work is retryable, the run ends up in `PENDING`.
33+
- STEP 3: If the run is `PENDING`, `RunPipeline` processes it in the pending phase.
34+
- For retrying runs, it waits for an exponential backoff before resubmitting.
35+
- For scheduled runs, it waits until `next_triggered_at`.
36+
- For scaled-to-zero services, it can keep the run in `PENDING` until autoscaling wants replicas again.
37+
- Once the run is ready to continue, `RunPipeline` creates new `SUBMITTED` jobs and moves the run back to `SUBMITTED`.
38+
- STEP 4: If the run is `TERMINATING`, `RunPipeline` marks active jobs as `TERMINATING` and assigns the corresponding `JobTerminationReason`.
39+
- STEP 5: Once all jobs are finished, the terminating phase of `RunPipeline` either:
40+
- assigns the final run status (`TERMINATED`, `DONE`, or `FAILED`), or
41+
- for scheduled runs that were not stopped or aborted by the user, returns the run to `PENDING` and computes a new `next_triggered_at`.
3642

3743
### Services
3844

39-
Services' lifecycle has some modifications:
45+
Services' run lifecycle has some modifications:
4046

41-
- During STEP 1, the service is registered on the gateway. If the gateway is not accessible or the domain name is taken, the run submission fails.
42-
- During STEP 2, downscaled jobs are ignored.
43-
- During STEP 4, the service is unregistered on the gateway.
44-
- During STEP 0, the service can stay in `PENDING` status if it was downscaled to zero (WIP).
47+
- During STEP 1, the service itself is registered on the gateway or the in-server proxy. If the gateway is not accessible or the domain name is taken, submission fails.
48+
- During STEP 2, active run processing also computes desired replica counts from gateway stats and handles scale-up, scale-down, rolling deployment, and cleanup of removed replica groups.
49+
- During STEP 2, jobs already marked `SCALED_DOWN` do not contribute to the run status.
50+
- During STEP 3, a service can stay in `PENDING` when autoscaling currently wants zero replicas.
51+
- During STEP 5, the terminating phase of `RunPipeline` unregisters the service from the gateway.
4552

4653
### When can the job be retried?
4754

@@ -54,29 +61,25 @@ Services' lifecycle has some modifications:
5461
## Job's Lifecycle
5562

5663
- STEP 1: A newly submitted job has status `SUBMITTED`. It is not assigned to any instance yet.
57-
- STEP 2: `background.tasks.process_submitted_jobs` tries to assign an existing instance or provision a new one.
58-
- On success, the job becomes `PROVISIONING`.
59-
- On failure, the job becomes `TERMINATING`, and after processing, `FAILED` because of `FAILED_TO_START_DUE_TO_NO_CAPACITY`.
60-
- STEP 3: `background.tasks.process_running_jobs` periodically pulls unfinished jobs and processes them.
61-
- While `dstack-shim`/`dstack-runner` is not responding, the job stays `PROVISIONING`.
62-
- Once `dstack-shim` (for VM-featured backends) becomes available, it submits the docker image name, and the job becomes `PULLING`.
63-
- Once `dstack-runner` inside a docker container becomes available, it submits the code and the job spec, and the job becomes `RUNNING`.
64-
- If `dstack-shim` or `dstack-runner` don't respond for a long time or fail to respond after successful connection and multiple retries, the job becomes `TERMINATING`, and after processing, `FAILED`.
65-
- STEP 4: `background.tasks.process_running_jobs` processes `RUNNING` jobs, pulling job logs, runner logs, and job status.
66-
- If the pulled status is `DONE`, the job becomes `TERMINATING`, and after processing, `DONE`.
67-
- Otherwise, the job becomes `TERMINATING`, and after processing, `FAILED`.
68-
- STEP 5: `background.tasks.process_terminating_jobs` processes `TERMINATING` jobs.
69-
- If the job has `remove_at` in the future, nothing happens. This is to give the job some time for a graceful stop.
70-
- Once `remove_at` is in the past, it stops the container via `dstack-shim`, detaches instance volumes, and releases the instance. The job becomes `TERMINATED`, `DONE`, `FAILED`, or `ABORTED` based on `JobTerminationReason`.
71-
- If some volumes fail to detach, it keeps the job `TERMINATING` and checks volumes attachment status.
72-
73-
> Use `switch_job_status()` for all status transitions. Do not set `JobModel.status` directly.
74-
75-
> No one must assign the finished status to the job, except `services.jobs.process_terminating_job`. To terminate the job, assign `TERMINATING` status and `JobTerminationReason`.
64+
- STEP 2: `JobSubmittedPipeline` tries to assign an existing instance or provision new capacity.
65+
- On success, the job becomes `PROVISIONING`.
66+
- On failure, the job becomes `TERMINATING`. `JobTerminatingPipeline` later assigns the final failed status.
67+
- STEP 3: `JobRunningPipeline` processes `PROVISIONING`, `PULLING`, and `RUNNING` jobs.
68+
- While `dstack-shim` / `dstack-runner` is not responding, the job stays `PROVISIONING`.
69+
- Once `dstack-shim` (for VM-featured backends) becomes available, the pipeline submits the image and the job becomes `PULLING`.
70+
- Once `dstack-runner` inside the container becomes available, the pipeline uploads the code and job spec, and the job becomes `RUNNING`.
71+
- While the job is `RUNNING`, the pipeline keeps collecting logs and runner status.
72+
- If startup, runner communication, or replica registration fails, the job becomes `TERMINATING`.
73+
- STEP 4: Once the job is actually ready, `JobRunningPipeline` initializes probes.
74+
- STEP 5: `JobTerminatingPipeline` processes `TERMINATING` jobs.
75+
- If the job has `remove_at` in the future, it waits. This gives the job time for a graceful stop.
76+
- Once `remove_at` is in the past, it stops the container, detaches volumes, unregisters service replicas if needed, and releases the instance assignment.
77+
- If some volumes are not detached yet, the job stays `TERMINATING` and is retried.
78+
- When cleanup is complete, the job becomes `TERMINATED`, `DONE`, `FAILED`, or `ABORTED` based on `JobTerminationReason`.
7679

7780
### Services' Jobs
7881

7982
Services' jobs lifecycle has some modifications:
8083

81-
- During STEP 3, once the job becomes `RUNNING`, it is registered on the gateway as a replica. If the gateway is not accessible, the job fails.
82-
- During STEP 5, the job is unregistered on the gateway (WIP).
84+
- During STEP 3, once the primary job of a replica is `RUNNING` and ready to receive traffic, `JobRunningPipeline` registers that replica on the gateway. If the gateway is not accessible, the job fails with a gateway-related termination reason.
85+
- During STEP 5, `JobTerminatingPipeline` unregisters the replica from receiving requests before the job is fully cleaned up.

docs/docs/reference/environment-variables.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ For more details on the options below, refer to the [server deployment](../guide
130130
- `DSTACK_SERVER_GCS_BUCKET`{ #DSTACK_SERVER_GCS_BUCKET } - The bucket that repo diffs will be uploaded to if set. If unset, diffs are uploaded to the database.
131131
- `DSTACK_DB_POOL_SIZE`{ #DSTACK_DB_POOL_SIZE } - The client DB connections pool size. Defaults to `20`,
132132
- `DSTACK_DB_MAX_OVERFLOW`{ #DSTACK_DB_MAX_OVERFLOW } - The client DB connections pool allowed overflow. Defaults to `20`.
133-
- `DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR`{ #DSTACK_SERVER_BACKGROUND_PROCESSING_FACTOR } - The number of background jobs for processing server resources. Increase if you need to process more resources per server replica quickly. Defaults to `1`.
134133
- `DSTACK_SERVER_BACKGROUND_PROCESSING_DISABLED`{ #DSTACK_SERVER_BACKGROUND_PROCESSING_DISABLED } - Disables background processing if set to any value. Useful to run only web frontend and API server.
135134
- `DSTACK_SERVER_MAX_PROBES_PER_JOB`{ #DSTACK_SERVER_MAX_PROBES_PER_JOB } - Maximum number of probes allowed in a run configuration. Validated at apply time.
136135
- `DSTACK_SERVER_MAX_PROBE_TIMEOUT`{ #DSTACK_SERVER_MAX_PROBE_TIMEOUT } - Maximum allowed timeout for a probe. Validated at apply time.

src/dstack/_internal/server/app.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,8 @@ async def lifespan(app: FastAPI):
171171
pipeline_manager = None
172172
if settings.SERVER_BACKGROUND_PROCESSING_ENABLED:
173173
scheduler = start_scheduled_tasks()
174-
if core_settings.FeatureFlags.PIPELINE_PROCESSING_ENABLED:
175-
pipeline_manager = start_pipeline_tasks()
176-
app.state.pipeline_manager = pipeline_manager
174+
pipeline_manager = start_pipeline_tasks()
175+
app.state.pipeline_manager = pipeline_manager
177176
else:
178177
logger.info("Background processing is disabled")
179178
PROBES_SCHEDULER.start()

src/dstack/_internal/server/background/scheduled_tasks/common.py renamed to src/dstack/_internal/server/background/pipeline_tasks/common.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55

66
def get_provisioning_timeout(backend_type: BackendType, instance_type_name: str) -> timedelta:
77
"""
8-
This timeout is used in a few places, but roughly refers to the max time between
9-
requesting instance creation and the instance becoming ready to accept jobs.
8+
This timeout refers to the max time between requesting instance creation and the instance becoming ready to accept jobs.
109
For container-based backends, this also includes the image pulling time.
1110
"""
1211
if backend_type == BackendType.LAMBDA:

src/dstack/_internal/server/background/pipeline_tasks/instances/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
ItemUpdateMap,
2121
UpdateMapDateTime,
2222
)
23-
from dstack._internal.server.background.scheduled_tasks.common import get_provisioning_timeout
23+
from dstack._internal.server.background.pipeline_tasks.common import get_provisioning_timeout
2424
from dstack._internal.server.models import FleetModel, InstanceModel, PlacementGroupModel
2525
from dstack._internal.server.services.fleets import get_fleet_spec
2626
from dstack._internal.utils.common import UNSET, Unset, get_current_datetime

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
set_processed_update_map_fields,
4949
set_unlock_update_map_fields,
5050
)
51-
from dstack._internal.server.background.scheduled_tasks.common import get_provisioning_timeout
51+
from dstack._internal.server.background.pipeline_tasks.common import get_provisioning_timeout
5252
from dstack._internal.server.db import get_db, get_session_ctx
5353
from dstack._internal.server.models import (
5454
FleetModel,

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,10 @@ async def process(self, item: JobSubmittedPipelineItem):
319319
if context.job_model.instance_assigned:
320320
logger.debug("%s: provisioning has started", fmt(context.job_model))
321321
provisioning = await _process_provisioning(item=item, context=context)
322+
_hint_pipelines_fetch(
323+
pipeline_hinter=self._pipeline_hinter,
324+
result=provisioning,
325+
)
322326
await _apply_provisioning_result(
323327
item=item,
324328
provisioning=provisioning,
@@ -327,6 +331,10 @@ async def process(self, item: JobSubmittedPipelineItem):
327331

328332
logger.debug("%s: assignment has started", fmt(context.job_model))
329333
assignment = await _process_assignment(context=context)
334+
_hint_pipelines_fetch(
335+
pipeline_hinter=self._pipeline_hinter,
336+
result=assignment,
337+
)
330338
await _apply_assignment_result(
331339
item=item,
332340
context=context,
@@ -365,6 +373,7 @@ class _DeferSubmittedJobResult:
365373
"""The job is not ready yet, so apply should just mark it processed and unlock it."""
366374

367375
log_message: str
376+
hint_fleet_pipeline: bool = False
368377

369378

370379
@dataclass
@@ -1179,6 +1188,17 @@ async def _process_new_capacity_provisioning(
11791188
job=context.job,
11801189
)
11811190
)
1191+
if (
1192+
is_master_job(context.job)
1193+
and fleet_model is not None
1194+
and _get_cluster_fleet_spec(fleet_model) is not None
1195+
and any(not instance.deleted for instance in fleet_model.instances)
1196+
and master_provisioning_data is None
1197+
):
1198+
return _DeferSubmittedJobResult(
1199+
log_message="waiting for fleet master instance election",
1200+
hint_fleet_pipeline=True,
1201+
)
11821202
provision_new_capacity_result = await _provision_new_capacity(
11831203
project=context.project,
11841204
fleet_model=fleet_model,
@@ -1834,6 +1854,17 @@ def _get_fleet_master_provisioning_data(
18341854
)
18351855

18361856

1857+
def _hint_pipelines_fetch(
1858+
pipeline_hinter: PipelineHinterProtocol,
1859+
result: Union[_AssignmentResult, _ProvisioningResult],
1860+
) -> None:
1861+
if not isinstance(result, _DeferSubmittedJobResult):
1862+
return
1863+
1864+
if result.hint_fleet_pipeline:
1865+
pipeline_hinter.hint_fetch(FleetModel.__name__)
1866+
1867+
18371868
def _select_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]:
18381869
jobs_to_provision = [job]
18391870
if is_multinode_job(job) and is_master_job(job) and job_model.waiting_master_job is not None:

0 commit comments

Comments
 (0)