diff --git a/plugins/nemo-deployments/pyproject.toml b/plugins/nemo-deployments/pyproject.toml index e9d1f932bc..22698169db 100644 --- a/plugins/nemo-deployments/pyproject.toml +++ b/plugins/nemo-deployments/pyproject.toml @@ -6,11 +6,15 @@ readme = "README.md" requires-python = ">=3.11,<3.14" dependencies = [ "fastapi>=0.115", + "httpx>=0.27", "nemo-platform", "nemo-platform-plugin", "pydantic>=2.10.6", ] +[project.optional-dependencies] +docker = ["docker>=7.0"] + [project.entry-points."nemo.services"] deployments = "nemo_deployments_plugin.service:DeploymentsService" @@ -29,11 +33,11 @@ nemo-platform = { workspace = true } nemo-platform-plugin = { workspace = true } [dependency-groups] -dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "httpx>=0.27", "fastapi>=0.115"] +dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "httpx>=0.27", "fastapi>=0.115", "docker>=7.0"] [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" -pythonpath = ["src", "tests/unit"] +pythonpath = ["src", "tests/unit", "tests/integration"] [tool.nemo.openapi] diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py index 45e876a36b..277a13ed96 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py @@ -126,7 +126,7 @@ async def list_deployments( if statuses: filter_operation = ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=statuses, ) result = await entity_client.list( diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py new file mode 100644 index 0000000000..0ed1a59842 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py @@ -0,0 +1,515 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Docker substrate backend for the deployments plugin.""" + +from __future__ import annotations + +import asyncio +import logging +import os +from typing import Any + +from nemo_deployments_plugin.backends.base import ( + BackendStatusUpdate, + DeploymentBackend, + LogResult, + VolumeStatusUpdate, +) +from nemo_deployments_plugin.backends.docker import volumes as volume_ops +from nemo_deployments_plugin.backends.docker.config import DockerExecutorConfig +from nemo_deployments_plugin.backends.docker.containers import ( + DeploymentConfigError, + build_port_bindings, + build_volume_bindings, + device_requests_for_gpus, + env_dict, + gpu_count_from_container, + merged_volume_mounts, + parse_docker_backend_config, + restart_policy_kwargs, + validate_config_for_docker, +) +from nemo_deployments_plugin.backends.docker.gpu import GPUAllocationError, get_shared_gpu_pool +from nemo_deployments_plugin.backends.docker.labels import ( + BACKOFF_LIMIT_LABEL, + CONFIG_NAME_LABEL, + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, + MANAGED_BY_KEY, + RESTART_POLICY_LABEL, + container_name, + deployment_identity_labels, + deployment_key, + managed_by_filter, +) +from nemo_deployments_plugin.backends.docker.ports import find_available_port +from nemo_deployments_plugin.backends.docker.probes import check_readiness_probe, host_url_for_port +from nemo_deployments_plugin.backends.docker.status import ( + LOG_MAX_CHARS, + map_docker_state_to_starting, + map_exited_status, + missing_container_status, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL +from nemo_deployments_plugin.entities import Deployment, DeploymentConfig +from nemo_deployments_plugin.types import Endpoint, RestartPolicy +from nemo_platform.resources.entities import AsyncEntitiesResource +from nemo_platform_plugin.config import LOOPBACK_ADDRESSES +from nemo_platform_plugin.entity_client import NemoEntitiesClient, NemoEntityNotFoundError + +logger = logging.getLogger(__name__) + + +class DockerDeploymentBackend(DeploymentBackend): + """Manage deployments and volumes as Docker containers and volumes.""" + + def init(self) -> None: + try: + import docker + from docker import errors as docker_errors + except ImportError as exc: + raise RuntimeError( + "docker package is required for DockerDeploymentBackend. " + "Install with: uv sync --package nemo-deployments-plugin --extra docker" + ) from exc + + self._docker = docker + self._docker_errors = docker_errors + self._executor_config = DockerExecutorConfig.model_validate(self._config) + self._entities = NemoEntitiesClient(AsyncEntitiesResource(self._sdk)) + self._gpu_pool = get_shared_gpu_pool() + self._client = self._create_client() + + def _create_client(self) -> Any: + kwargs: dict[str, Any] = {"timeout": self._executor_config.docker_timeout} + if self._executor_config.docker_host: + kwargs["base_url"] = self._executor_config.docker_host + return self._docker.from_env(**kwargs) + + def shutdown(self) -> None: + if hasattr(self, "_client") and self._client is not None: + self._client.close() + + async def _load_deployment_config(self, workspace: str, config_name: str) -> DeploymentConfig: + return await self._entities.get(DeploymentConfig, config_name, workspace=workspace) + + async def create_deployment( + self, + *, + workspace: str, + name: str, + config_name: str, + labels: dict[str, str], + backend_config: dict[str, Any], + ) -> BackendStatusUpdate: + c_name = container_name(workspace, name) + try: + existing = await asyncio.to_thread(self._client.containers.get, c_name) + if self._container_matches_deployment(existing, workspace, name, config_name): + return await self.read_status(workspace=workspace, name=name) + return BackendStatusUpdate( + status="FAILED", + status_message=f"Container name collision: {c_name} exists with different labels", + ) + except self._docker_errors.NotFound: + pass + + try: + config = await self._load_deployment_config(workspace, config_name) + container_spec = validate_config_for_docker(config) + except DeploymentConfigError as exc: + return BackendStatusUpdate(status="FAILED", status_message=str(exc)) + except NemoEntityNotFoundError: + return BackendStatusUpdate( + status="FAILED", + status_message=f"DeploymentConfig '{config_name}' not found in workspace '{workspace}'", + ) + except Exception as exc: + logger.exception("Failed to load deployment config %s/%s", workspace, config_name) + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to load deployment config: {exc}") + + docker_cfg = parse_docker_backend_config(backend_config) + if config.backend_config.docker is not None: + docker_cfg = config.backend_config.docker + + dep_key = deployment_key(workspace, name) + gpu_ids: list[int] = [] + gpu_count = gpu_count_from_container(container_spec) + if gpu_count > 0: + if self._gpu_pool is None: + return BackendStatusUpdate( + status="FAILED", + status_message="GPU requested but no GPUs detected on this host", + ) + try: + gpu_ids = self._gpu_pool.allocate_gpu(dep_key, num_requested=gpu_count) + except GPUAllocationError as exc: + return BackendStatusUpdate(status="FAILED", status_message=str(exc)) + + host_ports: dict[int, int] = {} + for port_spec in container_spec.ports: + host_port = await find_available_port( + self._client, + docker_cfg, + exclude_ports=set(host_ports.values()), + ) + if host_port is None: + if gpu_ids: + self._gpu_pool.release_gpu(dep_key) # type: ignore[union-attr] + return BackendStatusUpdate( + status="FAILED", status_message="No host ports available in configured range" + ) + host_ports[port_spec.container_port] = host_port + + if self._executor_config.pull_images: + try: + await asyncio.to_thread(self._client.images.pull, container_spec.image) + except (self._docker_errors.APIError, self._docker_errors.ImageNotFound) as exc: + if gpu_ids: + self._gpu_pool.release_gpu(dep_key) # type: ignore[union-attr] + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to pull image: {exc}") + + all_labels = { + **labels, + **config.labels, + **deployment_identity_labels( + workspace, + name, + config.restart_policy, + config_name=config_name, + backoff_limit=config.backoff_limit, + ), + } + run_kwargs: dict[str, Any] = { + "image": container_spec.image, + "name": c_name, + "detach": True, + "labels": all_labels, + "environment": env_dict(container_spec), + **restart_policy_kwargs(config.restart_policy, config.backoff_limit), + } + if container_spec.command: + run_kwargs["command"] = container_spec.command + if container_spec.args: + run_kwargs["command"] = list(container_spec.command) + list(container_spec.args) + + volume_bindings = build_volume_bindings(workspace, merged_volume_mounts(config, container_spec)) + if volume_bindings: + run_kwargs["volumes"] = volume_bindings + + if container_spec.ports: + run_kwargs["ports"] = build_port_bindings(container_spec, host_ports) + + device_requests = device_requests_for_gpus(gpu_ids) + if device_requests: + run_kwargs["device_requests"] = device_requests + + if docker_cfg.network: + run_kwargs["network"] = docker_cfg.network + + try: + await asyncio.to_thread(self._client.containers.run, **run_kwargs) + except Exception as exc: + if gpu_ids: + self._gpu_pool.release_gpu(dep_key) # type: ignore[union-attr] + logger.exception("Failed to start container %s", c_name) + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to start container: {exc}") + + endpoints = self._build_endpoints(container_spec, host_ports) + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container {c_name} created", + endpoints=endpoints, + ) + + async def read_status(self, *, workspace: str, name: str) -> BackendStatusUpdate: + c_name = container_name(workspace, name) + try: + container = await asyncio.to_thread(self._client.containers.get, c_name) + await asyncio.to_thread(container.reload) + except self._docker_errors.NotFound: + restart_policy = await self._resolve_restart_policy(workspace, name) + return missing_container_status(restart_policy, container_name=c_name) + except Exception as exc: + return BackendStatusUpdate(status="FAILED", status_message=f"Docker API error: {exc}") + + labels = container.labels or {} + restart_policy: RestartPolicy = labels.get(RESTART_POLICY_LABEL, "Always") + state = container.status + container_id = (container.id or "")[:12] + host_ports = self._extract_host_ports(container) + endpoints = self._endpoints_from_container_ports(container, host_ports) + + if state in ("created", "restarting"): + return map_docker_state_to_starting(container_id, state) + + if state == "running": + host_url = self._primary_host_url(host_ports) + config = await self._load_config_from_labels(workspace, labels) + probe = None + if config is not None and config.containers: + probe = config.containers[0].readiness_probe + ready, reason = await check_readiness_probe( + container=container, + probe=probe, + host_url=host_url, + host_ports=host_ports, + ) + if ready and restart_policy == "Always": + return BackendStatusUpdate( + status="READY", + status_message=f"Container running and ready ({reason})", + endpoints=endpoints, + ) + if ready and restart_policy in ("OnFailure", "Never"): + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container running ({reason})", + endpoints=endpoints, + ) + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container running but not ready ({reason})", + endpoints=endpoints, + ) + + if state in ("exited", "dead"): + exit_code = int(container.attrs.get("State", {}).get("ExitCode", 1)) + if exit_code == 0 and restart_policy in ("Never", "OnFailure"): + dep_key = deployment_key(workspace, name) + if self._gpu_pool is not None: + self._gpu_pool.release_gpu(dep_key) + return BackendStatusUpdate( + status="SUCCEEDED", + status_message="Container exited successfully (code 0)", + exit_code=exit_code, + endpoints=endpoints, + ) + if restart_policy == "Always": + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container exited (code {exit_code}); restart policy will recreate it", + exit_code=exit_code, + endpoints=endpoints, + ) + if restart_policy == "OnFailure": + restart_count = int(container.attrs.get("RestartCount", 0)) + backoff_limit = int(labels.get(BACKOFF_LIMIT_LABEL, "6")) + if restart_count < backoff_limit: + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container exited (code {exit_code}); retry {restart_count}/{backoff_limit}", + exit_code=exit_code, + endpoints=endpoints, + ) + dep_key = deployment_key(workspace, name) + if self._gpu_pool is not None: + self._gpu_pool.release_gpu(dep_key) + status = map_exited_status(exit_code, restart_policy) + message = f"Container exited with code {exit_code}" + return BackendStatusUpdate( + status=status, + status_message=message, + exit_code=exit_code, + endpoints=endpoints, + ) + + if state == "removing": + return BackendStatusUpdate(status="DELETING", status_message=f"Container removing (ID: {container_id})") + + return BackendStatusUpdate(status="STARTING", status_message=f"Container state: {state}") + + async def delete_deployment(self, workspace: str, name: str) -> BackendStatusUpdate: + c_name = container_name(workspace, name) + dep_key = deployment_key(workspace, name) + + def _delete() -> None: + try: + container = self._client.containers.get(c_name) + container.stop(timeout=30) + container.remove(force=True) + except self._docker_errors.NotFound: + return + + try: + await asyncio.to_thread(_delete) + except Exception as exc: + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to delete container: {exc}") + finally: + if self._gpu_pool is not None: + self._gpu_pool.release_gpu(dep_key) + + return BackendStatusUpdate(status="SUCCEEDED", status_message=f"Container {c_name} deleted") + + async def list_managed_deployment_names(self) -> list[str]: + try: + containers = await asyncio.to_thread( + self._client.containers.list, + all=True, + filters=managed_by_filter(), + ) + except Exception: + logger.warning("Failed to list managed containers", exc_info=True) + return [] + + seen: set[str] = set() + for container in containers: + container_labels = container.labels or {} + if container_labels.get(MANAGED_BY_KEY) != MANAGED_BY_LABEL: + continue + ws = container_labels.get(DEPLOYMENT_WORKSPACE_LABEL) + dep_name = container_labels.get(DEPLOYMENT_NAME_LABEL) + if ws and dep_name: + seen.add(f"{ws}/{dep_name}") + return sorted(seen) + + async def get_logs(self, *, workspace: str, name: str, tail: int = 100) -> LogResult: + c_name = container_name(workspace, name) + + def _logs() -> bytes: + container = self._client.containers.get(c_name) + return container.logs(tail=tail, timestamps=True) + + try: + raw = await asyncio.to_thread(_logs) + text = raw.decode("utf-8", errors="ignore") + lines = text.splitlines() + truncated = len(text) > LOG_MAX_CHARS + if truncated: + lines = lines[-tail:] + return LogResult(lines=lines, truncated=truncated) + except self._docker_errors.NotFound: + return LogResult(lines=[f"Container {c_name} not found"]) + except Exception as exc: + return LogResult(lines=[f"Failed to fetch logs: {exc}"]) + + async def create_volume( + self, + *, + workspace: str, + name: str, + size: str, + access_modes: list[str], + backend_config: dict[str, Any], + ) -> VolumeStatusUpdate: + del size, access_modes + driver = "local" + docker_section = backend_config.get("docker") or {} + if isinstance(docker_section, dict) and docker_section.get("driver"): + driver = str(docker_section["driver"]) + return await volume_ops.create_volume( + self._client, + workspace=workspace, + name=name, + driver=driver, + ) + + async def read_volume_status(self, *, workspace: str, name: str) -> VolumeStatusUpdate: + return await volume_ops.read_volume_status(self._client, workspace=workspace, name=name) + + async def delete_volume(self, workspace: str, name: str) -> VolumeStatusUpdate: + return await volume_ops.delete_volume(self._client, workspace=workspace, name=name) + + def _container_matches_deployment( + self, + container: Any, + workspace: str, + name: str, + config_name: str, + ) -> bool: + labels = container.labels or {} + return ( + labels.get(DEPLOYMENT_WORKSPACE_LABEL) == workspace + and labels.get(DEPLOYMENT_NAME_LABEL) == name + and labels.get(CONFIG_NAME_LABEL) == config_name + and labels.get(MANAGED_BY_KEY) == MANAGED_BY_LABEL + ) + + async def _resolve_restart_policy(self, workspace: str, name: str) -> RestartPolicy: + config = await self._load_config_for_deployment_entity(workspace, name) + if config is not None: + return config.restart_policy + return "Always" + + async def _load_config_from_labels(self, workspace: str, labels: dict[str, str]) -> DeploymentConfig | None: + config_name = labels.get(CONFIG_NAME_LABEL) + if not config_name: + return await self._load_config_for_deployment_entity( + workspace, + labels.get(DEPLOYMENT_NAME_LABEL, ""), + ) + try: + return await self._entities.get(DeploymentConfig, config_name, workspace=workspace) + except Exception: + return None + + async def _load_config_for_deployment_entity( + self, + workspace: str, + deployment_name: str, + ) -> DeploymentConfig | None: + if not deployment_name: + return None + try: + deployment = await self._entities.get(Deployment, deployment_name, workspace=workspace) + return await self._entities.get( + DeploymentConfig, + deployment.deployment_config, + workspace=workspace, + ) + except Exception: + return None + + def _extract_host_ports(self, container: Any) -> dict[int, int]: + result: dict[int, int] = {} + ports = container.ports or {} + for key, bindings in ports.items(): + if not bindings: + continue + container_port = int(str(key).split("/")[0]) + host_port = bindings[0].get("HostPort") + if host_port: + result[container_port] = int(host_port) + return result + + def _primary_host_url(self, host_ports: dict[int, int]) -> str | None: + if not host_ports: + return None + host_port = next(iter(host_ports.values())) + host = os.environ.get("NMP_LOOPBACK_ADDRESS", LOOPBACK_ADDRESSES[0]) + return host_url_for_port(host, host_port) + + def _build_endpoints(self, container_spec: Any, host_ports: dict[int, int]) -> list[Endpoint]: + endpoints: list[Endpoint] = [] + host = os.environ.get("NMP_LOOPBACK_ADDRESS", LOOPBACK_ADDRESSES[0]) + for port_spec in container_spec.ports: + host_port = host_ports.get(port_spec.container_port) + if host_port is None: + continue + endpoint_name = port_spec.name or f"port-{port_spec.container_port}" + protocol = "tcp" if port_spec.protocol == "UDP" else "http" + scheme = "http" + endpoints.append( + Endpoint( + name=endpoint_name, + url=host_url_for_port(host, host_port, scheme=scheme), + protocol=protocol, + ) + ) + return endpoints + + def _endpoints_from_container_ports(self, container: Any, host_ports: dict[int, int]) -> list[Endpoint]: + if not host_ports: + return [] + host = os.environ.get("NMP_LOOPBACK_ADDRESS", LOOPBACK_ADDRESSES[0]) + endpoints: list[Endpoint] = [] + for container_port, host_port in host_ports.items(): + endpoints.append( + Endpoint( + name=f"port-{container_port}", + url=host_url_for_port(host, host_port), + protocol="http", + ) + ) + return endpoints diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/config.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/config.py new file mode 100644 index 0000000000..aca2600f3e --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/config.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Executor-level Docker backend configuration.""" + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class DockerExecutorConfig(BaseModel): + """Knobs for a named docker executor instance (not entity backend_config).""" + + docker_host: str | None = Field(default=None, description="Override DOCKER_HOST for this executor.") + docker_timeout: int = Field(default=60, ge=1) + pull_images: bool = Field(default=True, description="Pull container images before run when missing locally.") diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/containers.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/containers.py new file mode 100644 index 0000000000..55f30055f3 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/containers.py @@ -0,0 +1,108 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Compile DeploymentConfig into docker.containers.run kwargs.""" + +from __future__ import annotations + +from typing import Any + +from nemo_deployments_plugin.backends.docker.labels import docker_volume_name +from nemo_deployments_plugin.entities import Container, DeploymentConfig, DockerDeploymentConfig, VolumeMount +from nemo_deployments_plugin.types import RestartPolicy + + +class DeploymentConfigError(ValueError): + """Invalid deployment config for docker backend.""" + + +def parse_docker_backend_config(backend_config: dict[str, Any]) -> DockerDeploymentConfig: + docker_section = backend_config.get("docker") or {} + return DockerDeploymentConfig.model_validate(docker_section) + + +def validate_config_for_docker(config: DeploymentConfig) -> Container: + if config.init_containers: + raise DeploymentConfigError("init_containers are not supported by the docker backend in v1") + if len(config.containers) != 1: + raise DeploymentConfigError(f"docker backend v1 supports exactly one container; got {len(config.containers)}") + return config.containers[0] + + +def restart_policy_kwargs(restart_policy: RestartPolicy, backoff_limit: int) -> dict[str, Any]: + if restart_policy == "Always": + return {"restart_policy": {"Name": "always"}} + if restart_policy == "OnFailure": + return {"restart_policy": {"Name": "on-failure", "MaximumRetryCount": backoff_limit}} + return {} + + +def env_dict(container: Container) -> dict[str, str]: + result: dict[str, str] = {} + for item in container.env: + if item.value is not None: + result[item.name] = item.value + return result + + +def merged_volume_mounts(config: DeploymentConfig, container: Container) -> list[VolumeMount]: + by_name: dict[str, VolumeMount] = {} + for mount in config.volume_mounts: + by_name[mount.name] = mount + for mount in container.volume_mounts: + by_name[mount.name] = mount + return list(by_name.values()) + + +def build_volume_bindings( + workspace: str, + mounts: list[VolumeMount], +) -> dict[str, dict[str, str]]: + bindings: dict[str, dict[str, str]] = {} + for mount in mounts: + vol_name = docker_volume_name(workspace, mount.name) + bindings[vol_name] = { + "bind": mount.mount_path, + "mode": "ro" if mount.read_only else "rw", + } + return bindings + + +def build_port_bindings( + container: Container, + host_ports: dict[int, int], +) -> dict[str, int | list[tuple[str, int]] | None]: + ports: dict[str, int | list[tuple[str, int]] | None] = {} + for port_spec in container.ports: + container_port = port_spec.container_port + protocol = port_spec.protocol.lower() + key = f"{container_port}/{protocol}" + host_port = host_ports.get(container_port) + if host_port is not None: + ports[key] = host_port + else: + ports[key] = container_port + return ports + + +def gpu_count_from_container(container: Container) -> int: + limit = container.resources.limits.get("nvidia.com/gpu") + if not limit: + return 0 + try: + return int(limit) + except ValueError: + return 0 + + +def device_requests_for_gpus(gpu_ids: list[int]) -> list[dict[str, Any]]: + if not gpu_ids: + return [] + return [ + { + "Driver": "nvidia", + "Count": 0, + "DeviceIDs": [str(gpu_id) for gpu_id in gpu_ids], + "Capabilities": [["gpu"]], + } + ] diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py new file mode 100644 index 0000000000..5a5a34a553 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py @@ -0,0 +1,219 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Thread-safe GPU pool for Docker deployments (plugin-local; not shared with models). + +During the 759 cutover both pools may coexist briefly — consolidate into +nemo_platform_plugin when models docker backend is removed. +""" + +from __future__ import annotations + +import logging +import subprocess +import threading +from dataclasses import dataclass, field +from typing import Any + +logger = logging.getLogger(__name__) + + +class GPUAllocationError(Exception): + """Raised when GPU allocation fails due to insufficient resources.""" + + +@dataclass +class GPUPoolStatus: + total: int + available: int + allocated: int + allocations: dict[str, list[int]] = field(default_factory=dict) + gpu_state: dict[int, str | None] = field(default_factory=dict) + + +class DockerGPUPool: + """Thread-safe pool of GPU device IDs for Docker device_requests.""" + + def __init__(self, reserved_gpu_device_ids: list[int]) -> None: + self.num_reserved_gpus = len(reserved_gpu_device_ids) + self.gpu_to_workload_id: dict[int, str | None] = {gpu_id: None for gpu_id in reserved_gpu_device_ids} + self._mutex = threading.Lock() + + def allocate_gpu(self, workload_id: str, num_requested: int = 1) -> list[int]: + with self._mutex: + if num_requested <= 0: + raise GPUAllocationError(f"Invalid GPU request: {num_requested}. Must be a positive integer.") + available_gpus = {gpu for gpu, workload in self.gpu_to_workload_id.items() if workload is None} + if len(available_gpus) < num_requested: + raise GPUAllocationError( + f"Not enough GPUs available. Requested {num_requested}, " + f"available {len(available_gpus)} out of {self.num_reserved_gpus} total." + ) + gpu_ids: list[int] = [] + for _ in range(num_requested): + gpu_id = available_gpus.pop() + gpu_ids.append(gpu_id) + self.gpu_to_workload_id[gpu_id] = workload_id + logger.info("DockerGPUPool: allocated gpu_ids %s to workload %s", gpu_ids, workload_id) + return gpu_ids + + def release_gpu(self, workload_id: str) -> list[int]: + with self._mutex: + gpu_ids = [gpu for gpu, workload in self.gpu_to_workload_id.items() if workload == workload_id] + if gpu_ids: + logger.info("DockerGPUPool: releasing gpu_ids %s from workload %s", gpu_ids, workload_id) + for gpu_id in gpu_ids: + self.gpu_to_workload_id[gpu_id] = None + return gpu_ids + + def restore_allocations(self, allocations: dict[str, list[int]]) -> None: + """Mark GPUs as allocated for workloads discovered from running containers.""" + with self._mutex: + for workload_id, gpu_ids in allocations.items(): + for gpu_id in gpu_ids: + if gpu_id not in self.gpu_to_workload_id: + logger.warning( + "Skipping GPU %s for workload %s during pool recovery (not in reserved pool)", + gpu_id, + workload_id, + ) + continue + existing = self.gpu_to_workload_id[gpu_id] + if existing is not None and existing != workload_id: + logger.warning( + "GPU %s already allocated to %s; skipping recovery claim for %s", + gpu_id, + existing, + workload_id, + ) + continue + self.gpu_to_workload_id[gpu_id] = workload_id + + +_pool: DockerGPUPool | None = None +_pool_lock = threading.Lock() + + +def detect_gpu_device_ids() -> list[int]: + """Return GPU indices from nvidia-smi when available.""" + try: + result = subprocess.run( + ["nvidia-smi", "--query-gpu=index", "--format=csv,noheader"], + capture_output=True, + text=True, + check=True, + timeout=10, + ) + except (FileNotFoundError, subprocess.SubprocessError, OSError): + return [] + ids: list[int] = [] + for line in result.stdout.splitlines(): + stripped = line.strip() + if stripped.isdigit(): + ids.append(int(stripped)) + return ids + + +def parse_gpu_device_ids(device_requests: list[Any] | None) -> list[int]: + """Extract NVIDIA GPU device IDs from Docker HostConfig DeviceRequests.""" + if not device_requests: + return [] + gpu_ids: list[int] = [] + for request in device_requests: + if isinstance(request, dict): + driver = request.get("Driver") + raw_ids = request.get("DeviceIDs") + else: + driver = getattr(request, "Driver", None) + raw_ids = getattr(request, "DeviceIDs", None) + if driver != "nvidia" or not raw_ids: + continue + for raw_id in raw_ids: + try: + gpu_ids.append(int(raw_id)) + except (TypeError, ValueError): + continue + return gpu_ids + + +def discover_managed_gpu_allocations(client: Any) -> dict[str, list[int]]: + """Return workload_id -> GPU IDs for running deployment-managed containers.""" + from nemo_deployments_plugin.backends.docker.labels import ( + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, + MANAGED_BY_KEY, + deployment_key, + managed_by_filter, + ) + from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + try: + containers = client.containers.list(all=True, filters=managed_by_filter()) + except Exception: + logger.warning("Failed to list managed containers for GPU pool recovery", exc_info=True) + raise + + allocations: dict[str, list[int]] = {} + for container in containers: + labels = container.labels or {} + if labels.get(MANAGED_BY_KEY) != MANAGED_BY_LABEL: + continue + workspace = labels.get(DEPLOYMENT_WORKSPACE_LABEL) + name = labels.get(DEPLOYMENT_NAME_LABEL) + if not workspace or not name: + continue + if container.status != "running": + continue + try: + container.reload() + except Exception: + logger.debug("Skipping container %s during GPU pool recovery", container.name, exc_info=True) + continue + host_config = container.attrs.get("HostConfig") or {} + gpu_ids = parse_gpu_device_ids(host_config.get("DeviceRequests")) + if not gpu_ids: + continue + workload_id = deployment_key(workspace, name) + allocations.setdefault(workload_id, []).extend(gpu_ids) + return allocations + + +def _recover_pool_allocations(pool: DockerGPUPool) -> bool: + """Restore in-use GPUs from running containers. Returns False on transient failure.""" + try: + import docker + except ImportError: + logger.debug("Docker SDK unavailable for GPU pool recovery") + return True + + client = docker.from_env() + try: + allocations = discover_managed_gpu_allocations(client) + pool.restore_allocations(allocations) + if allocations: + logger.info( + "DockerGPUPool: recovered GPU allocations from %d managed container(s): %s", + len(allocations), + allocations, + ) + return True + except Exception: + logger.warning("Failed to recover GPU allocations from managed containers", exc_info=True) + return False + finally: + client.close() + + +def get_shared_gpu_pool() -> DockerGPUPool | None: + """Lazy singleton GPU pool shared across docker executor instances in this process.""" + global _pool + with _pool_lock: + if _pool is None: + device_ids = detect_gpu_device_ids() + if not device_ids: + return None + pool = DockerGPUPool(reserved_gpu_device_ids=device_ids) + if not _recover_pool_allocations(pool): + return None + _pool = pool + return _pool diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py new file mode 100644 index 0000000000..221750831e --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Docker resource naming and identity labels for orphan cleanup.""" + +from __future__ import annotations + +import hashlib +import re + +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + +MANAGED_BY_KEY = "managed-by" +DEPLOYMENT_WORKSPACE_LABEL = "nmp.nvidia.com/deployment-workspace" +DEPLOYMENT_NAME_LABEL = "nmp.nvidia.com/deployment-name" +RESTART_POLICY_LABEL = "nmp.nvidia.com/restart-policy" +CONFIG_NAME_LABEL = "nmp.nvidia.com/deployment-config" +VOLUME_WORKSPACE_LABEL = "nmp.nvidia.com/volume-workspace" +VOLUME_NAME_LABEL = "nmp.nvidia.com/volume-name" + + +def k8s_safe_name(base_name: str, *, max_length: int = 63, suffix: str = "") -> str: + """Generate a DNS-label-safe name (RFC 1035) from arbitrary input.""" + hash_suffix = hashlib.sha256(base_name.encode()).hexdigest()[:8] + normalized = re.sub(r"[^a-z0-9-]", "-", base_name.lower()) + normalized = re.sub(r"[-]+", "-", normalized) + if normalized and not normalized[0].isalpha(): + normalized = f"x{normalized}" + normalized = normalized.rstrip("-") + + reserved = len(suffix) + len(hash_suffix) + 1 + if len(normalized) + reserved > max_length: + trim = max_length - reserved + normalized = normalized[:trim].rstrip("-") + normalized = f"{normalized}-{hash_suffix}{suffix}" + elif suffix: + normalized = f"{normalized}{suffix}" + return normalized + + +def container_name(workspace: str, deployment_name: str) -> str: + return k8s_safe_name(f"dep-{workspace}-{deployment_name}") + + +def docker_volume_name(workspace: str, volume_name: str) -> str: + return k8s_safe_name(f"dep-vol-{workspace}-{volume_name}") + + +def deployment_key(workspace: str, name: str) -> str: + return f"{workspace}/{name}" + + +BACKOFF_LIMIT_LABEL = "nmp.nvidia.com/backoff-limit" + + +def deployment_identity_labels( + workspace: str, + name: str, + restart_policy: str, + *, + config_name: str, + backoff_limit: int = 6, +) -> dict[str, str]: + return { + MANAGED_BY_KEY: MANAGED_BY_LABEL, + DEPLOYMENT_WORKSPACE_LABEL: workspace, + DEPLOYMENT_NAME_LABEL: name, + RESTART_POLICY_LABEL: restart_policy, + CONFIG_NAME_LABEL: config_name, + BACKOFF_LIMIT_LABEL: str(backoff_limit), + } + + +def volume_identity_labels(workspace: str, name: str) -> dict[str, str]: + return { + MANAGED_BY_KEY: MANAGED_BY_LABEL, + VOLUME_WORKSPACE_LABEL: workspace, + VOLUME_NAME_LABEL: name, + } + + +def managed_by_filter() -> dict[str, str | bool]: + return {"label": f"{MANAGED_BY_KEY}={MANAGED_BY_LABEL}"} diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py new file mode 100644 index 0000000000..1ab238ea4c --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Host port allocation for published container ports.""" + +from __future__ import annotations + +import asyncio +import logging +import os +import socket +from typing import Any + +from nemo_deployments_plugin.backends.docker.labels import managed_by_filter +from nemo_deployments_plugin.entities import DockerDeploymentConfig + +import docker + +logger = logging.getLogger(__name__) + + +def is_remote_docker_host() -> bool: + docker_host = os.environ.get("DOCKER_HOST", "") + return docker_host.startswith("tcp://") + + +def is_port_free(port: int) -> bool: + if is_remote_docker_host(): + return True + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(("127.0.0.1", port)) + return True + except OSError: + return False + + +def collect_used_host_ports(containers: list[Any]) -> set[int]: + used: set[int] = set() + for container in containers: + try: + ports = container.ports or {} + for bindings in ports.values(): + if not bindings: + continue + for binding in bindings: + if binding and "HostPort" in binding: + used.add(int(binding["HostPort"])) + except Exception as exc: + logger.warning("Failed to read ports for container %s: %s", getattr(container, "name", "?"), exc) + return used + + +async def find_available_port( + client: docker.DockerClient, + docker_cfg: DockerDeploymentConfig, + *, + exclude_ports: set[int] | None = None, +) -> int | None: + try: + containers = await asyncio.to_thread( + client.containers.list, + all=True, + filters=managed_by_filter(), + ) + except Exception: + logger.exception("Failed to list containers for port allocation") + return None + + used_ports = collect_used_host_ports(containers) + if exclude_ports: + used_ports = used_ports | exclude_ports + for port in range(docker_cfg.port_range_start, docker_cfg.port_range_end + 1): + if port not in used_ports and is_port_free(port): + return port + + logger.error( + "No available ports in range %s-%s", + docker_cfg.port_range_start, + docker_cfg.port_range_end, + ) + return None diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py new file mode 100644 index 0000000000..3c19a011be --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py @@ -0,0 +1,151 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Readiness probe evaluation for running containers.""" + +from __future__ import annotations + +import asyncio +import logging +import socket +from urllib.parse import urljoin, urlparse + +import httpx +from docker.models.containers import Container as DockerContainer +from nemo_deployments_plugin.entities import Probe + +logger = logging.getLogger(__name__) + + +async def check_readiness_probe( + *, + container: DockerContainer, + probe: Probe | None, + host_url: str | None, + host_ports: dict[int, int] | None = None, + named_ports: dict[str, int] | None = None, +) -> tuple[bool, str]: + """Return (ready, reason). When no probe is configured, running implies ready.""" + if probe is None: + return True, "no readiness probe configured" + + if probe.exec_action is not None and probe.exec_action.command: + return await _check_exec_probe(container, probe) + + if probe.http_get is not None and host_url is not None: + return await _check_http_probe(host_url, probe, host_ports=host_ports, named_ports=named_ports) + + if probe.tcp_socket is not None and host_url is not None: + return await _check_tcp_probe(host_url, probe, host_ports=host_ports, named_ports=named_ports) + + return True, "probe type not implemented; treating as ready" + + +async def _check_exec_probe(container: DockerContainer, probe: Probe) -> tuple[bool, str]: + assert probe.exec_action is not None + command = probe.exec_action.command + timeout = probe.timeout_seconds + + def _run() -> tuple[int, str]: + result = container.exec_run(command, demux=True) + exit_code = result.exit_code if result.exit_code is not None else 1 + output = "" + if result.output: + stdout, stderr = result.output + chunks = [] + if stdout: + chunks.append(stdout.decode("utf-8", errors="ignore")) + if stderr: + chunks.append(stderr.decode("utf-8", errors="ignore")) + output = "".join(chunks) + return exit_code, output + + try: + exit_code, output = await asyncio.wait_for(asyncio.to_thread(_run), timeout=timeout) + except TimeoutError: + return False, f"exec probe timed out after {timeout}s" + except Exception as exc: + return False, f"exec probe failed: {exc}" + + if exit_code == 0: + return True, "exec probe succeeded" + return False, f"exec probe exit {exit_code}: {output[:200]}" + + +def _probe_host(host_url: str) -> str: + return urlparse(host_url).hostname or "127.0.0.1" + + +def _resolve_probe_port( + port: int | str, + *, + host_ports: dict[int, int] | None, + named_ports: dict[str, int] | None, +) -> int | None: + if isinstance(port, int): + return host_ports.get(port, port) if host_ports else port + if named_ports and port in named_ports: + return named_ports[port] + if host_ports: + for container_port, host_port in host_ports.items(): + if str(container_port) == port: + return host_port + return None + + +async def _check_http_probe( + host_url: str, + probe: Probe, + *, + host_ports: dict[int, int] | None = None, + named_ports: dict[str, int] | None = None, +) -> tuple[bool, str]: + assert probe.http_get is not None + port = probe.http_get.port + path = probe.http_get.path + scheme = probe.http_get.scheme.lower() + host = _probe_host(host_url) + resolved_port = _resolve_probe_port(port, host_ports=host_ports, named_ports=named_ports) + if resolved_port is None: + return False, f"http probe port not mapped: {port}" + base = f"{scheme}://{host}:{resolved_port}" + url = urljoin(f"{base.rstrip('/')}/", path.lstrip("/")) + timeout = probe.timeout_seconds + try: + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.get(url) + if 200 <= response.status_code < 400: + return True, f"http probe {response.status_code}" + return False, f"http probe status {response.status_code}" + except Exception as exc: + return False, f"http probe failed: {exc}" + + +async def _check_tcp_probe( + host_url: str, + probe: Probe, + *, + host_ports: dict[int, int] | None = None, + named_ports: dict[str, int] | None = None, +) -> tuple[bool, str]: + assert probe.tcp_socket is not None + port_value = probe.tcp_socket.port + host = _probe_host(host_url) + target_port = _resolve_probe_port(port_value, host_ports=host_ports, named_ports=named_ports) + if target_port is None: + return False, f"tcp probe port not mapped: {port_value}" + timeout = probe.timeout_seconds + + def _connect() -> None: + with socket.create_connection((host, target_port), timeout=timeout): + return + + try: + await asyncio.wait_for(asyncio.to_thread(_connect), timeout=timeout) + return True, "tcp probe connected" + except Exception as exc: + return False, f"tcp probe failed: {exc}" + + +def host_url_for_port(host: str, host_port: int, *, scheme: str = "http") -> str: + return f"{scheme}://{host}:{host_port}" diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py new file mode 100644 index 0000000000..77b13695fd --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py @@ -0,0 +1,58 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Map Docker container state to plugin DeploymentStatus values.""" + +from __future__ import annotations + +from nemo_deployments_plugin.backends.base import BackendStatusUpdate +from nemo_deployments_plugin.types import DeploymentStatus, RestartPolicy + +LOG_TAIL_LINES = 80 +LOG_MAX_CHARS = 8000 +SUCCESSFUL_EXIT_CODES = (0,) + + +def format_duration(seconds: float) -> str: + hours, rem = divmod(int(seconds), 3600) + minutes, secs = divmod(rem, 60) + parts: list[str] = [] + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + parts.append(f"{secs}s") + return " ".join(parts) + + +def map_exited_status(exit_code: int, restart_policy: RestartPolicy) -> DeploymentStatus: + """Map a stopped container's exit code to a terminal deployment status. + + Restart-policy-specific retry handling happens in the backend before this + helper is called for non-zero exits. + """ + del restart_policy + if exit_code in SUCCESSFUL_EXIT_CODES: + return "SUCCEEDED" + return "FAILED" + + +def missing_container_status(restart_policy: RestartPolicy, *, container_name: str) -> BackendStatusUpdate: + if restart_policy == "Always": + return BackendStatusUpdate( + status="LOST", + status_message=(f"Container not found — may have been manually deleted. Expected name: {container_name}"), + error_details={"expected_container_name": container_name}, + ) + return BackendStatusUpdate( + status="FAILED", + status_message=f"Container not found. Expected name: {container_name}", + error_details={"expected_container_name": container_name}, + ) + + +def map_docker_state_to_starting(container_id: str, state: str) -> BackendStatusUpdate: + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container is {state} (ID: {container_id[:12]})", + ) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py new file mode 100644 index 0000000000..35047e012c --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Docker volume lifecycle helpers.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from nemo_deployments_plugin.backends.base import VolumeStatusUpdate +from nemo_deployments_plugin.backends.docker.labels import docker_volume_name, volume_identity_labels + +import docker + +logger = logging.getLogger(__name__) + + +async def create_volume( + client: docker.DockerClient, + *, + workspace: str, + name: str, + driver: str = "local", +) -> VolumeStatusUpdate: + vol_name = docker_volume_name(workspace, name) + labels = volume_identity_labels(workspace, name) + + def _create() -> Any: + from docker.errors import NotFound + + try: + existing = client.volumes.get(vol_name) + existing.reload() + return existing + except NotFound: + return client.volumes.create(name=vol_name, driver=driver, labels=labels) + + try: + await asyncio.to_thread(_create) + return VolumeStatusUpdate(status="BOUND", status_message=f"Volume {vol_name} is bound") + except Exception as exc: + logger.exception("Failed to create volume %s", vol_name) + return VolumeStatusUpdate(status="FAILED", status_message=f"Failed to create volume: {exc}") + + +async def read_volume_status(client: docker.DockerClient, *, workspace: str, name: str) -> VolumeStatusUpdate: + vol_name = docker_volume_name(workspace, name) + + def _get() -> Any: + from docker.errors import NotFound + + try: + return client.volumes.get(vol_name) + except NotFound: + raise + + try: + volume = await asyncio.to_thread(_get) + volume.reload() + return VolumeStatusUpdate(status="BOUND", status_message=f"Volume {volume.name} is bound") + except Exception as exc: + from docker.errors import NotFound + + if isinstance(exc, NotFound): + return VolumeStatusUpdate(status="FAILED", status_message=f"Volume {vol_name} not found") + return VolumeStatusUpdate(status="FAILED", status_message=f"Failed to read volume: {exc}") + + +async def delete_volume(client: docker.DockerClient, *, workspace: str, name: str) -> VolumeStatusUpdate: + vol_name = docker_volume_name(workspace, name) + + def _remove() -> None: + from docker.errors import NotFound + + try: + volume = client.volumes.get(vol_name) + volume.remove(force=True) + except NotFound: + return + + try: + await asyncio.to_thread(_remove) + return VolumeStatusUpdate(status="RELEASED", status_message=f"Volume {vol_name} released") + except Exception as exc: + return VolumeStatusUpdate(status="FAILED", status_message=f"Failed to delete volume: {exc}") diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py index 05aacd509b..b7d0a6d34e 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py @@ -10,12 +10,14 @@ from typing import Any, Self from nemo_deployments_plugin.backends.base import DeploymentBackend +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend from nemo_platform import AsyncNeMoPlatform logger = logging.getLogger(__name__) -BACKEND_CLASSES: dict[str, type[DeploymentBackend]] = {} -"""Backend type → class map. Populated when docker/k8s backends land (756/757).""" +BACKEND_CLASSES: dict[str, type[DeploymentBackend]] = { + "docker": DockerDeploymentBackend, +} @dataclass(frozen=True) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py index 51cf4d830a..2ea41c6620 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py @@ -179,7 +179,7 @@ async def _list_deployments(self) -> list[Deployment]: Deployment, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=list(NON_TERMINAL_DEPLOYMENT_STATUSES), ), ) @@ -197,7 +197,7 @@ async def _list_volumes(self) -> list[Volume]: Volume, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=list(NON_TERMINAL_VOLUME_STATUSES), ), ) @@ -218,7 +218,7 @@ async def _list_terminal_deployments_for_orphan_grace(self) -> list[Deployment]: Deployment, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=list(_TERMINAL_ORPHAN_STATUSES), ), ) diff --git a/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py new file mode 100644 index 0000000000..f7be34295f --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py @@ -0,0 +1,155 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration tests for DockerDeploymentBackend against a real daemon.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from docker_availability import skip_without_docker +from integration_helpers import force_remove_container +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.backends.docker.labels import container_name +from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL +from nemo_deployments_plugin.entities import ( + Container, + ContainerPort, + Deployment, + DeploymentBackendConfig, + DeploymentConfig, + DockerDeploymentConfig, +) + +import docker + +pytestmark = [ + pytest.mark.skipif("docker" not in BACKEND_CLASSES, reason="Docker backend not registered"), + skip_without_docker, +] + + +@pytest.fixture +def docker_backend() -> DockerDeploymentBackend: + mock_entities = AsyncMock() + mock_sdk = MagicMock() + with ( + patch("nemo_deployments_plugin.backends.docker.backend.AsyncEntitiesResource"), + patch("nemo_deployments_plugin.backends.docker.backend.NemoEntitiesClient", return_value=mock_entities), + patch("nemo_deployments_plugin.backends.docker.backend.get_shared_gpu_pool", return_value=None), + ): + backend = DockerDeploymentBackend(mock_sdk, {"pull_images": True}) + backend._entities = mock_entities + return backend + + +def _never_config() -> DeploymentConfig: + return DeploymentConfig( + name="echo-cfg", + workspace="itest", + restart_policy="Never", + containers=[Container(name="main", image="alpine:3.20", command=["echo"], args=["hello"])], + ) + + +def _always_http_config() -> DeploymentConfig: + return DeploymentConfig( + name="http-cfg", + workspace="itest", + restart_policy="Always", + containers=[ + Container( + name="main", + image="nginx:alpine", + ports=[ContainerPort(containerPort=80, protocol="TCP", name="http")], + ) + ], + backend_config=DeploymentBackendConfig( + docker=DockerDeploymentConfig(port_range_start=9050, port_range_end=9060) + ), + ) + + +@pytest.mark.asyncio +async def test_volume_lifecycle(docker_backend: DockerDeploymentBackend) -> None: + create = await docker_backend.create_volume( + workspace="itest", + name="data", + size="1Gi", + access_modes=["ReadWriteOnce"], + backend_config={}, + ) + assert create.status == "BOUND" + + read = await docker_backend.read_volume_status(workspace="itest", name="data") + assert read.status == "BOUND" + + deleted = await docker_backend.delete_volume("itest", "data") + assert deleted.status == "RELEASED" + + +@pytest.mark.asyncio +async def test_never_deployment_succeeds(docker_backend: DockerDeploymentBackend) -> None: + config = _never_config() + docker_backend._entities.get.return_value = config # type: ignore[attr-defined] + c_name = container_name("itest", "echo-job") + client = docker.from_env() + + try: + created = await docker_backend.create_deployment( + workspace="itest", + name="echo-job", + config_name="echo-cfg", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config={}, + ) + assert created.status == "STARTING" + + for _ in range(30): + status = await docker_backend.read_status(workspace="itest", name="echo-job") + if status.status in ("SUCCEEDED", "FAILED"): + break + await asyncio.sleep(0.5) + + assert status.status == "SUCCEEDED" + assert status.exit_code == 0 + finally: + await docker_backend.delete_deployment("itest", "echo-job") + force_remove_container(client, c_name) + + +@pytest.mark.asyncio +async def test_lost_detection_for_always(docker_backend: DockerDeploymentBackend) -> None: + deployment = Deployment(name="lost-srv", workspace="itest", deployment_config="http-cfg") + config = _always_http_config() + + async def get_side_effect(entity_type, name, workspace=None): + if entity_type is Deployment: + return deployment + return config + + docker_backend._entities.get.side_effect = get_side_effect # type: ignore[attr-defined] + c_name = container_name("itest", "lost-srv") + client = docker.from_env() + + try: + created = await docker_backend.create_deployment( + workspace="itest", + name="lost-srv", + config_name="http-cfg", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config=config.backend_config.model_dump(by_alias=True), + ) + assert created.status == "STARTING" + + container = client.containers.get(c_name) + container.remove(force=True) + + status = await docker_backend.read_status(workspace="itest", name="lost-srv") + assert status.status == "LOST" + finally: + await docker_backend.delete_deployment("itest", "lost-srv") + force_remove_container(client, c_name) diff --git a/plugins/nemo-deployments/tests/integration/conftest.py b/plugins/nemo-deployments/tests/integration/conftest.py new file mode 100644 index 0000000000..33f8b3eae0 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/conftest.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration test fixtures for docker-backed deployments.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +from docker_availability import DOCKER_AVAILABLE, skip_without_docker + +__all__ = ["DOCKER_AVAILABLE", "skip_without_docker"] + +# All tests in this package share one Docker daemon and the same managed-by label +# namespace. Run them on a single xdist worker to avoid container/volume races. +_DOCKER_INTEGRATION_XDIST_GROUP = "nemo_deployments_docker_integration" + + +def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: + integration_dir = Path(__file__).parent.resolve() + for item in items: + if integration_dir in Path(str(item.fspath)).resolve().parents: + item.add_marker(pytest.mark.xdist_group(_DOCKER_INTEGRATION_XDIST_GROUP)) diff --git a/plugins/nemo-deployments/tests/integration/docker_availability.py b/plugins/nemo-deployments/tests/integration/docker_availability.py new file mode 100644 index 0000000000..baf7f173c1 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/docker_availability.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared Docker daemon availability check for integration tests.""" + +from __future__ import annotations + +import pytest + +try: + import docker + + docker.from_env().ping() + DOCKER_AVAILABLE: bool = True +except Exception: + DOCKER_AVAILABLE = False + +skip_without_docker: pytest.MarkDecorator = pytest.mark.skipif( + not DOCKER_AVAILABLE, reason="Docker daemon not available" +) diff --git a/plugins/nemo-deployments/tests/integration/integration_helpers.py b/plugins/nemo-deployments/tests/integration/integration_helpers.py new file mode 100644 index 0000000000..e3e4dc9171 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/integration_helpers.py @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared helpers for docker-backed integration tests.""" + +from __future__ import annotations + +import time +from typing import Any + + +def force_remove_container(client: Any, name: str, *, retries: int = 3) -> None: + """Remove a container, tolerating concurrent cleanup from other tests.""" + from docker.errors import APIError, NotFound + + for attempt in range(retries): + try: + client.containers.get(name).remove(force=True) + return + except NotFound: + return + except APIError as exc: + if "already in progress" in str(exc).lower() and attempt + 1 < retries: + time.sleep(0.5) + continue + if "already in progress" in str(exc).lower(): + return + raise diff --git a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py index 0b67c55178..be6c88eae5 100644 --- a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py +++ b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py @@ -4,21 +4,189 @@ """Integration tests for deployment reconciliation. Requires AIRCORE-756 DockerDeploymentBackend to be registered in BACKEND_CLASSES. -Until then, these tests are skipped — unit tests with MockDeploymentBackend provide coverage. """ from __future__ import annotations -import pytest -from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch -pytestmark = pytest.mark.skipif( - "docker" not in BACKEND_CLASSES, - reason="Requires DockerDeploymentBackend (AIRCORE-756)", +import pytest +from docker.errors import NotFound +from docker_availability import skip_without_docker +from integration_helpers import force_remove_container +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.backends.docker.labels import container_name, docker_volume_name +from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES, ExecutorRegistry +from nemo_deployments_plugin.config import ControllerConfig +from nemo_deployments_plugin.entities import ( + Container, + Deployment, + DeploymentBackendConfig, + DeploymentConfig, + DockerDeploymentConfig, + Prerequisite, + Volume, + VolumeMount, ) +from nemo_deployments_plugin.reconciler.deployment_reconciler import DeploymentReconciler +from nemo_deployments_plugin.reconciler.volume_reconciler import VolumeReconciler + +import docker + +pytestmark = [ + pytest.mark.skipif( + "docker" not in BACKEND_CLASSES, + reason="Requires DockerDeploymentBackend (AIRCORE-756)", + ), + skip_without_docker, +] + + +@pytest.fixture +def docker_registry() -> ExecutorRegistry: + mock_sdk = MagicMock() + with ( + patch("nemo_deployments_plugin.backends.docker.backend.AsyncEntitiesResource"), + patch("nemo_deployments_plugin.backends.docker.backend.NemoEntitiesClient"), + patch("nemo_deployments_plugin.backends.docker.backend.get_shared_gpu_pool", return_value=None), + ): + backend = DockerDeploymentBackend(mock_sdk, {"pull_images": True}) + return ExecutorRegistry({"docker": backend}, default_executor="docker") + + +def _backend(docker_registry: ExecutorRegistry) -> DockerDeploymentBackend: + backend = docker_registry.resolve("docker") + assert isinstance(backend, DockerDeploymentBackend) + return backend @pytest.mark.asyncio -async def test_puller_server_prerequisite_chain() -> None: +async def test_puller_server_prerequisite_chain(docker_registry: ExecutorRegistry) -> None: """Volume → puller (OnFailure) → server (Always + prerequisite) end-to-end.""" - raise NotImplementedError("Enable when AIRCORE-756 lands") + entities = AsyncMock() + entities.update = AsyncMock(side_effect=lambda entity: entity) + + volume = Volume(name="weights", workspace="itest", size="1Gi", status="PENDING") + puller_dep = Deployment( + name="puller", + workspace="itest", + deployment_config="puller-cfg", + status="PENDING", + ) + server_dep = Deployment( + name="server", + workspace="itest", + deployment_config="server-cfg", + status="PENDING", + prerequisites=[Prerequisite(deployment_name="puller", condition="succeeded")], + ) + + puller_cfg = DeploymentConfig( + name="puller-cfg", + workspace="itest", + restart_policy="OnFailure", + containers=[ + Container( + name="puller", + image="alpine:3.20", + command=["sh", "-c"], + args=["echo pulled > /data/ready && sleep 1"], + volumeMounts=[VolumeMount(name="weights", mountPath="/data")], + ) + ], + volumeMounts=[VolumeMount(name="weights", mountPath="/data")], + ) + server_cfg = DeploymentConfig( + name="server-cfg", + workspace="itest", + restart_policy="Always", + containers=[ + Container( + name="server", + image="alpine:3.20", + command=["sleep"], + args=["3600"], + volumeMounts=[VolumeMount(name="weights", mountPath="/data", read_only=True)], + ) + ], + volumeMounts=[VolumeMount(name="weights", mountPath="/data")], + backend_config=DeploymentBackendConfig( + docker=DockerDeploymentConfig(port_range_start=9070, port_range_end=9080) + ), + ) + + config_cache = { + ("itest", "puller-cfg"): puller_cfg, + ("itest", "server-cfg"): server_cfg, + } + + async def get_side_effect( + entity_type: type, + name: str, + workspace: str | None = None, + ) -> Volume | Deployment | DeploymentConfig: + ws = workspace or "itest" + if entity_type is Volume: + return volume + if entity_type is Deployment: + if name == "puller": + return puller_dep + return server_dep + if entity_type is DeploymentConfig: + return config_cache[(ws, name)] + raise KeyError(name) + + entities.get.side_effect = get_side_effect + + backend = _backend(docker_registry) + backend._entities = entities + + volume_reconciler = VolumeReconciler(entities, docker_registry) + deployment_reconciler = DeploymentReconciler( + entities, + docker_registry, + ControllerConfig(interval_seconds=1), + ) + deployment_reconciler.set_config_cache(config_cache) + + volumes_by_name = {("itest", "weights"): volume} + by_name = {("itest", "puller"): puller_dep, ("itest", "server"): server_dep} + + try: + await volume_reconciler.reconcile_one(volume) + assert volume.status == "BOUND" + volumes_by_name[("itest", "weights")] = volume + + for _ in range(40): + await deployment_reconciler.reconcile_one( + puller_dep, + deployments_by_name=by_name, + volumes_by_name=volumes_by_name, + ) + if puller_dep.status == "SUCCEEDED": + break + await asyncio.sleep(0.5) + assert puller_dep.status == "SUCCEEDED" + + for _ in range(40): + await deployment_reconciler.reconcile_one( + server_dep, + deployments_by_name=by_name, + volumes_by_name=volumes_by_name, + ) + if server_dep.status in ("READY", "STARTING"): + break + await asyncio.sleep(0.5) + assert server_dep.status in ("READY", "STARTING") + finally: + client = docker.from_env() + await backend.delete_deployment("itest", "puller") + await backend.delete_deployment("itest", "server") + await backend.delete_volume("itest", "weights") + for c_name in (container_name("itest", "puller"), container_name("itest", "server")): + force_remove_container(client, c_name) + try: + client.volumes.get(docker_volume_name("itest", "weights")).remove(force=True) + except NotFound: + pass diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/conftest.py b/plugins/nemo-deployments/tests/unit/backends/docker/conftest.py new file mode 100644 index 0000000000..466eec4fc2 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/conftest.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared fixtures for docker backend unit tests.""" + +from __future__ import annotations + +from collections.abc import Iterator +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend + + +@pytest.fixture +def mock_sdk() -> MagicMock: + return MagicMock() + + +@pytest.fixture +def mock_entities() -> AsyncMock: + client = AsyncMock() + return client + + +@pytest.fixture +def mock_docker_client() -> MagicMock: + client = MagicMock() + client.containers = MagicMock() + client.volumes = MagicMock() + client.images = MagicMock() + client.close = MagicMock() + return client + + +@pytest.fixture +def docker_backend( + mock_sdk: MagicMock, mock_entities: AsyncMock, mock_docker_client: MagicMock +) -> Iterator[DockerDeploymentBackend]: + with ( + patch("nemo_deployments_plugin.backends.docker.backend.AsyncEntitiesResource"), + patch("nemo_deployments_plugin.backends.docker.backend.NemoEntitiesClient", return_value=mock_entities), + patch("nemo_deployments_plugin.backends.docker.backend.get_shared_gpu_pool", return_value=None), + patch("docker.from_env", return_value=mock_docker_client), + ): + backend = DockerDeploymentBackend(mock_sdk, {"docker_timeout": 60, "pull_images": False}) + backend._client = mock_docker_client + yield backend diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/docker_helpers.py b/plugins/nemo-deployments/tests/unit/backends/docker/docker_helpers.py new file mode 100644 index 0000000000..0e308633f8 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/docker_helpers.py @@ -0,0 +1,35 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Test helpers for docker backend unit tests.""" + +from __future__ import annotations + +from typing import Any + +from nemo_deployments_plugin.entities import Container, DeploymentConfig +from nemo_deployments_plugin.types import RestartPolicy + + +def sample_config(*, restart_policy: RestartPolicy = "Always") -> DeploymentConfig: + return DeploymentConfig( + name="cfg1", + workspace="default", + containers=[ + Container( + name="main", + image="alpine:latest", + command=["echo"], + args=["hello"], + ) + ], + restart_policy=restart_policy, + ) + + +def container_attrs(*, status: str = "running", exit_code: int = 0) -> dict[str, Any]: + del status + return { + "State": {"ExitCode": exit_code, "StartedAt": "2026-01-01T00:00:00Z"}, + "RestartCount": 0, + } diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py new file mode 100644 index 0000000000..a546712a22 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py @@ -0,0 +1,136 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Mocked docker client tests for DockerDeploymentBackend.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from backends.docker.docker_helpers import container_attrs, sample_config +from docker.errors import NotFound +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL +from nemo_deployments_plugin.entities import Deployment + + +@pytest.mark.asyncio +async def test_create_deployment_starts_container( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + mock_entities.get.return_value = sample_config() + mock_docker_client.containers.get.side_effect = NotFound("missing") + mock_docker_client.containers.run.return_value = MagicMock(id="abc123") + + update = await docker_backend.create_deployment( + workspace="default", + name="srv", + config_name="cfg1", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config={"docker": {"port_range_start": 9000, "port_range_end": 9100}}, + ) + + assert update.status == "STARTING" + mock_docker_client.containers.run.assert_called_once() + mock_entities.get.assert_awaited() + + +@pytest.mark.asyncio +async def test_read_status_ready_when_running_without_probe( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + container = MagicMock() + container.id = "abc123def456" + container.status = "running" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/restart-policy": "Always", + "nmp.nvidia.com/deployment-config": "cfg1", + } + container.ports = {} + container.attrs = container_attrs() + mock_docker_client.containers.get.return_value = container + mock_entities.get.return_value = sample_config() + + update = await docker_backend.read_status(workspace="default", name="srv") + + assert update.status == "READY" + + +@pytest.mark.asyncio +async def test_read_status_lost_when_missing_always( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + mock_docker_client.containers.get.side_effect = NotFound("missing") + + deployment_entity = MagicMock() + deployment_entity.deployment_config = "cfg1" + + async def get_side_effect(entity_type, name, workspace=None): + if entity_type is Deployment: + return deployment_entity + return sample_config(restart_policy="Always") + + mock_entities.get.side_effect = get_side_effect + + update = await docker_backend.read_status(workspace="default", name="srv") + + assert update.status == "LOST" + + +@pytest.mark.asyncio +async def test_delete_deployment_idempotent( + docker_backend: DockerDeploymentBackend, + mock_docker_client: MagicMock, +) -> None: + mock_docker_client.containers.get.side_effect = NotFound("missing") + + update = await docker_backend.delete_deployment("default", "srv") + + assert update.status == "SUCCEEDED" + + +@pytest.mark.asyncio +async def test_list_managed_deployment_names( + docker_backend: DockerDeploymentBackend, + mock_docker_client: MagicMock, +) -> None: + container = MagicMock() + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "default", + "nmp.nvidia.com/deployment-name": "srv", + } + mock_docker_client.containers.list.return_value = [container] + + names = await docker_backend.list_managed_deployment_names() + + assert names == ["default/srv"] + + +@pytest.mark.asyncio +async def test_create_volume_bound( + docker_backend: DockerDeploymentBackend, + mock_docker_client: MagicMock, +) -> None: + volume = MagicMock() + volume.name = "dep-vol-default-data" + mock_docker_client.volumes.get.side_effect = NotFound("missing") + mock_docker_client.volumes.create.return_value = volume + + update = await docker_backend.create_volume( + workspace="default", + name="data", + size="1Gi", + access_modes=["ReadWriteOnce"], + backend_config={}, + ) + + assert update.status == "BOUND" diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py new file mode 100644 index 0000000000..bb59a6bc7a --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py @@ -0,0 +1,144 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +from nemo_deployments_plugin.backends.docker import gpu as gpu_module +from nemo_deployments_plugin.backends.docker.gpu import ( + DockerGPUPool, + GPUAllocationError, + discover_managed_gpu_allocations, + get_shared_gpu_pool, + parse_gpu_device_ids, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + +def test_parse_gpu_device_ids_extracts_nvidia_ids() -> None: + requests = [ + { + "Driver": "nvidia", + "Count": 0, + "DeviceIDs": ["0", "1"], + "Capabilities": [["gpu"]], + } + ] + assert parse_gpu_device_ids(requests) == [0, 1] + + +def test_parse_gpu_device_ids_ignores_non_nvidia() -> None: + requests = [{"Driver": "amd", "DeviceIDs": ["0"]}] + assert parse_gpu_device_ids(requests) == [] + + +def test_restore_allocations_marks_gpus_in_use() -> None: + pool = DockerGPUPool(reserved_gpu_device_ids=[0, 1, 2]) + pool.restore_allocations({"smoke/srv": [0, 1]}) + assert pool.gpu_to_workload_id[0] == "smoke/srv" + assert pool.gpu_to_workload_id[1] == "smoke/srv" + assert pool.gpu_to_workload_id[2] is None + with pytest.raises(GPUAllocationError): + pool.allocate_gpu("other", num_requested=2) + + +def test_discover_managed_gpu_allocations_from_running_container() -> None: + container = MagicMock() + container.status = "running" + container.name = "dep-smoke-srv" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "smoke", + "nmp.nvidia.com/deployment-name": "srv", + } + container.attrs = { + "HostConfig": { + "DeviceRequests": [ + { + "Driver": "nvidia", + "DeviceIDs": ["1"], + "Capabilities": [["gpu"]], + } + ] + } + } + + client = MagicMock() + client.containers.list.return_value = [container] + + assert discover_managed_gpu_allocations(client) == {"smoke/srv": [1]} + + +def test_discover_managed_gpu_allocations_skips_exited_container() -> None: + container = MagicMock() + container.status = "exited" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "smoke", + "nmp.nvidia.com/deployment-name": "srv", + } + + client = MagicMock() + client.containers.list.return_value = [container] + + assert discover_managed_gpu_allocations(client) == {} + + +def test_get_shared_gpu_pool_recovers_running_allocations() -> None: + gpu_module._pool = None + container = MagicMock() + container.status = "running" + container.name = "dep-smoke-srv" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "smoke", + "nmp.nvidia.com/deployment-name": "srv", + } + container.attrs = { + "HostConfig": { + "DeviceRequests": [ + { + "Driver": "nvidia", + "DeviceIDs": ["0"], + "Capabilities": [["gpu"]], + } + ] + } + } + mock_client = MagicMock() + mock_client.containers.list.return_value = [container] + + with ( + patch.object(gpu_module, "detect_gpu_device_ids", return_value=[0, 1]), + patch("docker.from_env", return_value=mock_client), + ): + pool = get_shared_gpu_pool() + + assert pool is not None + assert pool.gpu_to_workload_id[0] == "smoke/srv" + assert pool.gpu_to_workload_id[1] is None + gpu_module._pool = None + + +def test_get_shared_gpu_pool_retries_after_recovery_failure() -> None: + gpu_module._pool = None + mock_client = MagicMock() + mock_client.containers.list.side_effect = RuntimeError("docker unavailable") + + with ( + patch.object(gpu_module, "detect_gpu_device_ids", return_value=[0, 1]), + patch("docker.from_env", return_value=mock_client), + ): + assert get_shared_gpu_pool() is None + assert gpu_module._pool is None + + mock_client.containers.list.side_effect = None + mock_client.containers.list.return_value = [] + + pool = get_shared_gpu_pool() + + assert pool is not None + assert pool.gpu_to_workload_id == {0: None, 1: None} + gpu_module._pool = None diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py new file mode 100644 index 0000000000..6b89981774 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Idempotency tests for DockerDeploymentBackend.create_deployment.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from backends.docker.docker_helpers import container_attrs, sample_config +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.backends.docker.labels import ( + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + +@pytest.mark.asyncio +async def test_create_existing_matching_container_returns_read_status( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + existing = MagicMock() + existing.labels = { + "managed-by": MANAGED_BY_LABEL, + DEPLOYMENT_WORKSPACE_LABEL: "default", + DEPLOYMENT_NAME_LABEL: "srv", + "nmp.nvidia.com/restart-policy": "Always", + "nmp.nvidia.com/deployment-config": "cfg1", + } + existing.id = "abc" + existing.status = "running" + existing.ports = {} + existing.attrs = container_attrs() + mock_docker_client.containers.get.return_value = existing + mock_entities.get.return_value = sample_config() + + update = await docker_backend.create_deployment( + workspace="default", + name="srv", + config_name="cfg1", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config={}, + ) + + assert update.status == "READY" + mock_docker_client.containers.run.assert_not_called() diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_labels.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_labels.py new file mode 100644 index 0000000000..0a8e91a4f4 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_labels.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for deployment identity labels.""" + +from __future__ import annotations + +from nemo_deployments_plugin.backends.docker.labels import ( + CONFIG_NAME_LABEL, + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, + MANAGED_BY_KEY, + container_name, + deployment_identity_labels, + docker_volume_name, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + +def test_container_name_is_dns_safe() -> None: + name = container_name("my-workspace", "my.deployment") + assert name.startswith("dep-") + assert len(name) <= 63 + + +def test_docker_volume_name_prefix() -> None: + assert docker_volume_name("ws", "vol").startswith("dep-vol-") + + +def test_deployment_identity_labels() -> None: + labels = deployment_identity_labels( + "default", + "srv", + "Always", + config_name="cfg1", + ) + assert labels[MANAGED_BY_KEY] == MANAGED_BY_LABEL + assert labels[DEPLOYMENT_WORKSPACE_LABEL] == "default" + assert labels[DEPLOYMENT_NAME_LABEL] == "srv" + assert labels[CONFIG_NAME_LABEL] == "cfg1" diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py new file mode 100644 index 0000000000..7b11700a98 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for host port allocation.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest +from nemo_deployments_plugin.backends.docker.ports import collect_used_host_ports, is_port_free +from nemo_deployments_plugin.entities import DockerDeploymentConfig + + +def test_collect_used_host_ports() -> None: + container = MagicMock() + container.ports = {"8080/tcp": [{"HostPort": "9001"}]} + assert collect_used_host_ports([container]) == {9001} + + +def test_is_port_free_skips_check_for_remote_host(monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod + + monkeypatch.setattr(ports_mod, "is_remote_docker_host", lambda: True) + assert is_port_free(1) is True + + +def test_is_port_free_returns_false_when_bind_fails(monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod + + class FakeSock: + def __enter__(self) -> FakeSock: + return self + + def __exit__(self, *args: object) -> None: + return None + + def setsockopt(self, *args: object, **kwargs: object) -> None: + return None + + def bind(self, addr: tuple[str, int]) -> None: + raise OSError("Address already in use") + + monkeypatch.setattr(ports_mod, "is_remote_docker_host", lambda: False) + monkeypatch.setattr(ports_mod.socket, "socket", lambda *args, **kwargs: FakeSock()) + assert is_port_free(9000) is False + + +def test_is_port_free_returns_true_when_bind_succeeds(monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod + + class FakeSock: + def __enter__(self) -> FakeSock: + return self + + def __exit__(self, *args: object) -> None: + return None + + def setsockopt(self, *args: object, **kwargs: object) -> None: + return None + + def bind(self, addr: tuple[str, int]) -> None: + assert addr == ("127.0.0.1", 9000) + return None + + monkeypatch.setattr(ports_mod, "is_remote_docker_host", lambda: False) + monkeypatch.setattr(ports_mod.socket, "socket", lambda *args, **kwargs: FakeSock()) + assert is_port_free(9000) is True + + +@pytest.mark.asyncio +async def test_find_available_port_skips_used(mock_docker_client: MagicMock, monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod + from nemo_deployments_plugin.backends.docker.ports import find_available_port + + used = MagicMock() + used.ports = {"80/tcp": [{"HostPort": "9000"}]} + mock_docker_client.containers.list.return_value = [used] + monkeypatch.setattr(ports_mod, "is_port_free", lambda port: port != 9001) + + cfg = DockerDeploymentConfig(port_range_start=9000, port_range_end=9002) + port = await find_available_port(mock_docker_client, cfg) + assert port == 9002 + + +@pytest.mark.asyncio +async def test_find_available_port_excludes_pending_assignments(mock_docker_client: MagicMock) -> None: + from nemo_deployments_plugin.backends.docker.ports import find_available_port + + mock_docker_client.containers.list.return_value = [] + cfg = DockerDeploymentConfig(port_range_start=9000, port_range_end=9002) + + first = await find_available_port(mock_docker_client, cfg) + assert first == 9000 + second = await find_available_port(mock_docker_client, cfg, exclude_ports={first}) + + assert first == 9000 + assert second == 9001 diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_restart_policy.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_restart_policy.py new file mode 100644 index 0000000000..78a410c953 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_restart_policy.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for restart policy → docker run kwargs.""" + +from __future__ import annotations + +import pytest +from nemo_deployments_plugin.backends.docker.containers import restart_policy_kwargs + + +@pytest.mark.parametrize( + ("policy", "backoff", "expected"), + [ + ("Always", 6, {"restart_policy": {"Name": "always"}}), + ("OnFailure", 3, {"restart_policy": {"Name": "on-failure", "MaximumRetryCount": 3}}), + ("Never", 6, {}), + ], +) +def test_restart_policy_kwargs(policy: str, backoff: int, expected: dict) -> None: + assert restart_policy_kwargs(policy, backoff) == expected # type: ignore[arg-type] diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_status_mapping.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_status_mapping.py new file mode 100644 index 0000000000..7283cb9b17 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_status_mapping.py @@ -0,0 +1,35 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for docker status mapping helpers.""" + +from __future__ import annotations + +import pytest +from nemo_deployments_plugin.backends.docker.status import ( + map_exited_status, + missing_container_status, +) + + +@pytest.mark.parametrize( + ("exit_code", "restart_policy", "expected"), + [ + (0, "Never", "SUCCEEDED"), + (0, "Always", "SUCCEEDED"), + (1, "Never", "FAILED"), + (1, "Always", "FAILED"), + ], +) +def test_map_exited_status(exit_code: int, restart_policy: str, expected: str) -> None: + assert map_exited_status(exit_code, restart_policy) == expected # type: ignore[arg-type] + + +def test_missing_container_lost_for_always() -> None: + update = missing_container_status("Always", container_name="dep-default-srv") + assert update.status == "LOST" + + +def test_missing_container_failed_for_never() -> None: + update = missing_container_status("Never", container_name="dep-default-job") + assert update.status == "FAILED" diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py b/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py index 3a45ead721..f0b903a420 100644 --- a/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py @@ -37,7 +37,7 @@ async def test_list_all_pages_fetches_multiple_pages() -> None: Deployment, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=["PENDING"], ), ) diff --git a/plugins/nemo-deployments/tests/unit/test_api_deployments.py b/plugins/nemo-deployments/tests/unit/test_api_deployments.py index 6f4d09c2e1..38b8dc2099 100644 --- a/plugins/nemo-deployments/tests/unit/test_api_deployments.py +++ b/plugins/nemo-deployments/tests/unit/test_api_deployments.py @@ -104,7 +104,7 @@ def test_list_deployments_status_in(client: TestClient, mock_entity_client: Asyn assert len(resp.json()["data"]) == 1 call_kwargs = mock_entity_client.list.await_args.kwargs assert call_kwargs["filter_operation"].operator.value == "$in" - assert call_kwargs["filter_operation"].field == "status" + assert call_kwargs["filter_operation"].field == "data.status" def test_list_deployments_invalid_status_in_400(client: TestClient) -> None: diff --git a/pytest.ini b/pytest.ini index fe374a65fb..1e1ea99b74 100644 --- a/pytest.ini +++ b/pytest.ini @@ -11,6 +11,7 @@ pythonpath = plugins/example-plugin/src plugins/nemo-deployments/src plugins/nemo-deployments/tests/unit + plugins/nemo-deployments/tests/integration plugins/nemo-safe-synthesizer/src # Test discovery paths - packages and stable services diff --git a/uv.lock b/uv.lock index 17fb89bea0..deaf05813c 100644 --- a/uv.lock +++ b/uv.lock @@ -4333,13 +4333,20 @@ version = "0.1.0" source = { editable = "plugins/nemo-deployments" } dependencies = [ { name = "fastapi", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, + { name = "httpx", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "nemo-platform", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "nemo-platform-plugin", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "pydantic", extra = ["email"], marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, ] +[package.optional-dependencies] +docker = [ + { name = "docker", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, +] + [package.dev-dependencies] dev = [ + { name = "docker", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "fastapi", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "httpx", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "pytest", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, @@ -4348,14 +4355,18 @@ dev = [ [package.metadata] requires-dist = [ + { name = "docker", marker = "extra == 'docker'", specifier = ">=7.0" }, { name = "fastapi", specifier = ">=0.115" }, + { name = "httpx", specifier = ">=0.27" }, { name = "nemo-platform", editable = "packages/nemo_platform" }, { name = "nemo-platform-plugin", editable = "packages/nemo_platform_plugin" }, { name = "pydantic", specifier = ">=2.10.6" }, ] +provides-extras = ["docker"] [package.metadata.requires-dev] dev = [ + { name = "docker", specifier = ">=7.0" }, { name = "fastapi", specifier = ">=0.115" }, { name = "httpx", specifier = ">=0.27" }, { name = "pytest", specifier = ">=8.3.4" },