Skip to content

Commit 30e90d3

Browse files
authored
Fix pipeline fetcher deadlock (#3704)
* Fix pipeline fetcher deadlock * Prioritize JobSubmittedPipeline over RunPipeline
1 parent 75ff17a commit 30e90d3

File tree

5 files changed

+136
-11
lines changed

5 files changed

+136
-11
lines changed

src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,21 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
166166
)
167167
),
168168
InstanceModel.deleted == False,
169-
or_(
170-
# Do not try to lock instances if the fleet is waiting for the lock.
171-
InstanceModel.fleet_id.is_(None),
172-
FleetModel.lock_owner.is_(None),
173-
),
174169
or_(
175170
InstanceModel.last_processed_at <= now - self._min_processing_interval,
176171
InstanceModel.last_processed_at == InstanceModel.created_at,
177172
),
178173
or_(
179-
InstanceModel.lock_expires_at.is_(None),
174+
and_(
175+
# Do not try to lock instances if the fleet is waiting for the
176+
# lock, but allow retrying instances whose own lock is stale
177+
# because the fleet pipeline cannot reclaim stale instance locks.
178+
or_(
179+
InstanceModel.fleet_id.is_(None),
180+
FleetModel.lock_owner.is_(None),
181+
),
182+
InstanceModel.lock_expires_at.is_(None),
183+
),
180184
InstanceModel.lock_expires_at < now,
181185
),
182186
or_(

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,14 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
197197
),
198198
RunModel.status.not_in([RunStatus.TERMINATING]),
199199
JobModel.last_processed_at <= now - self._min_processing_interval,
200-
# Do not try to lock jobs if the run is waiting for the lock.
201-
RunModel.lock_owner.is_(None),
202200
or_(
203-
JobModel.lock_expires_at.is_(None),
201+
and_(
202+
# Do not try to lock jobs if the run is waiting for the lock,
203+
# but allow retrying jobs whose own lock is stale because
204+
# the run pipeline cannot reclaim stale job locks.
205+
RunModel.lock_owner.is_(None),
206+
JobModel.lock_expires_at.is_(None),
207+
),
204208
JobModel.lock_expires_at < now,
205209
),
206210
or_(

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,9 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
243243
JobModel.last_processed_at <= now - self._min_processing_interval,
244244
JobModel.last_processed_at == JobModel.submitted_at,
245245
),
246-
# Do not try to lock jobs if the run is waiting for the lock.
247-
RunModel.lock_owner.is_(None),
248246
or_(
247+
# This pipeline does not check RunModel.lock_owner
248+
# because we want to provision jobs ASAP and RunPipeline can wait.
249249
JobModel.lock_expires_at.is_(None),
250250
JobModel.lock_expires_at < now,
251251
),

src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from sqlalchemy.ext.asyncio import AsyncSession
77

88
from dstack._internal.core.models.instances import InstanceStatus
9+
from dstack._internal.server.background.pipeline_tasks.fleets import FleetPipeline
910
from dstack._internal.server.background.pipeline_tasks.instances import (
1011
InstanceFetcher,
1112
InstancePipeline,
@@ -192,6 +193,64 @@ async def test_fetch_respects_order_and_limit(
192193
assert middle.lock_owner == InstancePipeline.__name__
193194
assert newest.lock_owner is None
194195

196+
async def test_fetch_allows_stale_instance_locks_if_fleet_is_waiting_for_instance_locks(
197+
self, test_db, session: AsyncSession, fetcher: InstanceFetcher
198+
):
199+
project = await create_project(session=session)
200+
fleet = await create_fleet(session=session, project=project)
201+
stale = get_current_datetime() - dt.timedelta(minutes=1)
202+
203+
fleet.lock_owner = FleetPipeline.__name__
204+
fleet.lock_token = None
205+
fleet.lock_expires_at = None
206+
207+
instance = await create_instance(
208+
session=session,
209+
project=project,
210+
fleet=fleet,
211+
status=InstanceStatus.IDLE,
212+
name="stale-locked",
213+
last_processed_at=stale - dt.timedelta(seconds=1),
214+
)
215+
lock_instance(instance)
216+
instance.lock_expires_at = stale
217+
await session.commit()
218+
219+
items = await fetcher.fetch(limit=10)
220+
221+
assert [item.id for item in items] == [instance.id]
222+
223+
await session.refresh(instance)
224+
assert instance.lock_owner == InstancePipeline.__name__
225+
226+
async def test_fetch_excludes_fresh_instances_when_fleet_is_waiting_for_instance_locks(
227+
self, test_db, session: AsyncSession, fetcher: InstanceFetcher
228+
):
229+
project = await create_project(session=session)
230+
fleet = await create_fleet(session=session, project=project)
231+
stale = get_current_datetime() - dt.timedelta(minutes=1)
232+
233+
fleet.lock_owner = FleetPipeline.__name__
234+
fleet.lock_token = None
235+
fleet.lock_expires_at = None
236+
237+
instance = await create_instance(
238+
session=session,
239+
project=project,
240+
fleet=fleet,
241+
status=InstanceStatus.IDLE,
242+
name="fresh-unlocked",
243+
last_processed_at=stale - dt.timedelta(seconds=1),
244+
)
245+
await session.commit()
246+
247+
items = await fetcher.fetch(limit=10)
248+
249+
assert items == []
250+
251+
await session.refresh(instance)
252+
assert instance.lock_owner is None
253+
195254

196255
@pytest.mark.asyncio
197256
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)

src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
_RunnerAvailability,
4141
_SubmitJobToRunnerResult,
4242
)
43+
from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline
4344
from dstack._internal.server.models import JobModel, ProbeModel
4445
from dstack._internal.server.schemas.runner import (
4546
HealthcheckResponse,
@@ -307,6 +308,63 @@ async def test_fetch_excludes_jobs_from_terminating_runs(
307308
assert active_job.lock_owner == JobRunningPipeline.__name__
308309
assert terminating_run_job.lock_owner is None
309310

311+
async def test_fetch_allows_stale_job_locks_even_if_run_is_waiting_for_job_locks(
312+
self, test_db, session: AsyncSession, fetcher: JobRunningFetcher
313+
):
314+
project = await create_project(session=session)
315+
user = await create_user(session=session)
316+
repo = await create_repo(session=session, project_id=project.id)
317+
run = await create_run(session=session, project=project, repo=repo, user=user)
318+
stale = get_current_datetime() - timedelta(minutes=1)
319+
320+
run.lock_owner = RunPipeline.__name__
321+
run.lock_token = None
322+
run.lock_expires_at = None
323+
324+
job = await create_job(
325+
session=session,
326+
run=run,
327+
status=JobStatus.RUNNING,
328+
last_processed_at=stale - timedelta(seconds=1),
329+
)
330+
_lock_job_expired_same_owner(job)
331+
await session.commit()
332+
333+
items = await fetcher.fetch(limit=10)
334+
335+
assert [item.id for item in items] == [job.id]
336+
337+
await session.refresh(job)
338+
assert job.lock_owner == JobRunningPipeline.__name__
339+
340+
async def test_fetch_excludes_jobs_when_run_is_waiting_for_related_job_locks(
341+
self, test_db, session: AsyncSession, fetcher: JobRunningFetcher
342+
):
343+
project = await create_project(session=session)
344+
user = await create_user(session=session)
345+
repo = await create_repo(session=session, project_id=project.id)
346+
run = await create_run(session=session, project=project, repo=repo, user=user)
347+
stale = get_current_datetime() - timedelta(minutes=1)
348+
349+
run.lock_owner = RunPipeline.__name__
350+
run.lock_token = None
351+
run.lock_expires_at = None
352+
353+
job = await create_job(
354+
session=session,
355+
run=run,
356+
status=JobStatus.RUNNING,
357+
last_processed_at=stale - timedelta(seconds=1),
358+
)
359+
await session.commit()
360+
361+
items = await fetcher.fetch(limit=10)
362+
363+
assert items == []
364+
365+
await session.refresh(job)
366+
assert job.lock_owner is None
367+
310368
async def test_fetch_returns_oldest_jobs_first_up_to_limit(
311369
self, test_db, session: AsyncSession, fetcher: JobRunningFetcher
312370
):

0 commit comments

Comments
 (0)