Skip to content

Commit 1505c08

Browse files
committed
Move submitted-jobs provisioning inputs to context
1 parent 8a10fc7 commit 1505c08

1 file changed

Lines changed: 5 additions & 17 deletions

File tree

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class _SubmittedJobContext:
225225
project: ProjectModel
226226
run: Run
227227
job: Job
228+
jobs_to_provision: list[Job]
228229
replica_jobs: list[Job]
229230
replica_job_models: list[JobModel]
230231
fleet_model: Optional[FleetModel]
@@ -244,7 +245,6 @@ class _PreparedJobVolumes:
244245

245246
@dataclass
246247
class _ProvisioningPhaseResult:
247-
jobs_to_provision: list[Job]
248248
instance_models: list[InstanceModel]
249249
compute_group_model: Optional[ComputeGroupModel]
250250

@@ -360,6 +360,7 @@ async def _load_submitted_job_context(
360360
project=run_model.project,
361361
run=run,
362362
job=job,
363+
jobs_to_provision=_select_jobs_to_provision(job, replica_jobs, job_model),
363364
replica_jobs=replica_jobs,
364365
replica_job_models=_get_job_models_for_jobs(run_model.jobs, replica_jobs),
365366
# Master job chooses fleet for the run.
@@ -572,14 +573,10 @@ async def _process_provisioning_phase(
572573
master_job_provisioning_data: Optional[JobProvisioningData],
573574
volumes: list[list[Volume]],
574575
) -> Optional[_ProvisioningPhaseResult]:
575-
jobs_to_provision = _select_jobs_to_provision(
576-
context.job, context.replica_jobs, context.job_model
577-
)
578576
if context.job_model.instance is not None:
579577
return await _process_existing_instance_provisioning_path(
580578
session=session,
581579
job_model=context.job_model,
582-
jobs_to_provision=jobs_to_provision,
583580
)
584581

585582
if context.run.run_spec.merged_profile.creation_policy == CreationPolicy.REUSE:
@@ -596,7 +593,6 @@ async def _process_provisioning_phase(
596593
exit_stack=exit_stack,
597594
session=session,
598595
context=context,
599-
jobs_to_provision=jobs_to_provision,
600596
master_job_provisioning_data=master_job_provisioning_data,
601597
volumes=volumes,
602598
)
@@ -605,7 +601,6 @@ async def _process_provisioning_phase(
605601
async def _process_existing_instance_provisioning_path(
606602
session: AsyncSession,
607603
job_model: JobModel,
608-
jobs_to_provision: list[Job],
609604
) -> _ProvisioningPhaseResult:
610605
assert job_model.instance is not None
611606
res = await session.execute(
@@ -617,7 +612,6 @@ async def _process_existing_instance_provisioning_path(
617612
instance = res.unique().scalar_one()
618613
switch_job_status(session, job_model, JobStatus.PROVISIONING)
619614
return _ProvisioningPhaseResult(
620-
jobs_to_provision=jobs_to_provision,
621615
instance_models=[instance],
622616
compute_group_model=None,
623617
)
@@ -627,7 +621,6 @@ async def _process_new_capacity_provisioning_path(
627621
exit_stack: AsyncExitStack,
628622
session: AsyncSession,
629623
context: _SubmittedJobContext,
630-
jobs_to_provision: list[Job],
631624
master_job_provisioning_data: Optional[JobProvisioningData],
632625
volumes: list[list[Volume]],
633626
) -> Optional[_ProvisioningPhaseResult]:
@@ -650,7 +643,7 @@ async def _process_new_capacity_provisioning_path(
650643
fleet_model=fleet_model,
651644
job_model=context.job_model,
652645
run=context.run,
653-
jobs=jobs_to_provision,
646+
jobs=context.jobs_to_provision,
654647
project_ssh_public_key=context.project.ssh_public_key,
655648
project_ssh_private_key=context.project.ssh_private_key,
656649
master_job_provisioning_data=master_provisioning_data,
@@ -687,7 +680,6 @@ async def _process_new_capacity_provisioning_path(
687680
return await _materialize_newly_provisioned_capacity(
688681
session=session,
689682
context=context,
690-
jobs_to_provision=jobs_to_provision,
691683
fleet_model=fleet_model,
692684
provision_new_capacity_result=provision_new_capacity_result,
693685
)
@@ -696,7 +688,6 @@ async def _process_new_capacity_provisioning_path(
696688
async def _materialize_newly_provisioned_capacity(
697689
session: AsyncSession,
698690
context: _SubmittedJobContext,
699-
jobs_to_provision: list[Job],
700691
fleet_model: FleetModel,
701692
provision_new_capacity_result: _ProvisionNewCapacityResult,
702693
) -> _ProvisioningPhaseResult:
@@ -707,7 +698,6 @@ async def _materialize_newly_provisioned_capacity(
707698
) = _resolve_provisioned_jobs_and_data(
708699
session=session,
709700
context=context,
710-
jobs_to_provision=jobs_to_provision,
711701
fleet_model=fleet_model,
712702
provisioning_data=provision_new_capacity_result.provisioning_data,
713703
)
@@ -729,7 +719,6 @@ async def _materialize_newly_provisioned_capacity(
729719
len(provisioned_jobs),
730720
)
731721
return _ProvisioningPhaseResult(
732-
jobs_to_provision=jobs_to_provision,
733722
instance_models=instance_models,
734723
compute_group_model=compute_group_model,
735724
)
@@ -738,7 +727,6 @@ async def _materialize_newly_provisioned_capacity(
738727
def _resolve_provisioned_jobs_and_data(
739728
session: AsyncSession,
740729
context: _SubmittedJobContext,
741-
jobs_to_provision: list[Job],
742730
fleet_model: FleetModel,
743731
provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData],
744732
) -> tuple[list[Job], list[JobProvisioningData], Optional[ComputeGroupModel]]:
@@ -752,7 +740,7 @@ def _resolve_provisioned_jobs_and_data(
752740
)
753741
session.add(compute_group_model)
754742
return (
755-
jobs_to_provision,
743+
context.jobs_to_provision,
756744
provisioning_data.job_provisioning_datas,
757745
compute_group_model,
758746
)
@@ -900,7 +888,7 @@ async def _finalize_submitted_job_processing(
900888
_release_replica_jobs_from_master_wait(
901889
context.job_model,
902890
replica_job_models=context.replica_job_models,
903-
jobs_to_provision=provisioning_phase_result.jobs_to_provision,
891+
jobs_to_provision=context.jobs_to_provision,
904892
)
905893

906894
await _attach_job_volumes_if_needed(

0 commit comments

Comments
 (0)