Skip to content

Commit b29118f

Browse files
committed
Reorder submitted-jobs helpers
1 parent a62c7af commit b29118f

1 file changed

Lines changed: 90 additions & 90 deletions

File tree

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 90 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,18 @@ async def _load_submitted_job_context(
370370
)
371371

372372

373+
def _get_job_models_for_jobs(
374+
job_models: list[JobModel],
375+
jobs: list[Job],
376+
) -> list[JobModel]:
377+
"""
378+
Returns job models of latest submissions for a list of jobs.
379+
Preserves jobs order.
380+
"""
381+
id_to_job_model_map = {jm.id: jm for jm in job_models}
382+
return [id_to_job_model_map[j.job_submissions[-1].id] for j in jobs]
383+
384+
373385
async def _resolve_master_job_dependency(
374386
session: AsyncSession,
375387
job_model: JobModel,
@@ -799,6 +811,84 @@ async def _create_instance_models_for_provisioned_jobs(
799811
return instance_models
800812

801813

814+
async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetModel) -> set[int]:
815+
res = await session.execute(
816+
select(InstanceModel.instance_num).where(
817+
InstanceModel.fleet_id == fleet_model.id,
818+
InstanceModel.deleted.is_(False),
819+
)
820+
)
821+
return set(res.scalars().all())
822+
823+
824+
def _create_instance_model_for_job(
825+
project: ProjectModel,
826+
fleet_model: FleetModel,
827+
compute_group_model: Optional[ComputeGroupModel],
828+
job_model: JobModel,
829+
job_provisioning_data: JobProvisioningData,
830+
offer: InstanceOfferWithAvailability,
831+
instance_num: int,
832+
profile: Profile,
833+
) -> InstanceModel:
834+
if not job_provisioning_data.dockerized:
835+
# terminate vastai/k8s instances immediately
836+
termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE
837+
termination_idle_time = 0
838+
else:
839+
termination_policy, termination_idle_time = get_termination(
840+
profile, DEFAULT_RUN_TERMINATION_IDLE_TIME
841+
)
842+
instance = InstanceModel(
843+
id=uuid.uuid4(),
844+
name=f"{fleet_model.name}-{instance_num}",
845+
instance_num=instance_num,
846+
project=project,
847+
fleet=fleet_model,
848+
compute_group=compute_group_model,
849+
created_at=common_utils.get_current_datetime(),
850+
started_at=common_utils.get_current_datetime(),
851+
status=InstanceStatus.PROVISIONING,
852+
unreachable=False,
853+
job_provisioning_data=job_provisioning_data.json(),
854+
offer=offer.json(),
855+
termination_policy=termination_policy,
856+
termination_idle_time=termination_idle_time,
857+
jobs=[job_model],
858+
backend=offer.backend,
859+
price=offer.price,
860+
region=offer.region,
861+
volume_attachments=[],
862+
total_blocks=1,
863+
busy_blocks=1,
864+
)
865+
return instance
866+
867+
868+
def _prepare_job_runtime_data(
869+
offer: InstanceOfferWithAvailability, multinode: bool
870+
) -> JobRuntimeData:
871+
if offer.blocks == offer.total_blocks:
872+
if settings.JOB_NETWORK_MODE == settings.JobNetworkMode.FORCED_BRIDGE:
873+
network_mode = NetworkMode.BRIDGE
874+
elif settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_WHEN_POSSIBLE:
875+
network_mode = NetworkMode.HOST
876+
else:
877+
assert settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_FOR_MULTINODE_ONLY
878+
network_mode = NetworkMode.HOST if multinode else NetworkMode.BRIDGE
879+
return JobRuntimeData(
880+
network_mode=network_mode,
881+
offer=offer,
882+
)
883+
return JobRuntimeData(
884+
network_mode=NetworkMode.BRIDGE,
885+
offer=offer,
886+
cpu=offer.instance.resources.cpus,
887+
gpu=len(offer.instance.resources.gpus),
888+
memory=Memory(offer.instance.resources.memory_mib / 1024),
889+
)
890+
891+
802892
async def _finalize_submitted_job_processing(
803893
exit_stack: AsyncExitStack,
804894
session: AsyncSession,
@@ -1286,84 +1376,6 @@ async def _create_fleet_model_for_job(
12861376
return fleet_model
12871377

12881378

1289-
async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetModel) -> set[int]:
1290-
res = await session.execute(
1291-
select(InstanceModel.instance_num).where(
1292-
InstanceModel.fleet_id == fleet_model.id,
1293-
InstanceModel.deleted.is_(False),
1294-
)
1295-
)
1296-
return set(res.scalars().all())
1297-
1298-
1299-
def _create_instance_model_for_job(
1300-
project: ProjectModel,
1301-
fleet_model: FleetModel,
1302-
compute_group_model: Optional[ComputeGroupModel],
1303-
job_model: JobModel,
1304-
job_provisioning_data: JobProvisioningData,
1305-
offer: InstanceOfferWithAvailability,
1306-
instance_num: int,
1307-
profile: Profile,
1308-
) -> InstanceModel:
1309-
if not job_provisioning_data.dockerized:
1310-
# terminate vastai/k8s instances immediately
1311-
termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE
1312-
termination_idle_time = 0
1313-
else:
1314-
termination_policy, termination_idle_time = get_termination(
1315-
profile, DEFAULT_RUN_TERMINATION_IDLE_TIME
1316-
)
1317-
instance = InstanceModel(
1318-
id=uuid.uuid4(),
1319-
name=f"{fleet_model.name}-{instance_num}",
1320-
instance_num=instance_num,
1321-
project=project,
1322-
fleet=fleet_model,
1323-
compute_group=compute_group_model,
1324-
created_at=common_utils.get_current_datetime(),
1325-
started_at=common_utils.get_current_datetime(),
1326-
status=InstanceStatus.PROVISIONING,
1327-
unreachable=False,
1328-
job_provisioning_data=job_provisioning_data.json(),
1329-
offer=offer.json(),
1330-
termination_policy=termination_policy,
1331-
termination_idle_time=termination_idle_time,
1332-
jobs=[job_model],
1333-
backend=offer.backend,
1334-
price=offer.price,
1335-
region=offer.region,
1336-
volume_attachments=[],
1337-
total_blocks=1,
1338-
busy_blocks=1,
1339-
)
1340-
return instance
1341-
1342-
1343-
def _prepare_job_runtime_data(
1344-
offer: InstanceOfferWithAvailability, multinode: bool
1345-
) -> JobRuntimeData:
1346-
if offer.blocks == offer.total_blocks:
1347-
if settings.JOB_NETWORK_MODE == settings.JobNetworkMode.FORCED_BRIDGE:
1348-
network_mode = NetworkMode.BRIDGE
1349-
elif settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_WHEN_POSSIBLE:
1350-
network_mode = NetworkMode.HOST
1351-
else:
1352-
assert settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_FOR_MULTINODE_ONLY
1353-
network_mode = NetworkMode.HOST if multinode else NetworkMode.BRIDGE
1354-
return JobRuntimeData(
1355-
network_mode=network_mode,
1356-
offer=offer,
1357-
)
1358-
return JobRuntimeData(
1359-
network_mode=NetworkMode.BRIDGE,
1360-
offer=offer,
1361-
cpu=offer.instance.resources.cpus,
1362-
gpu=len(offer.instance.resources.gpus),
1363-
memory=Memory(offer.instance.resources.memory_mib / 1024),
1364-
)
1365-
1366-
13671379
def _get_offer_volumes(
13681380
volumes: List[List[Volume]],
13691381
offer: InstanceOfferWithAvailability,
@@ -1480,15 +1492,3 @@ async def _attach_volume(
14801492
instance.volume_attachments.append(volume_attachment_model)
14811493

14821494
volume_model.last_job_processed_at = common_utils.get_current_datetime()
1483-
1484-
1485-
def _get_job_models_for_jobs(
1486-
job_models: list[JobModel],
1487-
jobs: list[Job],
1488-
) -> list[JobModel]:
1489-
"""
1490-
Returns job models of latest submissions for a list of jobs.
1491-
Preserves jobs order.
1492-
"""
1493-
id_to_job_model_map = {jm.id: jm for jm in job_models}
1494-
return [id_to_job_model_map[j.job_submissions[-1].id] for j in jobs]

0 commit comments

Comments
 (0)