Skip to content

Commit 7b268cb

Browse files
Bihan  RanaBihan  Rana
authored andcommitted
Optimize ServiceRouterWorkerSyncWorkerProcess select query
1 parent f5a4c37 commit 7b268cb

5 files changed

Lines changed: 40 additions & 16 deletions

File tree

src/dstack/_internal/core/models/configurations.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,10 @@ def validate_at_most_one_router_replica_group(cls, values):
10461046
router_groups = [g for g in replicas if g.router is not None]
10471047
if len(router_groups) > 1:
10481048
raise ValueError("At most one replica group may specify `router`.")
1049+
if router_groups:
1050+
router_group = router_groups[0]
1051+
if router_group.count.min != 1 or router_group.count.max != 1:
1052+
raise ValueError("For now replica group with `router` must have `count: 1`.")
10491053
return values
10501054

10511055

src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
from datetime import timedelta
55
from typing import Sequence
66

7-
from sqlalchemy import delete, or_, select, update
8-
from sqlalchemy.orm import load_only, selectinload
7+
from sqlalchemy import delete, or_, select, true, update
8+
from sqlalchemy.orm import joinedload, load_only, selectinload
99

10-
from dstack._internal.core.models.runs import RunStatus
10+
from dstack._internal.core.models.runs import JobStatus, RunStatus
1111
from dstack._internal.server.background.pipeline_tasks.base import (
1212
Fetcher,
1313
Heartbeater,
@@ -25,6 +25,7 @@
2525
from dstack._internal.server.models import (
2626
InstanceModel,
2727
JobModel,
28+
ProjectModel,
2829
RunModel,
2930
ServiceRouterWorkerSyncModel,
3031
)
@@ -105,7 +106,7 @@ def _workers(self) -> Sequence["ServiceRouterWorkerSyncWorker"]:
105106

106107

107108
class ServiceRouterWorkerSyncFetcher(Fetcher[ServiceRouterWorkerSyncPipelineItem]):
108-
@sentry_utils.instrument_named_task("pipeline_tasks.ServiceRouterWorkerSyncFetcher.fetch")
109+
@sentry_utils.instrument_pipeline_task("ServiceRouterWorkerSyncFetcher.fetch")
109110
async def fetch(self, limit: int) -> list[ServiceRouterWorkerSyncPipelineItem]:
110111
sync_lock, _ = get_locker(get_db().dialect_name).get_lockset(
111112
ServiceRouterWorkerSyncModel.__tablename__
@@ -183,7 +184,7 @@ def __init__(
183184
pipeline_hinter=pipeline_hinter,
184185
)
185186

186-
@sentry_utils.instrument_named_task("pipeline_tasks.ServiceRouterWorkerSyncWorker.process")
187+
@sentry_utils.instrument_pipeline_task("ServiceRouterWorkerSyncWorker.process")
187188
async def process(self, item: ServiceRouterWorkerSyncPipelineItem) -> None:
188189
async with get_session_ctx() as session:
189190
res = await session.execute(
@@ -199,10 +200,6 @@ async def process(self, item: ServiceRouterWorkerSyncPipelineItem) -> None:
199200
log_lock_token_mismatch(logger, item)
200201
return
201202
run_model = sync_row.run
202-
if run_model is None:
203-
await session.delete(sync_row)
204-
await session.commit()
205-
return
206203
if (
207204
run_model.deleted
208205
or run_model.status.is_finished()
@@ -218,11 +215,30 @@ async def process(self, item: ServiceRouterWorkerSyncPipelineItem) -> None:
218215
select(RunModel)
219216
.where(RunModel.id == item.run_id)
220217
.options(
221-
selectinload(RunModel.project),
222-
selectinload(RunModel.jobs).selectinload(JobModel.project),
223-
selectinload(RunModel.jobs)
224-
.selectinload(JobModel.instance)
225-
.selectinload(InstanceModel.project),
218+
load_only(RunModel.id, RunModel.run_spec),
219+
selectinload(
220+
RunModel.jobs.and_(
221+
JobModel.status == JobStatus.RUNNING,
222+
JobModel.registered == true(),
223+
)
224+
)
225+
.load_only(
226+
JobModel.id,
227+
JobModel.status,
228+
JobModel.registered,
229+
JobModel.job_spec_data,
230+
JobModel.job_provisioning_data,
231+
JobModel.job_runtime_data,
232+
)
233+
.options(
234+
joinedload(JobModel.project).load_only(
235+
ProjectModel.id, ProjectModel.ssh_private_key
236+
),
237+
joinedload(JobModel.instance)
238+
.load_only(InstanceModel.id, InstanceModel.remote_connection_info)
239+
.joinedload(InstanceModel.project)
240+
.load_only(ProjectModel.id, ProjectModel.ssh_private_key),
241+
),
226242
)
227243
)
228244
run_for_sync = res.unique().scalar_one_or_none()

src/dstack/_internal/server/migrations/versions/2026/03_29_1200_e7f4a91b2c3d_add_service_router_worker_sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Add service_router_worker_sync for router-worker reconcile pipeline.
22
33
Revision ID: e7f4a91b2c3d
4-
Revises: e9d81c97c042
4+
Revises: ad8c50120507
55
Create Date: 2026-03-29 12:00:00.000000+00:00
66
77
"""
@@ -13,7 +13,7 @@
1313
import dstack._internal.server.models
1414

1515
revision = "e7f4a91b2c3d"
16-
down_revision = "e9d81c97c042"
16+
down_revision = "ad8c50120507"
1717
branch_labels = None
1818
depends_on = None
1919

src/dstack/_internal/server/services/router_worker_sync.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ def _get_router_job(run_model: RunModel, router_group) -> Optional[JobModel]:
109109
]
110110
if not router_jobs or not is_replica_registered(router_jobs):
111111
return None
112+
# Router replica group is currently validated to have count=1, so we assume a single active
113+
# router job here. When we support multiple router replicas for HA, this should be updated
114+
# to handle syncing across all active router jobs.
112115
return router_jobs[0]
113116

114117

src/dstack/_internal/server/services/runs/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ async def apply_plan(
521521
deployment_num=new_deployment_num,
522522
)
523523
)
524+
await ensure_service_router_worker_sync_row(session, current_resource_model, run_spec)
524525
events.emit(
525526
session,
526527
(

0 commit comments

Comments
 (0)