Skip to content

Commit 6d9b016

Browse files
authored
Consider backend offers when choosing optimal fleet (#3101)
* Consider backend offers when choosing optimal fleet * Test
1 parent f0f273c commit 6d9b016

File tree

2 files changed

+119
-23
lines changed

2 files changed

+119
-23
lines changed

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
289289
instance_filters=instance_filters,
290290
)
291291
fleet_models = fleet_models_with_instances + fleet_models_without_instances
292-
fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers(
292+
fleet_model, fleet_instances_with_offers = await _find_optimal_fleet_with_offers(
293+
project=project,
293294
fleet_models=fleet_models,
294295
run_model=run_model,
295296
run_spec=run.run_spec,
@@ -492,7 +493,8 @@ async def _refetch_fleet_models_with_instances(
492493
return fleet_models
493494

494495

495-
def _find_optimal_fleet_with_offers(
496+
async def _find_optimal_fleet_with_offers(
497+
project: ProjectModel,
496498
fleet_models: list[FleetModel],
497499
run_model: RunModel,
498500
run_spec: RunSpec,
@@ -502,58 +504,99 @@ def _find_optimal_fleet_with_offers(
502504
) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]:
503505
if run_model.fleet is not None:
504506
# Using the fleet that was already chosen by the master job
505-
fleet_instances_with_offers = _get_fleet_instances_with_offers(
507+
fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers(
506508
fleet_model=run_model.fleet,
507509
run_spec=run_spec,
508510
job=job,
509511
master_job_provisioning_data=master_job_provisioning_data,
510512
volumes=volumes,
511513
)
512-
return run_model.fleet, fleet_instances_with_offers
514+
return run_model.fleet, fleet_instances_with_pool_offers
513515

514516
if len(fleet_models) == 0:
515517
return None, []
516518

517519
nodes_required_num = _get_nodes_required_num_for_run(run_spec)
518-
# The current strategy is to first consider fleets that can accommodate
519-
# the run without additional provisioning and choose the one with the cheapest offer.
520-
# Fallback to fleet with the cheapest offer among all fleets with offers.
520+
# The current strategy is first to consider fleets that can accommodate
521+
# the run without additional provisioning and choose the one with the cheapest pool offer.
522+
# Then choose a fleet with the cheapest pool offer among all fleets with pool offers.
523+
# If there are no fleets with pool offers, choose a fleet with a cheapest backend offer.
524+
# Fallback to autocreated fleet if fleets have no pool or backend offers.
525+
# TODO: Consider trying all backend offers and then choosing a fleet.
521526
candidate_fleets_with_offers: list[
522527
tuple[
523528
Optional[FleetModel],
524529
list[tuple[InstanceModel, InstanceOfferWithAvailability]],
525530
int,
526-
tuple[int, float],
531+
int,
532+
tuple[int, float, float],
527533
]
528534
] = []
529535
for candidate_fleet_model in fleet_models:
530-
fleet_instances_with_offers = _get_fleet_instances_with_offers(
536+
fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers(
531537
fleet_model=candidate_fleet_model,
532538
run_spec=run_spec,
533539
job=job,
534540
master_job_provisioning_data=master_job_provisioning_data,
535541
volumes=volumes,
536542
)
537-
fleet_available_offers = [
538-
o for _, o in fleet_instances_with_offers if o.availability.is_available()
539-
]
540-
fleet_has_available_capacity = nodes_required_num <= len(fleet_available_offers)
541-
fleet_cheapest_offer = math.inf
542-
if len(fleet_available_offers) > 0:
543-
fleet_cheapest_offer = fleet_available_offers[0].price
544-
fleet_priority = (not fleet_has_available_capacity, fleet_cheapest_offer)
543+
fleet_has_available_capacity = nodes_required_num <= len(fleet_instances_with_pool_offers)
544+
fleet_cheapest_pool_offer = math.inf
545+
if len(fleet_instances_with_pool_offers) > 0:
546+
fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price
547+
548+
candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
549+
profile = combine_fleet_and_run_profiles(
550+
candidate_fleet.spec.merged_profile, run_spec.merged_profile
551+
)
552+
fleet_requirements = get_fleet_requirements(candidate_fleet.spec)
553+
requirements = combine_fleet_and_run_requirements(
554+
fleet_requirements, job.job_spec.requirements
555+
)
556+
multinode = (
557+
candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
558+
or job.job_spec.jobs_per_replica > 1
559+
)
560+
fleet_backend_offers = []
561+
if (
562+
_check_can_create_new_instance_in_fleet(candidate_fleet)
563+
and profile is not None
564+
and requirements is not None
565+
):
566+
fleet_backend_offers = await get_offers_by_requirements(
567+
project=project,
568+
profile=profile,
569+
requirements=requirements,
570+
exclude_not_available=True,
571+
multinode=multinode,
572+
master_job_provisioning_data=master_job_provisioning_data,
573+
volumes=volumes,
574+
privileged=job.job_spec.privileged,
575+
instance_mounts=check_run_spec_requires_instance_mounts(run_spec),
576+
)
577+
578+
fleet_cheapest_backend_offer = math.inf
579+
if len(fleet_backend_offers) > 0:
580+
fleet_cheapest_backend_offer = fleet_backend_offers[0][1].price
581+
582+
fleet_priority = (
583+
not fleet_has_available_capacity,
584+
fleet_cheapest_pool_offer,
585+
fleet_cheapest_backend_offer,
586+
)
545587
candidate_fleets_with_offers.append(
546588
(
547589
candidate_fleet_model,
548-
fleet_instances_with_offers,
549-
len(fleet_available_offers),
590+
fleet_instances_with_pool_offers,
591+
len(fleet_instances_with_pool_offers),
592+
len(fleet_backend_offers),
550593
fleet_priority,
551594
)
552595
)
553596
if run_spec.merged_profile.fleets is None and all(
554-
t[2] == 0 for t in candidate_fleets_with_offers
597+
t[2] == 0 and t[3] == 0 for t in candidate_fleets_with_offers
555598
):
556-
# If fleets are not specified and no fleets have available offers, create a new fleet.
599+
# If fleets are not specified and no fleets have available pool or backend offers, create a new fleet.
557600
# This is for compatibility with non-fleet-first UX when runs created new fleets
558601
# if there are no instances to reuse.
559602
return None, []
@@ -573,7 +616,7 @@ def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int:
573616
return nodes_required_num
574617

575618

576-
def _get_fleet_instances_with_offers(
619+
def _get_fleet_instances_with_pool_offers(
577620
fleet_model: FleetModel,
578621
run_spec: RunSpec,
579622
job: Job,

src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
InstanceStatus,
1717
)
1818
from dstack._internal.core.models.profiles import Profile
19+
from dstack._internal.core.models.resources import Range, ResourcesSpec
1920
from dstack._internal.core.models.runs import (
2021
JobStatus,
2122
JobTerminationReason,
@@ -744,7 +745,7 @@ async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session:
744745

745746
@pytest.mark.asyncio
746747
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
747-
async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified(
748+
async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers_if_fleets_unspecified(
748749
self, test_db, session: AsyncSession
749750
):
750751
project = await create_project(session)
@@ -782,6 +783,58 @@ async def test_does_not_assign_job_to_elastic_empty_fleet_if_fleets_unspecified(
782783
assert job.instance_id is None
783784
assert job.fleet_id is None
784785

786+
@pytest.mark.asyncio
787+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
788+
async def test_assigns_job_to_elastic_empty_fleet_with_backend_offers_if_fleets_unspecified(
789+
self, test_db, session: AsyncSession
790+
):
791+
project = await create_project(session)
792+
user = await create_user(session)
793+
repo = await create_repo(session=session, project_id=project.id)
794+
fleet_spec1 = get_fleet_spec()
795+
fleet_spec1.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
796+
fleet1 = await create_fleet(
797+
session=session, project=project, spec=fleet_spec1, name="fleet"
798+
)
799+
# Need a second non-empty fleet to have two-stage processing
800+
fleet_spec2 = get_fleet_spec()
801+
# Empty resources intersection to return no backend offers
802+
fleet_spec2.configuration.resources = ResourcesSpec(cpu=Range(min=0, max=0))
803+
fleet2 = await create_fleet(
804+
session=session, project=project, spec=fleet_spec2, name="fleet2"
805+
)
806+
await create_instance(
807+
session=session,
808+
project=project,
809+
fleet=fleet2,
810+
instance_num=0,
811+
status=InstanceStatus.BUSY,
812+
)
813+
run = await create_run(
814+
session=session,
815+
project=project,
816+
repo=repo,
817+
user=user,
818+
)
819+
job = await create_job(
820+
session=session,
821+
run=run,
822+
instance_assigned=False,
823+
)
824+
aws_mock = Mock()
825+
aws_mock.TYPE = BackendType.AWS
826+
offer = get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0)
827+
aws_mock.compute.return_value = Mock(spec=ComputeMockSpec)
828+
aws_mock.compute.return_value.get_offers.return_value = [offer]
829+
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
830+
m.return_value = [aws_mock]
831+
await process_submitted_jobs()
832+
await session.refresh(job)
833+
assert job.status == JobStatus.SUBMITTED
834+
assert job.instance_assigned
835+
assert job.instance_id is None
836+
assert job.fleet_id == fleet1.id
837+
785838
@pytest.mark.asyncio
786839
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
787840
async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified(

0 commit comments

Comments
 (0)