Skip to content

Commit a46600f

Browse files
authored
Remove dangling services from gateway (#3586)
If a new service fails to be registered in the gateway because of a dangling service with the same name, automatically unregister the dangling service and retry new service registration.
1 parent e165684 commit a46600f

File tree

4 files changed

+81
-4
lines changed

4 files changed

+81
-4
lines changed

src/dstack/_internal/proxy/gateway/const.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
DSTACK_DIR_ON_GATEWAY = Path("/home/ubuntu/dstack")
66
SERVER_CONNECTIONS_DIR_ON_GATEWAY = DSTACK_DIR_ON_GATEWAY / "server-connections"
77
PROXY_PORT_ON_GATEWAY = 8000
8+
SERVICE_ALREADY_REGISTERED_ERROR_TEMPLATE = "Service {ref} is already registered"

src/dstack/_internal/proxy/gateway/services/registry.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from dstack._internal.core.models.instances import SSHConnectionParams
99
from dstack._internal.core.models.routers import AnyServiceRouterConfig, RouterType
1010
from dstack._internal.proxy.gateway import models as gateway_models
11+
from dstack._internal.proxy.gateway.const import SERVICE_ALREADY_REGISTERED_ERROR_TEMPLATE
1112
from dstack._internal.proxy.gateway.repo.repo import GatewayProxyRepo
1213
from dstack._internal.proxy.gateway.services.nginx import (
1314
LimitReqConfig,
@@ -63,7 +64,7 @@ async def register_service(
6364

6465
async with lock:
6566
if await repo.get_service(project_name, run_name) is not None:
66-
raise ProxyError(f"Service {service.fmt()} is already registered")
67+
raise ProxyError(SERVICE_ALREADY_REGISTERED_ERROR_TEMPLATE.format(ref=service.fmt()))
6768

6869
old_project = await repo.get_project(project_name)
6970
new_project = models.Project(name=project_name, ssh_private_key=ssh_private_key)

src/dstack/_internal/server/services/services/__init__.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import uuid
77
from datetime import datetime
8+
from functools import partial
89
from typing import Optional
910

1011
import httpx
@@ -33,6 +34,7 @@
3334
)
3435
from dstack._internal.core.models.runs import JobSpec, Run, RunSpec, ServiceModelSpec, ServiceSpec
3536
from dstack._internal.core.models.services import OpenAIChatModel
37+
from dstack._internal.proxy.gateway.const import SERVICE_ALREADY_REGISTERED_ERROR_TEMPLATE
3638
from dstack._internal.server import settings
3739
from dstack._internal.server.models import GatewayModel, JobModel, ProjectModel, RunModel
3840
from dstack._internal.server.services import events
@@ -177,7 +179,8 @@ async def _register_service_in_gateway(
177179
try:
178180
logger.debug("%s: registering service as %s", fmt(run_model), service_spec.url)
179181
async with conn.client() as client:
180-
await client.register_service(
182+
do_register = partial(
183+
client.register_service,
181184
project=run_model.project.name,
182185
run_name=run_model.run_name,
183186
domain=domain,
@@ -190,6 +193,26 @@ async def _register_service_in_gateway(
190193
ssh_private_key=run_model.project.ssh_private_key,
191194
router=router,
192195
)
196+
try:
197+
await do_register()
198+
except GatewayError as e:
199+
if e.msg == SERVICE_ALREADY_REGISTERED_ERROR_TEMPLATE.format(
200+
ref=f"{run_model.project.name}/{run_model.run_name}"
201+
):
202+
# Happens if there was a communication issue with the gateway when last unregistering
203+
logger.warning(
204+
"Service %s/%s is dangling on gateway %s, unregistering and re-registering",
205+
run_model.project.name,
206+
run_model.run_name,
207+
gateway.name,
208+
)
209+
await client.unregister_service(
210+
project=run_model.project.name,
211+
run_name=run_model.run_name,
212+
)
213+
await do_register()
214+
else:
215+
raise
193216
except SSHError:
194217
raise ServerClientError("Gateway tunnel is not working")
195218
except httpx.RequestError as e:

src/tests/_internal/server/routers/test_runs.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from sqlalchemy.ext.asyncio import AsyncSession
1414

1515
from dstack._internal import settings
16+
from dstack._internal.core.errors import GatewayError
1617
from dstack._internal.core.models.backends.base import BackendType
1718
from dstack._internal.core.models.common import ApplyAction
1819
from dstack._internal.core.models.configurations import (
@@ -2299,13 +2300,13 @@ async def test_returns_400_if_runs_active(
22992300
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
23002301
class TestSubmitService:
23012302
@pytest.fixture(autouse=True)
2302-
def mock_gateway_connections(self) -> Generator[None, None, None]:
2303+
def mock_gateway_connection(self) -> Generator[AsyncMock, None, None]:
23032304
with patch(
23042305
"dstack._internal.server.services.gateways.gateway_connections_pool.get_or_add"
23052306
) as get_conn_mock:
23062307
get_conn_mock.return_value.client = Mock()
23072308
get_conn_mock.return_value.client.return_value = AsyncMock()
2308-
yield
2309+
yield get_conn_mock
23092310

23102311
@pytest.mark.asyncio
23112312
@pytest.mark.parametrize(
@@ -2481,3 +2482,54 @@ async def test_return_error_if_specified_gateway_is_true_and_no_gateway_exists(
24812482
}
24822483
]
24832484
}
2485+
2486+
@pytest.mark.asyncio
2487+
async def test_unregister_dangling_service(
2488+
self,
2489+
test_db,
2490+
session: AsyncSession,
2491+
client: AsyncClient,
2492+
mock_gateway_connection: AsyncMock,
2493+
) -> None:
2494+
user = await create_user(session=session, global_role=GlobalRole.USER)
2495+
project = await create_project(session=session, owner=user, name="test-project")
2496+
await add_project_member(
2497+
session=session, project=project, user=user, project_role=ProjectRole.USER
2498+
)
2499+
repo = await create_repo(session=session, project_id=project.id)
2500+
backend = await create_backend(session=session, project_id=project.id)
2501+
gateway_compute = await create_gateway_compute(session=session, backend_id=backend.id)
2502+
gateway = await create_gateway(
2503+
session=session,
2504+
project_id=project.id,
2505+
backend_id=backend.id,
2506+
gateway_compute_id=gateway_compute.id,
2507+
status=GatewayStatus.RUNNING,
2508+
wildcard_domain="example.com",
2509+
)
2510+
project.default_gateway_id = gateway.id
2511+
await session.commit()
2512+
2513+
client_mock = (
2514+
mock_gateway_connection.return_value.client.return_value.__aenter__.return_value
2515+
)
2516+
client_mock.register_service.side_effect = [
2517+
GatewayError("Service test-project/test-service is already registered"),
2518+
None, # Second call succeeds
2519+
]
2520+
2521+
response = await client.post(
2522+
"/api/project/test-project/runs/submit",
2523+
headers=get_auth_headers(user.token),
2524+
json={"run_spec": get_service_run_spec(repo_id=repo.name, run_name="test-service")},
2525+
)
2526+
2527+
assert response.status_code == 200
2528+
assert response.json()["service"]["url"] == "https://test-service.example.com"
2529+
# Verify that unregister_service was called to clean up the dangling service
2530+
client_mock.unregister_service.assert_called_once_with(
2531+
project=project.name,
2532+
run_name="test-service",
2533+
)
2534+
# Verify that register_service was called twice (first failed, then succeeded)
2535+
assert client_mock.register_service.call_count == 2

0 commit comments

Comments
 (0)