Skip to content

Commit 115212c

Browse files
committed
Consider backend offers when choosing optimal fleet
1 parent 98aee00 commit 115212c

1 file changed

Lines changed: 65 additions & 22 deletions

File tree

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
289289
instance_filters=instance_filters,
290290
)
291291
fleet_models = fleet_models_with_instances + fleet_models_without_instances
292-
fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers(
292+
fleet_model, fleet_instances_with_offers = await _find_optimal_fleet_with_offers(
293+
project=project,
293294
fleet_models=fleet_models,
294295
run_model=run_model,
295296
run_spec=run.run_spec,
@@ -492,7 +493,8 @@ async def _refetch_fleet_models_with_instances(
492493
return fleet_models
493494

494495

495-
def _find_optimal_fleet_with_offers(
496+
async def _find_optimal_fleet_with_offers(
497+
project: ProjectModel,
496498
fleet_models: list[FleetModel],
497499
run_model: RunModel,
498500
run_spec: RunSpec,
@@ -502,58 +504,99 @@ def _find_optimal_fleet_with_offers(
502504
) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]:
503505
if run_model.fleet is not None:
504506
# Using the fleet that was already chosen by the master job
505-
fleet_instances_with_offers = _get_fleet_instances_with_offers(
507+
fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers(
506508
fleet_model=run_model.fleet,
507509
run_spec=run_spec,
508510
job=job,
509511
master_job_provisioning_data=master_job_provisioning_data,
510512
volumes=volumes,
511513
)
512-
return run_model.fleet, fleet_instances_with_offers
514+
return run_model.fleet, fleet_instances_with_pool_offers
513515

514516
if len(fleet_models) == 0:
515517
return None, []
516518

517519
nodes_required_num = _get_nodes_required_num_for_run(run_spec)
518-
# The current strategy is to first consider fleets that can accommodate
519-
# the run without additional provisioning and choose the one with the cheapest offer.
520-
# Fallback to fleet with the cheapest offer among all fleets with offers.
520+
# The current strategy is first to consider fleets that can accommodate
521+
# the run without additional provisioning and choose the one with the cheapest pool offer.
522+
# Then choose a fleet with the cheapest pool offer among all fleets with pool offers.
523+
# If there are no fleets with pool offers, choose a fleet with a cheapest backend offer.
524+
# Fallback to autocreated fleet if fleets have no pool or backend offers.
525+
# TODO: Consider trying all backend offers and then choosing a fleet.
521526
candidate_fleets_with_offers: list[
522527
tuple[
523528
Optional[FleetModel],
524529
list[tuple[InstanceModel, InstanceOfferWithAvailability]],
525530
int,
526-
tuple[int, float],
531+
int,
532+
tuple[int, float, float],
527533
]
528534
] = []
529535
for candidate_fleet_model in fleet_models:
530-
fleet_instances_with_offers = _get_fleet_instances_with_offers(
536+
fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers(
531537
fleet_model=candidate_fleet_model,
532538
run_spec=run_spec,
533539
job=job,
534540
master_job_provisioning_data=master_job_provisioning_data,
535541
volumes=volumes,
536542
)
537-
fleet_available_offers = [
538-
o for _, o in fleet_instances_with_offers if o.availability.is_available()
539-
]
540-
fleet_has_available_capacity = nodes_required_num <= len(fleet_available_offers)
541-
fleet_cheapest_offer = math.inf
542-
if len(fleet_available_offers) > 0:
543-
fleet_cheapest_offer = fleet_available_offers[0].price
544-
fleet_priority = (not fleet_has_available_capacity, fleet_cheapest_offer)
543+
fleet_has_available_capacity = nodes_required_num <= len(fleet_instances_with_pool_offers)
544+
fleet_cheapest_pool_offer = math.inf
545+
if len(fleet_instances_with_pool_offers) > 0:
546+
fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price
547+
548+
candidate_fleet = fleet_model_to_fleet(candidate_fleet_model)
549+
profile = combine_fleet_and_run_profiles(
550+
candidate_fleet.spec.merged_profile, run_spec.merged_profile
551+
)
552+
fleet_requirements = get_fleet_requirements(candidate_fleet.spec)
553+
requirements = combine_fleet_and_run_requirements(
554+
fleet_requirements, job.job_spec.requirements
555+
)
556+
multinode = (
557+
candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
558+
or job.job_spec.jobs_per_replica > 1
559+
)
560+
fleet_backend_offers = []
561+
if (
562+
_check_can_create_new_instance_in_fleet(candidate_fleet)
563+
and profile is not None
564+
and requirements is not None
565+
):
566+
fleet_backend_offers = await get_offers_by_requirements(
567+
project=project,
568+
profile=profile,
569+
requirements=requirements,
570+
exclude_not_available=True,
571+
multinode=multinode,
572+
master_job_provisioning_data=master_job_provisioning_data,
573+
volumes=volumes,
574+
privileged=job.job_spec.privileged,
575+
instance_mounts=check_run_spec_requires_instance_mounts(run_spec),
576+
)
577+
578+
fleet_cheapest_backend_offer = math.inf
579+
if len(fleet_backend_offers) > 0:
580+
fleet_cheapest_backend_offer = fleet_backend_offers[0][1].price
581+
582+
fleet_priority = (
583+
not fleet_has_available_capacity,
584+
fleet_cheapest_pool_offer,
585+
fleet_cheapest_backend_offer,
586+
)
545587
candidate_fleets_with_offers.append(
546588
(
547589
candidate_fleet_model,
548-
fleet_instances_with_offers,
549-
len(fleet_available_offers),
590+
fleet_instances_with_pool_offers,
591+
len(fleet_instances_with_pool_offers),
592+
len(fleet_backend_offers),
550593
fleet_priority,
551594
)
552595
)
553596
if run_spec.merged_profile.fleets is None and all(
554-
t[2] == 0 for t in candidate_fleets_with_offers
597+
t[2] == 0 and t[3] == 0 for t in candidate_fleets_with_offers
555598
):
556-
# If fleets are not specified and no fleets have available offers, create a new fleet.
599+
# If fleets are not specified and no fleets have available pool or backend offers, create a new fleet.
557600
# This is for compatibility with non-fleet-first UX when runs created new fleets
558601
# if there are no instances to reuse.
559602
return None, []
@@ -573,7 +616,7 @@ def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int:
573616
return nodes_required_num
574617

575618

576-
def _get_fleet_instances_with_offers(
619+
def _get_fleet_instances_with_pool_offers(
577620
fleet_model: FleetModel,
578621
run_spec: RunSpec,
579622
job: Job,

0 commit comments

Comments
 (0)