Skip to content

Commit 3043970

Browse files
committed
Do not lock non-empty fleets
1 parent 38d192a commit 3043970

2 files changed

Lines changed: 82 additions & 3 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,6 +1655,7 @@ def _should_lock_related_cluster_master_fleet(context: _SubmittedJobContext) ->
16551655
is_master_job(context.job)
16561656
and context.fleet_model is not None
16571657
and _get_cluster_fleet_spec(context.fleet_model) is not None
1658+
and len(context.fleet_model.instances) == 0
16581659
)
16591660

16601661

src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,12 @@ async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_f
438438
fleet=fleet,
439439
)
440440
job = await create_job(session=session, run=run, instance_assigned=True)
441+
fleet_lock_expires_at = get_current_datetime() + timedelta(minutes=1)
442+
fleet_lock_token = uuid.uuid4()
443+
fleet.lock_expires_at = fleet_lock_expires_at
444+
fleet.lock_token = fleet_lock_token
445+
fleet.lock_owner = "OtherPipeline:cluster-master"
446+
await session.commit()
441447

442448
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
443449
backend_mock = Mock()
@@ -461,9 +467,81 @@ async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_f
461467
await session.refresh(job)
462468
await session.refresh(fleet)
463469
assert job.status == JobStatus.PROVISIONING
464-
assert fleet.lock_owner is None
465-
assert fleet.lock_token is None
466-
assert fleet.lock_expires_at is None
470+
assert fleet.lock_owner == "OtherPipeline:cluster-master"
471+
assert fleet.lock_token == fleet_lock_token
472+
assert fleet.lock_expires_at == fleet_lock_expires_at
473+
backend_mock.compute.return_value.run_job.assert_called_once()
474+
selected_offer = backend_mock.compute.return_value.run_job.call_args[0][2]
475+
assert selected_offer.region == "eu-west-1"
476+
477+
async def test_provisioning_non_master_job_ignores_cluster_master_fleet_lock(
478+
self, test_db, session: AsyncSession, worker: JobSubmittedWorker
479+
):
480+
project = await create_project(session=session)
481+
user = await create_user(session=session)
482+
repo = await create_repo(session=session, project_id=project.id)
483+
fleet_spec = get_fleet_spec()
484+
fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER
485+
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
486+
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
487+
configuration = TaskConfiguration(image="debian", nodes=2)
488+
run_spec = get_run_spec(run_name="run", repo_id=repo.name, configuration=configuration)
489+
run = await create_run(
490+
session=session,
491+
run_name="run",
492+
project=project,
493+
repo=repo,
494+
user=user,
495+
run_spec=run_spec,
496+
fleet=fleet,
497+
)
498+
await create_job(
499+
session=session,
500+
run=run,
501+
job_num=0,
502+
instance_assigned=True,
503+
job_provisioning_data=get_job_provisioning_data(region="eu-west-1"),
504+
waiting_master_job=False,
505+
)
506+
job = await create_job(
507+
session=session,
508+
run=run,
509+
job_num=1,
510+
instance_assigned=True,
511+
waiting_master_job=False,
512+
)
513+
fleet_lock_expires_at = get_current_datetime() + timedelta(minutes=1)
514+
fleet_lock_token = uuid.uuid4()
515+
fleet.lock_expires_at = fleet_lock_expires_at
516+
fleet.lock_token = fleet_lock_token
517+
fleet.lock_owner = "OtherPipeline:cluster-master"
518+
await session.commit()
519+
520+
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
521+
backend_mock = Mock()
522+
m.return_value = [backend_mock]
523+
backend_mock.TYPE = BackendType.AWS
524+
offer_1 = get_instance_offer_with_availability(
525+
backend=BackendType.AWS,
526+
region="eu-west-2",
527+
)
528+
offer_2 = get_instance_offer_with_availability(
529+
backend=BackendType.AWS,
530+
region="eu-west-1",
531+
)
532+
backend_mock.compute.return_value.get_offers.return_value = [offer_1, offer_2]
533+
backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data(
534+
backend=BackendType.AWS,
535+
)
536+
537+
await _process_job(session=session, worker=worker, job_model=job)
538+
539+
await session.refresh(job)
540+
await session.refresh(fleet)
541+
assert job.status == JobStatus.PROVISIONING
542+
assert fleet.lock_owner == "OtherPipeline:cluster-master"
543+
assert fleet.lock_token == fleet_lock_token
544+
assert fleet.lock_expires_at == fleet_lock_expires_at
467545
backend_mock.compute.return_value.run_job.assert_called_once()
468546
selected_offer = backend_mock.compute.return_value.run_job.call_args[0][2]
469547
assert selected_offer.region == "eu-west-1"

0 commit comments

Comments
 (0)