Skip to content

Commit dff9ef5

Browse files
committed
Process fleets using one session
1 parent 85d927e commit dff9ef5

2 files changed

Lines changed: 48 additions & 38 deletions

File tree

src/dstack/_internal/server/background/__init__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ def start_background_tasks() -> AsyncIOScheduler:
7979
process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1
8080
)
8181
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
82+
_scheduler.add_job(
83+
process_fleets,
84+
IntervalTrigger(seconds=10, jitter=2),
85+
kwargs={"batch_size": 10},
86+
max_instances=1,
87+
)
8288
for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR):
8389
# Add multiple copies of tasks if requested.
8490
# max_instances=1 for additional copies to avoid running too many tasks.
@@ -113,11 +119,5 @@ def start_background_tasks() -> AsyncIOScheduler:
113119
kwargs={"batch_size": 5},
114120
max_instances=2 if replica == 0 else 1,
115121
)
116-
_scheduler.add_job(
117-
process_fleets,
118-
IntervalTrigger(seconds=10, jitter=2),
119-
kwargs={"batch_size": 5},
120-
max_instances=2 if replica == 0 else 1,
121-
)
122122
_scheduler.start()
123123
return _scheduler
Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
1-
import asyncio
21
from datetime import timedelta
2+
from typing import List
33

4-
from sqlalchemy import select
4+
from sqlalchemy import select, update
55
from sqlalchemy.ext.asyncio import AsyncSession
6-
from sqlalchemy.orm import joinedload
6+
from sqlalchemy.orm import joinedload, load_only
77

88
from dstack._internal.core.models.fleets import FleetStatus
99
from dstack._internal.server.db import get_db, get_session_ctx
10-
from dstack._internal.server.models import FleetModel, InstanceModel, JobModel, RunModel
10+
from dstack._internal.server.models import (
11+
FleetModel,
12+
InstanceModel,
13+
JobModel,
14+
PlacementGroupModel,
15+
RunModel,
16+
)
1117
from dstack._internal.server.services.fleets import (
1218
is_fleet_empty,
1319
is_fleet_in_use,
1420
)
1521
from dstack._internal.server.services.locking import get_locker
16-
from dstack._internal.server.services.placement import schedule_fleet_placement_groups_deletion
1722
from dstack._internal.utils.common import get_current_datetime
1823
from dstack._internal.utils.logging import get_logger
1924

@@ -24,13 +29,6 @@
2429

2530

2631
async def process_fleets(batch_size: int = 1):
27-
tasks = []
28-
for _ in range(batch_size):
29-
tasks.append(_process_next_fleet())
30-
await asyncio.gather(*tasks)
31-
32-
33-
async def _process_next_fleet():
3432
lock, lockset = get_locker(get_db().dialect_name).get_lockset(FleetModel.__tablename__)
3533
async with get_session_ctx() as session:
3634
async with lock:
@@ -42,50 +40,62 @@ async def _process_next_fleet():
4240
FleetModel.last_processed_at
4341
< get_current_datetime() - MIN_PROCESSING_INTERVAL,
4442
)
43+
.options(load_only(FleetModel.id))
4544
.order_by(FleetModel.last_processed_at.asc())
46-
.limit(1)
45+
.limit(batch_size)
4746
.with_for_update(skip_locked=True, key_share=True)
4847
)
49-
fleet_model = res.scalar()
50-
if fleet_model is None:
51-
return
52-
lockset.add(fleet_model.id)
48+
fleet_models = list(res.scalars().all())
49+
fleet_ids = [fm.id for fm in fleet_models]
50+
for fleet_id in fleet_ids:
51+
lockset.add(fleet_id)
5352
try:
54-
fleet_model_id = fleet_model.id
55-
await _process_fleet(session=session, fleet_model=fleet_model)
53+
await _process_fleets(session=session, fleet_models=fleet_models)
5654
finally:
57-
lockset.difference_update([fleet_model_id])
55+
lockset.difference_update(fleet_ids)
5856

5957

60-
async def _process_fleet(session: AsyncSession, fleet_model: FleetModel):
61-
logger.debug("Processing fleet %s", fleet_model.name)
58+
async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel]):
59+
fleet_ids = [fm.id for fm in fleet_models]
6260
# Refetch to load related attributes.
6361
res = await session.execute(
6462
select(FleetModel)
65-
.where(FleetModel.id == fleet_model.id)
63+
.where(FleetModel.id.in_(fleet_ids))
6664
.options(joinedload(FleetModel.instances).load_only(InstanceModel.deleted))
6765
.options(
6866
joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id)
6967
)
7068
.options(joinedload(FleetModel.runs).load_only(RunModel.status))
7169
.execution_options(populate_existing=True)
7270
)
73-
fleet_model = res.unique().scalar_one()
74-
await _autodelete_fleet(session=session, fleet_model=fleet_model)
71+
fleet_models = list(res.unique().scalars().all())
7572

73+
deleted_fleets_ids = []
74+
now = get_current_datetime()
75+
for fleet_model in fleet_models:
76+
deleted = _autodelete_fleet(fleet_model)
77+
if deleted:
78+
deleted_fleets_ids.append(fleet_model.id)
79+
fleet_model.last_processed_at = now
7680

77-
async def _autodelete_fleet(session: AsyncSession, fleet_model: FleetModel):
81+
await session.execute(
82+
update(PlacementGroupModel)
83+
.where(
84+
PlacementGroupModel.fleet_id.in_(deleted_fleets_ids),
85+
)
86+
.values(fleet_deleted=True)
87+
)
88+
await session.commit()
89+
90+
91+
def _autodelete_fleet(fleet_model: FleetModel) -> bool:
7892
# Currently all empty fleets are autodeleted.
7993
# TODO: If fleets with `nodes: 0..` are supported, their deletion should be skipped.
8094
if is_fleet_in_use(fleet_model) or not is_fleet_empty(fleet_model):
81-
fleet_model.last_processed_at = get_current_datetime()
82-
await session.commit()
83-
return
95+
return False
8496

8597
logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name)
8698
fleet_model.status = FleetStatus.TERMINATED
8799
fleet_model.deleted = True
88-
fleet_model.last_processed_at = get_current_datetime()
89-
await schedule_fleet_placement_groups_deletion(session=session, fleet_id=fleet_model.id)
90-
await session.commit()
91100
logger.info("Fleet %s deleted", fleet_model.name)
101+
return True

0 commit comments

Comments
 (0)