@@ -637,6 +637,14 @@ async def _process_new_capacity_provisioning_path(
637637 job = context .job ,
638638 )
639639
640+ if context .fleet_model is not None and fleet_model is None :
641+ await _defer_submitted_job (
642+ session = session ,
643+ job_model = context .job_model ,
644+ log_message = "cluster fleet is locked" ,
645+ )
646+ return None
647+
640648 # master_job_provisioning_data is present if there is a master job.
641649 # master_instance_provisioning_data is present if there is a master instance (non empty cluster fleet).
642650 master_provisioning_data = master_job_provisioning_data or master_instance_provisioning_data
@@ -1021,6 +1029,8 @@ async def _lock_fleet_and_get_master_provisioning_data(
10211029 )
10221030 await sqlite_commit (session )
10231031 fleet_model = await _refetch_cluster_master_fleet (session = session , fleet_model = fleet_model )
1032+ if fleet_model is None :
1033+ return None , None
10241034 master_instance_provisioning_data = get_fleet_master_instance_provisioning_data (
10251035 fleet_model = fleet_model ,
10261036 fleet_spec = fleet_spec ,
@@ -1037,7 +1047,7 @@ def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]:
10371047
10381048async def _refetch_cluster_master_fleet (
10391049 session : AsyncSession , fleet_model : FleetModel
1040- ) -> FleetModel :
1050+ ) -> Optional [ FleetModel ] :
10411051 res = await session .execute (
10421052 select (FleetModel )
10431053 .where (
@@ -1053,6 +1063,9 @@ async def _refetch_cluster_master_fleet(
10531063 )
10541064 empty_fleet_model = res .unique ().scalar ()
10551065 if empty_fleet_model is not None :
1066+ if empty_fleet_model .lock_expires_at is not None :
1067+ # Defer while a pipeline owns the empty cluster fleet.
1068+ return None
10561069 return empty_fleet_model
10571070
10581071 res = await session .execute (
0 commit comments