Skip to content

Commit b5d32b9

Browse files
committed
Extract submitted-jobs master fleet locking
1 parent 0ac00c5 commit b5d32b9

1 file changed

Lines changed: 61 additions & 47 deletions

File tree

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -921,56 +921,70 @@ async def _lock_fleet_and_get_master_provisioning_data(
921921
# cluster master from loaded fleet instances here. Resolve the current master via
922922
# FleetModel.current_master_instance_id so jobs follow the same master election
923923
# as FleetPipeline/InstancePipeline.
924-
master_instance_provisioning_data = None
925-
if is_master_job(job) and fleet_model is not None:
926-
fleet_spec = get_fleet_spec(fleet_model)
927-
if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER:
928-
# To avoid violating fleet placement cluster during master provisioning,
929-
# we must lock empty fleets and respect existing instances in non-empty fleets.
930-
# On SQLite always take the lock during master provisioning for simplicity.
931-
# It's fine to lock fleets currently locked by pipelines (with lock_* fields set)
932-
# since we won't update fleets – we only need to ensure there is no parallel provisioning.
933-
await exit_stack.enter_async_context(
934-
get_locker(get_db().dialect_name).lock_ctx(
935-
FleetModel.__tablename__, [fleet_model.id]
936-
)
937-
)
938-
await sqlite_commit(session)
939-
res = await session.execute(
940-
select(FleetModel)
941-
.where(
942-
FleetModel.id == fleet_model.id,
943-
~exists().where(
944-
InstanceModel.fleet_id == fleet_model.id,
945-
InstanceModel.deleted == False,
946-
),
947-
)
948-
.with_for_update(key_share=True, of=FleetModel)
949-
.execution_options(populate_existing=True)
950-
.options(noload(FleetModel.instances))
951-
)
952-
empty_fleet_model = res.unique().scalar()
953-
if empty_fleet_model is not None:
954-
fleet_model = empty_fleet_model
955-
else:
956-
res = await session.execute(
957-
select(FleetModel)
958-
.join(FleetModel.instances)
959-
.where(
960-
FleetModel.id == fleet_model.id,
961-
InstanceModel.deleted == False,
962-
)
963-
.options(contains_eager(FleetModel.instances))
964-
.execution_options(populate_existing=True)
965-
)
966-
fleet_model = res.unique().scalar_one()
967-
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
968-
fleet_model=fleet_model,
969-
fleet_spec=fleet_spec,
970-
)
924+
if not is_master_job(job) or fleet_model is None:
925+
return fleet_model, None
926+
927+
fleet_spec = _get_cluster_fleet_spec(fleet_model)
928+
if fleet_spec is None:
929+
return fleet_model, None
930+
931+
# To avoid violating fleet placement cluster during master provisioning,
932+
# we must lock empty fleets and respect existing instances in non-empty fleets.
933+
# On SQLite always take the lock during master provisioning for simplicity.
934+
# It's fine to lock fleets currently locked by pipelines (with lock_* fields set)
935+
# since we won't update fleets – we only need to ensure there is no parallel provisioning.
936+
await exit_stack.enter_async_context(
937+
get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, [fleet_model.id])
938+
)
939+
await sqlite_commit(session)
940+
fleet_model = await _refetch_cluster_master_fleet(session=session, fleet_model=fleet_model)
941+
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
942+
fleet_model=fleet_model,
943+
fleet_spec=fleet_spec,
944+
)
971945
return fleet_model, master_instance_provisioning_data
972946

973947

948+
def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]:
949+
fleet_spec = get_fleet_spec(fleet_model)
950+
if fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER:
951+
return None
952+
return fleet_spec
953+
954+
955+
async def _refetch_cluster_master_fleet(
956+
session: AsyncSession, fleet_model: FleetModel
957+
) -> FleetModel:
958+
res = await session.execute(
959+
select(FleetModel)
960+
.where(
961+
FleetModel.id == fleet_model.id,
962+
~exists().where(
963+
InstanceModel.fleet_id == fleet_model.id,
964+
InstanceModel.deleted == False,
965+
),
966+
)
967+
.with_for_update(key_share=True, of=FleetModel)
968+
.execution_options(populate_existing=True)
969+
.options(noload(FleetModel.instances))
970+
)
971+
empty_fleet_model = res.unique().scalar()
972+
if empty_fleet_model is not None:
973+
return empty_fleet_model
974+
975+
res = await session.execute(
976+
select(FleetModel)
977+
.join(FleetModel.instances)
978+
.where(
979+
FleetModel.id == fleet_model.id,
980+
InstanceModel.deleted == False,
981+
)
982+
.options(contains_eager(FleetModel.instances))
983+
.execution_options(populate_existing=True)
984+
)
985+
return res.unique().scalar_one()
986+
987+
974988
async def _assign_existing_instance_to_job(
975989
session: AsyncSession,
976990
fleet_model: Optional[FleetModel],

0 commit comments

Comments
 (0)