diff --git a/changes/11522.enhance.md b/changes/11522.enhance.md new file mode 100644 index 00000000000..21dc0d7e279 --- /dev/null +++ b/changes/11522.enhance.md @@ -0,0 +1 @@ +Project ``EndpointRow`` directly to ``ModelDeploymentData`` for the API path so deployment responses no longer pass through a fragile ``DeploymentInfo`` conversion, and split no-scope deployment reads into a dedicated admin repository/service/processor. diff --git a/src/ai/backend/common/metrics/metric.py b/src/ai/backend/common/metrics/metric.py index 68062786edf..1779e6cd586 100644 --- a/src/ai/backend/common/metrics/metric.py +++ b/src/ai/backend/common/metrics/metric.py @@ -420,6 +420,7 @@ class LayerType(enum.StrEnum): AUDIT_LOG_REPOSITORY = "audit_log_repository" CONTAINER_REGISTRY_REPOSITORY = "container_registry_repository" DEPLOYMENT_REPOSITORY = "deployment_repository" + DEPLOYMENT_ADMIN_REPOSITORY = "deployment_admin_repository" DOMAIN_REPOSITORY = "domain_repository" DOTFILE_REPOSITORY = "dotfile_repository" ETCD_CONFIG_REPOSITORY = "etcd_config_repository" diff --git a/src/ai/backend/manager/api/adapters/deployment/adapter.py b/src/ai/backend/manager/api/adapters/deployment/adapter.py index 9508752efc6..443534b1dbd 100644 --- a/src/ai/backend/manager/api/adapters/deployment/adapter.py +++ b/src/ai/backend/manager/api/adapters/deployment/adapter.py @@ -9,8 +9,6 @@ from typing import TYPE_CHECKING from uuid import UUID -import sqlalchemy as sa - if TYPE_CHECKING: from ai.backend.manager.services.processors import Processors from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator @@ -227,6 +225,10 @@ combine_conditions_or, negate_conditions, ) +from ai.backend.manager.repositories.deployment.types import ( + ProjectDeploymentSearchScope, + UserDeploymentSearchScope, +) from ai.backend.manager.repositories.deployment.updaters import ( DeploymentMetadataUpdaterSpec, DeploymentNetworkSpecUpdaterSpec, @@ -248,6 +250,9 @@ from ai.backend.manager.services.deployment.actions.access_token.search_access_tokens import ( SearchAccessTokensAction, ) +from ai.backend.manager.services.deployment.actions.admin_search_model_deployments import ( + AdminSearchModelDeploymentsAction, +) from ai.backend.manager.services.deployment.actions.auto_scaling_rule.bulk_delete_auto_scaling_rules import ( BulkDeleteAutoScalingRulesAction, ) @@ -310,10 +315,16 @@ from ai.backend.manager.services.deployment.actions.route.update_route_traffic_status import ( UpdateRouteTrafficStatusAction, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, +) +from ai.backend.manager.services.deployment.actions.search_project_model_deployments import ( + SearchProjectModelDeploymentsAction, ) from ai.backend.manager.services.deployment.actions.search_replicas import SearchReplicasAction +from ai.backend.manager.services.deployment.actions.search_user_model_deployments import ( + SearchUserModelDeploymentsAction, +) from ai.backend.manager.services.deployment.actions.sync_replicas import SyncReplicaAction from ai.backend.manager.services.deployment.actions.update_deployment import UpdateDeploymentAction from ai.backend.manager.types import OptionalState, TriState @@ -578,8 +589,8 @@ async def admin_search( ) -> AdminSearchDeploymentsPayload: """Search deployments (admin, no scope).""" querier = self._build_deployment_querier(input) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = await self._processors.deployment_admin.admin_search_model_deployments.wait_for_complete( + AdminSearchModelDeploymentsAction(querier=querier) ) return AdminSearchDeploymentsPayload( items=[self._deployment_data_to_dto(item) for item in action_result.data], @@ -596,16 +607,13 @@ async def my_search( user = current_user() if user is None: raise RuntimeError("No authenticated user in context") + scope = UserDeploymentSearchScope(user_id=user.user_id) conditions: list[QueryCondition] = [] if input.filter: conditions.extend(self._convert_deployment_filter(input.filter)) orders: list[QueryOrder] = ( self._convert_deployment_orders(input.order) if input.order else [] ) - - def _by_created_user() -> sa.sql.expression.ColumnElement[bool]: - return EndpointRow.created_user == user.user_id - querier = self._build_querier( conditions=conditions, orders=orders, @@ -616,10 +624,11 @@ def _by_created_user() -> sa.sql.expression.ColumnElement[bool]: before=input.before, limit=input.limit, offset=input.offset, - base_conditions=[_by_created_user], ) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = ( + await self._processors.deployment.search_user_model_deployments.wait_for_complete( + SearchUserModelDeploymentsAction(scope=scope, querier=querier) + ) ) return AdminSearchDeploymentsPayload( items=[self._deployment_data_to_dto(item) for item in action_result.data], @@ -634,16 +643,13 @@ async def project_search( input: AdminSearchDeploymentsInput, ) -> AdminSearchDeploymentsPayload: """Search deployments within a specific project.""" + scope = ProjectDeploymentSearchScope(project_id=project_id) conditions: list[QueryCondition] = [] if input.filter: conditions.extend(self._convert_deployment_filter(input.filter)) orders: list[QueryOrder] = ( self._convert_deployment_orders(input.order) if input.order else [] ) - - def _by_project_id() -> sa.sql.expression.ColumnElement[bool]: - return EndpointRow.project == project_id - querier = self._build_querier( conditions=conditions, orders=orders, @@ -654,10 +660,11 @@ def _by_project_id() -> sa.sql.expression.ColumnElement[bool]: before=input.before, limit=input.limit, offset=input.offset, - base_conditions=[_by_project_id], ) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = ( + await self._processors.deployment.search_project_model_deployments.wait_for_complete( + SearchProjectModelDeploymentsAction(scope=scope, querier=querier) + ) ) return AdminSearchDeploymentsPayload( items=[self._deployment_data_to_dto(item) for item in action_result.data], @@ -1284,7 +1291,15 @@ async def batch_load_by_ids( ) -> list[DeploymentNode | None]: """Batch load deployments by ID for DataLoader use. - Returns DeploymentNode DTOs in the same order as the input deployment_ids list. + Routed through the regular ``search_legacy_deployments`` processor + on purpose: the admin processor is reserved for admin-authorised + callers, but a DataLoader fires under whatever user triggered the + parent GraphQL query (admin or not). The ``by_ids`` filter is the + bound on the result set; the parent resolver has already authorised + access to whatever entity references these IDs. + + Output is aligned with the input ``deployment_ids`` order; missing + IDs come back as ``None``. """ if not deployment_ids: return [] @@ -1292,8 +1307,10 @@ async def batch_load_by_ids( pagination=OffsetPagination(limit=len(deployment_ids)), conditions=[DeploymentConditions.by_ids(deployment_ids)], ) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = ( + await self._processors.deployment.search_legacy_deployments.wait_for_complete( + SearchLegacyDeploymentsAction(querier=querier) + ) ) deployment_map = { data.id: self._deployment_data_to_dto(data) for data in action_result.data diff --git a/src/ai/backend/manager/api/rest/deployment/handler.py b/src/ai/backend/manager/api/rest/deployment/handler.py index ec4e998008f..060bec8aca0 100644 --- a/src/ai/backend/manager/api/rest/deployment/handler.py +++ b/src/ai/backend/manager/api/rest/deployment/handler.py @@ -88,8 +88,8 @@ SearchRoutesAction, UpdateRouteTrafficStatusAction, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, ) from ai.backend.manager.services.deployment.actions.update_deployment import ( UpdateDeploymentAction, @@ -200,8 +200,8 @@ async def search_deployments( querier = self._deployment_adapter.build_querier(body.parsed) # Call service action - action_result = await self._deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = await self._deployment.search_legacy_deployments.wait_for_complete( + SearchLegacyDeploymentsAction(querier=querier) ) # Build response diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index ac072072032..db3398e2288 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -1150,6 +1150,20 @@ class DeploymentInfoSearchResult: has_previous_page: bool +@dataclass +class ModelDeploymentDataSearchResult: + """Search result with pagination for the API-shaped ``ModelDeploymentData``. + + Returned by repository methods that project ``EndpointRow`` straight to + ``ModelDeploymentData`` without going through ``DeploymentInfo``. + """ + + items: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @dataclass class AutoScalingRuleSearchResult: """Search result with pagination for auto-scaling rules.""" diff --git a/src/ai/backend/manager/models/endpoint/row.py b/src/ai/backend/manager/models/endpoint/row.py index 53422f308bc..a6cb7b70908 100644 --- a/src/ai/backend/manager/models/endpoint/row.py +++ b/src/ai/backend/manager/models/endpoint/row.py @@ -108,6 +108,7 @@ from ai.backend.manager.models.routing import RoutingRow from ai.backend.manager.models.user import UserRow + __all__ = ( "EndpointAutoScalingRuleRow", "EndpointLifecycle", diff --git a/src/ai/backend/manager/repositories/deployment/__init__.py b/src/ai/backend/manager/repositories/deployment/__init__.py index aba78cccffc..bc7240657d5 100644 --- a/src/ai/backend/manager/repositories/deployment/__init__.py +++ b/src/ai/backend/manager/repositories/deployment/__init__.py @@ -3,9 +3,11 @@ from ai.backend.manager.models.endpoint.conditions import DeploymentConditions from ai.backend.manager.models.routing.conditions import RouteConditions +from .admin_repository import DeploymentAdminRepository from .repository import DeploymentRepository __all__ = [ + "DeploymentAdminRepository", "DeploymentRepository", "DeploymentConditions", "RouteConditions", diff --git a/src/ai/backend/manager/repositories/deployment/admin_repository.py b/src/ai/backend/manager/repositories/deployment/admin_repository.py new file mode 100644 index 00000000000..5f3ee9590a2 --- /dev/null +++ b/src/ai/backend/manager/repositories/deployment/admin_repository.py @@ -0,0 +1,75 @@ +"""Admin-only repository for deployment queries that span every scope. + +Holds reads that are not bounded by a project / user / domain scope. The +regular ``DeploymentRepository`` keeps the scoped variants; admin-style +"see everything" queries live here so the layering is explicit and the +GraphQL/REST ``admin_*`` paths have a clearly-named home. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from ai.backend.common.exception import BackendAIError +from ai.backend.common.metrics.metric import DomainType, LayerType +from ai.backend.common.resilience.policies.metrics import MetricArgs, MetricPolicy +from ai.backend.common.resilience.policies.retry import BackoffStrategy, RetryArgs, RetryPolicy +from ai.backend.common.resilience.resilience import Resilience +from ai.backend.manager.data.deployment.types import ModelDeploymentDataSearchResult +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.repositories.deployment.db_source.db_source import DeploymentDBSource + +if TYPE_CHECKING: + from ai.backend.manager.models.utils import ExtendedAsyncSAEngine + +__all__ = ("DeploymentAdminRepository",) + + +deployment_admin_repository_resilience = Resilience( + policies=[ + MetricPolicy( + MetricArgs( + domain=DomainType.REPOSITORY, + layer=LayerType.DEPLOYMENT_ADMIN_REPOSITORY, + ) + ), + RetryPolicy( + RetryArgs( + max_retries=3, + retry_delay=0.1, + backoff_strategy=BackoffStrategy.FIXED, + non_retryable_exceptions=(BackendAIError,), + ) + ), + ] +) + + +class DeploymentAdminRepository: + """Admin (no-scope) reads against the ``endpoints`` table. + + Shares the underlying ``DeploymentDBSource`` with the regular + repository — the SQL-level query stays in one place; the split exists + so the calling layer makes its admin intent explicit. + """ + + _db_source: DeploymentDBSource + + def __init__( + self, + db: ExtendedAsyncSAEngine, + ) -> None: + self._db_source = DeploymentDBSource(db) + + @deployment_admin_repository_resilience.apply() + async def search_model_deployments( + self, + querier: BatchQuerier, + ) -> ModelDeploymentDataSearchResult: + """Search every deployment without a scope filter. + + Callers may still pass arbitrary ``conditions`` through the + querier — the absence of a scope filter on the repository method + itself is what makes this admin-only. + """ + return await self._db_source.search_model_deployments(querier) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index f1b608bfe0b..2ded3aa72f6 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -20,6 +20,10 @@ from ai.backend.common.config import ModelHealthCheck from ai.backend.common.data.endpoint.types import EndpointLifecycle +from ai.backend.common.data.model_deployment.types import ( + DeploymentStrategy, + ModelDeploymentStatus, +) from ai.backend.common.data.permission.types import RBACElementType from ai.backend.common.dto.manager.v2.runtime_variant_preset.types import ( PresetTarget, @@ -59,6 +63,7 @@ DeploymentInfoWithAutoScalingRules, DeploymentLastHistory, DeploymentLifecycleSubStep, + DeploymentNetworkSpec, DeploymentOptions, DeploymentPolicyData, DeploymentPolicySearchResult, @@ -69,8 +74,12 @@ LegacyRevisionCreateReadBundle, ModelDeploymentAccessTokenData, ModelDeploymentAutoScalingRuleData, + ModelDeploymentData, + ModelDeploymentDataSearchResult, + ModelDeploymentMetadataInfo, ModelRevisionData, ModelRevisionSpec, + ReplicaStateData, RevisionSearchResult, RouteHealthStatus, RouteInfo, @@ -138,7 +147,6 @@ RouteHistoryRow, ) from ai.backend.manager.models.session import SessionRow -from ai.backend.manager.models.storage import StorageSessionManager from ai.backend.manager.models.user import UserRow from ai.backend.manager.models.utils import ExtendedAsyncSAEngine from ai.backend.manager.models.vfolder import VFolderRow @@ -178,6 +186,7 @@ ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo, + UserDeploymentSearchScope, ) from ai.backend.manager.repositories.scheduler.types.session_creation import ( ContainerUserContext, @@ -203,6 +212,96 @@ class EndpointWithRoutesRawData: log = BraceStyleAdapter(logging.getLogger(__name__)) +def _lifecycle_to_status(lifecycle: EndpointLifecycle) -> ModelDeploymentStatus: + """Map the persisted ``EndpointLifecycle`` to the v2 ``ModelDeploymentStatus``. + + Private to this module — used by ``_endpoint_row_to_model_deployment_data``. + The legacy projection (``DeploymentInfo`` -> ``ModelDeploymentData``) + keeps its own copy in ``services/deployment/service.py``. + """ + match lifecycle: + case EndpointLifecycle.PENDING | EndpointLifecycle.CREATED: + return ModelDeploymentStatus.PENDING + case EndpointLifecycle.READY | EndpointLifecycle.SCALING: + return ModelDeploymentStatus.READY + case EndpointLifecycle.DEPLOYING: + return ModelDeploymentStatus.DEPLOYING + case EndpointLifecycle.DESTROYING: + return ModelDeploymentStatus.STOPPING + case EndpointLifecycle.DESTROYED: + return ModelDeploymentStatus.STOPPED + + +def _endpoint_row_to_model_deployment_data(row: EndpointRow) -> ModelDeploymentData: + """Project an ``EndpointRow`` straight to the API-shaped ``ModelDeploymentData``. + + Lives at the repository boundary because the conversion is purely an + adapter between the ORM row and the API data type — neither the model + layer (which must stay free of API-shaped projections) nor the service + layer (which must not touch ORM rows) is the right home for it. + + Eager-load requirements: ``EndpointRow.revisions`` (with + ``DeploymentRevisionRow.image_row``) and ``EndpointRow.deployment_policy``. + + ``current_revision_id`` and ``deploying_revision_id`` are read from the + row columns directly. The ``revision`` spec is resolved by explicit match + against ``current_revision``; if no matching row is loaded (dangling + reference, or caller forgot to eager-load ``revisions``), ``revision`` + is ``None`` while ``current_revision_id`` still surfaces the column + value — callers can act on the ID even when the joined spec is missing. + """ + revision: ModelRevisionData | None = None + if row.current_revision is not None: + match = next((r for r in row.revisions if r.id == row.current_revision), None) + if match is None: + log.error( + "Deployment {} has current_revision_id {} but no matching " + "DeploymentRevisionRow was found among loaded revisions; " + "the revision spec will be reported as null. This usually " + "means EndpointRow.revisions was not eagerly loaded by the " + "caller.", + row.id, + row.current_revision, + ) + else: + revision = match.to_data() + + desired_count = row.desired_replicas if row.desired_replicas is not None else row.replicas + policy_data = row.deployment_policy.to_data() if row.deployment_policy is not None else None + + return ModelDeploymentData( + id=row.id, + metadata=ModelDeploymentMetadataInfo( + name=row.name, + status=_lifecycle_to_status(row.lifecycle_stage), + tags=[row.tag] if row.tag else [], + project_id=row.project, + domain_name=row.domain, + created_at=row.created_at, + updated_at=row.created_at, + ), + network_access=DeploymentNetworkSpec( + open_to_public=row.open_to_public if row.open_to_public is not None else False, + url=row.url, + ), + revision_history_ids=[row.current_revision] if row.current_revision else [], + revision=revision, + current_revision_id=row.current_revision, + deploying_revision_id=row.deploying_revision, + scaling_rule_ids=[], + replica_state=ReplicaStateData( + desired_replica_count=desired_count, + replica_ids=[], + ), + default_deployment_strategy=DeploymentStrategy.ROLLING, + created_user_id=row.created_user, + options=row.options, + scaling_state=row.scaling_state, + policy=policy_data, + sub_step=row.sub_step, + ) + + def _project_preset_slots( preset_row: DeploymentRevisionPresetRow | None, slot_entries: list[tuple[str, Decimal]], @@ -218,15 +317,12 @@ class DeploymentDBSource: """Database source for deployment-related operations.""" _db: ExtendedAsyncSAEngine - _storage_manager: StorageSessionManager def __init__( self, db: ExtendedAsyncSAEngine, - storage_manager: StorageSessionManager, ) -> None: self._db = db - self._storage_manager = storage_manager @actxmgr async def _begin_readonly_read_committed(self) -> AsyncIterator[SAConnection]: @@ -406,6 +502,37 @@ async def get_endpoint( return row.to_deployment_info() + async def get_model_deployment_data( + self, + endpoint_id: uuid.UUID, + ) -> ModelDeploymentData: + """Fetch a deployment as the API-shaped ``ModelDeploymentData``. + + Bypasses the ``DeploymentInfo`` intermediate so the API path's + revision-id columns flow through unchanged from the DB row. + + Raises: + EndpointNotFound: If the endpoint does not exist. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = ( + sa.select(EndpointRow) + .where(EndpointRow.id == endpoint_id) + .options( + selectinload(EndpointRow.revisions).selectinload( + DeploymentRevisionRow.image_row + ), + selectinload(EndpointRow.deployment_policy), + ) + ) + result = await db_sess.execute(query) + row: EndpointRow | None = result.scalar_one_or_none() + + if not row: + raise EndpointNotFound(f"Endpoint {endpoint_id} not found") + + return _endpoint_row_to_model_deployment_data(row) + async def get_deployments_by_ids( self, deployment_ids: set[DeploymentID], @@ -1154,6 +1281,102 @@ async def search_endpoints( has_previous_page=result.has_previous_page, ) + async def search_model_deployments( + self, + querier: BatchQuerier, + ) -> ModelDeploymentDataSearchResult: + """Search endpoints and project each row directly to ``ModelDeploymentData``. + + Mirrors ``search_endpoints`` but skips the ``DeploymentInfo`` step + so the API path consumes the row's authoritative projection. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = sa.select(EndpointRow).options( + selectinload(EndpointRow.revisions).selectinload(DeploymentRevisionRow.image_row), + selectinload(EndpointRow.deployment_policy), + ) + + result = await execute_batch_querier( + db_sess, + query, + querier, + ) + + items = [_endpoint_row_to_model_deployment_data(row.EndpointRow) for row in result.rows] + + return ModelDeploymentDataSearchResult( + items=items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + + async def search_user_model_deployments( + self, + querier: BatchQuerier, + scope: UserDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search deployments owned by a specific user, returning ``ModelDeploymentData``. + + Backs the v2 adapter's ``my_search`` path. Scope filter + (``EndpointRow.created_user == user_id``) is injected via + ``execute_batch_querier``'s ``scope`` argument. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = sa.select(EndpointRow).options( + selectinload(EndpointRow.revisions).selectinload(DeploymentRevisionRow.image_row), + selectinload(EndpointRow.deployment_policy), + ) + + result = await execute_batch_querier( + db_sess, + query, + querier, + scope=scope, + ) + + items = [_endpoint_row_to_model_deployment_data(row.EndpointRow) for row in result.rows] + + return ModelDeploymentDataSearchResult( + items=items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + + async def search_project_model_deployments( + self, + querier: BatchQuerier, + scope: ProjectDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search deployments in a project, returning ``ModelDeploymentData``. + + Distinct from :meth:`search_deployments_in_project`, which returns + the lighter-weight ``DeploymentSummaryData`` for project admin + list pages. Backs the v2 adapter's ``project_search`` path. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = sa.select(EndpointRow).options( + selectinload(EndpointRow.revisions).selectinload(DeploymentRevisionRow.image_row), + selectinload(EndpointRow.deployment_policy), + ) + + result = await execute_batch_querier( + db_sess, + query, + querier, + scope=scope, + ) + + items = [_endpoint_row_to_model_deployment_data(row.EndpointRow) for row in result.rows] + + return ModelDeploymentDataSearchResult( + items=items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + async def search_deployments_in_project( self, querier: BatchQuerier, diff --git a/src/ai/backend/manager/repositories/deployment/repositories.py b/src/ai/backend/manager/repositories/deployment/repositories.py index 49ccf2b4a41..b4076730977 100644 --- a/src/ai/backend/manager/repositories/deployment/repositories.py +++ b/src/ai/backend/manager/repositories/deployment/repositories.py @@ -5,6 +5,7 @@ from ai.backend.manager.repositories.types import RepositoryArgs +from .admin_repository import DeploymentAdminRepository from .repository import DeploymentRepository @@ -13,6 +14,7 @@ class DeploymentRepositories: """Container for deployment-related repositories.""" repository: DeploymentRepository + admin_repository: DeploymentAdminRepository @classmethod def create(cls, args: RepositoryArgs) -> Self: @@ -24,4 +26,5 @@ def create(cls, args: RepositoryArgs) -> Self: args.valkey_live_client, args.valkey_schedule_client, ) - return cls(repository=repository) + admin_repository = DeploymentAdminRepository(args.db) + return cls(repository=repository, admin_repository=admin_repository) diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index ab0689207fd..81b371ad1d4 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -66,6 +66,8 @@ LegacyRevisionCreateReadBundle, ModelDeploymentAccessTokenData, ModelDeploymentAutoScalingRuleData, + ModelDeploymentData, + ModelDeploymentDataSearchResult, ModelRevisionData, ModelRevisionSpec, RevisionSearchResult, @@ -105,7 +107,12 @@ from .db_source import DeploymentDBSource from .storage_source import DeploymentStorageSource -from .types import ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo +from .types import ( + ProjectDeploymentSearchScope, + RouteData, + RouteServiceDiscoveryInfo, + UserDeploymentSearchScope, +) log = BraceStyleAdapter(logging.getLogger(__name__)) @@ -160,7 +167,7 @@ def __init__( valkey_live: ValkeyLiveClient, valkey_schedule: ValkeyScheduleClient, ) -> None: - self._db_source = DeploymentDBSource(db, storage_manager) + self._db_source = DeploymentDBSource(db) self._storage_source = DeploymentStorageSource(storage_manager) self._valkey_stat = valkey_stat self._valkey_live = valkey_live @@ -345,6 +352,25 @@ async def get_endpoint_info( """ return await self._db_source.get_endpoint(endpoint_id) + @deployment_repository_resilience.apply() + async def get_model_deployment_data( + self, + endpoint_id: uuid.UUID, + ) -> ModelDeploymentData: + """Fetch a deployment as the API-shaped ``ModelDeploymentData``. + + Bypasses ``DeploymentInfo`` so the API path's revision-id columns + flow through unchanged from the DB row. Prefer this over + ``get_endpoint_info`` + a service-side conversion when the caller + produces an API response — there is no ordering ambiguity to + resolve and dangling references surface as ``revision=None`` with + the column ID still populated. + + Raises: + EndpointNotFound: If the endpoint does not exist. + """ + return await self._db_source.get_model_deployment_data(endpoint_id) + @deployment_repository_resilience.apply() async def destroy_endpoint( self, @@ -1505,6 +1531,24 @@ async def search_endpoints( """ return await self._db_source.search_endpoints(querier) + @deployment_repository_resilience.apply() + async def search_user_model_deployments( + self, + querier: BatchQuerier, + scope: UserDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search a user's own deployments — backs the v2 ``my_search`` path.""" + return await self._db_source.search_user_model_deployments(querier, scope) + + @deployment_repository_resilience.apply() + async def search_project_model_deployments( + self, + querier: BatchQuerier, + scope: ProjectDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search a project's deployments returning full ``ModelDeploymentData``.""" + return await self._db_source.search_project_model_deployments(querier, scope) + @deployment_repository_resilience.apply() async def search_deployments_in_project( self, diff --git a/src/ai/backend/manager/repositories/deployment/types/__init__.py b/src/ai/backend/manager/repositories/deployment/types/__init__.py index 741b2a06b16..ccea70bcaf8 100644 --- a/src/ai/backend/manager/repositories/deployment/types/__init__.py +++ b/src/ai/backend/manager/repositories/deployment/types/__init__.py @@ -7,6 +7,7 @@ ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo, + UserDeploymentSearchScope, ) __all__ = [ @@ -16,4 +17,5 @@ "ProjectDeploymentSearchScope", "RouteData", "RouteServiceDiscoveryInfo", + "UserDeploymentSearchScope", ] diff --git a/src/ai/backend/manager/repositories/deployment/types/endpoint.py b/src/ai/backend/manager/repositories/deployment/types/endpoint.py index 5b29435c725..62491f15f50 100644 --- a/src/ai/backend/manager/repositories/deployment/types/endpoint.py +++ b/src/ai/backend/manager/repositories/deployment/types/endpoint.py @@ -116,3 +116,26 @@ def existence_checks(self) -> Sequence[ExistenceCheck[UUID]]: error=ProjectNotFound(str(self.project_id)), ), ] + + +@dataclass(frozen=True) +class UserDeploymentSearchScope(SearchScope): + """Required scope for searching deployments created by a given user. + + Backs the v2 adapter's ``my_search`` path; the adapter resolves the + current user and constructs the scope before invoking the action. + """ + + user_id: UUID + + def to_condition(self) -> QueryCondition: + user_id = self.user_id + + def inner() -> sa.sql.expression.ColumnElement[bool]: + return EndpointRow.created_user == user_id + + return inner + + @property + def existence_checks(self) -> Sequence[ExistenceCheck[UUID]]: + return [] diff --git a/src/ai/backend/manager/services/deployment/actions/search_deployments.py b/src/ai/backend/manager/services/deployment/actions/admin_search_model_deployments.py similarity index 67% rename from src/ai/backend/manager/services/deployment/actions/search_deployments.py rename to src/ai/backend/manager/services/deployment/actions/admin_search_model_deployments.py index 3160f1c939c..21b31879f83 100644 --- a/src/ai/backend/manager/services/deployment/actions/search_deployments.py +++ b/src/ai/backend/manager/services/deployment/actions/admin_search_model_deployments.py @@ -9,7 +9,14 @@ @dataclass -class SearchDeploymentsAction(DeploymentBaseAction): +class AdminSearchModelDeploymentsAction(DeploymentBaseAction): + """Search every deployment with no scope (superadmin path). + + Routed through ``DeploymentAdminProcessors`` so callers make the + admin intent explicit; scope-restricted variants live on the regular + ``DeploymentService`` and ``DeploymentRepository``. + """ + querier: BatchQuerier @override @@ -23,7 +30,7 @@ def operation_type(cls) -> ActionOperationType: @dataclass -class SearchDeploymentsActionResult(BaseActionResult): +class AdminSearchModelDeploymentsActionResult(BaseActionResult): data: list[ModelDeploymentData] total_count: int has_next_page: bool diff --git a/src/ai/backend/manager/services/deployment/actions/search_legacy_deployments.py b/src/ai/backend/manager/services/deployment/actions/search_legacy_deployments.py new file mode 100644 index 00000000000..4d23fcf9186 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/actions/search_legacy_deployments.py @@ -0,0 +1,44 @@ +from dataclasses import dataclass +from typing import override + +from ai.backend.manager.actions.action import BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType +from ai.backend.manager.data.deployment.types import ModelDeploymentData +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.services.deployment.actions.base import DeploymentBaseAction + + +@dataclass +class SearchLegacyDeploymentsAction(DeploymentBaseAction): + """Legacy no-scope deployment search. + + Backs the v1 REST handler's ``GET /v3/services`` and the v2 adapter's + scoped (``my_search`` / ``project_search``) paths that piggyback their + scope filter onto a no-scope querier. New admin call sites should use + :class:`AdminSearchDeploymentsAction` (routed through + ``DeploymentAdminProcessors``) instead — this action is preserved so the + legacy contract stays intact. + """ + + querier: BatchQuerier + + @override + def entity_id(self) -> str | None: + return None + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.SEARCH + + +@dataclass +class SearchLegacyDeploymentsActionResult(BaseActionResult): + data: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @override + def entity_id(self) -> str | None: + return None diff --git a/src/ai/backend/manager/services/deployment/actions/search_project_model_deployments.py b/src/ai/backend/manager/services/deployment/actions/search_project_model_deployments.py new file mode 100644 index 00000000000..01c54c22134 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/actions/search_project_model_deployments.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import override + +from ai.backend.common.data.permission.types import RBACElementType, ScopeType +from ai.backend.manager.actions.action import BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType +from ai.backend.manager.data.deployment.types import ModelDeploymentData +from ai.backend.manager.data.permission.types import RBACElementRef +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.repositories.deployment.types import ProjectDeploymentSearchScope +from ai.backend.manager.services.deployment.actions.base import DeploymentScopeAction + + +@dataclass +class SearchProjectModelDeploymentsAction(DeploymentScopeAction): + """Search deployments within a project, returning ``ModelDeploymentData``. + + Distinct from :class:`SearchDeploymentsInProjectAction`, which returns + the lighter-weight ``DeploymentSummaryData`` for project admin list + pages. Backs the v2 adapter's ``project_search`` path. + """ + + scope: ProjectDeploymentSearchScope + querier: BatchQuerier + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.SEARCH + + @override + def scope_type(self) -> ScopeType: + return ScopeType.PROJECT + + @override + def scope_id(self) -> str: + return str(self.scope.project_id) + + @override + def target_element(self) -> RBACElementRef: + return RBACElementRef(RBACElementType.PROJECT, str(self.scope.project_id)) + + +@dataclass +class SearchProjectModelDeploymentsActionResult(BaseActionResult): + data: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @override + def entity_id(self) -> str | None: + return None diff --git a/src/ai/backend/manager/services/deployment/actions/search_user_model_deployments.py b/src/ai/backend/manager/services/deployment/actions/search_user_model_deployments.py new file mode 100644 index 00000000000..d146e5da3b7 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/actions/search_user_model_deployments.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import override + +from ai.backend.common.data.permission.types import RBACElementType, ScopeType +from ai.backend.manager.actions.action import BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType +from ai.backend.manager.data.deployment.types import ModelDeploymentData +from ai.backend.manager.data.permission.types import RBACElementRef +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.repositories.deployment.types import UserDeploymentSearchScope +from ai.backend.manager.services.deployment.actions.base import DeploymentScopeAction + + +@dataclass +class SearchUserModelDeploymentsAction(DeploymentScopeAction): + """Search deployments created by a specific user. + + Internal name uses the ``User`` scope semantics; the v2 adapter exposes + this as the user-facing ``my_search`` operation, resolving the current + user before constructing the scope. + """ + + scope: UserDeploymentSearchScope + querier: BatchQuerier + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.SEARCH + + @override + def scope_type(self) -> ScopeType: + return ScopeType.USER + + @override + def scope_id(self) -> str: + return str(self.scope.user_id) + + @override + def target_element(self) -> RBACElementRef: + return RBACElementRef( + element_type=RBACElementType.USER, + element_id=str(self.scope.user_id), + ) + + +@dataclass +class SearchUserModelDeploymentsActionResult(BaseActionResult): + data: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @override + def entity_id(self) -> str | None: + return None diff --git a/src/ai/backend/manager/services/deployment/admin_service.py b/src/ai/backend/manager/services/deployment/admin_service.py new file mode 100644 index 00000000000..9bda1d8e855 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/admin_service.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from ai.backend.manager.repositories.deployment.admin_repository import ( + DeploymentAdminRepository, +) +from ai.backend.manager.services.deployment.actions.admin_search_model_deployments import ( + AdminSearchModelDeploymentsAction, + AdminSearchModelDeploymentsActionResult, +) + +__all__ = ("DeploymentAdminService",) + + +class DeploymentAdminService: + """Admin (no-scope) service operations for deployments. + + Holds the call sites that are not bounded by a project / user / domain + scope (admin search, DataLoader batch lookups). Scoped operations live + on :class:`DeploymentService`. + """ + + _admin_repository: DeploymentAdminRepository + + def __init__(self, admin_repository: DeploymentAdminRepository) -> None: + self._admin_repository = admin_repository + + async def admin_search_model_deployments( + self, action: AdminSearchModelDeploymentsAction + ) -> AdminSearchModelDeploymentsActionResult: + """Search every deployment without a scope filter.""" + result = await self._admin_repository.search_model_deployments(action.querier) + return AdminSearchModelDeploymentsActionResult( + data=result.items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) diff --git a/src/ai/backend/manager/services/deployment/processors.py b/src/ai/backend/manager/services/deployment/processors.py index 98f4ab80736..5f81c692e56 100644 --- a/src/ai/backend/manager/services/deployment/processors.py +++ b/src/ai/backend/manager/services/deployment/processors.py @@ -30,6 +30,10 @@ SearchAccessTokensAction, SearchAccessTokensActionResult, ) +from ai.backend.manager.services.deployment.actions.admin_search_model_deployments import ( + AdminSearchModelDeploymentsAction, + AdminSearchModelDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.auto_scaling_rule.bulk_delete_auto_scaling_rules import ( BulkDeleteAutoScalingRulesAction, BulkDeleteAutoScalingRulesActionResult, @@ -116,18 +120,26 @@ UpdateRouteTrafficStatusAction, UpdateRouteTrafficStatusActionResult, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, - SearchDeploymentsActionResult, -) from ai.backend.manager.services.deployment.actions.search_deployments_in_project import ( SearchDeploymentsInProjectAction, SearchDeploymentsInProjectActionResult, ) +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, + SearchLegacyDeploymentsActionResult, +) +from ai.backend.manager.services.deployment.actions.search_project_model_deployments import ( + SearchProjectModelDeploymentsAction, + SearchProjectModelDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.search_replicas import ( SearchReplicasAction, SearchReplicasActionResult, ) +from ai.backend.manager.services.deployment.actions.search_user_model_deployments import ( + SearchUserModelDeploymentsAction, + SearchUserModelDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.sync_replicas import ( SyncReplicaAction, SyncReplicaActionResult, @@ -138,6 +150,7 @@ ) if TYPE_CHECKING: + from ai.backend.manager.services.deployment.admin_service import DeploymentAdminService from ai.backend.manager.services.deployment.service import DeploymentService @@ -158,7 +171,15 @@ class DeploymentProcessors(AbstractProcessorPackage): destroy_deployment: SingleEntityActionProcessor[ DestroyDeploymentAction, DestroyDeploymentActionResult ] - search_deployments: ActionProcessor[SearchDeploymentsAction, SearchDeploymentsActionResult] + search_legacy_deployments: ActionProcessor[ + SearchLegacyDeploymentsAction, SearchLegacyDeploymentsActionResult + ] + search_user_model_deployments: ActionProcessor[ + SearchUserModelDeploymentsAction, SearchUserModelDeploymentsActionResult + ] + search_project_model_deployments: ActionProcessor[ + SearchProjectModelDeploymentsAction, SearchProjectModelDeploymentsActionResult + ] search_deployments_in_project: ActionProcessor[ SearchDeploymentsInProjectAction, SearchDeploymentsInProjectActionResult ] @@ -248,7 +269,19 @@ def __init__( self.destroy_deployment = SingleEntityActionProcessor( service.destroy_deployment, action_monitors, validators=rbac_single_entity_validators ) - self.search_deployments = ActionProcessor(service.search_deployments, action_monitors) + self.search_legacy_deployments = ActionProcessor( + service.search_legacy_deployments, action_monitors + ) + self.search_user_model_deployments = ActionProcessor( + service.search_user_model_deployments, + action_monitors, + validators=[cast(ActionValidator, validators.rbac.scope)], + ) + self.search_project_model_deployments = ActionProcessor( + service.search_project_model_deployments, + action_monitors, + validators=[cast(ActionValidator, validators.rbac.scope)], + ) self.search_deployments_in_project = ActionProcessor( service.search_deployments_in_project, action_monitors, @@ -324,7 +357,9 @@ def supported_actions(self) -> list[ActionSpec]: UpdateDeploymentAction.spec(), ReplaceDeploymentOptionsAction.spec(), DestroyDeploymentAction.spec(), - SearchDeploymentsAction.spec(), + SearchLegacyDeploymentsAction.spec(), + SearchUserModelDeploymentsAction.spec(), + SearchProjectModelDeploymentsAction.spec(), SearchDeploymentsInProjectAction.spec(), GetDeploymentByIdAction.spec(), GetDeploymentPolicyAction.spec(), @@ -358,3 +393,31 @@ def supported_actions(self) -> list[ActionSpec]: BulkDeleteAccessTokensAction.spec(), SearchAccessTokensAction.spec(), ] + + +class DeploymentAdminProcessors(AbstractProcessorPackage): + """Admin (no-scope) processors for deployments. + + Counterpart of :class:`DeploymentProcessors` that owns the no-scope + queries — admin search and the DataLoader batch path. Scoped reads + stay on :class:`DeploymentProcessors`. + """ + + admin_search_model_deployments: ActionProcessor[ + AdminSearchModelDeploymentsAction, AdminSearchModelDeploymentsActionResult + ] + + def __init__( + self, + service: DeploymentAdminService, + action_monitors: list[ActionMonitor], + ) -> None: + self.admin_search_model_deployments = ActionProcessor( + service.admin_search_model_deployments, action_monitors + ) + + @override + def supported_actions(self) -> list[ActionSpec]: + return [ + AdminSearchModelDeploymentsAction.spec(), + ] diff --git a/src/ai/backend/manager/services/deployment/service.py b/src/ai/backend/manager/services/deployment/service.py index 77ae25b2cdb..7bbb4710ab0 100644 --- a/src/ai/backend/manager/services/deployment/service.py +++ b/src/ai/backend/manager/services/deployment/service.py @@ -18,9 +18,7 @@ MintEndpointTokenRequest, ) from ai.backend.common.identifier.deployment import DeploymentID -from ai.backend.common.types import ( - ResourceSlot, -) +from ai.backend.common.types import ResourceSlot from ai.backend.logging.utils import BraceStyleAdapter from ai.backend.manager.clients.appproxy.client import AppProxyClientPool from ai.backend.manager.data.deployment.creator import ( @@ -178,18 +176,26 @@ UpdateRouteTrafficStatusAction, UpdateRouteTrafficStatusActionResult, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, - SearchDeploymentsActionResult, -) from ai.backend.manager.services.deployment.actions.search_deployments_in_project import ( SearchDeploymentsInProjectAction, SearchDeploymentsInProjectActionResult, ) +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, + SearchLegacyDeploymentsActionResult, +) +from ai.backend.manager.services.deployment.actions.search_project_model_deployments import ( + SearchProjectModelDeploymentsAction, + SearchProjectModelDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.search_replicas import ( SearchReplicasAction, SearchReplicasActionResult, ) +from ai.backend.manager.services.deployment.actions.search_user_model_deployments import ( + SearchUserModelDeploymentsAction, + SearchUserModelDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.sync_replicas import ( SyncReplicaAction, SyncReplicaActionResult, @@ -207,13 +213,10 @@ def _map_lifecycle_to_status(lifecycle: EndpointLifecycle) -> ModelDeploymentStatus: """Map EndpointLifecycle to ModelDeploymentStatus for the v2 status surface. - The lifecycle axis is monotonic (PENDING → DEPLOYING → READY → DESTROYING - → DESTROYED); v2 exposes replica reconciliation as the orthogonal - ``scaling_state`` field on the deployment node. ``SCALING`` is therefore - no longer surfaced through ``ModelDeploymentStatus`` — a legacy - ``lifecycle=SCALING`` row folds into ``READY`` so clients only have to - consult ``scaling_state`` to decide whether a replica reconcile is in - flight. Legacy ``CREATED`` (never-deployed) folds into ``PENDING``. + Kept private to this module — only ``_convert_deployment_info_to_data`` (the + legacy ``DeploymentInfo``-based projection used by ``search_legacy_deployments``) + relies on it. The v2 path projects ``EndpointRow`` directly via + ``_lifecycle_to_status`` defined in ``models/endpoint/row.py``. """ match lifecycle: case EndpointLifecycle.PENDING | EndpointLifecycle.CREATED: @@ -229,9 +232,15 @@ def _map_lifecycle_to_status(lifecycle: EndpointLifecycle) -> ModelDeploymentSta def _convert_deployment_info_to_data(info: DeploymentInfo) -> ModelDeploymentData: - """Convert DeploymentInfo to ModelDeploymentData. + """Project a ``DeploymentInfo`` to ``ModelDeploymentData`` for legacy callers. + + Used only by ``DeploymentService.search_legacy_deployments`` (v1 REST and the + v2 adapter's scoped paths that piggyback their scope filter onto a no-scope + querier). The v2 service paths now project from ``EndpointRow`` directly via + ``EndpointRow.to_model_deployment_data`` and do not pass through this helper. - Note: Some fields are set to defaults as DeploymentInfo doesn't have all the data. + Note: Some fields are set to placeholder defaults because ``DeploymentInfo`` + does not carry them. """ revision: ModelRevisionData | None = None rev: ModelRevisionSpec | None = None @@ -439,12 +448,10 @@ async def create_deployment( revision=action.creator.model_revision, auto_activate=action.auto_activate, ) - updated_deployment_info = await self._deployment_repository.get_endpoint_info( + deployment_data = await self._deployment_repository.get_model_deployment_data( deployment_info.id ) - return CreateDeploymentActionResult( - data=_convert_deployment_info_to_data(updated_deployment_info) - ) + return CreateDeploymentActionResult(data=deployment_data) async def create_legacy_deployment( self, action: CreateLegacyDeploymentAction @@ -484,8 +491,9 @@ async def update_deployment( log.info("Updating deployment with ID: {}", action.updater.pk_value) endpoint_id = cast(UUID, action.updater.pk_value) spec = cast(DeploymentUpdaterSpec, action.updater.spec) - deployment_info = await self._deployment_controller.update_deployment(endpoint_id, spec) - return UpdateDeploymentActionResult(data=_convert_deployment_info_to_data(deployment_info)) + await self._deployment_controller.update_deployment(endpoint_id, spec) + deployment_data = await self._deployment_repository.get_model_deployment_data(endpoint_id) + return UpdateDeploymentActionResult(data=deployment_data) async def replace_deployment_options( self, action: ReplaceDeploymentOptionsAction @@ -526,26 +534,57 @@ async def destroy_deployment( await self._deployment_controller.mark_lifecycle_needed(DeploymentLifecycleType.DESTROYING) return DestroyDeploymentActionResult(success=success) - async def search_deployments( - self, action: SearchDeploymentsAction - ) -> SearchDeploymentsActionResult: - """Search deployments with filtering and pagination. - - Args: - action: Action containing BatchQuerier for filtering and pagination - - Returns: - SearchDeploymentsActionResult: Result containing list of deployments and pagination info + async def search_legacy_deployments( + self, action: SearchLegacyDeploymentsAction + ) -> SearchLegacyDeploymentsActionResult: + """Legacy no-scope deployment search. + + Preserves the pre-refactor projection path — + ``DeploymentRepository.search_endpoints`` returns ``DeploymentInfo`` + items, which are then mapped to ``ModelDeploymentData`` via + ``_convert_deployment_info_to_data``. Backs the v1 REST handler's + ``GET /v3/services`` and the v2 adapter's scoped (``my_search`` / + ``project_search``) paths that piggyback their scope filter onto a + no-scope querier. New admin call sites should go through + :class:`DeploymentAdminService` instead. """ result = await self._deployment_repository.search_endpoints(action.querier) deployments = [_convert_deployment_info_to_data(info) for info in result.items] - return SearchDeploymentsActionResult( + return SearchLegacyDeploymentsActionResult( data=deployments, total_count=result.total_count, has_next_page=result.has_next_page, has_previous_page=result.has_previous_page, ) + async def search_user_model_deployments( + self, action: SearchUserModelDeploymentsAction + ) -> SearchUserModelDeploymentsActionResult: + """Search the current user's deployments — backs ``my_search``.""" + result = await self._deployment_repository.search_user_model_deployments( + action.querier, action.scope + ) + return SearchUserModelDeploymentsActionResult( + data=result.items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + + async def search_project_model_deployments( + self, action: SearchProjectModelDeploymentsAction + ) -> SearchProjectModelDeploymentsActionResult: + """Search deployments inside a project, returning full ``ModelDeploymentData``.""" + result = await self._deployment_repository.search_project_model_deployments( + action.querier, action.scope + ) + return SearchProjectModelDeploymentsActionResult( + data=result.items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + async def search_deployments_in_project( self, action: SearchDeploymentsInProjectAction ) -> SearchDeploymentsInProjectActionResult: @@ -574,8 +613,10 @@ async def get_deployment_by_id( Raises: EndpointNotFound: If the deployment does not exist """ - deployment_info = await self._deployment_repository.get_endpoint_info(action.deployment_id) - return GetDeploymentByIdActionResult(data=_convert_deployment_info_to_data(deployment_info)) + deployment_data = await self._deployment_repository.get_model_deployment_data( + action.deployment_id + ) + return GetDeploymentByIdActionResult(data=deployment_data) async def get_deployment_policy( self, action: GetDeploymentPolicyAction @@ -674,8 +715,11 @@ async def activate_revision( result = await self._deployment_controller.activate_revision( action.deployment_id, action.revision_id ) + deployment_data = await self._deployment_repository.get_model_deployment_data( + action.deployment_id + ) return ActivateRevisionActionResult( - deployment=_convert_deployment_info_to_data(result.deployment_info), + deployment=deployment_data, previous_revision_id=result.previous_revision_id, activated_revision_id=result.activated_revision_id, deployment_policy=result.deployment_policy, diff --git a/src/ai/backend/manager/services/factory.py b/src/ai/backend/manager/services/factory.py index d9ba5d7e15e..0828f4d289d 100644 --- a/src/ai/backend/manager/services/factory.py +++ b/src/ai/backend/manager/services/factory.py @@ -20,7 +20,11 @@ from ai.backend.manager.services.auth.service import AuthService from ai.backend.manager.services.container_registry.processors import ContainerRegistryProcessors from ai.backend.manager.services.container_registry.service import ContainerRegistryService -from ai.backend.manager.services.deployment.processors import DeploymentProcessors +from ai.backend.manager.services.deployment.admin_service import DeploymentAdminService +from ai.backend.manager.services.deployment.processors import ( + DeploymentAdminProcessors, + DeploymentProcessors, +) from ai.backend.manager.services.deployment.service import DeploymentService from ai.backend.manager.services.deployment_revision_preset.processors import ( DeploymentRevisionPresetProcessors, @@ -389,6 +393,7 @@ def create_services(args: ServiceArgs) -> Services: runtime_variant_preset_repository=repositories.runtime_variant_preset.repository, appproxy_client_pool=args.appproxy_client_pool, ), + deployment_admin=DeploymentAdminService(repositories.deployment.admin_repository), storage_namespace=StorageNamespaceService(repositories.storage_namespace.repository), audit_log=AuditLogService(repositories.audit_log.repository), scheduling_history=SchedulingHistoryService(repositories.scheduling_history.repository), @@ -508,6 +513,7 @@ def create_processors( services.artifact_revision, action_monitors, validators ), deployment=DeploymentProcessors(services.deployment, action_monitors, validators), + deployment_admin=DeploymentAdminProcessors(services.deployment_admin, action_monitors), storage_namespace=StorageNamespaceProcessors( services.storage_namespace, action_monitors, validators ), diff --git a/src/ai/backend/manager/services/processors.py b/src/ai/backend/manager/services/processors.py index 14a725c2081..3147e2960fe 100644 --- a/src/ai/backend/manager/services/processors.py +++ b/src/ai/backend/manager/services/processors.py @@ -78,7 +78,11 @@ from ai.backend.manager.services.container_registry.service import ( ContainerRegistryService, ) + from ai.backend.manager.services.deployment.admin_service import ( + DeploymentAdminService, + ) from ai.backend.manager.services.deployment.processors import ( + DeploymentAdminProcessors, DeploymentProcessors, ) from ai.backend.manager.services.deployment.service import ( @@ -404,6 +408,7 @@ class Services: artifact_revision: ArtifactRevisionService artifact_registry: ArtifactRegistryService deployment: DeploymentService + deployment_admin: DeploymentAdminService storage_namespace: StorageNamespaceService audit_log: AuditLogService scheduling_history: SchedulingHistoryService @@ -469,6 +474,7 @@ class Processors(AbstractProcessorPackage): artifact_registry: ArtifactRegistryProcessors artifact_revision: ArtifactRevisionProcessors deployment: DeploymentProcessors + deployment_admin: DeploymentAdminProcessors storage_namespace: StorageNamespaceProcessors audit_log: AuditLogProcessors scheduling_history: SchedulingHistoryProcessors @@ -527,6 +533,7 @@ def supported_actions(self) -> list[ActionSpec]: *self.artifact_revision.supported_actions(), *self.artifact.supported_actions(), *self.deployment.supported_actions(), + *self.deployment_admin.supported_actions(), *self.storage_namespace.supported_actions(), *self.audit_log.supported_actions(), *self.scheduling_history.supported_actions(), diff --git a/tests/unit/manager/repositories/deployment/test_endpoint_projection.py b/tests/unit/manager/repositories/deployment/test_endpoint_projection.py new file mode 100644 index 00000000000..3e1ea2b803c --- /dev/null +++ b/tests/unit/manager/repositories/deployment/test_endpoint_projection.py @@ -0,0 +1,154 @@ +"""Tests for the API-facing ``EndpointRow`` projection at the repository boundary.""" + +from __future__ import annotations + +import uuid +from datetime import UTC, datetime +from typing import Any +from unittest.mock import MagicMock + +from ai.backend.common.data.endpoint.types import EndpointLifecycle, ScalingState +from ai.backend.common.data.model_deployment.types import ModelDeploymentStatus +from ai.backend.common.identifier.deployment import DeploymentID +from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.types import ClusterMode, ResourceSlot +from ai.backend.manager.data.deployment.types import ( + ClusterConfigData, + DeploymentOptions, + ModelMountConfigData, + ModelRevisionData, + ModelRuntimeConfigData, + ResourceConfigData, +) +from ai.backend.manager.repositories.deployment.db_source.db_source import ( + _endpoint_row_to_model_deployment_data, +) + + +def _stub_endpoint( + *, + current_revision: DeploymentRevisionID | None, + deploying_revision: DeploymentRevisionID | None, + revisions: list[Any], +) -> Any: + """Build a non-DB stub that satisfies the projection helper's reads. + + The helper is a pure projection — instantiating a real ``EndpointRow`` + against the DB schema would force a session and cascade test setup that + has nothing to do with the projection logic, so a ``MagicMock`` carrying + the columns the helper touches is sufficient. + """ + stub = MagicMock() + stub.id = DeploymentID(uuid.uuid4()) + stub.name = "test-deployment" + stub.lifecycle_stage = EndpointLifecycle.DEPLOYING + stub.tag = None + stub.project = uuid.uuid4() + stub.domain = "default" + stub.created_at = datetime(2024, 1, 1, tzinfo=UTC) + stub.open_to_public = False + stub.url = None + stub.current_revision = current_revision + stub.deploying_revision = deploying_revision + stub.revisions = revisions + stub.replicas = 1 + stub.desired_replicas = None + stub.deployment_policy = None + stub.created_user = uuid.uuid4() + stub.options = DeploymentOptions() + stub.scaling_state = ScalingState.STABLE + stub.sub_step = None + return stub + + +def _stub_revision(rev_id: DeploymentRevisionID) -> Any: + """Build a ``DeploymentRevisionRow`` stub whose ``to_data()`` returns a + minimal, well-formed ``ModelRevisionData``. + """ + rev = MagicMock() + rev.id = rev_id + rev.to_data.return_value = ModelRevisionData( + id=rev_id, + cluster_config=ClusterConfigData(mode=ClusterMode.SINGLE_NODE, size=1), + resource_config=ResourceConfigData( + resource_group_name="default", + resource_slot=ResourceSlot({}), + ), + model_runtime_config=ModelRuntimeConfigData( + runtime_variant_id=uuid.uuid4(), # type: ignore[arg-type] + ), + model_mount_config=ModelMountConfigData( + vfolder_id=None, + mount_destination="/models", + definition_path="model-definition.yml", + ), + created_at=datetime(2024, 1, 1, tzinfo=UTC), + image_id=None, + ) + return rev + + +class TestEndpointRowToModelDeploymentData: + """Pin the API-facing projection to direct DB columns (BA-5979). + + The previous code path matched the current revision via + ``info.model_revisions[0]``, which non-deterministically swapped + ``current_revision_id`` with ``deploying_revision_id`` when Postgres + returned the relationship rows in a different physical order. The new + helper must source IDs from ``EndpointRow`` columns directly so the + revision-row order is irrelevant. + """ + + def test_current_revision_resolved_by_id_match_not_list_order(self) -> None: + current_id = DeploymentRevisionID(uuid.uuid4()) + deploying_id = DeploymentRevisionID(uuid.uuid4()) + current_rev = _stub_revision(current_id) + deploying_rev = _stub_revision(deploying_id) + + # Deploying revision listed first — the natural-failure case for the + # old ``[0]``-blind lookup. + endpoint = _stub_endpoint( + current_revision=current_id, + deploying_revision=deploying_id, + revisions=[deploying_rev, current_rev], + ) + + data = _endpoint_row_to_model_deployment_data(endpoint) + + assert data.current_revision_id == current_id + assert data.deploying_revision_id == deploying_id + assert data.current_revision_id != data.deploying_revision_id + assert data.revision is not None + assert data.revision.id == current_id + + def test_current_revision_id_survives_dangling_reference(self) -> None: + """If the revision row was deleted but the column still points to it, + surface the column ID and report the spec as ``None`` — do not collapse + ``current_revision_id`` to ``None`` together with the missing spec. + """ + dangling_id = DeploymentRevisionID(uuid.uuid4()) + + endpoint = _stub_endpoint( + current_revision=dangling_id, + deploying_revision=None, + revisions=[], + ) + + data = _endpoint_row_to_model_deployment_data(endpoint) + + assert data.current_revision_id == dangling_id + assert data.deploying_revision_id is None + assert data.revision is None + + def test_lifecycle_status_mapping(self) -> None: + """Status surface follows the lifecycle column directly.""" + endpoint = _stub_endpoint( + current_revision=None, + deploying_revision=None, + revisions=[], + ) + endpoint.lifecycle_stage = EndpointLifecycle.READY + + data = _endpoint_row_to_model_deployment_data(endpoint) + + assert data.metadata.status == ModelDeploymentStatus.READY diff --git a/tests/unit/manager/services/deployment/test_deployment_service.py b/tests/unit/manager/services/deployment/test_deployment_service.py index 6fc9d4c58a2..aacc3574167 100644 --- a/tests/unit/manager/services/deployment/test_deployment_service.py +++ b/tests/unit/manager/services/deployment/test_deployment_service.py @@ -7,7 +7,6 @@ from __future__ import annotations import uuid -from collections.abc import Callable from datetime import UTC, datetime from typing import cast from unittest.mock import AsyncMock, MagicMock @@ -22,7 +21,6 @@ ) from ai.backend.common.dto.manager.v2.deployment.types import IntOrPercent from ai.backend.common.identifier.deployment import DeploymentID -from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID from ai.backend.common.identifier.image import ImageID from ai.backend.common.identifier.runtime_variant import RuntimeVariantID from ai.backend.common.identifier.vfolder import VFolderUUID @@ -52,9 +50,7 @@ ExecutionSpec, ModelMountConfigData, ModelRevisionData, - ModelRevisionSpec, ModelRuntimeConfigData, - MountMetadata, ReplicaSpec, ResourceConfigData, ResourceSpec, @@ -82,7 +78,6 @@ from ai.backend.manager.services.deployment.processors import DeploymentProcessors from ai.backend.manager.services.deployment.service import ( DeploymentService, - _convert_deployment_info_to_data, ) from ai.backend.manager.sokovan.deployment import DeploymentController @@ -641,72 +636,3 @@ async def test_persists_coordinator_jwt_instead_of_random( creator = cast(RBACEntityCreator[object], repo_call.args[0]) spec = cast(EndpointTokenCreatorSpec, creator.spec) assert spec.token == sample_coordinator_jwt - - -class TestConvertDeploymentInfoToData: - """Regression test for ``_convert_deployment_info_to_data`` (BA-5963).""" - - @pytest.fixture - def make_revision_spec(self) -> Callable[[], ModelRevisionSpec]: - def make() -> ModelRevisionSpec: - return ModelRevisionSpec( - revision_id=DeploymentRevisionID(uuid.uuid4()), - image_id=ImageID(uuid.uuid4()), - resource_spec=ResourceSpec( - cluster_mode=ClusterMode.SINGLE_NODE, - cluster_size=1, - resource_slots={"cpu": "1"}, - ), - mounts=MountMetadata( - model_vfolder_id=VFolderUUID(uuid.uuid4()), - model_definition_path="model-definition.yaml", - model_mount_destination="/models", - extra_mounts=[], - ), - execution=ExecutionSpec( - runtime_variant_id=RuntimeVariantID(uuid.uuid4()), - ), - ) - - return make - - def test_current_revision_resolved_by_id_match_not_list_order( - self, - make_revision_spec: Callable[[], ModelRevisionSpec], - ) -> None: - """Pin: revision lookup must use explicit ``current_revision_id``, not list[0].""" - deploying_spec = make_revision_spec() - current_spec = make_revision_spec() - - deployment_info = DeploymentInfo( - id=DeploymentID(uuid.uuid4()), - metadata=DeploymentMetadata( - name="ba5963-test", - domain="default", - project=uuid.uuid4(), - resource_group="default", - created_user=uuid.uuid4(), - session_owner=uuid.uuid4(), - created_at=datetime(2024, 1, 1, tzinfo=UTC), - revision_history_limit=10, - ), - state=DeploymentState( - lifecycle=EndpointLifecycle.DEPLOYING, - scaling_state=ScalingState.STABLE, - retry_count=0, - ), - replica_spec=ReplicaSpec(replica_count=1), - network=DeploymentNetworkSpec(open_to_public=False), - model_revisions=[deploying_spec, current_spec], - options=DeploymentOptions(), - current_revision_id=current_spec.revision_id, - deploying_revision_id=deploying_spec.revision_id, - ) - - deployment_data = _convert_deployment_info_to_data(deployment_info) - - assert deployment_data.current_revision_id == current_spec.revision_id - assert deployment_data.deploying_revision_id == deploying_spec.revision_id - assert deployment_data.current_revision_id != deployment_data.deploying_revision_id - assert deployment_data.revision is not None - assert deployment_data.revision.id == current_spec.revision_id