Skip to content

Commit 4f27c70

Browse files
authored
Fix infinite job retry when fleet is at capacity (#3887)
1 parent 779808f commit 4f27c70

2 files changed

Lines changed: 16 additions & 13 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -588,12 +588,12 @@ async def _apply_assignment_result(
588588
return
589589
fleet_spec = get_fleet_spec(fleet_model)
590590
if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec):
591-
logger.debug(
592-
"%s: fleet %s is full, retrying assignment",
593-
fmt(context.job_model),
594-
fleet_model.name,
591+
await _terminate_submitted_job(
592+
session=session,
593+
job_model=job_model,
594+
reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
595+
message="Fleet is at capacity",
595596
)
596-
await _reset_job_lock_for_retry(session=session, item=item)
597597
return
598598
instance_model = _create_placeholder_instance(
599599
fleet_model=fleet_model,

src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,14 +1202,18 @@ async def test_assignment_creates_placeholder_instance_for_new_capacity(
12021202
assert placeholder.offer is None
12031203
assert placeholder.instance_num == 0
12041204

1205-
async def test_assignment_retries_when_fleet_is_full(
1206-
self, test_db, session: AsyncSession, worker: JobSubmittedWorker
1205+
@pytest.mark.parametrize("fleet_type", ["cloud", "ssh"])
1206+
async def test_job_fails_when_fleet_is_full(
1207+
self, test_db, session: AsyncSession, worker: JobSubmittedWorker, fleet_type: str
12071208
):
12081209
project = await create_project(session=session)
12091210
user = await create_user(session=session)
12101211
repo = await create_repo(session=session, project_id=project.id)
1211-
fleet_spec = get_fleet_spec()
1212-
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
1212+
if fleet_type == "cloud":
1213+
fleet_spec = get_fleet_spec()
1214+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
1215+
else:
1216+
fleet_spec = get_fleet_spec(get_ssh_fleet_configuration(hosts=["10.0.0.1"]))
12131217
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
12141218
await create_instance(
12151219
session=session,
@@ -1230,10 +1234,9 @@ async def test_assignment_retries_when_fleet_is_full(
12301234
await _process_job(session=session, worker=worker, job_model=job)
12311235

12321236
job = await _get_job(session, job.id)
1233-
# Assignment retried — job not committed as assigned
1234-
assert job.status == JobStatus.SUBMITTED
1235-
assert not job.instance_assigned
1236-
assert job.instance is None
1237+
assert job.status == JobStatus.TERMINATING
1238+
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
1239+
assert job.termination_reason_message == "Fleet is at capacity"
12371240
# No placeholder must be committed when the fleet is full.
12381241
res = await session.execute(
12391242
select(InstanceModel).where(

0 commit comments

Comments
 (0)