Skip to content

Commit a79a045

Browse files
authored
feat(BA-5985): scan routes via BatchQuerier with traffic-status filter (#11534)
1 parent 60d9baf commit a79a045

6 files changed

Lines changed: 56 additions & 35 deletions

File tree

changes/11534.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Route coordinator now scans lifecycle routes via `BatchQuerier`, and `RouteTargetStatuses` gains an explicit traffic-status filter axis so handlers can target only routes whose `traffic_status` is in a given list.

src/ai/backend/manager/data/deployment/types.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,15 @@ class DeploymentTargetStatuses:
251251

252252
@dataclass(frozen=True)
253253
class RouteTargetStatuses:
254-
"""Target statuses for route handler filtering (lifecycle x health)."""
254+
"""Target statuses for route handler filtering (lifecycle x health x traffic).
255+
256+
``traffic=None`` skips the traffic-status predicate. Pass a non-empty list
257+
to restrict to specific ``RouteTrafficStatus`` values.
258+
"""
255259

256260
lifecycle: list[RouteStatus]
257261
health: list[RouteHealthStatus]
262+
traffic: list[RouteTrafficStatus] | None = None
258263

259264

260265
@dataclass(frozen=True)

src/ai/backend/manager/repositories/deployment/db_source/db_source.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1584,23 +1584,24 @@ async def scale_routes(
15841584

15851585
# Route operations
15861586

1587-
async def get_routes_by_statuses(
1587+
async def search_route_datas(
15881588
self,
1589-
statuses: list[RouteStatus],
1590-
health_statuses: list[RouteHealthStatus],
1589+
*,
1590+
querier: BatchQuerier,
15911591
) -> list[RouteData]:
1592-
"""Get routes by lifecycle and health statuses."""
1593-
async with self._begin_readonly_session_read_committed() as db_sess:
1594-
query = sa.select(RoutingRow).where(
1595-
RoutingRow.status.in_(statuses),
1596-
RoutingRow.health_status.in_(health_statuses),
1597-
)
1598-
result = await db_sess.execute(query)
1599-
rows: Sequence[RoutingRow] = result.scalars().all()
1592+
"""Search routes via :class:`BatchQuerier`.
16001593
1601-
route_data_list: list[RouteData] = []
1602-
for row in rows:
1603-
route_data = RouteData(
1594+
The caller composes ``querier`` with every filter that applies
1595+
(lifecycle / health / traffic_status / endpoint id set, etc.).
1596+
Pagination is part of the querier — pass ``NoPagination`` for
1597+
unbounded scans.
1598+
"""
1599+
async with self._begin_readonly_session_read_committed() as db_sess:
1600+
query = sa.select(RoutingRow)
1601+
query_result = await execute_batch_querier(db_sess, query, querier)
1602+
route_rows: list[RoutingRow] = [row.RoutingRow for row in query_result.rows]
1603+
return [
1604+
RouteData(
16041605
route_id=row.id,
16051606
deployment_id=row.endpoint,
16061607
session_id=SessionId(row.session) if row.session else None,
@@ -1613,9 +1614,8 @@ async def get_routes_by_statuses(
16131614
replica_port=row.replica_port,
16141615
error_data=row.error_data or {},
16151616
)
1616-
route_data_list.append(route_data)
1617-
1618-
return route_data_list
1617+
for row in route_rows
1618+
]
16191619

16201620
async def update_route_status_bulk(
16211621
self,

src/ai/backend/manager/repositories/deployment/repository.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
ModelDeploymentAutoScalingRuleData,
6868
ModelRevisionData,
6969
RevisionSearchResult,
70-
RouteHealthStatus,
7170
RouteInfo,
7271
RouteSearchResult,
7372
RouteStatus,
@@ -629,13 +628,18 @@ async def scale_routes(
629628
# Route operations
630629

631630
@deployment_repository_resilience.apply()
632-
async def get_routes_by_statuses(
631+
async def search_route_datas(
633632
self,
634-
statuses: list[RouteStatus],
635-
health_statuses: list[RouteHealthStatus],
633+
*,
634+
querier: BatchQuerier,
636635
) -> list[RouteData]:
637-
"""Get routes by lifecycle and health statuses."""
638-
return await self._db_source.get_routes_by_statuses(statuses, health_statuses)
636+
"""Search routes via :class:`BatchQuerier`.
637+
638+
The caller composes ``querier`` with every filter that applies;
639+
pagination is part of the querier (use ``NoPagination`` for
640+
unbounded scans).
641+
"""
642+
return await self._db_source.search_route_datas(querier=querier)
639643

640644
@deployment_repository_resilience.apply()
641645
async def update_route_status_bulk(

src/ai/backend/manager/sokovan/deployment/route/coordinator.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from ai.backend.manager.data.session.types import SchedulingResult
2525
from ai.backend.manager.models.routing import RoutingRow
2626
from ai.backend.manager.models.routing.conditions import RouteConditions
27+
from ai.backend.manager.repositories.base import BatchQuerier, NoPagination, QueryCondition
2728
from ai.backend.manager.repositories.base.creator import BulkCreator
2829
from ai.backend.manager.repositories.base.updater import BatchUpdater
2930
from ai.backend.manager.repositories.deployment import DeploymentRepository
@@ -197,11 +198,16 @@ async def process_route_lifecycle(
197198
lock_lifetime = self._config_provider.config.manager.session_schedule_lock_lifetime
198199
await stack.enter_async_context(self._lock_factory(handler.lock_id, lock_lifetime))
199200

200-
# Get routes by target lifecycle + health statuses
201+
# Get routes by target lifecycle + health (+ optional traffic) statuses
201202
target = handler.target_statuses()
202-
routes = await self._deployment_repository.get_routes_by_statuses(
203-
target.lifecycle,
204-
target.health,
203+
conditions: list[QueryCondition] = [
204+
RouteConditions.by_lifecycle_statuses(target.lifecycle),
205+
RouteConditions.by_health_statuses(target.health),
206+
]
207+
if target.traffic is not None:
208+
conditions.append(RouteConditions.by_traffic_statuses(target.traffic))
209+
routes = await self._deployment_repository.search_route_datas(
210+
querier=BatchQuerier(pagination=NoPagination(), conditions=conditions),
205211
)
206212
if not routes:
207213
log.trace("No routes to process for handler: {}", handler.name())
@@ -230,9 +236,14 @@ async def _process_observer(self, observer: RouteObserver) -> None:
230236
changing route status in DB.
231237
"""
232238
try:
233-
routes = await self._deployment_repository.get_routes_by_statuses(
234-
[RouteStatus.RUNNING],
235-
list(RouteHealthStatus),
239+
routes = await self._deployment_repository.search_route_datas(
240+
querier=BatchQuerier(
241+
pagination=NoPagination(),
242+
conditions=[
243+
RouteConditions.by_lifecycle_statuses([RouteStatus.RUNNING]),
244+
RouteConditions.by_health_statuses(list(RouteHealthStatus)),
245+
],
246+
),
236247
)
237248
if not routes:
238249
return

tests/unit/manager/sokovan/deployment/route/test_coordinator_history.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def sample_route_execution_error(
7575
def mock_deployment_repository() -> AsyncMock:
7676
"""Mock DeploymentRepository with minimal implementation."""
7777
mock = AsyncMock(spec=DeploymentRepository)
78-
mock.get_routes_by_statuses = AsyncMock(return_value=[])
78+
mock.search_route_datas = AsyncMock(return_value=[])
7979
mock.update_route_status_bulk_with_history = AsyncMock(return_value=0)
8080
return mock
8181

@@ -285,7 +285,7 @@ def coordinator_with_provisioning_routes(
285285
sample_route_data: RouteData,
286286
) -> Generator[RouteCoordinator, None, None]:
287287
"""Coordinator with PROVISIONING routes available."""
288-
mock_deployment_repository.get_routes_by_statuses = AsyncMock(return_value=[sample_route_data])
288+
mock_deployment_repository.search_route_datas = AsyncMock(return_value=[sample_route_data])
289289

290290
coordinator = RouteCoordinator(
291291
valkey_schedule=mock_valkey_schedule,
@@ -313,7 +313,7 @@ def coordinator_without_routes(
313313
mock_service_discovery: MagicMock,
314314
) -> Generator[RouteCoordinator, None, None]:
315315
"""Coordinator with no routes available."""
316-
mock_deployment_repository.get_routes_by_statuses = AsyncMock(return_value=[])
316+
mock_deployment_repository.search_route_datas = AsyncMock(return_value=[])
317317

318318
coordinator = RouteCoordinator(
319319
valkey_schedule=mock_valkey_schedule,
@@ -382,7 +382,7 @@ async def test_records_history_on_stale(
382382
RouteLifecycleType.HEALTH_CHECK: mock_handler_with_stale
383383
}
384384
# Update repository mock to return routes for HEALTHY status
385-
mock_deployment_repository.get_routes_by_statuses = AsyncMock(
385+
mock_deployment_repository.search_route_datas = AsyncMock(
386386
return_value=[
387387
RouteData(
388388
route_id=uuid4(),

0 commit comments

Comments
 (0)