Skip to content

Commit e28715b

Browse files
committed
Fix submitted jobs cluster fleet locking
1 parent 3043970 commit e28715b

1 file changed

Lines changed: 36 additions & 17 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,16 +1082,17 @@ async def _process_new_capacity_provisioning(
10821082
) -> _ProvisioningResult:
10831083
fleet_model = context.fleet_model
10841084
locked_fleet_id = None
1085-
if _should_lock_related_cluster_master_fleet(context=context):
1085+
if _should_refresh_related_cluster_master_fleet(context=context):
10861086
assert fleet_model is not None
1087-
fleet_model = await _lock_related_cluster_master_fleet(
1087+
related_cluster_master_fleet = await _resolve_related_cluster_master_fleet(
10881088
item=item,
10891089
fleet_id=fleet_model.id,
10901090
)
1091-
if fleet_model is None:
1091+
if related_cluster_master_fleet is None:
10921092
logger.debug("%s: cluster fleet is locked for provisioning", fmt(context.job_model))
10931093
return _RetrySubmittedJobResult()
1094-
locked_fleet_id = fleet_model.id
1094+
fleet_model = related_cluster_master_fleet.fleet_model
1095+
locked_fleet_id = related_cluster_master_fleet.locked_fleet_id
10951096

10961097
master_provisioning_data = (
10971098
preconditions.master_job_provisioning_data
@@ -1650,51 +1651,69 @@ def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]:
16501651
return fleet_spec
16511652

16521653

1653-
def _should_lock_related_cluster_master_fleet(context: _SubmittedJobContext) -> bool:
1654+
def _should_refresh_related_cluster_master_fleet(context: _SubmittedJobContext) -> bool:
16541655
return (
16551656
is_master_job(context.job)
16561657
and context.fleet_model is not None
16571658
and _get_cluster_fleet_spec(context.fleet_model) is not None
1658-
and len(context.fleet_model.instances) == 0
16591659
)
16601660

16611661

1662-
async def _lock_related_cluster_master_fleet(
1662+
@dataclass
1663+
class _ResolvedRelatedClusterMasterFleet:
1664+
fleet_model: FleetModel
1665+
locked_fleet_id: Optional[uuid.UUID]
1666+
1667+
1668+
async def _resolve_related_cluster_master_fleet(
16631669
item: JobSubmittedPipelineItem,
16641670
fleet_id: uuid.UUID,
1665-
) -> Optional[FleetModel]:
1671+
) -> Optional[_ResolvedRelatedClusterMasterFleet]:
16661672
now = get_current_datetime()
16671673
related_fleet_lock_owner = _get_related_fleet_lock_owner(item.id)
16681674
fleet_lock, _ = get_locker(get_db().dialect_name).get_lockset(FleetModel.__tablename__)
16691675
async with fleet_lock:
16701676
async with get_session_ctx() as session:
1677+
# To avoid violating cluster placement during master provisioning,
1678+
# lock empty fleets and respect existing instances in non-empty fleets.
1679+
# Refetch the fleet under lock before deciding which case we are in.
16711680
res = await session.execute(
16721681
select(FleetModel)
16731682
.where(
16741683
FleetModel.id == fleet_id,
1675-
or_(
1676-
FleetModel.lock_expires_at.is_(None),
1677-
and_(
1678-
FleetModel.lock_owner == related_fleet_lock_owner,
1679-
FleetModel.lock_expires_at < now,
1680-
),
1681-
),
16821684
)
16831685
.options(
16841686
joinedload(FleetModel.project).load_only(ProjectModel.id, ProjectModel.name)
16851687
)
16861688
.options(selectinload(FleetModel.instances.and_(InstanceModel.deleted == False)))
1687-
.with_for_update(skip_locked=True, key_share=True, of=FleetModel)
1689+
.execution_options(populate_existing=True)
1690+
.with_for_update(skip_locked=True, of=FleetModel)
16881691
)
16891692
fleet_model = res.unique().scalar_one_or_none()
16901693
if fleet_model is None:
16911694
return None
1695+
if len(fleet_model.instances) != 0:
1696+
return _ResolvedRelatedClusterMasterFleet(
1697+
fleet_model=fleet_model,
1698+
locked_fleet_id=None,
1699+
)
1700+
if not (
1701+
fleet_model.lock_expires_at is None
1702+
or (
1703+
fleet_model.lock_owner == related_fleet_lock_owner
1704+
and fleet_model.lock_expires_at < now
1705+
)
1706+
):
1707+
return None
16921708

16931709
fleet_model.lock_expires_at = item.lock_expires_at
16941710
fleet_model.lock_token = item.lock_token
16951711
fleet_model.lock_owner = related_fleet_lock_owner
16961712
await session.commit()
1697-
return fleet_model
1713+
return _ResolvedRelatedClusterMasterFleet(
1714+
fleet_model=fleet_model,
1715+
locked_fleet_id=fleet_model.id,
1716+
)
16981717

16991718

17001719
async def _unlock_related_fleet(

0 commit comments

Comments
 (0)