Skip to content

Commit e6d60f3

Browse files
committed
Fix tasks selects
1 parent edbd759 commit e6d60f3

3 files changed

Lines changed: 12 additions & 14 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,9 @@ async def _refetch_locked_job_model(
462462

463463

464464
async def _fetch_run_model(session: AsyncSession, run_id: uuid.UUID) -> RunModel:
465+
# FIXME: Selecting all run's jobs on every processing iteration is highly inefficient:
466+
# it's quadratic w.r.t. the number jobs within a run.
467+
# Avoid selecting other jobs as much as possible.
465468
latest_submissions_sq = (
466469
select(
467470
JobModel.run_id.label("run_id"),

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,11 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
234234
.where(
235235
JobModel.status == JobStatus.SUBMITTED,
236236
JobModel.waiting_master_job.is_not(True),
237+
or_(
238+
# Non-master jobs must wait for the run to have the fleet assigned.
239+
JobModel.job_num == 0,
240+
RunModel.fleet_id.is_not(None),
241+
),
237242
or_(
238243
JobModel.last_processed_at <= now - self._min_processing_interval,
239244
JobModel.last_processed_at == JobModel.submitted_at,

src/dstack/_internal/server/background/scheduled_tasks/runs.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,7 @@ async def _process_next_run():
114114
),
115115
# Scaled-to-zero runs:
116116
# Such runs cannot be scheduled, thus we check next_triggered_at.
117-
# If we allow scheduled services with downscaling to zero
118-
# This check won't pass.
117+
# If we allow scheduled services with downscaling to zero, this check won't pass.
119118
and_(
120119
RunModel.status == RunStatus.PENDING,
121120
RunModel.resubmission_attempt == 0,
@@ -144,19 +143,10 @@ async def _process_next_run():
144143
.where(
145144
JobModel.run_id == run_model.id,
146145
JobModel.id.not_in(job_lockset),
147-
or_(
148-
JobModel.lock_expires_at.is_(None),
149-
JobModel.lock_expires_at < now,
150-
),
151-
)
152-
.options(
153-
load_only(JobModel.id),
154-
with_loader_criteria(
155-
JobModel,
156-
JobModel.status.not_in(JOB_STATUSES_EXCLUDED_FOR_LOCKING),
157-
include_aliases=True,
158-
),
146+
JobModel.status.not_in(JOB_STATUSES_EXCLUDED_FOR_LOCKING),
147+
JobModel.lock_expires_at.is_(None),
159148
)
149+
.options(load_only(JobModel.id))
160150
.order_by(JobModel.id) # take locks in order
161151
.with_for_update(skip_locked=True, key_share=True)
162152
)

0 commit comments

Comments
 (0)