Skip to content

Commit 71d4c8d

Browse files
committed
Forbid new fleets creation with fleets specified
1 parent 28f235a commit 71d4c8d

3 files changed

Lines changed: 50 additions & 9 deletions

File tree

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,16 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
290290
master_job_provisioning_data=master_job_provisioning_data,
291291
volumes=volumes,
292292
)
293+
if fleet_model is None and run_spec.configuration.fleets is not None:
294+
# Run cannot create new fleets when fleets are specified
295+
logger.debug("%s: failed to use specified fleets", fmt(job_model))
296+
job_model.status = JobStatus.TERMINATING
297+
job_model.termination_reason = (
298+
JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
299+
)
300+
job_model.last_processed_at = common_utils.get_current_datetime()
301+
await session.commit()
302+
return
293303
instance = await _assign_job_to_fleet_instance(
294304
session=session,
295305
instances_with_offers=fleet_instances_with_offers,

src/dstack/_internal/server/testing/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ async def create_file_archive(
258258

259259

260260
def get_run_spec(
261-
run_name: str,
262261
repo_id: str,
262+
run_name: str = "test-run",
263263
configuration_path: str = "dstack.yaml",
264264
profile: Union[Profile, Callable[[], Profile], None] = lambda: Profile(name="default"),
265265
configuration: Optional[AnyRunConfiguration] = None,

src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -732,12 +732,47 @@ async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session:
732732
await session.commit()
733733
await process_submitted_jobs()
734734
await session.refresh(job)
735-
res = await session.execute(select(JobModel).options(joinedload(JobModel.instance)))
736-
job = res.unique().scalar_one()
737735
assert job.status == JobStatus.SUBMITTED
738736
assert job.instance_assigned
739-
assert job.instance is None
740-
assert job.fleet is None
737+
assert job.instance_id is None
738+
assert job.fleet_id is None
739+
740+
@pytest.mark.asyncio
741+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
742+
async def test_fails_with_no_capacity_when_specified_fleets_occupied(
743+
self, test_db, session: AsyncSession
744+
):
745+
project = await create_project(session)
746+
user = await create_user(session)
747+
repo = await create_repo(session=session, project_id=project.id)
748+
fleet = await create_fleet(session=session, project=project, name="test-fleet")
749+
instance = await create_instance(
750+
session=session,
751+
project=project,
752+
fleet=fleet,
753+
instance_num=0,
754+
status=InstanceStatus.BUSY,
755+
)
756+
fleet.instances.append(instance)
757+
run_spec = get_run_spec(repo_id=repo.name)
758+
run_spec.configuration.fleets = ["test-fleet"]
759+
run = await create_run(
760+
session=session,
761+
project=project,
762+
repo=repo,
763+
user=user,
764+
run_spec=run_spec,
765+
)
766+
job = await create_job(
767+
session=session,
768+
run=run,
769+
instance_assigned=False,
770+
)
771+
await session.commit()
772+
await process_submitted_jobs()
773+
await session.refresh(job)
774+
assert job.status == JobStatus.TERMINATING
775+
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
741776

742777
@pytest.mark.asyncio
743778
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@@ -770,8 +805,6 @@ async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncS
770805
)
771806
await process_submitted_jobs()
772807
await session.refresh(job)
773-
res = await session.execute(select(JobModel))
774-
job = res.unique().scalar_one()
775808
assert job.status == JobStatus.SUBMITTED
776809
assert job.instance_assigned
777810

@@ -867,8 +900,6 @@ async def test_assigns_job_to_optimal_fleet(self, test_db, session: AsyncSession
867900
)
868901
await process_submitted_jobs()
869902
await session.refresh(job)
870-
res = await session.execute(select(JobModel))
871-
job = res.unique().scalar_one()
872903
assert job.status == JobStatus.SUBMITTED
873904
assert job.instance_assigned
874905
assert job.fleet_id == fleet2.id

0 commit comments

Comments
 (0)