Skip to content

Commit 8d3f9fa

Browse files
authored
Optimize db queries (part 2) (#2929)
* Process fleets using one session * Select only ids in locking select * Do not load InstanceModel.jobs by default * Add indexes on status columns * Do not load all projects attrs for projects filtering * Fix comments * Do not load ProjectModel.default_gateway by default * Add TODO
1 parent 85d927e commit 8d3f9fa

File tree

17 files changed

+212
-147
lines changed

17 files changed

+212
-147
lines changed

src/dstack/_internal/server/background/__init__.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ def start_background_tasks() -> AsyncIOScheduler:
7979
process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1
8080
)
8181
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
82+
_scheduler.add_job(
83+
process_fleets,
84+
IntervalTrigger(seconds=10, jitter=2),
85+
max_instances=1,
86+
)
8287
for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR):
8388
# Add multiple copies of tasks if requested.
8489
# max_instances=1 for additional copies to avoid running too many tasks.
@@ -113,11 +118,5 @@ def start_background_tasks() -> AsyncIOScheduler:
113118
kwargs={"batch_size": 5},
114119
max_instances=2 if replica == 0 else 1,
115120
)
116-
_scheduler.add_job(
117-
process_fleets,
118-
IntervalTrigger(seconds=10, jitter=2),
119-
kwargs={"batch_size": 5},
120-
max_instances=2 if replica == 0 else 1,
121-
)
122121
_scheduler.start()
123122
return _scheduler
Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,35 @@
1-
import asyncio
21
from datetime import timedelta
2+
from typing import List
33

4-
from sqlalchemy import select
4+
from sqlalchemy import select, update
55
from sqlalchemy.ext.asyncio import AsyncSession
6-
from sqlalchemy.orm import joinedload
6+
from sqlalchemy.orm import joinedload, load_only
77

88
from dstack._internal.core.models.fleets import FleetStatus
99
from dstack._internal.server.db import get_db, get_session_ctx
10-
from dstack._internal.server.models import FleetModel, InstanceModel, JobModel, RunModel
10+
from dstack._internal.server.models import (
11+
FleetModel,
12+
InstanceModel,
13+
JobModel,
14+
PlacementGroupModel,
15+
RunModel,
16+
)
1117
from dstack._internal.server.services.fleets import (
1218
is_fleet_empty,
1319
is_fleet_in_use,
1420
)
1521
from dstack._internal.server.services.locking import get_locker
16-
from dstack._internal.server.services.placement import schedule_fleet_placement_groups_deletion
1722
from dstack._internal.utils.common import get_current_datetime
1823
from dstack._internal.utils.logging import get_logger
1924

2025
logger = get_logger(__name__)
2126

2227

28+
BATCH_SIZE = 10
2329
MIN_PROCESSING_INTERVAL = timedelta(seconds=30)
2430

2531

26-
async def process_fleets(batch_size: int = 1):
27-
tasks = []
28-
for _ in range(batch_size):
29-
tasks.append(_process_next_fleet())
30-
await asyncio.gather(*tasks)
31-
32-
33-
async def _process_next_fleet():
32+
async def process_fleets():
3433
lock, lockset = get_locker(get_db().dialect_name).get_lockset(FleetModel.__tablename__)
3534
async with get_session_ctx() as session:
3635
async with lock:
@@ -42,50 +41,62 @@ async def _process_next_fleet():
4241
FleetModel.last_processed_at
4342
< get_current_datetime() - MIN_PROCESSING_INTERVAL,
4443
)
44+
.options(load_only(FleetModel.id))
4545
.order_by(FleetModel.last_processed_at.asc())
46-
.limit(1)
46+
.limit(BATCH_SIZE)
4747
.with_for_update(skip_locked=True, key_share=True)
4848
)
49-
fleet_model = res.scalar()
50-
if fleet_model is None:
51-
return
52-
lockset.add(fleet_model.id)
49+
fleet_models = list(res.scalars().all())
50+
fleet_ids = [fm.id for fm in fleet_models]
51+
for fleet_id in fleet_ids:
52+
lockset.add(fleet_id)
5353
try:
54-
fleet_model_id = fleet_model.id
55-
await _process_fleet(session=session, fleet_model=fleet_model)
54+
await _process_fleets(session=session, fleet_models=fleet_models)
5655
finally:
57-
lockset.difference_update([fleet_model_id])
56+
lockset.difference_update(fleet_ids)
5857

5958

60-
async def _process_fleet(session: AsyncSession, fleet_model: FleetModel):
61-
logger.debug("Processing fleet %s", fleet_model.name)
59+
async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel]):
60+
fleet_ids = [fm.id for fm in fleet_models]
6261
# Refetch to load related attributes.
6362
res = await session.execute(
6463
select(FleetModel)
65-
.where(FleetModel.id == fleet_model.id)
64+
.where(FleetModel.id.in_(fleet_ids))
6665
.options(joinedload(FleetModel.instances).load_only(InstanceModel.deleted))
6766
.options(
6867
joinedload(FleetModel.instances).joinedload(InstanceModel.jobs).load_only(JobModel.id)
6968
)
7069
.options(joinedload(FleetModel.runs).load_only(RunModel.status))
7170
.execution_options(populate_existing=True)
7271
)
73-
fleet_model = res.unique().scalar_one()
74-
await _autodelete_fleet(session=session, fleet_model=fleet_model)
72+
fleet_models = list(res.unique().scalars().all())
7573

74+
deleted_fleets_ids = []
75+
now = get_current_datetime()
76+
for fleet_model in fleet_models:
77+
deleted = _autodelete_fleet(fleet_model)
78+
if deleted:
79+
deleted_fleets_ids.append(fleet_model.id)
80+
fleet_model.last_processed_at = now
7681

77-
async def _autodelete_fleet(session: AsyncSession, fleet_model: FleetModel):
82+
await session.execute(
83+
update(PlacementGroupModel)
84+
.where(
85+
PlacementGroupModel.fleet_id.in_(deleted_fleets_ids),
86+
)
87+
.values(fleet_deleted=True)
88+
)
89+
await session.commit()
90+
91+
92+
def _autodelete_fleet(fleet_model: FleetModel) -> bool:
7893
# Currently all empty fleets are autodeleted.
7994
# TODO: If fleets with `nodes: 0..` are supported, their deletion should be skipped.
8095
if is_fleet_in_use(fleet_model) or not is_fleet_empty(fleet_model):
81-
fleet_model.last_processed_at = get_current_datetime()
82-
await session.commit()
83-
return
96+
return False
8497

8598
logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name)
8699
fleet_model.status = FleetStatus.TERMINATED
87100
fleet_model.deleted = True
88-
fleet_model.last_processed_at = get_current_datetime()
89-
await schedule_fleet_placement_groups_deletion(session=session, fleet_id=fleet_model.id)
90-
await session.commit()
91101
logger.info("Fleet %s deleted", fleet_model.name)
102+
return True

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from sqlalchemy import select
99
from sqlalchemy.ext.asyncio import AsyncSession
10-
from sqlalchemy.orm import joinedload
10+
from sqlalchemy.orm import joinedload, load_only
1111

1212
from dstack._internal import settings
1313
from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT, DSTACK_SHIM_HTTP_PORT
@@ -110,6 +110,7 @@ async def _process_next_running_job():
110110
JobModel.last_processed_at
111111
< common_utils.get_current_datetime() - MIN_PROCESSING_INTERVAL,
112112
)
113+
.options(load_only(JobModel.id))
113114
.order_by(JobModel.last_processed_at.asc())
114115
.limit(1)
115116
.with_for_update(

src/dstack/_internal/server/background/tasks/process_runs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from sqlalchemy import and_, or_, select
66
from sqlalchemy.ext.asyncio import AsyncSession
7-
from sqlalchemy.orm import joinedload, selectinload
7+
from sqlalchemy.orm import joinedload, load_only, selectinload
88

99
import dstack._internal.server.services.services.autoscalers as autoscalers
1010
from dstack._internal.core.errors import ServerError
@@ -102,6 +102,7 @@ async def _process_next_run():
102102
),
103103
)
104104
.options(joinedload(RunModel.jobs).load_only(JobModel.id))
105+
.options(load_only(RunModel.id))
105106
.order_by(RunModel.last_processed_at.asc())
106107
.limit(1)
107108
.with_for_update(skip_locked=True, key_share=True, of=RunModel)
@@ -134,7 +135,6 @@ async def _process_next_run():
134135

135136

136137
async def _process_run(session: AsyncSession, run_model: RunModel):
137-
logger.debug("%s: processing run", fmt(run_model))
138138
# Refetch to load related attributes.
139139
res = await session.execute(
140140
select(RunModel)
@@ -150,6 +150,7 @@ async def _process_run(session: AsyncSession, run_model: RunModel):
150150
.execution_options(populate_existing=True)
151151
)
152152
run_model = res.unique().scalar_one()
153+
logger.debug("%s: processing run", fmt(run_model))
153154
try:
154155
if run_model.status == RunStatus.PENDING:
155156
await _process_pending_run(session, run_model)

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from sqlalchemy import select
77
from sqlalchemy.ext.asyncio import AsyncSession
8-
from sqlalchemy.orm import joinedload, lazyload, selectinload
8+
from sqlalchemy.orm import joinedload, load_only, selectinload
99

1010
from dstack._internal.core.backends.base.backend import Backend
1111
from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport
@@ -120,6 +120,7 @@ async def _process_next_submitted_job():
120120
JobModel.status == JobStatus.SUBMITTED,
121121
JobModel.id.not_in(lockset),
122122
)
123+
.options(load_only(JobModel.id))
123124
# Jobs are process in FIFO sorted by priority globally,
124125
# thus runs from different projects can "overtake" each other by using higher priorities.
125126
# That's not a big problem as long as projects do not compete for the same compute resources.
@@ -152,7 +153,6 @@ async def _process_next_submitted_job():
152153

153154

154155
async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
155-
logger.debug("%s: provisioning has started", fmt(job_model))
156156
# Refetch to load related attributes.
157157
res = await session.execute(
158158
select(JobModel).where(JobModel.id == job_model.id).options(joinedload(JobModel.instance))
@@ -166,6 +166,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
166166
.options(joinedload(RunModel.fleet).joinedload(FleetModel.instances))
167167
)
168168
run_model = res.unique().scalar_one()
169+
logger.debug("%s: provisioning has started", fmt(job_model))
170+
169171
project = run_model.project
170172
run = run_model_to_run(run_model)
171173
run_spec = run.run_spec
@@ -227,7 +229,6 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
227229
InstanceModel.deleted == False,
228230
InstanceModel.total_blocks > InstanceModel.busy_blocks,
229231
)
230-
.options(lazyload(InstanceModel.jobs))
231232
.order_by(InstanceModel.id) # take locks in order
232233
.with_for_update(key_share=True)
233234
)

src/dstack/_internal/server/background/tasks/process_terminating_jobs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from sqlalchemy import or_, select
44
from sqlalchemy.ext.asyncio import AsyncSession
5-
from sqlalchemy.orm import joinedload, lazyload
5+
from sqlalchemy.orm import joinedload
66

77
from dstack._internal.core.models.runs import JobStatus
88
from dstack._internal.server.db import get_db, get_session_ctx
@@ -65,7 +65,6 @@ async def _process_next_terminating_job():
6565
InstanceModel.id == job_model.used_instance_id,
6666
InstanceModel.id.not_in(instance_lockset),
6767
)
68-
.options(lazyload(InstanceModel.jobs))
6968
.with_for_update(skip_locked=True, key_share=True)
7069
)
7170
instance_model = res.scalar()
@@ -94,6 +93,7 @@ async def _process_job(session: AsyncSession, job_model: JobModel):
9493
.options(
9594
joinedload(InstanceModel.project).joinedload(ProjectModel.backends),
9695
joinedload(InstanceModel.volume_attachments).joinedload(VolumeAttachmentModel.volume),
96+
joinedload(InstanceModel.jobs).load_only(JobModel.id),
9797
)
9898
)
9999
instance_model = res.unique().scalar()
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""Index status columns
2+
3+
Revision ID: 50dd7ea98639
4+
Revises: ec02a26a256c
5+
Create Date: 2025-07-25 10:36:25.127923
6+
7+
"""
8+
9+
from alembic import op
10+
11+
# revision identifiers, used by Alembic.
12+
revision = "50dd7ea98639"
13+
down_revision = "ec02a26a256c"
14+
branch_labels = None
15+
depends_on = None
16+
17+
18+
def upgrade() -> None:
19+
# ### commands auto generated by Alembic - please adjust! ###
20+
with op.batch_alter_table("runs", schema=None) as batch_op:
21+
batch_op.create_index(batch_op.f("ix_runs_status"), ["status"], unique=False)
22+
23+
with op.batch_alter_table("jobs", schema=None) as batch_op:
24+
batch_op.create_index(batch_op.f("ix_jobs_status"), ["status"], unique=False)
25+
26+
with op.batch_alter_table("fleets", schema=None) as batch_op:
27+
batch_op.create_index(batch_op.f("ix_fleets_status"), ["status"], unique=False)
28+
29+
with op.batch_alter_table("instances", schema=None) as batch_op:
30+
batch_op.create_index(batch_op.f("ix_instances_status"), ["status"], unique=False)
31+
32+
with op.batch_alter_table("volumes", schema=None) as batch_op:
33+
batch_op.create_index(batch_op.f("ix_volumes_status"), ["status"], unique=False)
34+
35+
# ### end Alembic commands ###
36+
37+
38+
def downgrade() -> None:
39+
# ### commands auto generated by Alembic - please adjust! ###
40+
with op.batch_alter_table("runs", schema=None) as batch_op:
41+
batch_op.drop_index(batch_op.f("ix_runs_status"))
42+
43+
with op.batch_alter_table("jobs", schema=None) as batch_op:
44+
batch_op.drop_index(batch_op.f("ix_jobs_status"))
45+
46+
with op.batch_alter_table("fleets", schema=None) as batch_op:
47+
batch_op.drop_index(batch_op.f("ix_fleets_status"))
48+
49+
with op.batch_alter_table("instances", schema=None) as batch_op:
50+
batch_op.drop_index(batch_op.f("ix_instances_status"))
51+
52+
with op.batch_alter_table("volumes", schema=None) as batch_op:
53+
batch_op.drop_index(batch_op.f("ix_volumes_status"))
54+
55+
# ### end Alembic commands ###

0 commit comments

Comments
 (0)