Skip to content

Commit 900b904

Browse files
committed
Select only ids in locking select
1 parent dff9ef5 commit 900b904

5 files changed

Lines changed: 12 additions & 8 deletions

File tree

src/dstack/_internal/server/background/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ def start_background_tasks() -> AsyncIOScheduler:
8282
_scheduler.add_job(
8383
process_fleets,
8484
IntervalTrigger(seconds=10, jitter=2),
85-
kwargs={"batch_size": 10},
8685
max_instances=1,
8786
)
8887
for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR):

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@
2525
logger = get_logger(__name__)
2626

2727

28+
BATCH_SIZE = 10
2829
MIN_PROCESSING_INTERVAL = timedelta(seconds=30)
2930

3031

31-
async def process_fleets(batch_size: int = 1):
32+
async def process_fleets():
3233
lock, lockset = get_locker(get_db().dialect_name).get_lockset(FleetModel.__tablename__)
3334
async with get_session_ctx() as session:
3435
async with lock:
@@ -42,7 +43,7 @@ async def process_fleets(batch_size: int = 1):
4243
)
4344
.options(load_only(FleetModel.id))
4445
.order_by(FleetModel.last_processed_at.asc())
45-
.limit(batch_size)
46+
.limit(BATCH_SIZE)
4647
.with_for_update(skip_locked=True, key_share=True)
4748
)
4849
fleet_models = list(res.scalars().all())

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from sqlalchemy import select
99
from sqlalchemy.ext.asyncio import AsyncSession
10-
from sqlalchemy.orm import joinedload
10+
from sqlalchemy.orm import joinedload, load_only
1111

1212
from dstack._internal import settings
1313
from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT, DSTACK_SHIM_HTTP_PORT
@@ -110,6 +110,7 @@ async def _process_next_running_job():
110110
JobModel.last_processed_at
111111
< common_utils.get_current_datetime() - MIN_PROCESSING_INTERVAL,
112112
)
113+
.options(load_only(JobModel.id))
113114
.order_by(JobModel.last_processed_at.asc())
114115
.limit(1)
115116
.with_for_update(

src/dstack/_internal/server/background/tasks/process_runs.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from sqlalchemy import and_, or_, select
66
from sqlalchemy.ext.asyncio import AsyncSession
7-
from sqlalchemy.orm import joinedload, selectinload
7+
from sqlalchemy.orm import joinedload, load_only, selectinload
88

99
import dstack._internal.server.services.services.autoscalers as autoscalers
1010
from dstack._internal.core.errors import ServerError
@@ -102,6 +102,7 @@ async def _process_next_run():
102102
),
103103
)
104104
.options(joinedload(RunModel.jobs).load_only(JobModel.id))
105+
.options(load_only(RunModel.id))
105106
.order_by(RunModel.last_processed_at.asc())
106107
.limit(1)
107108
.with_for_update(skip_locked=True, key_share=True, of=RunModel)
@@ -134,7 +135,6 @@ async def _process_next_run():
134135

135136

136137
async def _process_run(session: AsyncSession, run_model: RunModel):
137-
logger.debug("%s: processing run", fmt(run_model))
138138
# Refetch to load related attributes.
139139
res = await session.execute(
140140
select(RunModel)
@@ -150,6 +150,7 @@ async def _process_run(session: AsyncSession, run_model: RunModel):
150150
.execution_options(populate_existing=True)
151151
)
152152
run_model = res.unique().scalar_one()
153+
logger.debug("%s: processing run", fmt(run_model))
153154
try:
154155
if run_model.status == RunStatus.PENDING:
155156
await _process_pending_run(session, run_model)

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from sqlalchemy import select
77
from sqlalchemy.ext.asyncio import AsyncSession
8-
from sqlalchemy.orm import joinedload, lazyload, selectinload
8+
from sqlalchemy.orm import joinedload, lazyload, load_only, selectinload
99

1010
from dstack._internal.core.backends.base.backend import Backend
1111
from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport
@@ -120,6 +120,7 @@ async def _process_next_submitted_job():
120120
JobModel.status == JobStatus.SUBMITTED,
121121
JobModel.id.not_in(lockset),
122122
)
123+
.options(load_only(JobModel.id))
123124
# Jobs are process in FIFO sorted by priority globally,
124125
# thus runs from different projects can "overtake" each other by using higher priorities.
125126
# That's not a big problem as long as projects do not compete for the same compute resources.
@@ -152,7 +153,6 @@ async def _process_next_submitted_job():
152153

153154

154155
async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
155-
logger.debug("%s: provisioning has started", fmt(job_model))
156156
# Refetch to load related attributes.
157157
res = await session.execute(
158158
select(JobModel).where(JobModel.id == job_model.id).options(joinedload(JobModel.instance))
@@ -166,6 +166,8 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
166166
.options(joinedload(RunModel.fleet).joinedload(FleetModel.instances))
167167
)
168168
run_model = res.unique().scalar_one()
169+
logger.debug("%s: provisioning has started", fmt(job_model))
170+
169171
project = run_model.project
170172
run = run_model_to_run(run_model)
171173
run_spec = run.run_spec

0 commit comments

Comments
 (0)