Skip to content

Commit 0ac00c5

Browse files
committed
Extract submitted-jobs instance materialization
1 parent 2dbd44d commit 0ac00c5

1 file changed

Lines changed: 72 additions & 30 deletions

File tree

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -686,13 +686,49 @@ async def _materialize_newly_provisioned_capacity(
686686
fleet_model: FleetModel,
687687
provision_new_capacity_result: _ProvisionNewCapacityResult,
688688
) -> _ProvisioningPhaseResult:
689-
provisioning_data = provision_new_capacity_result.provisioning_data
690-
offer = provision_new_capacity_result.offer
691-
effective_profile = provision_new_capacity_result.effective_profile
692-
compute_group_model = None
689+
(
690+
provisioned_jobs,
691+
job_provisioning_datas,
692+
compute_group_model,
693+
) = _resolve_provisioned_jobs_and_data(
694+
session=session,
695+
context=context,
696+
jobs_to_provision=jobs_to_provision,
697+
fleet_model=fleet_model,
698+
provisioning_data=provision_new_capacity_result.provisioning_data,
699+
)
700+
701+
instance_models = await _create_instance_models_for_provisioned_jobs(
702+
session=session,
703+
context=context,
704+
fleet_model=fleet_model,
705+
compute_group_model=compute_group_model,
706+
provisioned_jobs=provisioned_jobs,
707+
job_provisioning_datas=job_provisioning_datas,
708+
offer=provision_new_capacity_result.offer,
709+
effective_profile=provision_new_capacity_result.effective_profile,
710+
)
711+
712+
logger.info(
713+
"%s: provisioned %s new instance(s)",
714+
fmt(context.job_model),
715+
len(provisioned_jobs),
716+
)
717+
return _ProvisioningPhaseResult(
718+
jobs_to_provision=jobs_to_provision,
719+
instance_models=instance_models,
720+
compute_group_model=compute_group_model,
721+
)
722+
723+
724+
def _resolve_provisioned_jobs_and_data(
725+
session: AsyncSession,
726+
context: _SubmittedJobContext,
727+
jobs_to_provision: list[Job],
728+
fleet_model: FleetModel,
729+
provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData],
730+
) -> tuple[list[Job], list[JobProvisioningData], Optional[ComputeGroupModel]]:
693731
if isinstance(provisioning_data, ComputeGroupProvisioningData):
694-
provisioned_jobs = jobs_to_provision
695-
jpds = provisioning_data.job_provisioning_datas
696732
compute_group_model = ComputeGroupModel(
697733
id=uuid.uuid4(),
698734
project=context.project,
@@ -701,59 +737,65 @@ async def _materialize_newly_provisioned_capacity(
701737
provisioning_data=provisioning_data.json(),
702738
)
703739
session.add(compute_group_model)
704-
else:
705-
provisioned_jobs = [context.job]
706-
jpds = [provisioning_data]
740+
return (
741+
jobs_to_provision,
742+
provisioning_data.job_provisioning_datas,
743+
compute_group_model,
744+
)
745+
return [context.job], [provisioning_data], None
707746

708-
logger.info(
709-
"%s: provisioned %s new instance(s)",
710-
fmt(context.job_model),
711-
len(provisioned_jobs),
712-
)
747+
748+
async def _create_instance_models_for_provisioned_jobs(
749+
session: AsyncSession,
750+
context: _SubmittedJobContext,
751+
fleet_model: FleetModel,
752+
compute_group_model: Optional[ComputeGroupModel],
753+
provisioned_jobs: list[Job],
754+
job_provisioning_datas: list[JobProvisioningData],
755+
offer: InstanceOfferWithAvailability,
756+
effective_profile: Profile,
757+
) -> list[InstanceModel]:
713758
provisioned_job_models = _get_job_models_for_jobs(context.run_model.jobs, provisioned_jobs)
714-
instances: list[InstanceModel] = []
759+
instance_models: list[InstanceModel] = []
715760
# FIXME: Fleet is not locked which may lead to duplicate instance_num.
716761
# This is currently hard to fix without locking the fleet for entire provisioning duration.
717762
# Processing should be done in multiple steps so that
718763
# InstanceModel is created before provisioning.
719764
taken_instance_nums = await _get_taken_instance_nums(session, fleet_model)
720-
for provisioned_job_model, jpd in zip(provisioned_job_models, jpds):
721-
provisioned_job_model.job_provisioning_data = jpd.json()
765+
for provisioned_job_model, job_provisioning_data in zip(
766+
provisioned_job_models, job_provisioning_datas
767+
):
768+
provisioned_job_model.job_provisioning_data = job_provisioning_data.json()
722769
switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING)
723770
instance_num = get_next_instance_num(taken_instance_nums)
724-
instance = _create_instance_model_for_job(
771+
instance_model = _create_instance_model_for_job(
725772
project=context.project,
726773
fleet_model=fleet_model,
727774
compute_group_model=compute_group_model,
728775
job_model=provisioned_job_model,
729-
job_provisioning_data=jpd,
776+
job_provisioning_data=job_provisioning_data,
730777
offer=offer,
731778
instance_num=instance_num,
732779
profile=effective_profile,
733780
)
734-
instances.append(instance)
781+
instance_models.append(instance_model)
735782
taken_instance_nums.add(instance_num)
736783
provisioned_job_model.job_runtime_data = _prepare_job_runtime_data(
737784
offer, context.multinode
738785
).json()
739-
session.add(instance)
786+
session.add(instance_model)
740787
events.emit(
741788
session,
742-
f"Instance created for job. Instance status: {instance.status.upper()}",
789+
f"Instance created for job. Instance status: {instance_model.status.upper()}",
743790
actor=events.SystemActor(),
744791
targets=[
745-
events.Target.from_model(instance),
792+
events.Target.from_model(instance_model),
746793
events.Target.from_model(provisioned_job_model),
747794
],
748795
)
749-
provisioned_job_model.used_instance_id = instance.id
796+
provisioned_job_model.used_instance_id = instance_model.id
750797
provisioned_job_model.last_processed_at = common_utils.get_current_datetime()
751-
752-
return _ProvisioningPhaseResult(
753-
jobs_to_provision=jobs_to_provision,
754-
instance_models=instances,
755-
compute_group_model=compute_group_model,
756-
)
798+
return instance_models
757799

758800

759801
async def _finalize_submitted_job_processing(

0 commit comments

Comments
 (0)