Skip to content

Commit 5ce60f7

Browse files
fregataaclaude
andcommitted
refactor: move hc-required filter into check_route_health
``HealthCheckRouteHandler`` shrinks to a passthrough and drops its ``DeploymentRepository`` dependency. The (``hc is not None``) filter that keeps the probe loop from misclassifying opt-out routes as stale now lives at the head of ``RouteExecutor.check_route_health``, alongside the Valkey record lookup it gates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5ea86aa commit 5ce60f7

5 files changed

Lines changed: 38 additions & 74 deletions

File tree

src/ai/backend/manager/sokovan/deployment/route/coordinator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ def _init_handlers(self, executor: RouteExecutor) -> Mapping[RouteLifecycleType,
158158
RouteLifecycleType.HEALTH_CHECK: HealthCheckRouteHandler(
159159
route_executor=executor,
160160
event_producer=self._event_producer,
161-
deployment_repository=self._deployment_repository,
162161
),
163162
RouteLifecycleType.ROUTE_EVICTION: RouteEvictionHandler(
164163
route_executor=executor,

src/ai/backend/manager/sokovan/deployment/route/executor.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,19 @@ async def check_route_health(self, routes: Sequence[RouteData]) -> RouteExecutio
388388
Returns:
389389
Result with successes (healthy), errors (unhealthy), stale (degraded)
390390
"""
391+
if not routes:
392+
return RouteExecutionResult(successes=[], errors=[], stale=[])
393+
394+
# Revisions that opted out of ``service.health_check`` have no
395+
# RouteHealthRecord in Valkey; including them would classify
396+
# them as stale. Drop them before the probe loop.
397+
hc_configs = await self._deployment_repo.fetch_health_check_configs({
398+
r.revision_id for r in routes
399+
})
400+
routes = [r for r in routes if hc_configs.get(r.revision_id) is not None]
401+
if not routes:
402+
return RouteExecutionResult(successes=[], errors=[], stale=[])
403+
391404
# Phase 1: Load RouteHealthRecords
392405
with RouteRecorderContext.shared_phase("load_health_status"):
393406
with RouteRecorderContext.shared_step("query_health_check_results"):

src/ai/backend/manager/sokovan/deployment/route/handlers/health_check.py

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
RouteTransitionTarget,
1515
)
1616
from ai.backend.manager.defs import LockID
17-
from ai.backend.manager.repositories.deployment import DeploymentRepository
1817
from ai.backend.manager.repositories.deployment.types import RouteData
1918
from ai.backend.manager.sokovan.deployment.route.executor import RouteExecutor
2019
from ai.backend.manager.sokovan.deployment.route.types import RouteExecutionResult
@@ -31,11 +30,9 @@ def __init__(
3130
self,
3231
route_executor: RouteExecutor,
3332
event_producer: EventProducer,
34-
deployment_repository: DeploymentRepository,
3533
) -> None:
3634
self._route_executor = route_executor
3735
self._event_producer = event_producer
38-
self._deployment_repository = deployment_repository
3936

4037
@classmethod
4138
def name(cls) -> str:
@@ -73,23 +70,9 @@ def status_transitions(cls) -> RouteStatusTransitions:
7370
)
7471

7572
async def execute(self, routes: Sequence[RouteData]) -> RouteExecutionResult:
76-
"""Execute health check for routes.
77-
78-
Revisions that opted out of ``service.health_check`` have no
79-
``RouteHealthRecord`` in Valkey — including them would let the
80-
executor classify them as stale. Filter on the per-revision
81-
config fetched on entry so the probe loop only sees routes that
82-
should be probed.
83-
"""
73+
"""Execute health check for routes."""
8474
log.debug("Checking health for {} routes", len(routes))
85-
if not routes:
86-
return RouteExecutionResult(successes=[], errors=[])
87-
revision_ids = {r.revision_id for r in routes}
88-
hc_configs = await self._deployment_repository.fetch_health_check_configs(revision_ids)
89-
eligible = [r for r in routes if hc_configs.get(r.revision_id) is not None]
90-
if not eligible:
91-
return RouteExecutionResult(successes=[], errors=[])
92-
return await self._route_executor.check_route_health(eligible)
75+
return await self._route_executor.check_route_health(routes)
9376

9477
async def post_process(self, result: RouteExecutionResult) -> None:
9578
"""Log health-check results.

tests/unit/manager/sokovan/deployment/route/executor/conftest.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import pytest
1010
from dateutil.tz import tzutc
1111

12+
from ai.backend.common.config import ModelHealthCheck
1213
from ai.backend.common.data.endpoint.types import EndpointLifecycle, ScalingState
1314
from ai.backend.common.identifier.deployment import DeploymentID
1415
from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID
@@ -38,7 +39,16 @@ def mock_deployment_repo() -> AsyncMock:
3839
repo.get_deployments_by_ids = AsyncMock(return_value=[])
3940
repo.update_route_sessions = AsyncMock(return_value=None)
4041
repo.fetch_session_statuses_by_route_ids = AsyncMock(return_value={})
41-
repo.fetch_health_check_configs = AsyncMock(return_value={})
42+
43+
# Default to "every revision declared a probe" so tests for paths
44+
# that filter on hc_configs (``check_route_health``, ``sync_appproxy``)
45+
# see the input route set unchanged unless they override this.
46+
async def _default_health_check_configs(
47+
revision_ids: set[DeploymentRevisionID],
48+
) -> dict[DeploymentRevisionID, ModelHealthCheck]:
49+
return {rid: ModelHealthCheck(path="/health", initial_delay=720.0) for rid in revision_ids}
50+
51+
repo.fetch_health_check_configs = AsyncMock(side_effect=_default_health_check_configs)
4252
repo.fetch_route_service_discovery_info = AsyncMock(return_value=[])
4353
repo.get_scaling_group_cleanup_configs = AsyncMock(return_value={})
4454
repo.fetch_deployment_context = AsyncMock(return_value=MagicMock())

tests/unit/manager/sokovan/deployment/route/handlers/test_health_check_handler.py

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
"""Unit tests for HealthCheckRouteHandler.
22
3-
The handler resolves the per-revision ``ModelHealthCheck`` itself and
4-
drops routes whose revision opted out of ``service.health_check`` before
5-
forwarding the remainder to ``RouteExecutor.check_route_health``. The
6-
register push for first-time HEALTHY transitions is exercised in the
7-
executor tests; here we only verify the handler stays a thin shim plus
8-
the hc-null skip.
3+
The handler is a thin pass-through to ``RouteExecutor.check_route_health``;
4+
the per-revision hc filtering and the register push for first-time
5+
HEALTHY transitions are exercised in the executor tests.
96
"""
107

118
from __future__ import annotations
@@ -16,7 +13,6 @@
1613

1714
from dateutil.tz import tzutc
1815

19-
from ai.backend.common.config import ModelHealthCheck
2016
from ai.backend.common.identifier.deployment import DeploymentID
2117
from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID
2218
from ai.backend.common.types import SessionId
@@ -28,78 +24,41 @@
2824
from ai.backend.manager.sokovan.deployment.route.types import RouteExecutionResult
2925

3026

31-
def _route(
32-
health_status: RouteHealthStatus,
33-
revision_id: DeploymentRevisionID | None = None,
34-
) -> RouteData:
27+
def _route(health_status: RouteHealthStatus) -> RouteData:
3528
return RouteData(
3629
route_id=uuid4(),
3730
deployment_id=DeploymentID(uuid4()),
3831
session_id=SessionId(uuid4()),
3932
status=RouteStatus.RUNNING,
4033
health_status=health_status,
4134
traffic_ratio=1.0,
42-
revision_id=revision_id or DeploymentRevisionID(uuid4()),
35+
revision_id=DeploymentRevisionID(uuid4()),
4336
replica_host="10.0.0.1",
4437
replica_port=8000,
4538
created_at=datetime.now(tzutc()),
4639
)
4740

4841

4942
class TestHealthCheckHandler:
50-
"""Tests for HealthCheckRouteHandler delegation and hc gating."""
43+
"""Tests for HealthCheckRouteHandler delegation."""
5144

52-
async def test_execute_forwards_routes_with_health_check(self) -> None:
53-
"""Routes whose revision declares a probe reach the executor."""
45+
async def test_execute_forwards_routes_to_executor(self) -> None:
5446
executor = AsyncMock()
5547
check_result = RouteExecutionResult(successes=[], errors=[], stale=[])
5648
executor.check_route_health = AsyncMock(return_value=check_result)
57-
repo = AsyncMock()
58-
route = _route(RouteHealthStatus.NOT_CHECKED)
59-
repo.fetch_health_check_configs = AsyncMock(
60-
return_value={route.revision_id: ModelHealthCheck(path="/health", initial_delay=720.0)}
61-
)
62-
handler = HealthCheckRouteHandler(executor, MagicMock(), repo)
49+
handler = HealthCheckRouteHandler(executor, MagicMock())
6350

64-
result = await handler.execute([route])
51+
routes = [_route(RouteHealthStatus.NOT_CHECKED)]
52+
result = await handler.execute(routes)
6553

66-
executor.check_route_health.assert_awaited_once_with([route])
67-
repo.fetch_health_check_configs.assert_awaited_once_with({route.revision_id})
54+
executor.check_route_health.assert_awaited_once_with(routes)
6855
assert result is check_result
6956

70-
async def test_execute_skips_routes_without_health_check(self) -> None:
71-
"""Routes whose revision opted out of probing never reach the executor."""
72-
executor = AsyncMock()
73-
executor.check_route_health = AsyncMock(
74-
return_value=RouteExecutionResult(successes=[], errors=[], stale=[])
75-
)
76-
repo = AsyncMock()
77-
route = _route(RouteHealthStatus.NOT_CHECKED)
78-
repo.fetch_health_check_configs = AsyncMock(return_value={route.revision_id: None})
79-
handler = HealthCheckRouteHandler(executor, MagicMock(), repo)
80-
81-
result = await handler.execute([route])
82-
83-
executor.check_route_health.assert_not_awaited()
84-
assert result.successes == []
85-
86-
async def test_execute_empty_routes_is_noop(self) -> None:
87-
executor = AsyncMock()
88-
repo = AsyncMock()
89-
handler = HealthCheckRouteHandler(executor, MagicMock(), repo)
90-
91-
result = await handler.execute([])
92-
93-
repo.fetch_health_check_configs.assert_not_awaited()
94-
executor.check_route_health.assert_not_awaited()
95-
assert result.successes == []
96-
9757
async def test_post_process_logs_only(self) -> None:
9858
"""post_process is a logging shim — no executor call here."""
9959
executor = AsyncMock()
100-
repo = AsyncMock()
10160
event_producer = MagicMock()
102-
handler = HealthCheckRouteHandler(executor, event_producer, repo)
61+
handler = HealthCheckRouteHandler(executor, event_producer)
10362
success_route = _route(RouteHealthStatus.NOT_CHECKED)
10463

10564
await handler.post_process(

0 commit comments

Comments
 (0)