Skip to content

Commit cdf0717

Browse files
committed
More fixes
1 parent ce97b81 commit cdf0717

3 files changed

Lines changed: 54 additions & 46 deletions

File tree

src/dstack/_internal/core/backends/base/compute.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ def run_job(
178178
)
179179
instance_offer = instance_offer.copy()
180180
self._restrict_instance_offer_az_to_volumes_az(instance_offer, volumes)
181-
return self.create_instance(instance_offer, instance_config)
181+
return self.create_instance(instance_offer, instance_config, placement_group=None)
182182

183183
def _restrict_instance_offer_az_to_volumes_az(
184184
self,

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@
8383
from dstack._internal.server.services.fleets import (
8484
fleet_model_to_fleet,
8585
get_create_instance_offers,
86-
get_fleet_spec,
8786
)
8887
from dstack._internal.server.services.instances import (
8988
get_instance_configuration,
@@ -574,10 +573,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
574573
_is_fleet_master_instance(instance)
575574
and instance_offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT
576575
and instance.fleet
577-
and (
578-
get_fleet_spec(instance.fleet).configuration.placement
579-
== InstanceGroupPlacement.CLUSTER
580-
)
576+
and _is_cloud_cluster(instance.fleet)
581577
):
582578
assert isinstance(compute, ComputeWithPlacementGroupSupport)
583579
placement_group_model = _find_suitable_placement_group(
@@ -666,37 +662,32 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
666662
instance.last_retry_at = get_current_datetime()
667663

668664
if not should_retry:
669-
instance.status = InstanceStatus.TERMINATED
670-
instance.termination_reason = "All offers failed" if offers else "No offers found"
671-
logger.info(
672-
"Terminated instance %s: %s",
673-
instance.name,
674-
instance.termination_reason,
675-
extra={
676-
"instance_name": instance.name,
677-
"instance_status": InstanceStatus.TERMINATED.value,
678-
},
679-
)
680-
if instance.fleet and _is_fleet_master_instance(instance):
665+
_mark_terminated(instance, "All offers failed" if offers else "No offers found")
666+
if (
667+
instance.fleet
668+
and _is_fleet_master_instance(instance)
669+
and _is_cloud_cluster(instance.fleet)
670+
):
681671
# Do not attempt to deploy other instances, as they won't determine the correct cluster
682672
# backend, region, and placement group without a successfully deployed master instance
683-
# FIXME(critical): this should only apply to placement: cluster
684673
for sibling_instance in instance.fleet.instances:
685674
if sibling_instance.id == instance.id:
686675
continue
687-
if sibling_instance.status == InstanceStatus.PENDING:
688-
sibling_instance.status = InstanceStatus.TERMINATED
689-
else:
690-
logger.error(
691-
"Instance %s has unexpected status %s."
692-
" Should have been %s, as master instance %s has not been provisioned",
693-
sibling_instance.name,
694-
sibling_instance.status.value,
695-
InstanceStatus.PENDING.value,
696-
instance.name,
697-
)
698-
sibling_instance.status = InstanceStatus.TERMINATING
699-
sibling_instance.termination_reason = "Master instance failed to start"
676+
_mark_terminated(sibling_instance, "Master instance failed to start")
677+
678+
679+
def _mark_terminated(instance: InstanceModel, termination_reason: str) -> None:
680+
instance.status = InstanceStatus.TERMINATED
681+
instance.termination_reason = termination_reason
682+
logger.info(
683+
"Terminated instance %s: %s",
684+
instance.name,
685+
instance.termination_reason,
686+
extra={
687+
"instance_name": instance.name,
688+
"instance_status": InstanceStatus.TERMINATED.value,
689+
},
690+
)
700691

701692

702693
async def _check_instance(instance: InstanceModel) -> None:
@@ -980,17 +971,21 @@ def _need_to_wait_fleet_provisioning(instance: InstanceModel) -> bool:
980971
or instance.fleet.instances[0].status == InstanceStatus.TERMINATED
981972
):
982973
return False
983-
fleet = fleet_model_to_fleet(instance.fleet)
984-
return (
985-
fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
986-
and fleet.spec.configuration.ssh_config is None
987-
)
974+
return _is_cloud_cluster(instance.fleet)
988975

989976

990977
def _is_fleet_master_instance(instance: InstanceModel) -> bool:
991978
return instance.fleet is not None and instance.id == instance.fleet.instances[0].id
992979

993980

981+
def _is_cloud_cluster(fleet_model: FleetModel) -> bool:
982+
fleet = fleet_model_to_fleet(fleet_model)
983+
return (
984+
fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
985+
and fleet.spec.configuration.ssh_config is None
986+
)
987+
988+
994989
def _get_instance_offer_for_instance(
995990
instance_offer: InstanceOfferWithAvailability,
996991
instance: InstanceModel,

src/tests/_internal/server/background/tasks/test_process_instances.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -638,16 +638,32 @@ async def test_fails_if_no_offers(self, session: AsyncSession):
638638
assert instance.status == InstanceStatus.TERMINATED
639639
assert instance.termination_reason == "No offers found"
640640

641-
async def test_terminates_fleet_instances_if_master_instance_not_created(
642-
self, session: AsyncSession
641+
@pytest.mark.parametrize(
642+
("placement", "expected_termination_reasons"),
643+
[
644+
pytest.param(
645+
InstanceGroupPlacement.CLUSTER,
646+
{"No offers found": 1, "Master instance failed to start": 3},
647+
id="cluster",
648+
),
649+
pytest.param(
650+
None,
651+
{"No offers found": 4},
652+
id="non-cluster",
653+
),
654+
],
655+
)
656+
async def test_terminates_cluster_instances_if_master_not_created(
657+
self,
658+
session: AsyncSession,
659+
placement: Optional[InstanceGroupPlacement],
660+
expected_termination_reasons: dict[str, int],
643661
):
644662
project = await create_project(session=session)
645663
fleet = await create_fleet(
646664
session,
647665
project,
648-
spec=get_fleet_spec(
649-
conf=get_fleet_configuration(placement=InstanceGroupPlacement.CLUSTER, nodes=4)
650-
),
666+
spec=get_fleet_spec(conf=get_fleet_configuration(placement=placement, nodes=4)),
651667
)
652668
instances = [
653669
await create_instance(
@@ -672,10 +688,7 @@ async def test_terminates_fleet_instances_if_master_instance_not_created(
672688
await session.refresh(instance)
673689
assert instance.status == InstanceStatus.TERMINATED
674690
termination_reasons[instance.termination_reason] += 1
675-
assert termination_reasons == {
676-
"No offers found": 1,
677-
"Master instance failed to start": 3,
678-
}
691+
assert termination_reasons == expected_termination_reasons
679692

680693

681694
@pytest.mark.asyncio

0 commit comments

Comments
 (0)