Skip to content

Commit 57167af

Browse files
authored
Consider elastic busy fleets for provisioning (#3024)
* Get fleets from run_spec.merged_profile.fleets * Consider elastic busy fleets for provisioning
1 parent 54c4a9f commit 57167af

File tree

2 files changed

+62
-21
lines changed

2 files changed

+62
-21
lines changed

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
from datetime import datetime, timedelta
66
from typing import List, Optional, Tuple
77

8-
from sqlalchemy import and_, or_, select
8+
from sqlalchemy import and_, not_, or_, select
99
from sqlalchemy.ext.asyncio import AsyncSession
10-
from sqlalchemy.orm import contains_eager, joinedload, load_only, selectinload
10+
from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload
1111

1212
from dstack._internal.core.backends.base.backend import Backend
1313
from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport
@@ -250,8 +250,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
250250
]
251251
if run_model.fleet is not None:
252252
fleet_filters.append(FleetModel.id == run_model.fleet_id)
253-
if run_spec.configuration.fleets is not None:
254-
fleet_filters.append(FleetModel.name.in_(run_spec.configuration.fleets))
253+
if run_spec.merged_profile.fleets is not None:
254+
fleet_filters.append(FleetModel.name.in_(run_spec.merged_profile.fleets))
255255

256256
instance_filters = [
257257
InstanceModel.deleted == False,
@@ -269,9 +269,6 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
269269
[i.id for i in f.instances] for f in fleet_models_with_instances
270270
)
271271
)
272-
fleet_models = fleet_models_with_instances + fleet_models_without_instances
273-
fleets_ids = [f.id for f in fleet_models]
274-
275272
if get_db().dialect_name == "sqlite":
276273
# Start new transaction to see committed changes after lock
277274
await session.commit()
@@ -280,13 +277,15 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
280277
InstanceModel.__tablename__, instances_ids
281278
):
282279
if get_db().dialect_name == "sqlite":
283-
fleet_models = await _refetch_fleet_models(
280+
fleets_with_instances_ids = [f.id for f in fleet_models_with_instances]
281+
fleet_models_with_instances = await _refetch_fleet_models_with_instances(
284282
session=session,
285-
fleets_ids=fleets_ids,
283+
fleets_ids=fleets_with_instances_ids,
286284
instances_ids=instances_ids,
287285
fleet_filters=fleet_filters,
288286
instance_filters=instance_filters,
289287
)
288+
fleet_models = fleet_models_with_instances + fleet_models_without_instances
290289
fleet_model, fleet_instances_with_offers = _find_optimal_fleet_with_offers(
291290
fleet_models=fleet_models,
292291
run_model=run_model,
@@ -295,7 +294,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
295294
master_job_provisioning_data=master_job_provisioning_data,
296295
volumes=volumes,
297296
)
298-
if fleet_model is None and run_spec.configuration.fleets is not None:
297+
if fleet_model is None and run_spec.merged_profile.fleets is not None:
299298
# Run cannot create new fleets when fleets are specified
300299
logger.debug("%s: failed to use specified fleets", fmt(job_model))
301300
job_model.status = JobStatus.TERMINATING
@@ -443,14 +442,21 @@ async def _select_fleet_models(
443442
*fleet_filters,
444443
FleetModel.id.not_in(fleet_models_with_instances_ids),
445444
)
446-
.where(InstanceModel.id.is_(None))
447-
.options(contains_eager(FleetModel.instances)) # loading empty relation
445+
.where(
446+
or_(
447+
InstanceModel.id.is_(None),
448+
not_(and_(*instance_filters)),
449+
)
450+
)
451+
# Load empty list of instances so that downstream code
452+
# knows this fleet has no instances eligible for offers.
453+
.options(noload(FleetModel.instances))
448454
)
449455
fleet_models_without_instances = list(res.unique().scalars().all())
450456
return fleet_models_with_instances, fleet_models_without_instances
451457

452458

453-
async def _refetch_fleet_models(
459+
async def _refetch_fleet_models_with_instances(
454460
session: AsyncSession,
455461
fleets_ids: list[uuid.UUID],
456462
instances_ids: list[uuid.UUID],
@@ -465,13 +471,8 @@ async def _refetch_fleet_models(
465471
*fleet_filters,
466472
)
467473
.where(
468-
or_(
469-
InstanceModel.id.is_(None),
470-
and_(
471-
InstanceModel.id.in_(instances_ids),
472-
*instance_filters,
473-
),
474-
)
474+
InstanceModel.id.in_(instances_ids),
475+
*instance_filters,
475476
)
476477
.options(contains_eager(FleetModel.instances))
477478
.execution_options(populate_existing=True)
@@ -538,7 +539,7 @@ def _find_optimal_fleet_with_offers(
538539
fleet_priority,
539540
)
540541
)
541-
if run_spec.configuration.fleets is None and all(
542+
if run_spec.merged_profile.fleets is None and all(
542543
t[2] == 0 for t in candidate_fleets_with_offers
543544
):
544545
# If fleets are not specified and no fleets have available offers, create a new fleet.

src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,46 @@ async def test_assigns_job_to_elastic_empty_fleet_if_fleets_specified(
808808
assert job.instance_id is None
809809
assert job.fleet_id == fleet.id
810810

811+
@pytest.mark.asyncio
812+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
813+
async def test_assigns_job_to_elastic_non_empty_busy_fleet_if_fleets_specified(
814+
self, test_db, session: AsyncSession
815+
):
816+
project = await create_project(session)
817+
user = await create_user(session)
818+
repo = await create_repo(session=session, project_id=project.id)
819+
fleet_spec = get_fleet_spec()
820+
fleet_spec.configuration.nodes = Range(min=1, max=2)
821+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet")
822+
await create_instance(
823+
session=session,
824+
project=project,
825+
fleet=fleet,
826+
instance_num=0,
827+
status=InstanceStatus.BUSY,
828+
total_blocks=1,
829+
busy_blocks=1,
830+
)
831+
run_spec = get_run_spec(repo_id=repo.name)
832+
run_spec.configuration.fleets = [fleet.name]
833+
run = await create_run(
834+
session=session,
835+
project=project,
836+
repo=repo,
837+
user=user,
838+
run_spec=run_spec,
839+
)
840+
job = await create_job(
841+
session=session,
842+
run=run,
843+
instance_assigned=False,
844+
)
845+
await process_submitted_jobs()
846+
await session.refresh(job)
847+
assert job.instance_assigned
848+
assert job.instance_id is None
849+
assert job.fleet_id == fleet.id
850+
811851
@pytest.mark.asyncio
812852
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
813853
async def test_creates_new_instance_in_existing_empty_fleet(

0 commit comments

Comments
 (0)