Skip to content

Commit 6a2ccce

Browse files
committed
Make JobSubmittedPipeline to wait for master election
1 parent 3cd11ac commit 6a2ccce

File tree

7 files changed

+189
-8
lines changed

7 files changed

+189
-8
lines changed

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,10 @@ async def process(self, item: JobSubmittedPipelineItem):
319319
if context.job_model.instance_assigned:
320320
logger.debug("%s: provisioning has started", fmt(context.job_model))
321321
provisioning = await _process_provisioning(item=item, context=context)
322+
_hint_pipelines_fetch(
323+
pipeline_hinter=self._pipeline_hinter,
324+
result=provisioning,
325+
)
322326
await _apply_provisioning_result(
323327
item=item,
324328
provisioning=provisioning,
@@ -327,6 +331,10 @@ async def process(self, item: JobSubmittedPipelineItem):
327331

328332
logger.debug("%s: assignment has started", fmt(context.job_model))
329333
assignment = await _process_assignment(context=context)
334+
_hint_pipelines_fetch(
335+
pipeline_hinter=self._pipeline_hinter,
336+
result=assignment,
337+
)
330338
await _apply_assignment_result(
331339
item=item,
332340
context=context,
@@ -365,6 +373,7 @@ class _DeferSubmittedJobResult:
365373
"""The job is not ready yet, so apply should just mark it processed and unlock it."""
366374

367375
log_message: str
376+
hint_fleet_pipeline: bool = False
368377

369378

370379
@dataclass
@@ -1179,6 +1188,17 @@ async def _process_new_capacity_provisioning(
11791188
job=context.job,
11801189
)
11811190
)
1191+
if (
1192+
is_master_job(context.job)
1193+
and fleet_model is not None
1194+
and _get_cluster_fleet_spec(fleet_model) is not None
1195+
and any(not instance.deleted for instance in fleet_model.instances)
1196+
and master_provisioning_data is None
1197+
):
1198+
return _DeferSubmittedJobResult(
1199+
log_message="waiting for fleet master instance election",
1200+
hint_fleet_pipeline=True,
1201+
)
11821202
provision_new_capacity_result = await _provision_new_capacity(
11831203
project=context.project,
11841204
fleet_model=fleet_model,
@@ -1834,6 +1854,17 @@ def _get_fleet_master_provisioning_data(
18341854
)
18351855

18361856

1857+
def _hint_pipelines_fetch(
1858+
pipeline_hinter: PipelineHinterProtocol,
1859+
result: Union[_AssignmentResult, _ProvisioningResult],
1860+
) -> None:
1861+
if not isinstance(result, _DeferSubmittedJobResult):
1862+
return
1863+
1864+
if result.hint_fleet_pipeline:
1865+
pipeline_hinter.hint_fetch(FleetModel.__name__)
1866+
1867+
18371868
def _select_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]:
18381869
jobs_to_provision = [job]
18391870
if is_multinode_job(job) and is_master_job(job) and job_model.waiting_master_job is not None:

src/dstack/_internal/server/services/fleets.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -961,12 +961,6 @@ def get_fleet_master_instance_provisioning_data(
961961
instance_model.job_provisioning_data
962962
)
963963

964-
# TODO: Drop the legacy instance-list fallback after scheduled tasks stop
965-
# inferring cluster masters from loaded fleet instances.
966-
for instance_model in fleet_model.instances:
967-
if not instance_model.deleted and instance_model.job_provisioning_data is not None:
968-
return JobProvisioningData.__response__.parse_raw(instance_model.job_provisioning_data)
969-
970964
return None
971965

972966

src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
)
3232
from dstack._internal.server.models import (
3333
ComputeGroupModel,
34+
FleetModel,
3435
InstanceModel,
3536
JobModel,
3637
PlacementGroupModel,
@@ -431,14 +432,15 @@ async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_f
431432
fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER
432433
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
433434
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
434-
await create_instance(
435+
instance = await create_instance(
435436
session=session,
436437
project=project,
437438
fleet=fleet,
438439
status=InstanceStatus.BUSY,
439440
backend=BackendType.AWS,
440441
job_provisioning_data=get_job_provisioning_data(region="eu-west-1"),
441442
)
443+
fleet.current_master_instance_id = instance.id
442444
configuration = TaskConfiguration(image="debian", nodes=2)
443445
run_spec = get_run_spec(run_name="run", repo_id=repo.name, configuration=configuration)
444446
run = await create_run(
@@ -487,6 +489,57 @@ async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_f
487489
selected_offer = backend_mock.compute.return_value.run_job.call_args[0][2]
488490
assert selected_offer.region == "eu-west-1"
489491

492+
async def test_defers_new_capacity_provisioning_until_fleet_master_is_elected(
493+
self, test_db, session: AsyncSession, worker: JobSubmittedWorker
494+
):
495+
project = await create_project(session=session)
496+
user = await create_user(session=session)
497+
repo = await create_repo(session=session, project_id=project.id)
498+
fleet_spec = get_fleet_spec()
499+
fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER
500+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
501+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
502+
await create_instance(
503+
session=session,
504+
project=project,
505+
fleet=fleet,
506+
status=InstanceStatus.BUSY,
507+
backend=BackendType.AWS,
508+
job_provisioning_data=get_job_provisioning_data(region="eu-west-1"),
509+
)
510+
configuration = TaskConfiguration(image="debian", nodes=2)
511+
run_spec = get_run_spec(run_name="run", repo_id=repo.name, configuration=configuration)
512+
run = await create_run(
513+
session=session,
514+
run_name="run",
515+
project=project,
516+
repo=repo,
517+
user=user,
518+
run_spec=run_spec,
519+
fleet=fleet,
520+
)
521+
job = await create_job(
522+
session=session,
523+
run=run,
524+
instance_assigned=True,
525+
waiting_master_job=False,
526+
)
527+
previous_last_processed_at = job.last_processed_at
528+
529+
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
530+
await _process_job(session=session, worker=worker, job_model=job)
531+
m.assert_not_called()
532+
533+
await session.refresh(job)
534+
assert job.status == JobStatus.SUBMITTED
535+
assert job.instance_assigned
536+
assert job.instance is None
537+
assert job.last_processed_at > previous_last_processed_at
538+
assert job.lock_owner is None
539+
assert job.lock_token is None
540+
assert job.lock_expires_at is None
541+
worker._pipeline_hinter.hint_fetch.assert_called_once_with(FleetModel.__name__)
542+
490543
async def test_provisioning_non_master_job_ignores_cluster_master_fleet_lock(
491544
self, test_db, session: AsyncSession, worker: JobSubmittedWorker
492545
):

src/tests/_internal/server/services/runs/__init__.py

Whitespace-only changes.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from unittest.mock import AsyncMock
2+
3+
import pytest
4+
from sqlalchemy.ext.asyncio import AsyncSession
5+
6+
from dstack._internal.core.models.configurations import TaskConfiguration
7+
from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement
8+
from dstack._internal.server.services.jobs import get_jobs_from_run_spec
9+
from dstack._internal.server.services.runs.plan import _get_backend_offers_in_fleet
10+
from dstack._internal.server.testing.common import (
11+
create_fleet,
12+
create_instance,
13+
create_project,
14+
create_repo,
15+
create_user,
16+
get_fleet_spec,
17+
get_instance_offer_with_availability,
18+
get_job_provisioning_data,
19+
get_run_spec,
20+
)
21+
22+
pytestmark = pytest.mark.usefixtures("image_config_mock")
23+
24+
25+
class TestGetBackendOffersInFleet:
26+
@pytest.mark.asyncio
27+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
28+
async def test_keeps_unconstrained_offers_for_non_empty_cluster_fleet_without_elected_master(
29+
self, test_db, session: AsyncSession, monkeypatch: pytest.MonkeyPatch
30+
) -> None:
31+
user = await create_user(session=session)
32+
project = await create_project(session=session, owner=user)
33+
repo = await create_repo(session=session, project_id=project.id)
34+
fleet_spec = get_fleet_spec()
35+
fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER
36+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=1, max=2)
37+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
38+
await create_instance(
39+
session=session,
40+
project=project,
41+
fleet=fleet,
42+
job_provisioning_data=get_job_provisioning_data(region="eu-west-1"),
43+
)
44+
run_spec = get_run_spec(
45+
repo_id=repo.name,
46+
configuration=TaskConfiguration(image="debian", nodes=2),
47+
)
48+
jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0)
49+
get_offers_by_requirements_mock = AsyncMock()
50+
monkeypatch.setattr(
51+
"dstack._internal.server.services.runs.plan.get_offers_by_requirements",
52+
get_offers_by_requirements_mock,
53+
)
54+
offer = get_instance_offer_with_availability()
55+
backend = AsyncMock()
56+
get_offers_by_requirements_mock.return_value = [(backend, offer)]
57+
58+
offers = await _get_backend_offers_in_fleet(
59+
project=project,
60+
fleet_model=fleet,
61+
run_spec=run_spec,
62+
job=jobs[0],
63+
volumes=None,
64+
)
65+
66+
assert offers == [(backend, offer)]
67+
get_offers_by_requirements_mock.assert_awaited_once()
68+
assert (
69+
get_offers_by_requirements_mock.await_args.kwargs["master_job_provisioning_data"]
70+
is None
71+
)
File renamed without changes.

src/tests/_internal/server/services/test_fleets.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,26 @@
99
from dstack._internal.core.models.backends.base import BackendType
1010
from dstack._internal.core.models.fleets import (
1111
FleetConfiguration,
12+
FleetNodesSpec,
1213
FleetSpec,
14+
InstanceGroupPlacement,
1315
SSHHostParams,
1416
SSHParams,
1517
)
1618
from dstack._internal.core.models.instances import RemoteConnectionInfo
1719
from dstack._internal.server.models import FleetModel, ProjectModel
1820
from dstack._internal.server.services.backends import get_project_backends
19-
from dstack._internal.server.services.fleets import get_plan
21+
from dstack._internal.server.services.fleets import (
22+
get_fleet_master_instance_provisioning_data,
23+
get_plan,
24+
)
2025
from dstack._internal.server.testing.common import (
2126
create_fleet,
2227
create_instance,
2328
create_project,
2429
create_user,
2530
get_fleet_spec,
31+
get_job_provisioning_data,
2632
get_ssh_key,
2733
)
2834

@@ -171,3 +177,29 @@ async def test_error_fleet_spec_without_name(self, session: AsyncSession):
171177
await get_plan(
172178
session=session, project=project, user=user, spec=fleet_spec_without_name
173179
)
180+
181+
182+
class TestGetFleetMasterInstanceProvisioningData:
183+
@pytest.mark.asyncio
184+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
185+
async def test_returns_none_without_current_master_instance(
186+
self, test_db, session: AsyncSession
187+
) -> None:
188+
project = await create_project(session=session)
189+
fleet_spec = get_fleet_spec()
190+
fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER
191+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=1, max=2)
192+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
193+
await create_instance(
194+
session=session,
195+
project=project,
196+
fleet=fleet,
197+
job_provisioning_data=get_job_provisioning_data(region="eu-west-1"),
198+
)
199+
200+
master_provisioning_data = get_fleet_master_instance_provisioning_data(
201+
fleet_model=fleet,
202+
fleet_spec=fleet_spec,
203+
)
204+
205+
assert master_provisioning_data is None

0 commit comments

Comments
 (0)