Skip to content
Draft
1 change: 1 addition & 0 deletions changes/11522.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Project ``EndpointRow`` directly to ``ModelDeploymentData`` for the API path so deployment responses no longer pass through a fragile ``DeploymentInfo`` conversion, and split no-scope deployment reads into a dedicated admin repository/service/processor.
1 change: 1 addition & 0 deletions src/ai/backend/common/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ class LayerType(enum.StrEnum):
AUDIT_LOG_REPOSITORY = "audit_log_repository"
CONTAINER_REGISTRY_REPOSITORY = "container_registry_repository"
DEPLOYMENT_REPOSITORY = "deployment_repository"
DEPLOYMENT_ADMIN_REPOSITORY = "deployment_admin_repository"
DOMAIN_REPOSITORY = "domain_repository"
DOTFILE_REPOSITORY = "dotfile_repository"
ETCD_CONFIG_REPOSITORY = "etcd_config_repository"
Expand Down
63 changes: 40 additions & 23 deletions src/ai/backend/manager/api/adapters/deployment/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from typing import TYPE_CHECKING
from uuid import UUID

import sqlalchemy as sa

if TYPE_CHECKING:
from ai.backend.manager.services.processors import Processors
from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator
Expand Down Expand Up @@ -227,6 +225,10 @@
combine_conditions_or,
negate_conditions,
)
from ai.backend.manager.repositories.deployment.types import (
ProjectDeploymentSearchScope,
UserDeploymentSearchScope,
)
from ai.backend.manager.repositories.deployment.updaters import (
DeploymentMetadataUpdaterSpec,
DeploymentNetworkSpecUpdaterSpec,
Expand All @@ -248,6 +250,9 @@
from ai.backend.manager.services.deployment.actions.access_token.search_access_tokens import (
SearchAccessTokensAction,
)
from ai.backend.manager.services.deployment.actions.admin_search_model_deployments import (
AdminSearchModelDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.auto_scaling_rule.bulk_delete_auto_scaling_rules import (
BulkDeleteAutoScalingRulesAction,
)
Expand Down Expand Up @@ -310,10 +315,16 @@
from ai.backend.manager.services.deployment.actions.route.update_route_traffic_status import (
UpdateRouteTrafficStatusAction,
)
from ai.backend.manager.services.deployment.actions.search_deployments import (
SearchDeploymentsAction,
from ai.backend.manager.services.deployment.actions.search_legacy_deployments import (
SearchLegacyDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.search_project_model_deployments import (
SearchProjectModelDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.search_replicas import SearchReplicasAction
from ai.backend.manager.services.deployment.actions.search_user_model_deployments import (
SearchUserModelDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.sync_replicas import SyncReplicaAction
from ai.backend.manager.services.deployment.actions.update_deployment import UpdateDeploymentAction
from ai.backend.manager.types import OptionalState, TriState
Expand Down Expand Up @@ -578,8 +589,8 @@ async def admin_search(
) -> AdminSearchDeploymentsPayload:
"""Search deployments (admin, no scope)."""
querier = self._build_deployment_querier(input)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = await self._processors.deployment_admin.admin_search_model_deployments.wait_for_complete(
AdminSearchModelDeploymentsAction(querier=querier)
)
return AdminSearchDeploymentsPayload(
items=[self._deployment_data_to_dto(item) for item in action_result.data],
Expand All @@ -596,16 +607,13 @@ async def my_search(
user = current_user()
if user is None:
raise RuntimeError("No authenticated user in context")
scope = UserDeploymentSearchScope(user_id=user.user_id)
conditions: list[QueryCondition] = []
if input.filter:
conditions.extend(self._convert_deployment_filter(input.filter))
orders: list[QueryOrder] = (
self._convert_deployment_orders(input.order) if input.order else []
)

def _by_created_user() -> sa.sql.expression.ColumnElement[bool]:
return EndpointRow.created_user == user.user_id

querier = self._build_querier(
conditions=conditions,
orders=orders,
Expand All @@ -616,10 +624,11 @@ def _by_created_user() -> sa.sql.expression.ColumnElement[bool]:
before=input.before,
limit=input.limit,
offset=input.offset,
base_conditions=[_by_created_user],
)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = (
await self._processors.deployment.search_user_model_deployments.wait_for_complete(
SearchUserModelDeploymentsAction(scope=scope, querier=querier)
)
)
return AdminSearchDeploymentsPayload(
items=[self._deployment_data_to_dto(item) for item in action_result.data],
Expand All @@ -634,16 +643,13 @@ async def project_search(
input: AdminSearchDeploymentsInput,
) -> AdminSearchDeploymentsPayload:
"""Search deployments within a specific project."""
scope = ProjectDeploymentSearchScope(project_id=project_id)
conditions: list[QueryCondition] = []
if input.filter:
conditions.extend(self._convert_deployment_filter(input.filter))
orders: list[QueryOrder] = (
self._convert_deployment_orders(input.order) if input.order else []
)

def _by_project_id() -> sa.sql.expression.ColumnElement[bool]:
return EndpointRow.project == project_id

querier = self._build_querier(
conditions=conditions,
orders=orders,
Expand All @@ -654,10 +660,11 @@ def _by_project_id() -> sa.sql.expression.ColumnElement[bool]:
before=input.before,
limit=input.limit,
offset=input.offset,
base_conditions=[_by_project_id],
)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = (
await self._processors.deployment.search_project_model_deployments.wait_for_complete(
SearchProjectModelDeploymentsAction(scope=scope, querier=querier)
)
)
return AdminSearchDeploymentsPayload(
items=[self._deployment_data_to_dto(item) for item in action_result.data],
Expand Down Expand Up @@ -1284,16 +1291,26 @@ async def batch_load_by_ids(
) -> list[DeploymentNode | None]:
"""Batch load deployments by ID for DataLoader use.

Returns DeploymentNode DTOs in the same order as the input deployment_ids list.
Routed through the regular ``search_legacy_deployments`` processor
on purpose: the admin processor is reserved for admin-authorised
callers, but a DataLoader fires under whatever user triggered the
parent GraphQL query (admin or not). The ``by_ids`` filter is the
bound on the result set; the parent resolver has already authorised
access to whatever entity references these IDs.

Output is aligned with the input ``deployment_ids`` order; missing
IDs come back as ``None``.
"""
if not deployment_ids:
return []
querier = BatchQuerier(
pagination=OffsetPagination(limit=len(deployment_ids)),
conditions=[DeploymentConditions.by_ids(deployment_ids)],
)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = (
await self._processors.deployment.search_legacy_deployments.wait_for_complete(
SearchLegacyDeploymentsAction(querier=querier)
)
)
deployment_map = {
data.id: self._deployment_data_to_dto(data) for data in action_result.data
Expand Down
8 changes: 4 additions & 4 deletions src/ai/backend/manager/api/rest/deployment/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@
SearchRoutesAction,
UpdateRouteTrafficStatusAction,
)
from ai.backend.manager.services.deployment.actions.search_deployments import (
SearchDeploymentsAction,
from ai.backend.manager.services.deployment.actions.search_legacy_deployments import (
SearchLegacyDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.update_deployment import (
UpdateDeploymentAction,
Expand Down Expand Up @@ -200,8 +200,8 @@ async def search_deployments(
querier = self._deployment_adapter.build_querier(body.parsed)

# Call service action
action_result = await self._deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = await self._deployment.search_legacy_deployments.wait_for_complete(
SearchLegacyDeploymentsAction(querier=querier)
)

# Build response
Expand Down
14 changes: 14 additions & 0 deletions src/ai/backend/manager/data/deployment/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,20 @@ class DeploymentInfoSearchResult:
has_previous_page: bool


@dataclass
class ModelDeploymentDataSearchResult:
"""Search result with pagination for the API-shaped ``ModelDeploymentData``.

Returned by repository methods that project ``EndpointRow`` straight to
``ModelDeploymentData`` without going through ``DeploymentInfo``.
"""

items: list[ModelDeploymentData]
total_count: int
has_next_page: bool
has_previous_page: bool


@dataclass
class AutoScalingRuleSearchResult:
"""Search result with pagination for auto-scaling rules."""
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/manager/models/endpoint/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
from ai.backend.manager.models.routing import RoutingRow
from ai.backend.manager.models.user import UserRow


__all__ = (
"EndpointAutoScalingRuleRow",
"EndpointLifecycle",
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/manager/repositories/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from ai.backend.manager.models.endpoint.conditions import DeploymentConditions
from ai.backend.manager.models.routing.conditions import RouteConditions

from .admin_repository import DeploymentAdminRepository
from .repository import DeploymentRepository

__all__ = [
"DeploymentAdminRepository",
"DeploymentRepository",
"DeploymentConditions",
"RouteConditions",
Expand Down
75 changes: 75 additions & 0 deletions src/ai/backend/manager/repositories/deployment/admin_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Admin-only repository for deployment queries that span every scope.

Holds reads that are not bounded by a project / user / domain scope. The
regular ``DeploymentRepository`` keeps the scoped variants; admin-style
"see everything" queries live here so the layering is explicit and the
GraphQL/REST ``admin_*`` paths have a clearly-named home.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from ai.backend.common.exception import BackendAIError
from ai.backend.common.metrics.metric import DomainType, LayerType
from ai.backend.common.resilience.policies.metrics import MetricArgs, MetricPolicy
from ai.backend.common.resilience.policies.retry import BackoffStrategy, RetryArgs, RetryPolicy
from ai.backend.common.resilience.resilience import Resilience
from ai.backend.manager.data.deployment.types import ModelDeploymentDataSearchResult
from ai.backend.manager.repositories.base import BatchQuerier
from ai.backend.manager.repositories.deployment.db_source.db_source import DeploymentDBSource

if TYPE_CHECKING:
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine

__all__ = ("DeploymentAdminRepository",)


deployment_admin_repository_resilience = Resilience(
policies=[
MetricPolicy(
MetricArgs(
domain=DomainType.REPOSITORY,
layer=LayerType.DEPLOYMENT_ADMIN_REPOSITORY,
)
),
RetryPolicy(
RetryArgs(
max_retries=3,
retry_delay=0.1,
backoff_strategy=BackoffStrategy.FIXED,
non_retryable_exceptions=(BackendAIError,),
)
),
]
)


class DeploymentAdminRepository:
"""Admin (no-scope) reads against the ``endpoints`` table.

Shares the underlying ``DeploymentDBSource`` with the regular
repository — the SQL-level query stays in one place; the split exists
so the calling layer makes its admin intent explicit.
"""

_db_source: DeploymentDBSource

def __init__(
self,
db: ExtendedAsyncSAEngine,
) -> None:
self._db_source = DeploymentDBSource(db)

@deployment_admin_repository_resilience.apply()
async def search_model_deployments(
self,
querier: BatchQuerier,
) -> ModelDeploymentDataSearchResult:
"""Search every deployment without a scope filter.

Callers may still pass arbitrary ``conditions`` through the
querier — the absence of a scope filter on the repository method
itself is what makes this admin-only.
"""
return await self._db_source.search_model_deployments(querier)
Loading
Loading