Skip to content

Commit 8657913

Browse files
authored
Implement submitted jobs pipeline (#3670)
* Add submitted jobs pipeline fetcher scaffold * Implement submitted jobs pipeline assignment * Refine submitted jobs pipeline assignment * Implement submitted jobs pipeline provisioning * Clarify submitted jobs pipeline result names * Wire submitted jobs pipeline * Tighten submitted jobs pipeline cleanup * Add FIXME * Fixes after review * Fix missing volumes lock * Lock cluster master fleet in submitted jobs pipeline * Prefer fleet current master in provisioning lookup * Add docstrings * Restore todos * Drop autocreated fleets broken lock * Fix submitted jobs placement group cleanup * Do not lock non-empty fleets * Fix submitted jobs cluster fleet locking * Fix tests * Adjust pipelines params * Fix tasks selects * Fix tests
1 parent 002ea51 commit 8657913

File tree

14 files changed

+3780
-81
lines changed

14 files changed

+3780
-81
lines changed

src/dstack/_internal/server/background/pipeline_tasks/__init__.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
from dstack._internal.server.background.pipeline_tasks.gateways import GatewayPipeline
77
from dstack._internal.server.background.pipeline_tasks.instances import InstancePipeline
88
from dstack._internal.server.background.pipeline_tasks.jobs_running import JobRunningPipeline
9+
from dstack._internal.server.background.pipeline_tasks.jobs_submitted import (
10+
JobSubmittedPipeline,
11+
)
912
from dstack._internal.server.background.pipeline_tasks.jobs_terminating import (
1013
JobTerminatingPipeline,
1114
)
@@ -24,6 +27,7 @@ def __init__(self) -> None:
2427
ComputeGroupPipeline(),
2528
FleetPipeline(),
2629
GatewayPipeline(),
30+
JobSubmittedPipeline(),
2731
JobRunningPipeline(),
2832
JobTerminatingPipeline(),
2933
InstancePipeline(),
@@ -60,14 +64,17 @@ def hinter(self):
6064
class PipelineHinter:
6165
def __init__(self, pipelines: list[Pipeline]) -> None:
6266
self._pipelines = pipelines
63-
self._hint_fetch_map = {p.hint_fetch_model_name: p for p in self._pipelines}
67+
self._hint_fetch_map: dict[str, list[Pipeline]] = {}
68+
for pipeline in self._pipelines:
69+
self._hint_fetch_map.setdefault(pipeline.hint_fetch_model_name, []).append(pipeline)
6470

6571
def hint_fetch(self, model_name: str):
66-
pipeline = self._hint_fetch_map.get(model_name)
67-
if pipeline is None:
72+
pipelines = self._hint_fetch_map.get(model_name)
73+
if pipelines is None:
6874
logger.warning("Model %s not registered for fetch hints", model_name)
6975
return
70-
pipeline.hint_fetch()
76+
for pipeline in pipelines:
77+
pipeline.hint_fetch()
7178

7279

7380
def start_pipeline_tasks() -> PipelineManager:

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class JobRunningPipelineItem(PipelineItem):
114114
class JobRunningPipeline(Pipeline[JobRunningPipelineItem]):
115115
def __init__(
116116
self,
117-
workers_num: int = 10,
117+
workers_num: int = 20,
118118
queue_lower_limit_factor: float = 0.5,
119119
queue_upper_limit_factor: float = 2.0,
120120
min_processing_interval: timedelta = timedelta(seconds=10),
@@ -462,6 +462,9 @@ async def _refetch_locked_job_model(
462462

463463

464464
async def _fetch_run_model(session: AsyncSession, run_id: uuid.UUID) -> RunModel:
465+
# FIXME: Selecting all run's jobs on every processing iteration is highly inefficient:
466+
# it's quadratic w.r.t. the number jobs within a run.
467+
# Avoid selecting other jobs as much as possible.
465468
latest_submissions_sq = (
466469
select(
467470
JobModel.run_id.label("run_id"),

0 commit comments

Comments
 (0)