Skip to content

Commit 38d192a

Browse files
committed
Fix submitted jobs placement group cleanup
1 parent af2b18f commit 38d192a

3 files changed

Lines changed: 282 additions & 26 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 142 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@
118118
)
119119
from dstack._internal.server.services.placement import (
120120
find_or_create_suitable_placement_group,
121-
get_fleet_placement_group_models,
122121
get_placement_group_model_for_job,
123122
placement_group_model_to_placement_group_optional,
123+
schedule_fleet_placement_groups_deletion,
124124
)
125125
from dstack._internal.server.services.runs import run_model_to_run
126126
from dstack._internal.server.services.runs.plan import (
@@ -357,11 +357,19 @@ class _RetrySubmittedJobResult:
357357
pass
358358

359359

360+
@dataclass
361+
class _PlacementGroupCleanup:
362+
fleet_id: uuid.UUID
363+
selected_placement_group_id: Optional[uuid.UUID]
364+
new_placement_group_models: list[PlacementGroupModel]
365+
366+
360367
@dataclass
361368
class _TerminateSubmittedJobResult:
362369
reason: JobTerminationReason
363370
message: Optional[str] = None
364371
locked_fleet_id: Optional[uuid.UUID] = None
372+
placement_group_cleanup: Optional[_PlacementGroupCleanup] = None
365373

366374

367375
@dataclass
@@ -409,12 +417,17 @@ class _ExistingInstanceProvisioning:
409417
volume_attachment_result: _VolumeAttachmentResult
410418

411419

420+
@dataclass
421+
class _FailedNewCapacityProvisioning:
422+
placement_group_cleanup: Optional[_PlacementGroupCleanup]
423+
424+
412425
@dataclass
413426
class _ProvisionNewCapacityResult:
414427
provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData]
415428
offer: InstanceOfferWithAvailability
416429
effective_profile: Profile
417-
new_placement_group_models: list[PlacementGroupModel]
430+
placement_group_cleanup: Optional[_PlacementGroupCleanup]
418431

419432

420433
@dataclass
@@ -423,7 +436,7 @@ class _NewCapacityProvisioning:
423436
offer: InstanceOfferWithAvailability
424437
effective_profile: Profile
425438
created_fleet_model: Optional[FleetModel]
426-
new_placement_group_models: list[PlacementGroupModel]
439+
placement_group_cleanup: Optional[_PlacementGroupCleanup]
427440
volume_attachment_result: Optional[_VolumeAttachmentResult]
428441
locked_fleet_id: Optional[uuid.UUID]
429442

@@ -966,6 +979,17 @@ async def _apply_provisioning_result(
966979
return
967980

968981
if isinstance(provisioning, _TerminateSubmittedJobResult):
982+
if provisioning.placement_group_cleanup is not None:
983+
cleanup_fleet_model = await _load_placement_group_cleanup_fleet(
984+
session=session,
985+
fleet_id=provisioning.placement_group_cleanup.fleet_id,
986+
)
987+
await _persist_placement_group_cleanup(
988+
session=session,
989+
fleet_model=cleanup_fleet_model,
990+
project=cleanup_fleet_model.project,
991+
placement_group_cleanup=provisioning.placement_group_cleanup,
992+
)
969993
await _unlock_related_fleet(
970994
session=session,
971995
item=item,
@@ -1087,11 +1111,12 @@ async def _process_new_capacity_provisioning(
10871111
master_job_provisioning_data=master_provisioning_data,
10881112
volumes=preconditions.prepared_job_volumes.volumes,
10891113
)
1090-
if provision_new_capacity_result is None:
1114+
if isinstance(provision_new_capacity_result, _FailedNewCapacityProvisioning):
10911115
logger.debug("%s: provisioning failed", fmt(context.job_model))
10921116
return _TerminateSubmittedJobResult(
10931117
reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
10941118
locked_fleet_id=locked_fleet_id,
1119+
placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup,
10951120
)
10961121

10971122
created_fleet_model = None
@@ -1119,7 +1144,7 @@ async def _process_new_capacity_provisioning(
11191144
offer=provision_new_capacity_result.offer,
11201145
effective_profile=provision_new_capacity_result.effective_profile,
11211146
created_fleet_model=created_fleet_model,
1122-
new_placement_group_models=provision_new_capacity_result.new_placement_group_models,
1147+
placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup,
11231148
volume_attachment_result=volume_attachment_result,
11241149
locked_fleet_id=locked_fleet_id,
11251150
)
@@ -1151,10 +1176,12 @@ async def _apply_new_capacity_provisioning(
11511176
)
11521177

11531178
assert fleet_model is not None
1154-
for placement_group_model in provisioning.new_placement_group_models:
1155-
placement_group_model.project = fresh_context.project
1156-
placement_group_model.fleet = fleet_model
1157-
session.add(placement_group_model)
1179+
await _persist_placement_group_cleanup(
1180+
session=session,
1181+
fleet_model=fleet_model,
1182+
project=fresh_context.project,
1183+
placement_group_cleanup=provisioning.placement_group_cleanup,
1184+
)
11581185

11591186
instance_models, _ = await _materialize_newly_provisioned_capacity(
11601187
session=session,
@@ -1737,7 +1764,7 @@ async def _provision_new_capacity(
17371764
master_job_provisioning_data: Optional[JobProvisioningData] = None,
17381765
volumes: Optional[list[list[Volume]]] = None,
17391766
fleet_model: Optional[FleetModel] = None,
1740-
) -> Optional[_ProvisionNewCapacityResult]:
1767+
) -> Union[_FailedNewCapacityProvisioning, _ProvisionNewCapacityResult]:
17411768
job = jobs[0]
17421769
if volumes is None:
17431770
volumes = []
@@ -1748,13 +1775,16 @@ async def _provision_new_capacity(
17481775
fleet_model=fleet_model,
17491776
)
17501777
if effective_profile_and_requirements is None:
1751-
return None
1778+
return _FailedNewCapacityProvisioning(placement_group_cleanup=None)
17521779
profile, requirements = effective_profile_and_requirements
17531780

17541781
placement_group_models = await _load_fleet_placement_group_models(
17551782
fleet_id=fleet_model.id if fleet_model else None,
17561783
)
17571784
new_placement_group_models: list[PlacementGroupModel] = []
1785+
known_placement_group_ids = {
1786+
placement_group_model.id for placement_group_model in placement_group_models
1787+
}
17581788
placement_group_model = get_placement_group_model_for_job(
17591789
placement_group_models=placement_group_models,
17601790
fleet_model=fleet_model,
@@ -1772,6 +1802,7 @@ async def _provision_new_capacity(
17721802
instance_mounts=check_run_spec_requires_instance_mounts(run.run_spec),
17731803
placement_group=placement_group_model_to_placement_group_optional(placement_group_model),
17741804
)
1805+
offers_tried = 0
17751806
for backend, offer in offers[: settings.MAX_OFFERS_TRIED]:
17761807
logger.debug(
17771808
"%s: trying %s in %s/%s for $%0.4f per hour",
@@ -1810,9 +1841,11 @@ async def _provision_new_capacity(
18101841
)
18111842
if placement_group_model is None:
18121843
continue
1813-
if placement_group_model.id not in {pg.id for pg in placement_group_models}:
1844+
if placement_group_model.id not in known_placement_group_ids:
18141845
new_placement_group_models.append(placement_group_model)
18151846
placement_group_models.append(placement_group_model)
1847+
known_placement_group_ids.add(placement_group_model.id)
1848+
offers_tried += 1
18161849
try:
18171850
if len(jobs) > 1 and offer.backend in BACKENDS_WITH_GROUP_PROVISIONING_SUPPORT:
18181851
assert isinstance(compute, ComputeWithGroupProvisioningSupport)
@@ -1829,7 +1862,14 @@ async def _provision_new_capacity(
18291862
provisioning_data=compute_group_provisioning_data,
18301863
offer=offer,
18311864
effective_profile=profile,
1832-
new_placement_group_models=new_placement_group_models,
1865+
placement_group_cleanup=_build_placement_group_cleanup(
1866+
fleet_model=fleet_model,
1867+
offers_tried=offers_tried,
1868+
selected_placement_group_id=(
1869+
None if placement_group_model is None else placement_group_model.id
1870+
),
1871+
new_placement_group_models=new_placement_group_models,
1872+
),
18331873
)
18341874
job_provisioning_data = await run_async(
18351875
compute.run_job,
@@ -1845,7 +1885,14 @@ async def _provision_new_capacity(
18451885
provisioning_data=job_provisioning_data,
18461886
offer=offer,
18471887
effective_profile=profile,
1848-
new_placement_group_models=new_placement_group_models,
1888+
placement_group_cleanup=_build_placement_group_cleanup(
1889+
fleet_model=fleet_model,
1890+
offers_tried=offers_tried,
1891+
selected_placement_group_id=(
1892+
None if placement_group_model is None else placement_group_model.id
1893+
),
1894+
new_placement_group_models=new_placement_group_models,
1895+
),
18491896
)
18501897
except BackendError as e:
18511898
logger.warning(
@@ -1866,25 +1913,94 @@ async def _provision_new_capacity(
18661913
offer.region,
18671914
)
18681915
continue
1869-
finally:
1870-
if fleet_model is not None and len(fleet_model.instances) == 0:
1871-
for placement_group in placement_group_models:
1872-
if (
1873-
placement_group_model is None
1874-
or placement_group.id != placement_group_model.id
1875-
):
1876-
placement_group.fleet_deleted = True
1877-
return None
1916+
return _FailedNewCapacityProvisioning(
1917+
placement_group_cleanup=_build_placement_group_cleanup(
1918+
fleet_model=fleet_model,
1919+
offers_tried=offers_tried,
1920+
selected_placement_group_id=None,
1921+
new_placement_group_models=new_placement_group_models,
1922+
)
1923+
)
18781924

18791925

18801926
async def _load_fleet_placement_group_models(
18811927
fleet_id: Optional[uuid.UUID],
18821928
) -> list["PlacementGroupModel"]:
1929+
if fleet_id is None:
1930+
return []
1931+
18831932
async with get_session_ctx() as session:
1884-
return await get_fleet_placement_group_models(
1885-
session=session,
1886-
fleet_id=fleet_id,
1933+
res = await session.execute(
1934+
select(PlacementGroupModel)
1935+
.where(
1936+
and_(
1937+
PlacementGroupModel.fleet_id == fleet_id,
1938+
PlacementGroupModel.deleted == False,
1939+
PlacementGroupModel.fleet_deleted == False,
1940+
)
1941+
)
1942+
.options(
1943+
joinedload(PlacementGroupModel.project).load_only(
1944+
ProjectModel.id,
1945+
ProjectModel.name,
1946+
)
1947+
)
1948+
)
1949+
return list(res.scalars().all())
1950+
1951+
1952+
def _build_placement_group_cleanup(
1953+
fleet_model: Optional[FleetModel],
1954+
offers_tried: int,
1955+
selected_placement_group_id: Optional[uuid.UUID],
1956+
new_placement_group_models: list[PlacementGroupModel],
1957+
) -> Optional[_PlacementGroupCleanup]:
1958+
if fleet_model is None or len(fleet_model.instances) != 0 or offers_tried == 0:
1959+
return None
1960+
return _PlacementGroupCleanup(
1961+
fleet_id=fleet_model.id,
1962+
selected_placement_group_id=selected_placement_group_id,
1963+
new_placement_group_models=new_placement_group_models,
1964+
)
1965+
1966+
1967+
async def _load_placement_group_cleanup_fleet(
1968+
session: AsyncSession,
1969+
fleet_id: uuid.UUID,
1970+
) -> FleetModel:
1971+
res = await session.execute(
1972+
select(FleetModel)
1973+
.where(FleetModel.id == fleet_id)
1974+
.options(joinedload(FleetModel.project).load_only(ProjectModel.id, ProjectModel.name))
1975+
)
1976+
return res.unique().scalar_one()
1977+
1978+
1979+
async def _persist_placement_group_cleanup(
1980+
session: AsyncSession,
1981+
fleet_model: FleetModel,
1982+
project: ProjectModel,
1983+
placement_group_cleanup: Optional[_PlacementGroupCleanup],
1984+
) -> None:
1985+
if placement_group_cleanup is None:
1986+
return
1987+
1988+
assert fleet_model.id == placement_group_cleanup.fleet_id
1989+
except_placement_group_ids = ()
1990+
if placement_group_cleanup.selected_placement_group_id is not None:
1991+
except_placement_group_ids = (placement_group_cleanup.selected_placement_group_id,)
1992+
await schedule_fleet_placement_groups_deletion(
1993+
session=session,
1994+
fleet_id=placement_group_cleanup.fleet_id,
1995+
except_placement_group_ids=except_placement_group_ids,
1996+
)
1997+
for placement_group_model in placement_group_cleanup.new_placement_group_models:
1998+
placement_group_model.project = project
1999+
placement_group_model.fleet = fleet_model
2000+
placement_group_model.fleet_deleted = (
2001+
placement_group_model.id != placement_group_cleanup.selected_placement_group_id
18872002
)
2003+
session.add(placement_group_model)
18882004

18892005

18902006
def _get_effective_profile_and_requirements(

src/dstack/_internal/server/services/placement.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import uuid
12
from collections.abc import Iterable
23
from typing import Optional
34
from uuid import UUID
@@ -174,6 +175,7 @@ async def create_placement_group(
174175
compute: ComputeWithPlacementGroupSupport,
175176
) -> Optional[PlacementGroupModel]:
176177
placement_group_model = PlacementGroupModel(
178+
id=uuid.uuid4(),
177179
# TODO: generate the name in Compute.create_placement_group to allow
178180
# backend-specific name length limits
179181
name=generate_unique_placement_group_name(

0 commit comments

Comments
 (0)