Skip to content

Commit a62c7af

Browse files
committed
Inline submitted-jobs offer loop
1 parent b5d32b9 commit a62c7af

1 file changed

Lines changed: 52 additions & 32 deletions

File tree

src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
JobRuntimeData,
5252
JobStatus,
5353
JobTerminationReason,
54+
Requirements,
5455
Run,
5556
)
5657
from dstack._internal.core.models.volumes import Volume
@@ -1074,24 +1075,18 @@ async def _provision_new_capacity(
10741075
and run only the master job in case there are no offers supporting cluster groups.
10751076
Other jobs should be provisioned one-by-one later.
10761077
"""
1078+
job = jobs[0]
10771079
if volumes is None:
10781080
volumes = []
1079-
job = jobs[0]
1080-
profile = run.run_spec.merged_profile
1081-
requirements = job.job_spec.requirements
1082-
if fleet_model is not None:
1083-
fleet_spec = get_fleet_spec(fleet_model)
1084-
try:
1085-
check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec)
1086-
profile, requirements = get_run_profile_and_requirements_in_fleet(
1087-
job=job,
1088-
run_spec=run.run_spec,
1089-
fleet_spec=fleet_spec,
1090-
)
1091-
except ValueError as e:
1092-
logger.debug("%s: %s", fmt(job_model), e.args[0])
1093-
return None
1094-
# TODO: Respect fleet provisioning properties such as tags
1081+
effective_profile_and_requirements = _get_effective_profile_and_requirements(
1082+
job_model=job_model,
1083+
run=run,
1084+
job=job,
1085+
fleet_model=fleet_model,
1086+
)
1087+
if effective_profile_and_requirements is None:
1088+
return None
1089+
profile, requirements = effective_profile_and_requirements
10951090

10961091
# The placement group is determined when provisioning the master instance
10971092
# and used for all other instances in the fleet.
@@ -1175,22 +1170,21 @@ async def _provision_new_capacity(
11751170
offer=offer,
11761171
effective_profile=profile,
11771172
)
1178-
else:
1179-
jpd = await common_utils.run_async(
1180-
compute.run_job,
1181-
run,
1182-
job,
1183-
offer,
1184-
project_ssh_public_key,
1185-
project_ssh_private_key,
1186-
offer_volumes,
1187-
placement_group_model_to_placement_group_optional(placement_group_model),
1188-
)
1189-
return _ProvisionNewCapacityResult(
1190-
provisioning_data=jpd,
1191-
offer=offer,
1192-
effective_profile=profile,
1193-
)
1173+
jpd = await common_utils.run_async(
1174+
compute.run_job,
1175+
run,
1176+
job,
1177+
offer,
1178+
project_ssh_public_key,
1179+
project_ssh_private_key,
1180+
offer_volumes,
1181+
placement_group_model_to_placement_group_optional(placement_group_model),
1182+
)
1183+
return _ProvisionNewCapacityResult(
1184+
provisioning_data=jpd,
1185+
offer=offer,
1186+
effective_profile=profile,
1187+
)
11941188
except BackendError as e:
11951189
logger.warning(
11961190
"%s: %s launch in %s/%s failed: %s",
@@ -1219,6 +1213,32 @@ async def _provision_new_capacity(
12191213
return None
12201214

12211215

1216+
def _get_effective_profile_and_requirements(
1217+
job_model: JobModel,
1218+
run: Run,
1219+
job: Job,
1220+
fleet_model: Optional[FleetModel],
1221+
) -> Optional[tuple[Profile, Requirements]]:
1222+
effective_profile = run.run_spec.merged_profile
1223+
requirements = job.job_spec.requirements
1224+
if fleet_model is None:
1225+
return effective_profile, requirements
1226+
1227+
fleet_spec = get_fleet_spec(fleet_model)
1228+
try:
1229+
check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec)
1230+
effective_profile, requirements = get_run_profile_and_requirements_in_fleet(
1231+
job=job,
1232+
run_spec=run.run_spec,
1233+
fleet_spec=fleet_spec,
1234+
)
1235+
except ValueError as e:
1236+
logger.debug("%s: %s", fmt(job_model), e.args[0])
1237+
return None
1238+
# TODO: Respect fleet provisioning properties such as tags
1239+
return effective_profile, requirements
1240+
1241+
12221242
async def _create_fleet_model_for_job(
12231243
exit_stack: AsyncExitStack,
12241244
session: AsyncSession,

0 commit comments

Comments
 (0)