Skip to content

Commit 8e15164

Browse files
committed
Clean up submitted-jobs minor issues
1 parent 1505c08 commit 8e15164

1 file changed

Lines changed: 11 additions & 13 deletions

File tree

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from contextlib import AsyncExitStack
55
from dataclasses import dataclass
66
from datetime import datetime, timedelta
7-
from typing import List, Optional, Union
7+
from typing import Optional, Union
88

99
from sqlalchemy import exists, func, select
1010
from sqlalchemy.ext.asyncio import AsyncSession
@@ -696,11 +696,12 @@ async def _materialize_newly_provisioned_capacity(
696696
job_provisioning_datas,
697697
compute_group_model,
698698
) = _resolve_provisioned_jobs_and_data(
699-
session=session,
700699
context=context,
701700
fleet_model=fleet_model,
702701
provisioning_data=provision_new_capacity_result.provisioning_data,
703702
)
703+
if compute_group_model is not None:
704+
session.add(compute_group_model)
704705

705706
instance_models = await _create_instance_models_for_provisioned_jobs(
706707
session=session,
@@ -725,7 +726,6 @@ async def _materialize_newly_provisioned_capacity(
725726

726727

727728
def _resolve_provisioned_jobs_and_data(
728-
session: AsyncSession,
729729
context: _SubmittedJobContext,
730730
fleet_model: FleetModel,
731731
provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData],
@@ -738,7 +738,6 @@ def _resolve_provisioned_jobs_and_data(
738738
status=ComputeGroupStatus.RUNNING,
739739
provisioning_data=provisioning_data.json(),
740740
)
741-
session.add(compute_group_model)
742741
return (
743742
context.jobs_to_provision,
744743
provisioning_data.job_provisioning_datas,
@@ -914,6 +913,8 @@ async def _attach_job_volumes_if_needed(
914913
return
915914

916915
volume_models = prepared_job_volumes.volume_models
916+
if len(volume_models) == 0:
917+
return
917918
volumes_ids = sorted([v.id for vs in volume_models for v in vs])
918919
# Take lock to prevent attaching volumes that are to be deleted.
919920
# If the volume was deleted before the lock, the volume will fail to attach and the job will fail.
@@ -928,8 +929,6 @@ async def _attach_job_volumes_if_needed(
928929
await exit_stack.enter_async_context(
929930
get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids)
930931
)
931-
if len(volume_models) == 0:
932-
return
933932
assert len(provisioning_phase_result.instance_models) == 1
934933
await _attach_volumes(
935934
session=session,
@@ -1071,11 +1070,11 @@ async def _assign_existing_instance_to_job(
10711070
job_model: JobModel,
10721071
instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]],
10731072
multinode: bool,
1074-
) -> Optional[InstanceModel]:
1073+
) -> None:
10751074
job_model.fleet = fleet_model
10761075
job_model.instance_assigned = True
10771076
if len(instances_with_offers) == 0:
1078-
return None
1077+
return
10791078

10801079
instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0)
10811080
instance, offer = instances_with_offers[0]
@@ -1105,7 +1104,6 @@ async def _assign_existing_instance_to_job(
11051104
events.Target.from_model(instance),
11061105
],
11071106
)
1108-
return instance
11091107

11101108

11111109
def _select_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]:
@@ -1366,9 +1364,9 @@ async def _create_fleet_model_for_job(
13661364

13671365

13681366
def _get_offer_volumes(
1369-
volumes: List[List[Volume]],
1367+
volumes: list[list[Volume]],
13701368
offer: InstanceOfferWithAvailability,
1371-
) -> List[Volume]:
1369+
) -> list[Volume]:
13721370
"""
13731371
Returns volumes suitable for the offer for each mount point.
13741372
"""
@@ -1379,7 +1377,7 @@ def _get_offer_volumes(
13791377

13801378

13811379
def _get_offer_mount_point_volume(
1382-
volumes: List[Volume],
1380+
volumes: list[Volume],
13831381
offer: InstanceOfferWithAvailability,
13841382
) -> Volume:
13851383
"""
@@ -1400,7 +1398,7 @@ async def _attach_volumes(
14001398
project: ProjectModel,
14011399
job_model: JobModel,
14021400
instance: InstanceModel,
1403-
volume_models: List[List[VolumeModel]],
1401+
volume_models: list[list[VolumeModel]],
14041402
):
14051403
job_provisioning_data = common_utils.get_or_error(get_instance_provisioning_data(instance))
14061404
backend = await get_project_backend_by_type_or_error(

0 commit comments

Comments
 (0)