Skip to content

Commit bfe3521

Browse files
committed
Prioritize fleet choice by capacity and offer price
1 parent 173eba0 commit bfe3521

2 files changed

Lines changed: 127 additions & 43 deletions

File tree

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
from dstack._internal.server.services.backends import get_project_backend_by_type_or_error
5454
from dstack._internal.server.services.fleets import (
5555
fleet_model_to_fleet,
56-
get_fleet_spec,
5756
)
5857
from dstack._internal.server.services.instances import (
5958
filter_pool_instances,
@@ -298,47 +297,14 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
298297
.execution_options(populate_existing=True)
299298
)
300299
fleet_models = list(res.unique().scalars().all())
301-
fleet_instances_with_offers = []
302-
for candidate_fleet_model in fleet_models:
303-
fleet_instances_with_offers = await _get_fleet_instances_with_offers(
304-
fleet_model=candidate_fleet_model,
305-
run_spec=run_spec,
306-
job=job,
307-
master_job_provisioning_data=master_job_provisioning_data,
308-
volumes=volumes,
309-
)
310-
if run_model.fleet_id is not None:
311-
# Using the first fleet that was already chosen by the master job.
312-
fleet_model = candidate_fleet_model
313-
break
314-
# Looking for an eligible fleet for the run.
315-
# TODO: Pick optimal fleet instead of the first eligible one.
316-
fleet_spec = get_fleet_spec(candidate_fleet_model)
317-
fleet_capacity = len(
318-
[o for o in fleet_instances_with_offers if o[1].availability.is_available()]
319-
)
320-
if fleet_spec.configuration.nodes is not None:
321-
if fleet_spec.configuration.nodes.max is None:
322-
fleet_capacity = math.inf
323-
else:
324-
# FIXME: Multiple service jobs can be provisioned on one instance with blocks.
325-
# Current capacity calculation does not take future provisioned blocks into account.
326-
# It may be impossible to do since we cannot be sure which instance will be provisioned.
327-
fleet_capacity += fleet_spec.configuration.nodes.max - len(
328-
candidate_fleet_model.instances
329-
)
330-
instances_required = 1
331-
if run_spec.configuration.type == "task":
332-
instances_required = run_spec.configuration.nodes
333-
elif (
334-
run_spec.configuration.type == "service"
335-
and run_spec.configuration.replicas.min is not None
336-
):
337-
instances_required = run_spec.configuration.replicas.min
338-
if fleet_capacity >= instances_required:
339-
# TODO: Ensure we use the chosen fleet when there are no instance assigned.
340-
fleet_model = candidate_fleet_model
341-
break
300+
fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers(
301+
fleet_models=fleet_models,
302+
run_model=run_model,
303+
run_spec=run.run_spec,
304+
job=job,
305+
master_job_provisioning_data=master_job_provisioning_data,
306+
volumes=volumes,
307+
)
342308
instance = await _assign_job_to_fleet_instance(
343309
session=session,
344310
instances_with_offers=fleet_instances_with_offers,
@@ -453,7 +419,73 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
453419
await session.commit()
454420

455421

456-
async def _get_fleet_instances_with_offers(
422+
def _find_optimal_fleet_with_offers(
423+
fleet_models: list[FleetModel],
424+
run_model: RunModel,
425+
run_spec: RunSpec,
426+
job: Job,
427+
master_job_provisioning_data: Optional[JobProvisioningData],
428+
volumes: Optional[list[list[Volume]]],
429+
) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]:
430+
if run_model.fleet is not None:
431+
# Using the fleet that was already chosen by the master job
432+
fleet_instances_with_offers = _get_fleet_instances_with_offers(
433+
fleet_model=run_model.fleet,
434+
run_spec=run_spec,
435+
job=job,
436+
master_job_provisioning_data=master_job_provisioning_data,
437+
volumes=volumes,
438+
)
439+
return run_model.fleet, fleet_instances_with_offers
440+
441+
if len(fleet_models) == 0:
442+
return None, []
443+
444+
nodes_required_num = _get_nodes_required_num_for_run(run_spec)
445+
446+
candidate_fleets_with_offers: list[
447+
tuple[
448+
Optional[FleetModel],
449+
list[tuple[InstanceModel, InstanceOfferWithAvailability]],
450+
tuple[int, float],
451+
]
452+
] = []
453+
for candidate_fleet_model in fleet_models:
454+
fleet_instances_with_offers = _get_fleet_instances_with_offers(
455+
fleet_model=candidate_fleet_model,
456+
run_spec=run_spec,
457+
job=job,
458+
master_job_provisioning_data=master_job_provisioning_data,
459+
volumes=volumes,
460+
)
461+
fleet_available_offers = [
462+
o for _, o in fleet_instances_with_offers if o.availability.is_available()
463+
]
464+
fleet_has_available_capacity = nodes_required_num <= len(fleet_available_offers)
465+
fleet_cheapest_offer = math.inf
466+
if len(fleet_available_offers) > 0:
467+
fleet_cheapest_offer = fleet_available_offers[0].price
468+
fleet_priority = (not fleet_has_available_capacity, fleet_cheapest_offer)
469+
candidate_fleets_with_offers.append(
470+
(candidate_fleet_model, fleet_instances_with_offers, fleet_priority)
471+
)
472+
candidate_fleets_with_offers.sort(key=lambda t: t[-1])
473+
return candidate_fleets_with_offers[0][:2]
474+
475+
476+
def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int:
477+
nodes_required_num = 1
478+
if run_spec.configuration.type == "task":
479+
nodes_required_num = run_spec.configuration.nodes
480+
elif (
481+
run_spec.configuration.type == "service"
482+
and run_spec.configuration.replicas.min is not None
483+
):
484+
nodes_required_num = run_spec.configuration.replicas.min
485+
return nodes_required_num
486+
487+
488+
def _get_fleet_instances_with_offers(
457489
fleet_model: FleetModel,
458490
run_spec: RunSpec,
459491
job: Job,

src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,58 @@ async def test_creates_new_instance_in_existing_empty_fleet(
785785
assert job.instance.instance_num == 0
786786
assert job.instance.fleet_id == fleet.id
787787

788+
@pytest.mark.asyncio
789+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
790+
async def test_assigns_job_to_optimal_fleet(self, test_db, session: AsyncSession):
791+
project = await create_project(session)
792+
user = await create_user(session)
793+
repo = await create_repo(session=session, project_id=project.id)
794+
fleet1 = await create_fleet(session=session, project=project)
795+
fleet2 = await create_fleet(session=session, project=project)
796+
fleet3 = await create_fleet(session=session, project=project)
797+
await create_instance(
798+
session=session,
799+
project=project,
800+
fleet=fleet1,
801+
instance_num=0,
802+
status=InstanceStatus.BUSY,
803+
price=1,
804+
)
805+
await create_instance(
806+
session=session,
807+
project=project,
808+
fleet=fleet2,
809+
instance_num=0,
810+
status=InstanceStatus.IDLE,
811+
price=2,
812+
)
813+
await create_instance(
814+
session=session,
815+
project=project,
816+
fleet=fleet3,
817+
instance_num=0,
818+
status=InstanceStatus.IDLE,
819+
price=3,
820+
)
821+
run = await create_run(
822+
session=session,
823+
project=project,
824+
repo=repo,
825+
user=user,
826+
)
827+
job = await create_job(
828+
session=session,
829+
run=run,
830+
instance_assigned=False,
831+
)
832+
await process_submitted_jobs()
833+
await session.refresh(job)
834+
res = await session.execute(select(JobModel))
835+
job = res.unique().scalar_one()
836+
assert job.status == JobStatus.SUBMITTED
837+
assert job.instance_assigned
838+
assert job.fleet_id == fleet2.id
839+
788840
@pytest.mark.asyncio
789841
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
790842
async def test_picks_high_priority_jobs_first(self, test_db, session: AsyncSession):

0 commit comments

Comments
 (0)