Skip to content

Commit c9c84bc

Browse files
committed
Use fleets with no instances if fleets specified
1 parent 605985b commit c9c84bc

2 files changed

Lines changed: 63 additions & 13 deletions

File tree

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -460,13 +460,11 @@ async def _refetch_fleet_models(
460460
*fleet_filters,
461461
)
462462
.where(
463-
and_(
464-
InstanceModel.id.in_(instances_ids),
465-
or_(
466-
InstanceModel.id.is_(None),
467-
and_(
468-
*instance_filters,
469-
),
463+
or_(
464+
InstanceModel.id.is_(None),
465+
and_(
466+
InstanceModel.id.in_(instances_ids),
467+
*instance_filters,
470468
),
471469
)
472470
)
@@ -535,8 +533,12 @@ def _find_optimal_fleet_with_offers(
535533
fleet_priority,
536534
)
537535
)
538-
if all(t[2] == 0 for t in candidate_fleets_with_offers):
539-
# If no fleets have available offers, create a new fleet.
536+
if run_spec.configuration.fleets is None and all(
537+
t[2] == 0 for t in candidate_fleets_with_offers
538+
):
539+
# If fleets are not specified and no fleets have available offers, create a new fleet.
540+
# This is for compatibility with non-fleet-first UX when runs created new fleets
541+
# if there are no instances to reuse.
540542
return None, []
541543
candidate_fleets_with_offers.sort(key=lambda t: t[-1])
542544
return candidate_fleets_with_offers[0][:2]

src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -776,15 +776,19 @@ async def test_fails_with_no_capacity_when_specified_fleets_occupied(
776776

777777
@pytest.mark.asyncio
778778
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
779-
async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncSession):
779+
async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified(
780+
self, test_db, session: AsyncSession
781+
):
780782
project = await create_project(session)
781783
user = await create_user(session)
782784
repo = await create_repo(session=session, project_id=project.id)
783785
fleet_spec = get_fleet_spec()
784786
fleet_spec.configuration.nodes = Range(min=0, max=1)
785-
await create_fleet(session=session, project=project, spec=fleet_spec)
787+
await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet")
786788
# Need a second non-empty fleet to have two-stage processing
787-
fleet2 = await create_fleet(session=session, project=project, spec=fleet_spec)
789+
fleet2 = await create_fleet(
790+
session=session, project=project, spec=fleet_spec, name="fleet2"
791+
)
788792
await create_instance(
789793
session=session,
790794
project=project,
@@ -807,6 +811,51 @@ async def test_assigns_job_to_elastic_empty_fleet(self, test_db, session: AsyncS
807811
await session.refresh(job)
808812
assert job.status == JobStatus.SUBMITTED
809813
assert job.instance_assigned
814+
assert job.instance_id is None
815+
assert job.fleet_id is None
816+
817+
@pytest.mark.asyncio
818+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
819+
async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified(
820+
self, test_db, session: AsyncSession
821+
):
822+
project = await create_project(session)
823+
user = await create_user(session)
824+
repo = await create_repo(session=session, project_id=project.id)
825+
fleet_spec = get_fleet_spec()
826+
fleet_spec.configuration.nodes = Range(min=0, max=1)
827+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet")
828+
# Need a second non-empty fleet to have two-stage processing
829+
fleet2 = await create_fleet(
830+
session=session, project=project, spec=fleet_spec, name="fleet2"
831+
)
832+
await create_instance(
833+
session=session,
834+
project=project,
835+
fleet=fleet2,
836+
instance_num=0,
837+
status=InstanceStatus.BUSY,
838+
)
839+
run_spec = get_run_spec(repo_id=repo.name)
840+
run_spec.configuration.fleets = [fleet.name, fleet2.name]
841+
run = await create_run(
842+
session=session,
843+
project=project,
844+
repo=repo,
845+
user=user,
846+
run_spec=run_spec,
847+
)
848+
job = await create_job(
849+
session=session,
850+
run=run,
851+
instance_assigned=False,
852+
)
853+
await process_submitted_jobs()
854+
await session.refresh(job)
855+
assert job.status == JobStatus.SUBMITTED
856+
assert job.instance_assigned
857+
assert job.instance_id is None
858+
assert job.fleet_id == fleet.id
810859

811860
@pytest.mark.asyncio
812861
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@@ -852,7 +901,6 @@ async def test_creates_new_instance_in_existing_empty_fleet(
852901
assert job.status == JobStatus.PROVISIONING
853902
assert job.instance is not None
854903
assert job.instance.instance_num == 0
855-
assert job.instance.fleet_id == fleet.id
856904

857905
@pytest.mark.asyncio
858906
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)

0 commit comments

Comments
 (0)