Skip to content

Commit 37a1c5a

Browse files
Bihan  RanaBihan  Rana
authored andcommitted
Resolve Comments
1 parent c5a6716 commit 37a1c5a

File tree

8 files changed

+110
-18
lines changed

8 files changed

+110
-18
lines changed

src/dstack/_internal/core/models/configurations.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,23 @@ def validate_at_most_one_router_replica_group(cls, values):
10521052
raise ValueError("For now replica group with `router` must have `count: 1`.")
10531053
return values
10541054

1055+
@root_validator()
1056+
def validate_replica_group_router_mutex(cls, values):
1057+
"""
1058+
When a replica group sets `router:`, service-level `router` must be omitted.
1059+
(Gateway-level SGLang is rejected at service registration when a gateway is selected.)
1060+
"""
1061+
replicas = values.get("replicas")
1062+
if not isinstance(replicas, list):
1063+
return values
1064+
if not any(g.router is not None for g in replicas):
1065+
return values
1066+
if values.get("router") is not None:
1067+
raise ValueError(
1068+
"Service-Level router configuration is not allowed together with replica-group `router`."
1069+
)
1070+
return values
1071+
10551072

10561073
class ServiceConfigurationConfig(
10571074
ProfileParamsConfig,

src/dstack/_internal/core/models/routers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ class SGLangGatewayRouterConfig(CoreModel):
3131

3232
class SGLangServiceRouterConfig(CoreModel):
3333
type: Annotated[Literal["sglang"], Field(description="The router type")] = "sglang"
34+
managed_by: Annotated[
35+
Literal["gateway", "service"],
36+
Field(
37+
description=(
38+
"Where the router process is managed. "
39+
"`gateway`: the gateway runs and manages the router process. "
40+
"`service`: the router runs inside the service (replica-group `router:`)."
41+
)
42+
),
43+
] = "gateway"
3444
policy: Annotated[
3545
Literal["random", "round_robin", "cache_aware", "power_of_two"],
3646
Field(

src/dstack/_internal/proxy/gateway/services/nginx.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None:
110110
if conf.https:
111111
await run_async(self.run_certbot, conf.domain, acme)
112112

113-
if isinstance(conf, ServiceConfig) and conf.router:
113+
if (
114+
isinstance(conf, ServiceConfig)
115+
and conf.router
116+
and getattr(conf.router, "managed_by", "gateway") == "gateway"
117+
):
114118
if conf.router.type == RouterType.SGLANG:
115119
# Check if router already exists for this domain
116120
if conf.domain in self._domain_to_router:

src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,19 @@ async def process(self, item: ServiceRouterWorkerSyncPipelineItem) -> None:
214214
or run_model.status != RunStatus.RUNNING
215215
or not run_model_has_router_replica_group(run_model)
216216
):
217-
sync_row.deleted = True
218-
sync_row.lock_expires_at = None
219-
sync_row.lock_token = None
220-
sync_row.lock_owner = None
217+
early_cleanup_update_map: _SyncRowUpdateMap = {"deleted": True}
218+
set_processed_update_map_fields(early_cleanup_update_map)
219+
set_unlock_update_map_fields(early_cleanup_update_map)
220+
now = get_current_datetime()
221+
resolve_now_placeholders(early_cleanup_update_map, now=now)
222+
await session.execute(
223+
update(ServiceRouterWorkerSyncModel)
224+
.where(
225+
ServiceRouterWorkerSyncModel.id == item.id,
226+
ServiceRouterWorkerSyncModel.lock_token == item.lock_token,
227+
)
228+
.values(**early_cleanup_update_map)
229+
)
221230
await session.commit()
222231
return
223232

src/dstack/_internal/server/services/runs/router_worker_sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typing_extensions import NotRequired
99

1010
from dstack._internal.core.errors import SSHError
11-
from dstack._internal.core.models.configurations import ServiceConfiguration
11+
from dstack._internal.core.models.configurations import ReplicaGroup, ServiceConfiguration
1212
from dstack._internal.core.models.runs import JobStatus, RunSpec, get_service_port
1313
from dstack._internal.server.models import JobModel, RunModel
1414
from dstack._internal.server.services.jobs import get_job_provisioning_data, get_job_spec
@@ -98,7 +98,7 @@ def run_model_has_router_replica_group(run_model: RunModel) -> bool:
9898
return run_spec_has_router_replica_group(run_spec)
9999

100100

101-
def _get_router_job(run_model: RunModel, router_group) -> Optional[JobModel]:
101+
def _get_router_job(run_model: RunModel, router_group: ReplicaGroup) -> Optional[JobModel]:
102102
group_name = router_group.name
103103
assert group_name is not None, "Replica group name is set by validation"
104104
router_jobs = [
@@ -280,7 +280,7 @@ async def _get_worker_payload(job_model: JobModel, worker_url: str) -> _WorkerPa
280280
async def _build_target_workers(
281281
run_model: RunModel,
282282
run_spec: RunSpec,
283-
replica_groups: List,
283+
replica_groups: list[ReplicaGroup],
284284
) -> List[_TargetWorker]:
285285
payloads: List[_TargetWorker] = []
286286
config = run_spec.configuration

src/dstack/_internal/server/services/runs/service_router_worker_sync.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import uuid
44

5-
from sqlalchemy import select
5+
from sqlalchemy import select, update
66
from sqlalchemy.ext.asyncio import AsyncSession
77

88
import dstack._internal.utils.common as common_utils
@@ -36,11 +36,21 @@ async def ensure_service_router_worker_sync_row(
3636
now = common_utils.get_current_datetime()
3737
if sync_row is not None:
3838
if sync_row.deleted:
39-
sync_row.deleted = False
40-
sync_row.lock_expires_at = None
41-
sync_row.lock_token = None
42-
sync_row.lock_owner = None
43-
sync_row.last_processed_at = now
39+
# If the router replica group is reintroduced (e.g. via re-apply), reactivate the
40+
# existing sync row so the background pipeline resumes syncing router workers.
41+
# Do not import pipeline_tasks.base here: loading that package runs
42+
# pipeline_tasks/__init__.py -> jobs_running -> runs and causes a circular import.
43+
await session.execute(
44+
update(ServiceRouterWorkerSyncModel)
45+
.where(ServiceRouterWorkerSyncModel.id == sync_row.id)
46+
.values(
47+
deleted=False,
48+
last_processed_at=now,
49+
lock_expires_at=None,
50+
lock_token=None,
51+
lock_owner=None,
52+
)
53+
)
4454
return
4555
session.add(
4656
ServiceRouterWorkerSyncModel(

src/dstack/_internal/server/services/services/__init__.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ async def _register_service_in_gateway(
102102

103103
gateway_configuration = get_gateway_configuration(gateway)
104104

105+
has_replica_group_router = any(
106+
g.router is not None for g in run_spec.configuration.replica_groups
107+
)
108+
if has_replica_group_router and _gateway_has_sglang_router(gateway_configuration):
109+
raise ServerClientError(
110+
"A replica-group `router:` cannot be used with a gateway that has router configuration."
111+
)
112+
105113
# Check: service specifies SGLang router but gateway does not have it
106114
service_router = run_spec.configuration.router
107115
service_wants_sglang = service_router is not None and isinstance(
@@ -266,12 +274,24 @@ def _build_service_router_config(
266274
service_configuration: ServiceConfiguration,
267275
) -> Optional[AnyServiceRouterConfig]:
268276
"""
269-
Build router config from gateway (type, policy) + service (pd_disaggregation, policy override).
270-
Service's policy overrides gateway's if present. Keeps backward compat: SGLang enabled
271-
automatically when gateway has it configured.
277+
Router metadata to store on the gateway proxy for this service (`service.router`).
278+
279+
A replica-group `router:` does **not** depend on the gateway having its own global SGLang
280+
router block—the router runs on service replicas. When the gateway has no global SGLang
281+
config but a replica group does declare `router:`, we still return a default
282+
`SGLangServiceRouterConfig` so nginx/proxy code can treat the service as SGLang and apply path
283+
rules. When the gateway *does* have global SGLang, we merge gateway policy with service-level
284+
`configuration.router` overrides as before.
272285
"""
286+
has_replica_group_router = any(
287+
g.router is not None for g in service_configuration.replica_groups
288+
)
273289
if not _gateway_has_sglang_router(gateway_configuration):
274-
return None
290+
if not has_replica_group_router:
291+
return None
292+
# In later releases we will deprecate service-level and gateway-level router
293+
# configuration and return `ReplicaGroupRouterConfig` here instead.
294+
return SGLangServiceRouterConfig(managed_by="service")
275295

276296
gateway_router = gateway_configuration.router
277297
assert gateway_router is not None # ensured by _gateway_has_sglang_router
@@ -287,6 +307,7 @@ def _build_service_router_config(
287307

288308
return SGLangServiceRouterConfig(
289309
type=router_type,
310+
managed_by="gateway",
290311
policy=policy,
291312
pd_disaggregation=pd_disaggregation,
292313
)

src/tests/_internal/core/models/test_configurations.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,27 @@ def test_replica_group_router(self):
120120
assert isinstance(router_g.router, ReplicaGroupRouterConfig)
121121
assert router_g.router.type == "sglang"
122122

123+
def test_replica_group_router_forbids_service_level_router(self):
124+
conf = {
125+
"type": "service",
126+
"port": 8000,
127+
"router": {"type": "sglang"},
128+
"replicas": [
129+
{
130+
"name": "router",
131+
"count": 1,
132+
"commands": ["sglang serve"],
133+
"router": {"type": "sglang"},
134+
},
135+
{"name": "worker", "count": 2, "commands": ["worker"]},
136+
],
137+
}
138+
with pytest.raises(
139+
ConfigurationError,
140+
match="Service-Level router configuration is not allowed together with replica-group",
141+
):
142+
parse_run_configuration(conf)
143+
123144
@pytest.mark.parametrize("shell", [None, "sh", "bash", "/usr/bin/zsh"])
124145
def test_shell_valid(self, shell: Optional[str]):
125146
conf = {

0 commit comments

Comments
 (0)