From 4d7a76a17c78f2a6e8aab0f405bdb71f3b0f02e8 Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Mon, 22 Jun 2026 18:51:51 -0700 Subject: [PATCH 1/9] feat(deployments): implement DockerDeploymentBackend (AIRCORE-756) Add Docker backend for nemo-deployments-plugin with single-container v1 lifecycle, entity-store config fetch, port allocation, exec/http probes, plugin-local GPU pool, and unit plus integration test coverage. Stacks on #315 (AIRCORE-758 reconciler prerequisite DAG). Signed-off-by: Tyler Bray --- plugins/nemo-deployments/pyproject.toml | 6 +- .../backends/docker/backend.py | 509 ++++++++++++++++++ .../backends/docker/config.py | 16 + .../backends/docker/containers.py | 108 ++++ .../backends/docker/gpu.py | 102 ++++ .../backends/docker/labels.py | 83 +++ .../backends/docker/ports.py | 77 +++ .../backends/docker/probes.py | 131 +++++ .../backends/docker/status.py | 54 ++ .../backends/docker/volumes.py | 88 +++ .../backends/registry.py | 6 +- .../backends/docker/test_docker_backend.py | 164 ++++++ .../tests/integration/conftest.py | 18 + .../integration/test_reconcile_docker.py | 197 ++++++- .../tests/unit/backends/docker/conftest.py | 48 ++ .../backends/docker/test_backend_mocked.py | 136 +++++ .../unit/backends/docker/test_idempotency.py | 50 ++ .../tests/unit/backends/docker/test_labels.py | 40 ++ .../tests/unit/backends/docker/test_ports.py | 36 ++ .../backends/docker/test_restart_policy.py | 21 + .../backends/docker/test_status_mapping.py | 35 ++ .../tests/unit/backends/docker_helpers.py | 34 ++ uv.lock | 11 + 23 files changed, 1959 insertions(+), 11 deletions(-) create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/config.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/containers.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py create mode 100644 plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py create mode 100644 plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py create mode 100644 plugins/nemo-deployments/tests/integration/conftest.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/conftest.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/test_labels.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/test_restart_policy.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/test_status_mapping.py create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker_helpers.py diff --git a/plugins/nemo-deployments/pyproject.toml b/plugins/nemo-deployments/pyproject.toml index e9d1f932bc..fe3ca7fea3 100644 --- a/plugins/nemo-deployments/pyproject.toml +++ b/plugins/nemo-deployments/pyproject.toml @@ -6,11 +6,15 @@ readme = "README.md" requires-python = ">=3.11,<3.14" dependencies = [ "fastapi>=0.115", + "httpx>=0.27", "nemo-platform", "nemo-platform-plugin", "pydantic>=2.10.6", ] +[project.optional-dependencies] +docker = ["docker>=7.0"] + [project.entry-points."nemo.services"] deployments = "nemo_deployments_plugin.service:DeploymentsService" @@ -29,7 +33,7 @@ nemo-platform = { workspace = true } nemo-platform-plugin = { workspace = true } [dependency-groups] -dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "httpx>=0.27", "fastapi>=0.115"] +dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "httpx>=0.27", "fastapi>=0.115", "docker>=7.0"] [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py new file mode 100644 index 0000000000..d575ea5f02 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py @@ -0,0 +1,509 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Docker substrate backend for the deployments plugin.""" + +from __future__ import annotations + +import asyncio +import logging +import os +from typing import Any + +from nemo_deployments_plugin.backends.base import ( + BackendStatusUpdate, + DeploymentBackend, + LogResult, + VolumeStatusUpdate, +) +from nemo_deployments_plugin.backends.docker import volumes as volume_ops +from nemo_deployments_plugin.backends.docker.config import DockerExecutorConfig +from nemo_deployments_plugin.backends.docker.containers import ( + DeploymentConfigError, + build_port_bindings, + build_volume_bindings, + device_requests_for_gpus, + env_dict, + gpu_count_from_container, + merged_volume_mounts, + parse_docker_backend_config, + restart_policy_kwargs, + validate_config_for_docker, +) +from nemo_deployments_plugin.backends.docker.gpu import GPUAllocationError, get_shared_gpu_pool +from nemo_deployments_plugin.backends.docker.labels import ( + CONFIG_NAME_LABEL, + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, + RESTART_POLICY_LABEL, + container_name, + deployment_identity_labels, + deployment_key, + managed_by_filter, +) +from nemo_deployments_plugin.backends.docker.ports import find_available_port +from nemo_deployments_plugin.backends.docker.probes import check_readiness_probe, host_url_for_port +from nemo_deployments_plugin.backends.docker.status import ( + LOG_MAX_CHARS, + map_docker_state_to_starting, + map_exited_status, + missing_container_status, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL +from nemo_deployments_plugin.entities import Deployment, DeploymentConfig +from nemo_deployments_plugin.types import Endpoint, RestartPolicy +from nemo_platform.resources.entities import AsyncEntitiesResource +from nemo_platform_plugin.config import LOOPBACK_ADDRESSES +from nemo_platform_plugin.entity_client import NemoEntitiesClient, NemoEntityNotFoundError + +logger = logging.getLogger(__name__) + + +class DockerDeploymentBackend(DeploymentBackend): + """Manage deployments and volumes as Docker containers and volumes.""" + + def init(self) -> None: + try: + import docker + from docker import errors as docker_errors + except ImportError as exc: + raise RuntimeError( + "docker package is required for DockerDeploymentBackend. " + "Install with: uv sync --package nemo-deployments-plugin --extra docker" + ) from exc + + self._docker = docker + self._docker_errors = docker_errors + self._executor_config = DockerExecutorConfig.model_validate(self._config) + self._entities = NemoEntitiesClient(AsyncEntitiesResource(self._sdk)) + self._gpu_pool = get_shared_gpu_pool() + self._client = self._create_client() + + def _create_client(self) -> Any: + kwargs: dict[str, Any] = {"timeout": self._executor_config.docker_timeout} + if self._executor_config.docker_host: + kwargs["base_url"] = self._executor_config.docker_host + return self._docker.from_env(**kwargs) + + def shutdown(self) -> None: + if hasattr(self, "_client") and self._client is not None: + self._client.close() + + async def _load_deployment_config(self, workspace: str, config_name: str) -> DeploymentConfig: + return await self._entities.get(DeploymentConfig, config_name, workspace=workspace) + + async def create_deployment( + self, + *, + workspace: str, + name: str, + config_name: str, + labels: dict[str, str], + backend_config: dict[str, Any], + ) -> BackendStatusUpdate: + c_name = container_name(workspace, name) + try: + existing = await asyncio.to_thread(self._client.containers.get, c_name) + if self._container_matches_deployment(existing, workspace, name, config_name): + return await self.read_status(workspace=workspace, name=name) + return BackendStatusUpdate( + status="FAILED", + status_message=f"Container name collision: {c_name} exists with different labels", + ) + except self._docker_errors.NotFound: + pass + + try: + config = await self._load_deployment_config(workspace, config_name) + container_spec = validate_config_for_docker(config) + except DeploymentConfigError as exc: + return BackendStatusUpdate(status="FAILED", status_message=str(exc)) + except NemoEntityNotFoundError: + return BackendStatusUpdate( + status="FAILED", + status_message=f"DeploymentConfig '{config_name}' not found in workspace '{workspace}'", + ) + except Exception as exc: + logger.exception("Failed to load deployment config %s/%s", workspace, config_name) + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to load deployment config: {exc}") + + docker_cfg = parse_docker_backend_config(backend_config) + if config.backend_config.docker is not None: + docker_cfg = config.backend_config.docker + + dep_key = deployment_key(workspace, name) + gpu_ids: list[int] = [] + gpu_count = gpu_count_from_container(container_spec) + if gpu_count > 0: + if self._gpu_pool is None: + return BackendStatusUpdate( + status="FAILED", + status_message="GPU requested but no GPUs detected on this host", + ) + try: + gpu_ids = self._gpu_pool.allocate_gpu(dep_key, num_requested=gpu_count) + except GPUAllocationError as exc: + return BackendStatusUpdate(status="FAILED", status_message=str(exc)) + + host_ports: dict[int, int] = {} + for port_spec in container_spec.ports: + host_port = await find_available_port(self._client, docker_cfg) + if host_port is None: + if gpu_ids: + self._gpu_pool.release_gpu(dep_key) # type: ignore[union-attr] + return BackendStatusUpdate( + status="FAILED", status_message="No host ports available in configured range" + ) + host_ports[port_spec.container_port] = host_port + + if self._executor_config.pull_images: + try: + await asyncio.to_thread(self._client.images.pull, container_spec.image) + except (self._docker_errors.APIError, self._docker_errors.ImageNotFound) as exc: + if gpu_ids: + self._gpu_pool.release_gpu(dep_key) # type: ignore[union-attr] + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to pull image: {exc}") + + all_labels = { + **labels, + **config.labels, + **deployment_identity_labels( + workspace, + name, + config.restart_policy, + config_name=config_name, + backoff_limit=config.backoff_limit, + ), + } + run_kwargs: dict[str, Any] = { + "image": container_spec.image, + "name": c_name, + "detach": True, + "labels": all_labels, + "environment": env_dict(container_spec), + **restart_policy_kwargs(config.restart_policy, config.backoff_limit), + } + if container_spec.command: + run_kwargs["command"] = container_spec.command + if container_spec.args: + run_kwargs["command"] = list(container_spec.command) + list(container_spec.args) + + volume_bindings = build_volume_bindings(workspace, merged_volume_mounts(config, container_spec)) + if volume_bindings: + run_kwargs["volumes"] = volume_bindings + + if container_spec.ports: + run_kwargs["ports"] = build_port_bindings(container_spec, host_ports) + + device_requests = device_requests_for_gpus(gpu_ids) + if device_requests: + run_kwargs["device_requests"] = device_requests + + if docker_cfg.network: + run_kwargs["network"] = docker_cfg.network + + try: + await asyncio.to_thread(self._client.containers.run, **run_kwargs) + except Exception as exc: + if gpu_ids: + self._gpu_pool.release_gpu(dep_key) # type: ignore[union-attr] + logger.exception("Failed to start container %s", c_name) + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to start container: {exc}") + + endpoints = self._build_endpoints(container_spec, host_ports) + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container {c_name} created", + endpoints=endpoints, + ) + + async def read_status(self, *, workspace: str, name: str) -> BackendStatusUpdate: + c_name = container_name(workspace, name) + try: + container = await asyncio.to_thread(self._client.containers.get, c_name) + await asyncio.to_thread(container.reload) + except self._docker_errors.NotFound: + restart_policy = await self._resolve_restart_policy(workspace, name) + return missing_container_status(restart_policy, container_name=c_name) + except Exception as exc: + return BackendStatusUpdate(status="FAILED", status_message=f"Docker API error: {exc}") + + labels = container.labels or {} + restart_policy: RestartPolicy = labels.get(RESTART_POLICY_LABEL, "Always") + state = container.status + container_id = (container.id or "")[:12] + host_ports = self._extract_host_ports(container) + endpoints = self._endpoints_from_container_ports(container, host_ports) + + if state in ("created", "restarting"): + return map_docker_state_to_starting(container_id, state) + + if state == "running": + host_url = self._primary_host_url(host_ports) + config = await self._load_config_from_labels(workspace, labels) + probe = None + if config is not None and config.containers: + probe = config.containers[0].readiness_probe + ready, reason = await check_readiness_probe( + container=container, + probe=probe, + host_url=host_url, + host_ports=host_ports, + ) + if ready and restart_policy == "Always": + return BackendStatusUpdate( + status="READY", + status_message=f"Container running and ready ({reason})", + endpoints=endpoints, + ) + if ready and restart_policy in ("OnFailure", "Never"): + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container running ({reason})", + endpoints=endpoints, + ) + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container running but not ready ({reason})", + endpoints=endpoints, + ) + + if state in ("exited", "dead"): + exit_code = int(container.attrs.get("State", {}).get("ExitCode", 1)) + if exit_code == 0 and restart_policy in ("Never", "OnFailure"): + dep_key = deployment_key(workspace, name) + if self._gpu_pool is not None: + self._gpu_pool.release_gpu(dep_key) + return BackendStatusUpdate( + status="SUCCEEDED", + status_message="Container exited successfully (code 0)", + exit_code=exit_code, + endpoints=endpoints, + ) + if restart_policy == "Always": + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container exited (code {exit_code}); restart policy will recreate it", + exit_code=exit_code, + endpoints=endpoints, + ) + if restart_policy == "OnFailure": + restart_count = int(container.attrs.get("RestartCount", 0)) + backoff_limit = int(labels.get("nmp.nvidia.com/backoff-limit", "6")) + if restart_count < backoff_limit: + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container exited (code {exit_code}); retry {restart_count}/{backoff_limit}", + exit_code=exit_code, + endpoints=endpoints, + ) + dep_key = deployment_key(workspace, name) + if self._gpu_pool is not None: + self._gpu_pool.release_gpu(dep_key) + status = map_exited_status(exit_code, restart_policy) + message = f"Container exited with code {exit_code}" + return BackendStatusUpdate( + status=status, + status_message=message, + exit_code=exit_code, + endpoints=endpoints, + ) + + if state == "removing": + return BackendStatusUpdate(status="DELETING", status_message=f"Container removing (ID: {container_id})") + + return BackendStatusUpdate(status="STARTING", status_message=f"Container state: {state}") + + async def delete_deployment(self, workspace: str, name: str) -> BackendStatusUpdate: + c_name = container_name(workspace, name) + dep_key = deployment_key(workspace, name) + + def _delete() -> None: + try: + container = self._client.containers.get(c_name) + container.stop(timeout=30) + container.remove(force=True) + except self._docker_errors.NotFound: + return + + try: + await asyncio.to_thread(_delete) + except Exception as exc: + return BackendStatusUpdate(status="FAILED", status_message=f"Failed to delete container: {exc}") + finally: + if self._gpu_pool is not None: + self._gpu_pool.release_gpu(dep_key) + + return BackendStatusUpdate(status="SUCCEEDED", status_message=f"Container {c_name} deleted") + + async def list_managed_deployment_names(self) -> list[str]: + try: + containers = await asyncio.to_thread( + self._client.containers.list, + all=True, + filters=managed_by_filter(), + ) + except Exception: + logger.warning("Failed to list managed containers", exc_info=True) + return [] + + seen: set[str] = set() + for container in containers: + container_labels = container.labels or {} + if container_labels.get("managed-by") != MANAGED_BY_LABEL: + continue + ws = container_labels.get(DEPLOYMENT_WORKSPACE_LABEL) + dep_name = container_labels.get(DEPLOYMENT_NAME_LABEL) + if ws and dep_name: + seen.add(f"{ws}/{dep_name}") + return sorted(seen) + + async def get_logs(self, *, workspace: str, name: str, tail: int = 100) -> LogResult: + c_name = container_name(workspace, name) + + def _logs() -> bytes: + container = self._client.containers.get(c_name) + return container.logs(tail=tail, timestamps=True) + + try: + raw = await asyncio.to_thread(_logs) + text = raw.decode("utf-8", errors="ignore") + lines = text.splitlines() + truncated = len(text) > LOG_MAX_CHARS + if truncated: + lines = lines[-tail:] + return LogResult(lines=lines, truncated=truncated) + except self._docker_errors.NotFound: + return LogResult(lines=[f"Container {c_name} not found"]) + except Exception as exc: + return LogResult(lines=[f"Failed to fetch logs: {exc}"]) + + async def create_volume( + self, + *, + workspace: str, + name: str, + size: str, + access_modes: list[str], + backend_config: dict[str, Any], + ) -> VolumeStatusUpdate: + del size, access_modes + driver = "local" + docker_section = backend_config.get("docker") or {} + if isinstance(docker_section, dict) and docker_section.get("driver"): + driver = str(docker_section["driver"]) + return await volume_ops.create_volume( + self._client, + workspace=workspace, + name=name, + driver=driver, + ) + + async def read_volume_status(self, *, workspace: str, name: str) -> VolumeStatusUpdate: + return await volume_ops.read_volume_status(self._client, workspace=workspace, name=name) + + async def delete_volume(self, workspace: str, name: str) -> VolumeStatusUpdate: + return await volume_ops.delete_volume(self._client, workspace=workspace, name=name) + + def _container_matches_deployment( + self, + container: Any, + workspace: str, + name: str, + config_name: str, + ) -> bool: + labels = container.labels or {} + return ( + labels.get(DEPLOYMENT_WORKSPACE_LABEL) == workspace + and labels.get(DEPLOYMENT_NAME_LABEL) == name + and labels.get(CONFIG_NAME_LABEL) == config_name + and labels.get("managed-by") == MANAGED_BY_LABEL + ) + + async def _resolve_restart_policy(self, workspace: str, name: str) -> RestartPolicy: + config = await self._load_config_for_deployment_entity(workspace, name) + if config is not None: + return config.restart_policy + return "Always" + + async def _load_config_from_labels(self, workspace: str, labels: dict[str, str]) -> DeploymentConfig | None: + config_name = labels.get(CONFIG_NAME_LABEL) + if not config_name: + return await self._load_config_for_deployment_entity( + workspace, + labels.get(DEPLOYMENT_NAME_LABEL, ""), + ) + try: + return await self._entities.get(DeploymentConfig, config_name, workspace=workspace) + except Exception: + return None + + async def _load_config_for_deployment_entity( + self, + workspace: str, + deployment_name: str, + ) -> DeploymentConfig | None: + if not deployment_name: + return None + try: + deployment = await self._entities.get(Deployment, deployment_name, workspace=workspace) + return await self._entities.get( + DeploymentConfig, + deployment.deployment_config, + workspace=workspace, + ) + except Exception: + return None + + def _extract_host_ports(self, container: Any) -> dict[int, int]: + result: dict[int, int] = {} + ports = container.ports or {} + for key, bindings in ports.items(): + if not bindings: + continue + container_port = int(str(key).split("/")[0]) + host_port = bindings[0].get("HostPort") + if host_port: + result[container_port] = int(host_port) + return result + + def _primary_host_url(self, host_ports: dict[int, int]) -> str | None: + if not host_ports: + return None + host_port = next(iter(host_ports.values())) + host = os.environ.get("NMP_LOOPBACK_ADDRESS", LOOPBACK_ADDRESSES[0]) + return host_url_for_port(host, host_port) + + def _build_endpoints(self, container_spec: Any, host_ports: dict[int, int]) -> list[Endpoint]: + endpoints: list[Endpoint] = [] + host = os.environ.get("NMP_LOOPBACK_ADDRESS", LOOPBACK_ADDRESSES[0]) + for port_spec in container_spec.ports: + host_port = host_ports.get(port_spec.container_port) + if host_port is None: + continue + endpoint_name = port_spec.name or f"port-{port_spec.container_port}" + protocol = "tcp" if port_spec.protocol == "UDP" else "http" + scheme = "http" + endpoints.append( + Endpoint( + name=endpoint_name, + url=host_url_for_port(host, host_port, scheme=scheme), + protocol=protocol, + ) + ) + return endpoints + + def _endpoints_from_container_ports(self, container: Any, host_ports: dict[int, int]) -> list[Endpoint]: + if not host_ports: + return [] + host = os.environ.get("NMP_LOOPBACK_ADDRESS", LOOPBACK_ADDRESSES[0]) + endpoints: list[Endpoint] = [] + for container_port, host_port in host_ports.items(): + endpoints.append( + Endpoint( + name=f"port-{container_port}", + url=host_url_for_port(host, host_port), + protocol="http", + ) + ) + return endpoints diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/config.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/config.py new file mode 100644 index 0000000000..aca2600f3e --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/config.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Executor-level Docker backend configuration.""" + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class DockerExecutorConfig(BaseModel): + """Knobs for a named docker executor instance (not entity backend_config).""" + + docker_host: str | None = Field(default=None, description="Override DOCKER_HOST for this executor.") + docker_timeout: int = Field(default=60, ge=1) + pull_images: bool = Field(default=True, description="Pull container images before run when missing locally.") diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/containers.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/containers.py new file mode 100644 index 0000000000..55f30055f3 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/containers.py @@ -0,0 +1,108 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Compile DeploymentConfig into docker.containers.run kwargs.""" + +from __future__ import annotations + +from typing import Any + +from nemo_deployments_plugin.backends.docker.labels import docker_volume_name +from nemo_deployments_plugin.entities import Container, DeploymentConfig, DockerDeploymentConfig, VolumeMount +from nemo_deployments_plugin.types import RestartPolicy + + +class DeploymentConfigError(ValueError): + """Invalid deployment config for docker backend.""" + + +def parse_docker_backend_config(backend_config: dict[str, Any]) -> DockerDeploymentConfig: + docker_section = backend_config.get("docker") or {} + return DockerDeploymentConfig.model_validate(docker_section) + + +def validate_config_for_docker(config: DeploymentConfig) -> Container: + if config.init_containers: + raise DeploymentConfigError("init_containers are not supported by the docker backend in v1") + if len(config.containers) != 1: + raise DeploymentConfigError(f"docker backend v1 supports exactly one container; got {len(config.containers)}") + return config.containers[0] + + +def restart_policy_kwargs(restart_policy: RestartPolicy, backoff_limit: int) -> dict[str, Any]: + if restart_policy == "Always": + return {"restart_policy": {"Name": "always"}} + if restart_policy == "OnFailure": + return {"restart_policy": {"Name": "on-failure", "MaximumRetryCount": backoff_limit}} + return {} + + +def env_dict(container: Container) -> dict[str, str]: + result: dict[str, str] = {} + for item in container.env: + if item.value is not None: + result[item.name] = item.value + return result + + +def merged_volume_mounts(config: DeploymentConfig, container: Container) -> list[VolumeMount]: + by_name: dict[str, VolumeMount] = {} + for mount in config.volume_mounts: + by_name[mount.name] = mount + for mount in container.volume_mounts: + by_name[mount.name] = mount + return list(by_name.values()) + + +def build_volume_bindings( + workspace: str, + mounts: list[VolumeMount], +) -> dict[str, dict[str, str]]: + bindings: dict[str, dict[str, str]] = {} + for mount in mounts: + vol_name = docker_volume_name(workspace, mount.name) + bindings[vol_name] = { + "bind": mount.mount_path, + "mode": "ro" if mount.read_only else "rw", + } + return bindings + + +def build_port_bindings( + container: Container, + host_ports: dict[int, int], +) -> dict[str, int | list[tuple[str, int]] | None]: + ports: dict[str, int | list[tuple[str, int]] | None] = {} + for port_spec in container.ports: + container_port = port_spec.container_port + protocol = port_spec.protocol.lower() + key = f"{container_port}/{protocol}" + host_port = host_ports.get(container_port) + if host_port is not None: + ports[key] = host_port + else: + ports[key] = container_port + return ports + + +def gpu_count_from_container(container: Container) -> int: + limit = container.resources.limits.get("nvidia.com/gpu") + if not limit: + return 0 + try: + return int(limit) + except ValueError: + return 0 + + +def device_requests_for_gpus(gpu_ids: list[int]) -> list[dict[str, Any]]: + if not gpu_ids: + return [] + return [ + { + "Driver": "nvidia", + "Count": 0, + "DeviceIDs": [str(gpu_id) for gpu_id in gpu_ids], + "Capabilities": [["gpu"]], + } + ] diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py new file mode 100644 index 0000000000..f5b1b46aa4 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py @@ -0,0 +1,102 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Thread-safe GPU pool for Docker deployments (plugin-local; not shared with models). + +During the 759 cutover both pools may coexist briefly — consolidate into +nemo_platform_plugin when models docker backend is removed. +""" + +from __future__ import annotations + +import logging +import subprocess +import threading +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + + +class GPUAllocationError(Exception): + """Raised when GPU allocation fails due to insufficient resources.""" + + +@dataclass +class GPUPoolStatus: + total: int + available: int + allocated: int + allocations: dict[str, list[int]] = field(default_factory=dict) + gpu_state: dict[int, str | None] = field(default_factory=dict) + + +class DockerGPUPool: + """Thread-safe pool of GPU device IDs for Docker device_requests.""" + + def __init__(self, reserved_gpu_device_ids: list[int]) -> None: + self.num_reserved_gpus = len(reserved_gpu_device_ids) + self.gpu_to_workload_id: dict[int, str | None] = {gpu_id: None for gpu_id in reserved_gpu_device_ids} + self._mutex = threading.Lock() + + def allocate_gpu(self, workload_id: str, num_requested: int = 1) -> list[int]: + with self._mutex: + if num_requested <= 0: + raise GPUAllocationError(f"Invalid GPU request: {num_requested}. Must be a positive integer.") + available_gpus = {gpu for gpu, workload in self.gpu_to_workload_id.items() if workload is None} + if len(available_gpus) < num_requested: + raise GPUAllocationError( + f"Not enough GPUs available. Requested {num_requested}, " + f"available {len(available_gpus)} out of {self.num_reserved_gpus} total." + ) + gpu_ids: list[int] = [] + for _ in range(num_requested): + gpu_id = available_gpus.pop() + gpu_ids.append(gpu_id) + self.gpu_to_workload_id[gpu_id] = workload_id + logger.info("DockerGPUPool: allocated gpu_ids %s to workload %s", gpu_ids, workload_id) + return gpu_ids + + def release_gpu(self, workload_id: str) -> list[int]: + with self._mutex: + gpu_ids = [gpu for gpu, workload in self.gpu_to_workload_id.items() if workload == workload_id] + if gpu_ids: + logger.info("DockerGPUPool: releasing gpu_ids %s from workload %s", gpu_ids, workload_id) + for gpu_id in gpu_ids: + self.gpu_to_workload_id[gpu_id] = None + return gpu_ids + + +_pool: DockerGPUPool | None = None +_pool_lock = threading.Lock() + + +def detect_gpu_device_ids() -> list[int]: + """Return GPU indices from nvidia-smi when available.""" + try: + result = subprocess.run( + ["nvidia-smi", "--query-gpu=index", "--format=csv,noheader"], + capture_output=True, + text=True, + check=True, + timeout=10, + ) + except (FileNotFoundError, subprocess.SubprocessError, OSError): + return [] + ids: list[int] = [] + for line in result.stdout.splitlines(): + stripped = line.strip() + if stripped.isdigit(): + ids.append(int(stripped)) + return ids + + +def get_shared_gpu_pool() -> DockerGPUPool | None: + """Lazy singleton GPU pool shared across docker executor instances in this process.""" + global _pool + with _pool_lock: + if _pool is None: + device_ids = detect_gpu_device_ids() + if not device_ids: + return None + _pool = DockerGPUPool(reserved_gpu_device_ids=device_ids) + return _pool diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py new file mode 100644 index 0000000000..cd1b525d17 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Docker resource naming and identity labels for orphan cleanup.""" + +from __future__ import annotations + +import hashlib +import re + +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + +MANAGED_BY_KEY = "managed-by" +DEPLOYMENT_WORKSPACE_LABEL = "nmp.nvidia.com/deployment-workspace" +DEPLOYMENT_NAME_LABEL = "nmp.nvidia.com/deployment-name" +RESTART_POLICY_LABEL = "nmp.nvidia.com/restart-policy" +CONFIG_NAME_LABEL = "nmp.nvidia.com/deployment-config" +VOLUME_WORKSPACE_LABEL = "nmp.nvidia.com/volume-workspace" +VOLUME_NAME_LABEL = "nmp.nvidia.com/volume-name" + + +def k8s_safe_name(base_name: str, *, max_length: int = 63, suffix: str = "") -> str: + """Generate a DNS-label-safe name (RFC 1035) from arbitrary input.""" + hash_suffix = hashlib.sha256(base_name.encode()).hexdigest()[:8] + normalized = re.sub(r"[^a-z0-9-]", "-", base_name.lower()) + normalized = re.sub(r"[-]+", "-", normalized) + if normalized and not normalized[0].isalpha(): + normalized = f"x{normalized}" + normalized = normalized.rstrip("-") + + reserved = len(suffix) + len(hash_suffix) + 1 + if len(normalized) + reserved > max_length: + trim = max_length - reserved + normalized = normalized[:trim].rstrip("-") + normalized = f"{normalized}-{hash_suffix}{suffix}" + elif suffix: + normalized = f"{normalized}{suffix}" + return normalized + + +def container_name(workspace: str, deployment_name: str) -> str: + return k8s_safe_name(f"dep-{workspace}-{deployment_name}") + + +def docker_volume_name(workspace: str, volume_name: str) -> str: + return k8s_safe_name(f"dep-vol-{workspace}-{volume_name}") + + +def deployment_key(workspace: str, name: str) -> str: + return f"{workspace}/{name}" + + +BACKOFF_LIMIT_LABEL = "nmp.nvidia.com/backoff-limit" + + +def deployment_identity_labels( + workspace: str, + name: str, + restart_policy: str, + *, + config_name: str, + backoff_limit: int = 6, +) -> dict[str, str]: + return { + MANAGED_BY_KEY: MANAGED_BY_LABEL, + DEPLOYMENT_WORKSPACE_LABEL: workspace, + DEPLOYMENT_NAME_LABEL: name, + RESTART_POLICY_LABEL: restart_policy, + CONFIG_NAME_LABEL: config_name, + BACKOFF_LIMIT_LABEL: str(backoff_limit), + } + + +def volume_identity_labels(workspace: str, name: str) -> dict[str, str]: + return { + MANAGED_BY_KEY: MANAGED_BY_LABEL, + VOLUME_WORKSPACE_LABEL: workspace, + VOLUME_NAME_LABEL: name, + } + + +def managed_by_filter() -> dict[str, str]: + return {"label": f"{MANAGED_BY_KEY}={MANAGED_BY_LABEL}"} diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py new file mode 100644 index 0000000000..90dbc59b3a --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py @@ -0,0 +1,77 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Host port allocation for published container ports.""" + +from __future__ import annotations + +import asyncio +import logging +import os +import socket +from typing import TYPE_CHECKING, Any + +from nemo_deployments_plugin.backends.docker.labels import managed_by_filter +from nemo_deployments_plugin.entities import DockerDeploymentConfig + +if TYPE_CHECKING: + import docker + +logger = logging.getLogger(__name__) + + +def is_remote_docker_host() -> bool: + docker_host = os.environ.get("DOCKER_HOST", "") + return docker_host.startswith("tcp://") + + +def is_port_free(port: int) -> bool: + if is_remote_docker_host(): + return True + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(("0.0.0.0", port)) # noqa: S104 # nosec B104 + return True + except OSError: + return False + + +def collect_used_host_ports(containers: list[Any]) -> set[int]: + used: set[int] = set() + for container in containers: + try: + ports = container.ports or {} + for bindings in ports.values(): + if not bindings: + continue + for binding in bindings: + if binding and "HostPort" in binding: + used.add(int(binding["HostPort"])) + except Exception as exc: + logger.warning("Failed to read ports for container %s: %s", getattr(container, "name", "?"), exc) + return used + + +async def find_available_port(client: docker.DockerClient, docker_cfg: DockerDeploymentConfig) -> int | None: + try: + containers = await asyncio.to_thread( + client.containers.list, + all=True, + filters=managed_by_filter(), + ) + except Exception: + logger.exception("Failed to list containers for port allocation") + return None + + used_ports = collect_used_host_ports(containers) + for port in range(docker_cfg.port_range_start, docker_cfg.port_range_end + 1): + if port not in used_ports and is_port_free(port): + return port + + logger.error( + "No available ports in range %s-%s", + docker_cfg.port_range_start, + docker_cfg.port_range_end, + ) + return None diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py new file mode 100644 index 0000000000..55e97ea0a2 --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py @@ -0,0 +1,131 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Readiness probe evaluation for running containers.""" + +from __future__ import annotations + +import asyncio +import logging +import socket +from typing import TYPE_CHECKING +from urllib.parse import urljoin + +import httpx +from nemo_deployments_plugin.entities import Probe + +if TYPE_CHECKING: + from docker.models.containers import Container as DockerContainer + +logger = logging.getLogger(__name__) + + +async def check_readiness_probe( + *, + container: DockerContainer, + probe: Probe | None, + host_url: str | None, + host_ports: dict[int, int] | None = None, + named_ports: dict[str, int] | None = None, +) -> tuple[bool, str]: + """Return (ready, reason). When no probe is configured, running implies ready.""" + if probe is None: + return True, "no readiness probe configured" + + if probe.exec_action is not None and probe.exec_action.command: + return await _check_exec_probe(container, probe) + + if probe.http_get is not None and host_url is not None: + return await _check_http_probe(host_url, probe, host_ports=host_ports, named_ports=named_ports) + + if probe.tcp_socket is not None and host_url is not None: + return await _check_tcp_probe(host_url, probe) + + return True, "probe type not implemented; treating as ready" + + +async def _check_exec_probe(container: DockerContainer, probe: Probe) -> tuple[bool, str]: + assert probe.exec_action is not None + command = probe.exec_action.command + timeout = probe.timeout_seconds + + def _run() -> tuple[int, str]: + result = container.exec_run(command, demux=True) + exit_code = result.exit_code if result.exit_code is not None else 1 + output = "" + if result.output: + stdout, stderr = result.output + chunks = [] + if stdout: + chunks.append(stdout.decode("utf-8", errors="ignore")) + if stderr: + chunks.append(stderr.decode("utf-8", errors="ignore")) + output = "".join(chunks) + return exit_code, output + + try: + exit_code, output = await asyncio.wait_for(asyncio.to_thread(_run), timeout=timeout) + except TimeoutError: + return False, f"exec probe timed out after {timeout}s" + except Exception as exc: + return False, f"exec probe failed: {exc}" + + if exit_code == 0: + return True, "exec probe succeeded" + return False, f"exec probe exit {exit_code}: {output[:200]}" + + +async def _check_http_probe( + host_url: str, + probe: Probe, + *, + host_ports: dict[int, int] | None = None, + named_ports: dict[str, int] | None = None, +) -> tuple[bool, str]: + assert probe.http_get is not None + port = probe.http_get.port + path = probe.http_get.path + scheme = probe.http_get.scheme.lower() + base = host_url + if isinstance(port, int): + base = f"{scheme}://127.0.0.1:{port}" + elif isinstance(port, str) and named_ports and port in named_ports: + base = f"{scheme}://127.0.0.1:{named_ports[port]}" + elif isinstance(port, str) and host_ports: + for container_port, host_port in host_ports.items(): + if str(container_port) == port: + base = f"{scheme}://127.0.0.1:{host_port}" + break + url = urljoin(f"{base.rstrip('/')}/", path.lstrip("/")) + timeout = probe.timeout_seconds + try: + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.get(url) + if 200 <= response.status_code < 400: + return True, f"http probe {response.status_code}" + return False, f"http probe status {response.status_code}" + except Exception as exc: + return False, f"http probe failed: {exc}" + + +async def _check_tcp_probe(host_url: str, probe: Probe) -> tuple[bool, str]: + assert probe.tcp_socket is not None + port_value = probe.tcp_socket.port + if not isinstance(port_value, int): + return False, "tcp probe requires numeric port" + host = "127.0.0.1" + timeout = probe.timeout_seconds + + def _connect() -> None: + with socket.create_connection((host, port_value), timeout=timeout): + return + + try: + await asyncio.wait_for(asyncio.to_thread(_connect), timeout=timeout) + return True, "tcp probe connected" + except Exception as exc: + return False, f"tcp probe failed: {exc}" + + +def host_url_for_port(host: str, host_port: int, *, scheme: str = "http") -> str: + return f"{scheme}://{host}:{host_port}" diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py new file mode 100644 index 0000000000..74a705070f --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Map Docker container state to plugin DeploymentStatus values.""" + +from __future__ import annotations + +from nemo_deployments_plugin.backends.base import BackendStatusUpdate +from nemo_deployments_plugin.types import DeploymentStatus, RestartPolicy + +LOG_TAIL_LINES = 80 +LOG_MAX_CHARS = 8000 +SUCCESSFUL_EXIT_CODES = (0,) + + +def format_duration(seconds: float) -> str: + hours, rem = divmod(int(seconds), 3600) + minutes, secs = divmod(rem, 60) + parts: list[str] = [] + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + parts.append(f"{secs}s") + return " ".join(parts) + + +def map_exited_status(exit_code: int, restart_policy: RestartPolicy) -> DeploymentStatus: + if exit_code in SUCCESSFUL_EXIT_CODES: + return "SUCCEEDED" + if restart_policy == "Always": + return "FAILED" + return "FAILED" + + +def missing_container_status(restart_policy: RestartPolicy, *, container_name: str) -> BackendStatusUpdate: + if restart_policy == "Always": + return BackendStatusUpdate( + status="LOST", + status_message=(f"Container not found — may have been manually deleted. Expected name: {container_name}"), + error_details={"expected_container_name": container_name}, + ) + return BackendStatusUpdate( + status="FAILED", + status_message=f"Container not found. Expected name: {container_name}", + error_details={"expected_container_name": container_name}, + ) + + +def map_docker_state_to_starting(container_id: str, state: str) -> BackendStatusUpdate: + return BackendStatusUpdate( + status="STARTING", + status_message=f"Container is {state} (ID: {container_id[:12]})", + ) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py new file mode 100644 index 0000000000..f50d9690eb --- /dev/null +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Docker volume lifecycle helpers.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING, Any + +from nemo_deployments_plugin.backends.base import VolumeStatusUpdate +from nemo_deployments_plugin.backends.docker.labels import docker_volume_name, volume_identity_labels + +if TYPE_CHECKING: + import docker + +logger = logging.getLogger(__name__) + + +async def create_volume( + client: docker.DockerClient, + *, + workspace: str, + name: str, + driver: str = "local", +) -> VolumeStatusUpdate: + vol_name = docker_volume_name(workspace, name) + labels = volume_identity_labels(workspace, name) + + def _create() -> Any: + from docker.errors import NotFound + + try: + existing = client.volumes.get(vol_name) + existing.reload() + return existing + except NotFound: + return client.volumes.create(name=vol_name, driver=driver, labels=labels) + + try: + await asyncio.to_thread(_create) + return VolumeStatusUpdate(status="BOUND", status_message=f"Volume {vol_name} is bound") + except Exception as exc: + logger.exception("Failed to create volume %s", vol_name) + return VolumeStatusUpdate(status="FAILED", status_message=f"Failed to create volume: {exc}") + + +async def read_volume_status(client: docker.DockerClient, *, workspace: str, name: str) -> VolumeStatusUpdate: + vol_name = docker_volume_name(workspace, name) + + def _get() -> Any: + from docker.errors import NotFound + + try: + return client.volumes.get(vol_name) + except NotFound: + raise + + try: + volume = await asyncio.to_thread(_get) + volume.reload() + return VolumeStatusUpdate(status="BOUND", status_message=f"Volume {volume.name} is bound") + except Exception as exc: + from docker.errors import NotFound + + if isinstance(exc, NotFound): + return VolumeStatusUpdate(status="FAILED", status_message=f"Volume {vol_name} not found") + return VolumeStatusUpdate(status="FAILED", status_message=f"Failed to read volume: {exc}") + + +async def delete_volume(client: docker.DockerClient, *, workspace: str, name: str) -> VolumeStatusUpdate: + vol_name = docker_volume_name(workspace, name) + + def _remove() -> None: + from docker.errors import NotFound + + try: + volume = client.volumes.get(vol_name) + volume.remove(force=True) + except NotFound: + return + + try: + await asyncio.to_thread(_remove) + return VolumeStatusUpdate(status="RELEASED", status_message=f"Volume {vol_name} released") + except Exception as exc: + return VolumeStatusUpdate(status="FAILED", status_message=f"Failed to delete volume: {exc}") diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py index 05aacd509b..b7d0a6d34e 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/registry.py @@ -10,12 +10,14 @@ from typing import Any, Self from nemo_deployments_plugin.backends.base import DeploymentBackend +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend from nemo_platform import AsyncNeMoPlatform logger = logging.getLogger(__name__) -BACKEND_CLASSES: dict[str, type[DeploymentBackend]] = {} -"""Backend type → class map. Populated when docker/k8s backends land (756/757).""" +BACKEND_CLASSES: dict[str, type[DeploymentBackend]] = { + "docker": DockerDeploymentBackend, +} @dataclass(frozen=True) diff --git a/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py new file mode 100644 index 0000000000..3fd32d0ce0 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py @@ -0,0 +1,164 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration tests for DockerDeploymentBackend against a real daemon.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from docker.errors import NotFound +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.backends.docker.labels import container_name, docker_volume_name +from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL +from nemo_deployments_plugin.entities import ( + Container, + ContainerPort, + Deployment, + DeploymentBackendConfig, + DeploymentConfig, + DockerDeploymentConfig, +) + +try: + import docker + + docker.from_env().ping() + _DOCKER_AVAILABLE = True +except Exception: + _DOCKER_AVAILABLE = False + +pytestmark = [ + pytest.mark.skipif("docker" not in BACKEND_CLASSES, reason="Docker backend not registered"), + pytest.mark.skipif(not _DOCKER_AVAILABLE, reason="Docker daemon not available"), +] + + +@pytest.fixture +def docker_backend() -> DockerDeploymentBackend: + mock_entities = AsyncMock() + mock_sdk = MagicMock() + with ( + patch("nemo_deployments_plugin.backends.docker.backend.AsyncEntitiesResource"), + patch("nemo_deployments_plugin.backends.docker.backend.NemoEntitiesClient", return_value=mock_entities), + patch("nemo_deployments_plugin.backends.docker.backend.get_shared_gpu_pool", return_value=None), + ): + backend = DockerDeploymentBackend(mock_sdk, {"pull_images": True}) + backend._entities = mock_entities + return backend + + +def _never_config() -> DeploymentConfig: + return DeploymentConfig( + name="echo-cfg", + workspace="itest", + restart_policy="Never", + containers=[Container(name="main", image="alpine:3.20", command=["echo"], args=["hello"])], + ) + + +def _always_http_config() -> DeploymentConfig: + return DeploymentConfig( + name="http-cfg", + workspace="itest", + restart_policy="Always", + containers=[ + Container( + name="main", + image="nginx:alpine", + ports=[ContainerPort(containerPort=80, protocol="TCP", name="http")], + ) + ], + backend_config=DeploymentBackendConfig( + docker=DockerDeploymentConfig(port_range_start=9050, port_range_end=9060) + ), + ) + + +@pytest.mark.asyncio +async def test_volume_lifecycle(docker_backend: DockerDeploymentBackend) -> None: + create = await docker_backend.create_volume( + workspace="itest", + name="data", + size="1Gi", + access_modes=["ReadWriteOnce"], + backend_config={}, + ) + assert create.status == "BOUND" + + read = await docker_backend.read_volume_status(workspace="itest", name="data") + assert read.status == "BOUND" + + deleted = await docker_backend.delete_volume("itest", "data") + assert deleted.status == "RELEASED" + + +@pytest.mark.asyncio +async def test_never_deployment_succeeds(docker_backend: DockerDeploymentBackend) -> None: + config = _never_config() + docker_backend._entities.get.return_value = config # type: ignore[attr-defined] + c_name = container_name("itest", "echo-job") + + try: + created = await docker_backend.create_deployment( + workspace="itest", + name="echo-job", + config_name="echo-cfg", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config={}, + ) + assert created.status == "STARTING" + + for _ in range(30): + status = await docker_backend.read_status(workspace="itest", name="echo-job") + if status.status in ("SUCCEEDED", "FAILED"): + break + await asyncio.sleep(0.5) + + assert status.status == "SUCCEEDED" + assert status.exit_code == 0 + finally: + await docker_backend.delete_deployment("itest", "echo-job") + try: + docker_backend._client.containers.get(c_name).remove(force=True) + except NotFound: + pass + + +@pytest.mark.asyncio +async def test_lost_detection_for_always(docker_backend: DockerDeploymentBackend) -> None: + deployment = Deployment(name="lost-srv", workspace="itest", deployment_config="http-cfg") + config = _always_http_config() + + async def get_side_effect(entity_type, name, workspace=None): + if entity_type is Deployment: + return deployment + return config + + docker_backend._entities.get.side_effect = get_side_effect # type: ignore[attr-defined] + c_name = container_name("itest", "lost-srv") + + try: + created = await docker_backend.create_deployment( + workspace="itest", + name="lost-srv", + config_name="http-cfg", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config=config.backend_config.model_dump(by_alias=True), + ) + assert created.status == "STARTING" + + container = docker_backend._client.containers.get(c_name) + container.remove(force=True) + + status = await docker_backend.read_status(workspace="itest", name="lost-srv") + assert status.status == "LOST" + finally: + await docker_backend.delete_deployment("itest", "lost-srv") + try: + docker_backend._client.volumes.remove(docker_volume_name("itest", "data")) + except Exception: + pass diff --git a/plugins/nemo-deployments/tests/integration/conftest.py b/plugins/nemo-deployments/tests/integration/conftest.py new file mode 100644 index 0000000000..f0f3586fe4 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/conftest.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration test fixtures for docker-backed deployments.""" + +from __future__ import annotations + +import pytest + +try: + import docker + + docker.from_env().ping() + DOCKER_AVAILABLE = True +except Exception: + DOCKER_AVAILABLE = False + +skip_without_docker = pytest.mark.skipif(not DOCKER_AVAILABLE, reason="Docker daemon not available") diff --git a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py index 0b67c55178..d35802c822 100644 --- a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py +++ b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py @@ -4,21 +4,202 @@ """Integration tests for deployment reconciliation. Requires AIRCORE-756 DockerDeploymentBackend to be registered in BACKEND_CLASSES. -Until then, these tests are skipped — unit tests with MockDeploymentBackend provide coverage. """ from __future__ import annotations -import pytest -from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch -pytestmark = pytest.mark.skipif( - "docker" not in BACKEND_CLASSES, - reason="Requires DockerDeploymentBackend (AIRCORE-756)", +import pytest +from docker.errors import NotFound +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.backends.docker.labels import container_name, docker_volume_name +from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES, ExecutorRegistry +from nemo_deployments_plugin.config import ControllerConfig +from nemo_deployments_plugin.entities import ( + Container, + Deployment, + DeploymentBackendConfig, + DeploymentConfig, + DockerDeploymentConfig, + Prerequisite, + Volume, + VolumeMount, ) +from nemo_deployments_plugin.reconciler.deployment_reconciler import DeploymentReconciler +from nemo_deployments_plugin.reconciler.volume_reconciler import VolumeReconciler + +try: + import docker + + docker.from_env().ping() + DOCKER_AVAILABLE = True +except Exception: + DOCKER_AVAILABLE = False + +pytestmark = [ + pytest.mark.skipif( + "docker" not in BACKEND_CLASSES, + reason="Requires DockerDeploymentBackend (AIRCORE-756)", + ), + pytest.mark.skipif(not DOCKER_AVAILABLE, reason="Docker daemon not available"), +] + + +@pytest.fixture +def docker_registry() -> ExecutorRegistry: + mock_sdk = MagicMock() + with ( + patch("nemo_deployments_plugin.backends.docker.backend.AsyncEntitiesResource"), + patch("nemo_deployments_plugin.backends.docker.backend.NemoEntitiesClient"), + patch("nemo_deployments_plugin.backends.docker.backend.get_shared_gpu_pool", return_value=None), + ): + backend = DockerDeploymentBackend(mock_sdk, {"pull_images": True}) + return ExecutorRegistry({"docker": backend}, default_executor="docker") + + +def _backend(docker_registry: ExecutorRegistry) -> DockerDeploymentBackend: + backend = docker_registry.resolve("docker") + assert isinstance(backend, DockerDeploymentBackend) + return backend @pytest.mark.asyncio -async def test_puller_server_prerequisite_chain() -> None: +async def test_puller_server_prerequisite_chain(docker_registry: ExecutorRegistry) -> None: """Volume → puller (OnFailure) → server (Always + prerequisite) end-to-end.""" - raise NotImplementedError("Enable when AIRCORE-756 lands") + entities = AsyncMock() + entities.update = AsyncMock(side_effect=lambda entity: entity) + + volume = Volume(name="weights", workspace="itest", size="1Gi", status="PENDING") + puller_dep = Deployment( + name="puller", + workspace="itest", + deployment_config="puller-cfg", + status="PENDING", + ) + server_dep = Deployment( + name="server", + workspace="itest", + deployment_config="server-cfg", + status="PENDING", + ) + + puller_cfg = DeploymentConfig( + name="puller-cfg", + workspace="itest", + restart_policy="OnFailure", + containers=[ + Container( + name="puller", + image="alpine:3.20", + command=["sh", "-c"], + args=["echo pulled > /data/ready && sleep 1"], + volumeMounts=[VolumeMount(name="weights", mountPath="/data")], + ) + ], + volumeMounts=[VolumeMount(name="weights", mountPath="/data")], + ) + server_cfg = DeploymentConfig( + name="server-cfg", + workspace="itest", + restart_policy="Always", + prerequisites=[Prerequisite(deployment_name="puller-cfg", condition="succeeded")], + containers=[ + Container( + name="server", + image="alpine:3.20", + command=["sleep"], + args=["3600"], + volumeMounts=[VolumeMount(name="weights", mountPath="/data", read_only=True)], + ) + ], + volumeMounts=[VolumeMount(name="weights", mountPath="/data")], + backend_config=DeploymentBackendConfig( + docker=DockerDeploymentConfig(port_range_start=9070, port_range_end=9080) + ), + ) + + config_cache = { + ("itest", "puller-cfg"): puller_cfg, + ("itest", "server-cfg"): server_cfg, + } + + async def get_side_effect(entity_type, name, workspace=None): + ws = workspace or "itest" + if entity_type is Volume: + return volume + if entity_type is Deployment: + if name == "puller": + return puller_dep + return server_dep + if entity_type is DeploymentConfig: + return config_cache[(ws, name)] + raise KeyError(name) + + entities.get.side_effect = get_side_effect + + backend = _backend(docker_registry) + backend._entities = entities + + volume_reconciler = VolumeReconciler(entities, docker_registry) + deployment_reconciler = DeploymentReconciler( + entities, + docker_registry, + ControllerConfig(interval_seconds=1), + ) + deployment_reconciler.set_config_cache(config_cache) + + volumes_by_name = {("itest", "weights"): volume} + + try: + await volume_reconciler.reconcile_one(volume) + assert volume.status == "BOUND" + volumes_by_name[("itest", "weights")] = volume + + for _ in range(40): + by_name = {("itest", "puller"): puller_dep, ("itest", "server"): server_dep} + by_config = { + ("itest", "puller-cfg"): puller_dep, + ("itest", "server-cfg"): server_dep, + } + await deployment_reconciler.reconcile_one( + puller_dep, + deployments_by_config=by_config, + deployments_by_name=by_name, + volumes_by_name=volumes_by_name, + ) + if puller_dep.status == "SUCCEEDED": + break + await asyncio.sleep(0.5) + assert puller_dep.status == "SUCCEEDED" + + for _ in range(10): + by_name = {("itest", "puller"): puller_dep, ("itest", "server"): server_dep} + by_config = { + ("itest", "puller-cfg"): puller_dep, + ("itest", "server-cfg"): server_dep, + } + await deployment_reconciler.reconcile_one( + server_dep, + deployments_by_config=by_config, + deployments_by_name=by_name, + volumes_by_name=volumes_by_name, + ) + if server_dep.status in ("READY", "STARTING"): + break + await asyncio.sleep(0.5) + assert server_dep.status in ("READY", "STARTING") + finally: + await backend.delete_deployment("itest", "puller") + await backend.delete_deployment("itest", "server") + await backend.delete_volume("itest", "weights") + for c_name in (container_name("itest", "puller"), container_name("itest", "server")): + try: + backend._client.containers.get(c_name).remove(force=True) + except NotFound: + pass + try: + backend._client.volumes.get(docker_volume_name("itest", "weights")).remove(force=True) + except NotFound: + pass diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/conftest.py b/plugins/nemo-deployments/tests/unit/backends/docker/conftest.py new file mode 100644 index 0000000000..466eec4fc2 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/conftest.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared fixtures for docker backend unit tests.""" + +from __future__ import annotations + +from collections.abc import Iterator +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend + + +@pytest.fixture +def mock_sdk() -> MagicMock: + return MagicMock() + + +@pytest.fixture +def mock_entities() -> AsyncMock: + client = AsyncMock() + return client + + +@pytest.fixture +def mock_docker_client() -> MagicMock: + client = MagicMock() + client.containers = MagicMock() + client.volumes = MagicMock() + client.images = MagicMock() + client.close = MagicMock() + return client + + +@pytest.fixture +def docker_backend( + mock_sdk: MagicMock, mock_entities: AsyncMock, mock_docker_client: MagicMock +) -> Iterator[DockerDeploymentBackend]: + with ( + patch("nemo_deployments_plugin.backends.docker.backend.AsyncEntitiesResource"), + patch("nemo_deployments_plugin.backends.docker.backend.NemoEntitiesClient", return_value=mock_entities), + patch("nemo_deployments_plugin.backends.docker.backend.get_shared_gpu_pool", return_value=None), + patch("docker.from_env", return_value=mock_docker_client), + ): + backend = DockerDeploymentBackend(mock_sdk, {"docker_timeout": 60, "pull_images": False}) + backend._client = mock_docker_client + yield backend diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py new file mode 100644 index 0000000000..0865b288d9 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py @@ -0,0 +1,136 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Mocked docker client tests for DockerDeploymentBackend.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from backends.docker_helpers import container_attrs, sample_config +from docker.errors import NotFound +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL +from nemo_deployments_plugin.entities import Deployment + + +@pytest.mark.asyncio +async def test_create_deployment_starts_container( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + mock_entities.get.return_value = sample_config() + mock_docker_client.containers.get.side_effect = NotFound("missing") + mock_docker_client.containers.run.return_value = MagicMock(id="abc123") + + update = await docker_backend.create_deployment( + workspace="default", + name="srv", + config_name="cfg1", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config={"docker": {"port_range_start": 9000, "port_range_end": 9100}}, + ) + + assert update.status == "STARTING" + mock_docker_client.containers.run.assert_called_once() + mock_entities.get.assert_awaited() + + +@pytest.mark.asyncio +async def test_read_status_ready_when_running_without_probe( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + container = MagicMock() + container.id = "abc123def456" + container.status = "running" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/restart-policy": "Always", + "nmp.nvidia.com/deployment-config": "cfg1", + } + container.ports = {} + container.attrs = container_attrs() + mock_docker_client.containers.get.return_value = container + mock_entities.get.return_value = sample_config() + + update = await docker_backend.read_status(workspace="default", name="srv") + + assert update.status == "READY" + + +@pytest.mark.asyncio +async def test_read_status_lost_when_missing_always( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + mock_docker_client.containers.get.side_effect = NotFound("missing") + + deployment_entity = MagicMock() + deployment_entity.deployment_config = "cfg1" + + async def get_side_effect(entity_type, name, workspace=None): + if entity_type is Deployment: + return deployment_entity + return sample_config(restart_policy="Always") + + mock_entities.get.side_effect = get_side_effect + + update = await docker_backend.read_status(workspace="default", name="srv") + + assert update.status == "LOST" + + +@pytest.mark.asyncio +async def test_delete_deployment_idempotent( + docker_backend: DockerDeploymentBackend, + mock_docker_client: MagicMock, +) -> None: + mock_docker_client.containers.get.side_effect = NotFound("missing") + + update = await docker_backend.delete_deployment("default", "srv") + + assert update.status == "SUCCEEDED" + + +@pytest.mark.asyncio +async def test_list_managed_deployment_names( + docker_backend: DockerDeploymentBackend, + mock_docker_client: MagicMock, +) -> None: + container = MagicMock() + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "default", + "nmp.nvidia.com/deployment-name": "srv", + } + mock_docker_client.containers.list.return_value = [container] + + names = await docker_backend.list_managed_deployment_names() + + assert names == ["default/srv"] + + +@pytest.mark.asyncio +async def test_create_volume_bound( + docker_backend: DockerDeploymentBackend, + mock_docker_client: MagicMock, +) -> None: + volume = MagicMock() + volume.name = "dep-vol-default-data" + mock_docker_client.volumes.get.side_effect = NotFound("missing") + mock_docker_client.volumes.create.return_value = volume + + update = await docker_backend.create_volume( + workspace="default", + name="data", + size="1Gi", + access_modes=["ReadWriteOnce"], + backend_config={}, + ) + + assert update.status == "BOUND" diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py new file mode 100644 index 0000000000..8f23803575 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Idempotency tests for DockerDeploymentBackend.create_deployment.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from backends.docker_helpers import container_attrs, sample_config +from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend +from nemo_deployments_plugin.backends.docker.labels import ( + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + +@pytest.mark.asyncio +async def test_create_existing_matching_container_returns_read_status( + docker_backend: DockerDeploymentBackend, + mock_entities: AsyncMock, + mock_docker_client: MagicMock, +) -> None: + existing = MagicMock() + existing.labels = { + "managed-by": MANAGED_BY_LABEL, + DEPLOYMENT_WORKSPACE_LABEL: "default", + DEPLOYMENT_NAME_LABEL: "srv", + "nmp.nvidia.com/restart-policy": "Always", + "nmp.nvidia.com/deployment-config": "cfg1", + } + existing.id = "abc" + existing.status = "running" + existing.ports = {} + existing.attrs = container_attrs() + mock_docker_client.containers.get.return_value = existing + mock_entities.get.return_value = sample_config() + + update = await docker_backend.create_deployment( + workspace="default", + name="srv", + config_name="cfg1", + labels={"managed-by": MANAGED_BY_LABEL}, + backend_config={}, + ) + + assert update.status == "READY" + mock_docker_client.containers.run.assert_not_called() diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_labels.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_labels.py new file mode 100644 index 0000000000..0a8e91a4f4 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_labels.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for deployment identity labels.""" + +from __future__ import annotations + +from nemo_deployments_plugin.backends.docker.labels import ( + CONFIG_NAME_LABEL, + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, + MANAGED_BY_KEY, + container_name, + deployment_identity_labels, + docker_volume_name, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + +def test_container_name_is_dns_safe() -> None: + name = container_name("my-workspace", "my.deployment") + assert name.startswith("dep-") + assert len(name) <= 63 + + +def test_docker_volume_name_prefix() -> None: + assert docker_volume_name("ws", "vol").startswith("dep-vol-") + + +def test_deployment_identity_labels() -> None: + labels = deployment_identity_labels( + "default", + "srv", + "Always", + config_name="cfg1", + ) + assert labels[MANAGED_BY_KEY] == MANAGED_BY_LABEL + assert labels[DEPLOYMENT_WORKSPACE_LABEL] == "default" + assert labels[DEPLOYMENT_NAME_LABEL] == "srv" + assert labels[CONFIG_NAME_LABEL] == "cfg1" diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py new file mode 100644 index 0000000000..d89f73836d --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for host port allocation.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest +from nemo_deployments_plugin.backends.docker.ports import collect_used_host_ports, is_port_free +from nemo_deployments_plugin.entities import DockerDeploymentConfig + + +def test_collect_used_host_ports() -> None: + container = MagicMock() + container.ports = {"8080/tcp": [{"HostPort": "9001"}]} + assert collect_used_host_ports([container]) == {9001} + + +def test_is_port_free_local() -> None: + # Ephemeral high port likely free in test environment + assert is_port_free(59999) or not is_port_free(59999) + + +@pytest.mark.asyncio +async def test_find_available_port_skips_used(mock_docker_client: MagicMock) -> None: + from nemo_deployments_plugin.backends.docker.ports import find_available_port + + used = MagicMock() + used.ports = {"80/tcp": [{"HostPort": "9000"}]} + mock_docker_client.containers.list.return_value = [used] + + cfg = DockerDeploymentConfig(port_range_start=9000, port_range_end=9002) + port = await find_available_port(mock_docker_client, cfg) + assert port in {9001, 9002} diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_restart_policy.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_restart_policy.py new file mode 100644 index 0000000000..78a410c953 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_restart_policy.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for restart policy → docker run kwargs.""" + +from __future__ import annotations + +import pytest +from nemo_deployments_plugin.backends.docker.containers import restart_policy_kwargs + + +@pytest.mark.parametrize( + ("policy", "backoff", "expected"), + [ + ("Always", 6, {"restart_policy": {"Name": "always"}}), + ("OnFailure", 3, {"restart_policy": {"Name": "on-failure", "MaximumRetryCount": 3}}), + ("Never", 6, {}), + ], +) +def test_restart_policy_kwargs(policy: str, backoff: int, expected: dict) -> None: + assert restart_policy_kwargs(policy, backoff) == expected # type: ignore[arg-type] diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_status_mapping.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_status_mapping.py new file mode 100644 index 0000000000..7283cb9b17 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_status_mapping.py @@ -0,0 +1,35 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for docker status mapping helpers.""" + +from __future__ import annotations + +import pytest +from nemo_deployments_plugin.backends.docker.status import ( + map_exited_status, + missing_container_status, +) + + +@pytest.mark.parametrize( + ("exit_code", "restart_policy", "expected"), + [ + (0, "Never", "SUCCEEDED"), + (0, "Always", "SUCCEEDED"), + (1, "Never", "FAILED"), + (1, "Always", "FAILED"), + ], +) +def test_map_exited_status(exit_code: int, restart_policy: str, expected: str) -> None: + assert map_exited_status(exit_code, restart_policy) == expected # type: ignore[arg-type] + + +def test_missing_container_lost_for_always() -> None: + update = missing_container_status("Always", container_name="dep-default-srv") + assert update.status == "LOST" + + +def test_missing_container_failed_for_never() -> None: + update = missing_container_status("Never", container_name="dep-default-job") + assert update.status == "FAILED" diff --git a/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py b/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py new file mode 100644 index 0000000000..f310ae3503 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Test helpers for docker backend unit tests.""" + +from __future__ import annotations + +from typing import Any + +from nemo_deployments_plugin.entities import Container, DeploymentConfig + + +def sample_config(*, restart_policy: str = "Always") -> DeploymentConfig: + return DeploymentConfig( + name="cfg1", + workspace="default", + containers=[ + Container( + name="main", + image="alpine:latest", + command=["echo"], + args=["hello"], + ) + ], + restart_policy=restart_policy, # type: ignore[arg-type] + ) + + +def container_attrs(*, status: str = "running", exit_code: int = 0) -> dict[str, Any]: + del status + return { + "State": {"ExitCode": exit_code, "StartedAt": "2026-01-01T00:00:00Z"}, + "RestartCount": 0, + } diff --git a/uv.lock b/uv.lock index 17fb89bea0..deaf05813c 100644 --- a/uv.lock +++ b/uv.lock @@ -4333,13 +4333,20 @@ version = "0.1.0" source = { editable = "plugins/nemo-deployments" } dependencies = [ { name = "fastapi", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, + { name = "httpx", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "nemo-platform", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "nemo-platform-plugin", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "pydantic", extra = ["email"], marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, ] +[package.optional-dependencies] +docker = [ + { name = "docker", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, +] + [package.dev-dependencies] dev = [ + { name = "docker", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "fastapi", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "httpx", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "pytest", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, @@ -4348,14 +4355,18 @@ dev = [ [package.metadata] requires-dist = [ + { name = "docker", marker = "extra == 'docker'", specifier = ">=7.0" }, { name = "fastapi", specifier = ">=0.115" }, + { name = "httpx", specifier = ">=0.27" }, { name = "nemo-platform", editable = "packages/nemo_platform" }, { name = "nemo-platform-plugin", editable = "packages/nemo_platform_plugin" }, { name = "pydantic", specifier = ">=2.10.6" }, ] +provides-extras = ["docker"] [package.metadata.requires-dev] dev = [ + { name = "docker", specifier = ">=7.0" }, { name = "fastapi", specifier = ">=0.115" }, { name = "httpx", specifier = ">=0.27" }, { name = "pytest", specifier = ">=8.3.4" }, From bad762d2f485f5d537458acb77ea7948d091045a Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Tue, 23 Jun 2026 09:28:30 -0700 Subject: [PATCH 2/9] fix(deployments): address CodeRabbit review on Docker backend (AIRCORE-756) - Exclude already-assigned host ports during multi-port allocation - Use label constants; fix HTTP/TCP probe host/port resolution - Remove TYPE_CHECKING-only docker imports - Deduplicate Docker availability check via docker_availability module - Harden port unit tests; add type hints and cleanup improvements Signed-off-by: Tyler Bray --- plugins/nemo-deployments/pyproject.toml | 2 +- .../backends/docker/backend.py | 14 ++-- .../backends/docker/ports.py | 14 ++-- .../backends/docker/probes.py | 62 ++++++++++------ .../backends/docker/status.py | 8 ++- .../backends/docker/volumes.py | 5 +- .../backends/docker/test_docker_backend.py | 23 +++--- .../tests/integration/conftest.py | 12 +--- .../tests/integration/docker_availability.py | 20 ++++++ .../integration/test_reconcile_docker.py | 37 +++++----- .../tests/unit/backends/docker/test_ports.py | 71 +++++++++++++++++-- .../tests/unit/backends/docker_helpers.py | 5 +- 12 files changed, 187 insertions(+), 86 deletions(-) create mode 100644 plugins/nemo-deployments/tests/integration/docker_availability.py diff --git a/plugins/nemo-deployments/pyproject.toml b/plugins/nemo-deployments/pyproject.toml index fe3ca7fea3..22698169db 100644 --- a/plugins/nemo-deployments/pyproject.toml +++ b/plugins/nemo-deployments/pyproject.toml @@ -38,6 +38,6 @@ dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "httpx>=0.27", "fastapi>=0.115 [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" -pythonpath = ["src", "tests/unit"] +pythonpath = ["src", "tests/unit", "tests/integration"] [tool.nemo.openapi] diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py index d575ea5f02..0ed1a59842 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py @@ -32,9 +32,11 @@ ) from nemo_deployments_plugin.backends.docker.gpu import GPUAllocationError, get_shared_gpu_pool from nemo_deployments_plugin.backends.docker.labels import ( + BACKOFF_LIMIT_LABEL, CONFIG_NAME_LABEL, DEPLOYMENT_NAME_LABEL, DEPLOYMENT_WORKSPACE_LABEL, + MANAGED_BY_KEY, RESTART_POLICY_LABEL, container_name, deployment_identity_labels, @@ -147,7 +149,11 @@ async def create_deployment( host_ports: dict[int, int] = {} for port_spec in container_spec.ports: - host_port = await find_available_port(self._client, docker_cfg) + host_port = await find_available_port( + self._client, + docker_cfg, + exclude_ports=set(host_ports.values()), + ) if host_port is None: if gpu_ids: self._gpu_pool.release_gpu(dep_key) # type: ignore[union-attr] @@ -289,7 +295,7 @@ async def read_status(self, *, workspace: str, name: str) -> BackendStatusUpdate ) if restart_policy == "OnFailure": restart_count = int(container.attrs.get("RestartCount", 0)) - backoff_limit = int(labels.get("nmp.nvidia.com/backoff-limit", "6")) + backoff_limit = int(labels.get(BACKOFF_LIMIT_LABEL, "6")) if restart_count < backoff_limit: return BackendStatusUpdate( status="STARTING", @@ -350,7 +356,7 @@ async def list_managed_deployment_names(self) -> list[str]: seen: set[str] = set() for container in containers: container_labels = container.labels or {} - if container_labels.get("managed-by") != MANAGED_BY_LABEL: + if container_labels.get(MANAGED_BY_KEY) != MANAGED_BY_LABEL: continue ws = container_labels.get(DEPLOYMENT_WORKSPACE_LABEL) dep_name = container_labels.get(DEPLOYMENT_NAME_LABEL) @@ -417,7 +423,7 @@ def _container_matches_deployment( labels.get(DEPLOYMENT_WORKSPACE_LABEL) == workspace and labels.get(DEPLOYMENT_NAME_LABEL) == name and labels.get(CONFIG_NAME_LABEL) == config_name - and labels.get("managed-by") == MANAGED_BY_LABEL + and labels.get(MANAGED_BY_KEY) == MANAGED_BY_LABEL ) async def _resolve_restart_policy(self, workspace: str, name: str) -> RestartPolicy: diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py index 90dbc59b3a..f95884e887 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py @@ -9,13 +9,12 @@ import logging import os import socket -from typing import TYPE_CHECKING, Any +from typing import Any from nemo_deployments_plugin.backends.docker.labels import managed_by_filter from nemo_deployments_plugin.entities import DockerDeploymentConfig -if TYPE_CHECKING: - import docker +import docker logger = logging.getLogger(__name__) @@ -53,7 +52,12 @@ def collect_used_host_ports(containers: list[Any]) -> set[int]: return used -async def find_available_port(client: docker.DockerClient, docker_cfg: DockerDeploymentConfig) -> int | None: +async def find_available_port( + client: docker.DockerClient, + docker_cfg: DockerDeploymentConfig, + *, + exclude_ports: set[int] | None = None, +) -> int | None: try: containers = await asyncio.to_thread( client.containers.list, @@ -65,6 +69,8 @@ async def find_available_port(client: docker.DockerClient, docker_cfg: DockerDep return None used_ports = collect_used_host_ports(containers) + if exclude_ports: + used_ports = used_ports | exclude_ports for port in range(docker_cfg.port_range_start, docker_cfg.port_range_end + 1): if port not in used_ports and is_port_free(port): return port diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py index 55e97ea0a2..3c19a011be 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/probes.py @@ -8,15 +8,12 @@ import asyncio import logging import socket -from typing import TYPE_CHECKING -from urllib.parse import urljoin +from urllib.parse import urljoin, urlparse import httpx +from docker.models.containers import Container as DockerContainer from nemo_deployments_plugin.entities import Probe -if TYPE_CHECKING: - from docker.models.containers import Container as DockerContainer - logger = logging.getLogger(__name__) @@ -39,7 +36,7 @@ async def check_readiness_probe( return await _check_http_probe(host_url, probe, host_ports=host_ports, named_ports=named_ports) if probe.tcp_socket is not None and host_url is not None: - return await _check_tcp_probe(host_url, probe) + return await _check_tcp_probe(host_url, probe, host_ports=host_ports, named_ports=named_ports) return True, "probe type not implemented; treating as ready" @@ -75,6 +72,27 @@ def _run() -> tuple[int, str]: return False, f"exec probe exit {exit_code}: {output[:200]}" +def _probe_host(host_url: str) -> str: + return urlparse(host_url).hostname or "127.0.0.1" + + +def _resolve_probe_port( + port: int | str, + *, + host_ports: dict[int, int] | None, + named_ports: dict[str, int] | None, +) -> int | None: + if isinstance(port, int): + return host_ports.get(port, port) if host_ports else port + if named_ports and port in named_ports: + return named_ports[port] + if host_ports: + for container_port, host_port in host_ports.items(): + if str(container_port) == port: + return host_port + return None + + async def _check_http_probe( host_url: str, probe: Probe, @@ -86,16 +104,11 @@ async def _check_http_probe( port = probe.http_get.port path = probe.http_get.path scheme = probe.http_get.scheme.lower() - base = host_url - if isinstance(port, int): - base = f"{scheme}://127.0.0.1:{port}" - elif isinstance(port, str) and named_ports and port in named_ports: - base = f"{scheme}://127.0.0.1:{named_ports[port]}" - elif isinstance(port, str) and host_ports: - for container_port, host_port in host_ports.items(): - if str(container_port) == port: - base = f"{scheme}://127.0.0.1:{host_port}" - break + host = _probe_host(host_url) + resolved_port = _resolve_probe_port(port, host_ports=host_ports, named_ports=named_ports) + if resolved_port is None: + return False, f"http probe port not mapped: {port}" + base = f"{scheme}://{host}:{resolved_port}" url = urljoin(f"{base.rstrip('/')}/", path.lstrip("/")) timeout = probe.timeout_seconds try: @@ -108,16 +121,23 @@ async def _check_http_probe( return False, f"http probe failed: {exc}" -async def _check_tcp_probe(host_url: str, probe: Probe) -> tuple[bool, str]: +async def _check_tcp_probe( + host_url: str, + probe: Probe, + *, + host_ports: dict[int, int] | None = None, + named_ports: dict[str, int] | None = None, +) -> tuple[bool, str]: assert probe.tcp_socket is not None port_value = probe.tcp_socket.port - if not isinstance(port_value, int): - return False, "tcp probe requires numeric port" - host = "127.0.0.1" + host = _probe_host(host_url) + target_port = _resolve_probe_port(port_value, host_ports=host_ports, named_ports=named_ports) + if target_port is None: + return False, f"tcp probe port not mapped: {port_value}" timeout = probe.timeout_seconds def _connect() -> None: - with socket.create_connection((host, port_value), timeout=timeout): + with socket.create_connection((host, target_port), timeout=timeout): return try: diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py index 74a705070f..77b13695fd 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/status.py @@ -26,10 +26,14 @@ def format_duration(seconds: float) -> str: def map_exited_status(exit_code: int, restart_policy: RestartPolicy) -> DeploymentStatus: + """Map a stopped container's exit code to a terminal deployment status. + + Restart-policy-specific retry handling happens in the backend before this + helper is called for non-zero exits. + """ + del restart_policy if exit_code in SUCCESSFUL_EXIT_CODES: return "SUCCEEDED" - if restart_policy == "Always": - return "FAILED" return "FAILED" diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py index f50d9690eb..35047e012c 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/volumes.py @@ -7,13 +7,12 @@ import asyncio import logging -from typing import TYPE_CHECKING, Any +from typing import Any from nemo_deployments_plugin.backends.base import VolumeStatusUpdate from nemo_deployments_plugin.backends.docker.labels import docker_volume_name, volume_identity_labels -if TYPE_CHECKING: - import docker +import docker logger = logging.getLogger(__name__) diff --git a/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py index 3fd32d0ce0..9cfd0a666a 100644 --- a/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py +++ b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py @@ -10,8 +10,9 @@ import pytest from docker.errors import NotFound +from docker_availability import skip_without_docker from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend -from nemo_deployments_plugin.backends.docker.labels import container_name, docker_volume_name +from nemo_deployments_plugin.backends.docker.labels import container_name from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES from nemo_deployments_plugin.constants import MANAGED_BY_LABEL from nemo_deployments_plugin.entities import ( @@ -23,17 +24,11 @@ DockerDeploymentConfig, ) -try: - import docker - - docker.from_env().ping() - _DOCKER_AVAILABLE = True -except Exception: - _DOCKER_AVAILABLE = False +import docker pytestmark = [ pytest.mark.skipif("docker" not in BACKEND_CLASSES, reason="Docker backend not registered"), - pytest.mark.skipif(not _DOCKER_AVAILABLE, reason="Docker daemon not available"), + skip_without_docker, ] @@ -101,6 +96,7 @@ async def test_never_deployment_succeeds(docker_backend: DockerDeploymentBackend config = _never_config() docker_backend._entities.get.return_value = config # type: ignore[attr-defined] c_name = container_name("itest", "echo-job") + client = docker.from_env() try: created = await docker_backend.create_deployment( @@ -123,7 +119,7 @@ async def test_never_deployment_succeeds(docker_backend: DockerDeploymentBackend finally: await docker_backend.delete_deployment("itest", "echo-job") try: - docker_backend._client.containers.get(c_name).remove(force=True) + client.containers.get(c_name).remove(force=True) except NotFound: pass @@ -140,6 +136,7 @@ async def get_side_effect(entity_type, name, workspace=None): docker_backend._entities.get.side_effect = get_side_effect # type: ignore[attr-defined] c_name = container_name("itest", "lost-srv") + client = docker.from_env() try: created = await docker_backend.create_deployment( @@ -151,7 +148,7 @@ async def get_side_effect(entity_type, name, workspace=None): ) assert created.status == "STARTING" - container = docker_backend._client.containers.get(c_name) + container = client.containers.get(c_name) container.remove(force=True) status = await docker_backend.read_status(workspace="itest", name="lost-srv") @@ -159,6 +156,6 @@ async def get_side_effect(entity_type, name, workspace=None): finally: await docker_backend.delete_deployment("itest", "lost-srv") try: - docker_backend._client.volumes.remove(docker_volume_name("itest", "data")) - except Exception: + client.containers.get(c_name).remove(force=True) + except NotFound: pass diff --git a/plugins/nemo-deployments/tests/integration/conftest.py b/plugins/nemo-deployments/tests/integration/conftest.py index f0f3586fe4..386c970b2f 100644 --- a/plugins/nemo-deployments/tests/integration/conftest.py +++ b/plugins/nemo-deployments/tests/integration/conftest.py @@ -5,14 +5,6 @@ from __future__ import annotations -import pytest +from docker_availability import DOCKER_AVAILABLE, skip_without_docker -try: - import docker - - docker.from_env().ping() - DOCKER_AVAILABLE = True -except Exception: - DOCKER_AVAILABLE = False - -skip_without_docker = pytest.mark.skipif(not DOCKER_AVAILABLE, reason="Docker daemon not available") +__all__ = ["DOCKER_AVAILABLE", "skip_without_docker"] diff --git a/plugins/nemo-deployments/tests/integration/docker_availability.py b/plugins/nemo-deployments/tests/integration/docker_availability.py new file mode 100644 index 0000000000..baf7f173c1 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/docker_availability.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared Docker daemon availability check for integration tests.""" + +from __future__ import annotations + +import pytest + +try: + import docker + + docker.from_env().ping() + DOCKER_AVAILABLE: bool = True +except Exception: + DOCKER_AVAILABLE = False + +skip_without_docker: pytest.MarkDecorator = pytest.mark.skipif( + not DOCKER_AVAILABLE, reason="Docker daemon not available" +) diff --git a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py index d35802c822..62b399496a 100644 --- a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py +++ b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py @@ -13,6 +13,7 @@ import pytest from docker.errors import NotFound +from docker_availability import skip_without_docker from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend from nemo_deployments_plugin.backends.docker.labels import container_name, docker_volume_name from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES, ExecutorRegistry @@ -30,20 +31,14 @@ from nemo_deployments_plugin.reconciler.deployment_reconciler import DeploymentReconciler from nemo_deployments_plugin.reconciler.volume_reconciler import VolumeReconciler -try: - import docker - - docker.from_env().ping() - DOCKER_AVAILABLE = True -except Exception: - DOCKER_AVAILABLE = False +import docker pytestmark = [ pytest.mark.skipif( "docker" not in BACKEND_CLASSES, reason="Requires DockerDeploymentBackend (AIRCORE-756)", ), - pytest.mark.skipif(not DOCKER_AVAILABLE, reason="Docker daemon not available"), + skip_without_docker, ] @@ -125,7 +120,11 @@ async def test_puller_server_prerequisite_chain(docker_registry: ExecutorRegistr ("itest", "server-cfg"): server_cfg, } - async def get_side_effect(entity_type, name, workspace=None): + async def get_side_effect( + entity_type: type, + name: str, + workspace: str | None = None, + ) -> Volume | Deployment | DeploymentConfig: ws = workspace or "itest" if entity_type is Volume: return volume @@ -151,6 +150,11 @@ async def get_side_effect(entity_type, name, workspace=None): deployment_reconciler.set_config_cache(config_cache) volumes_by_name = {("itest", "weights"): volume} + by_name = {("itest", "puller"): puller_dep, ("itest", "server"): server_dep} + by_config = { + ("itest", "puller-cfg"): puller_dep, + ("itest", "server-cfg"): server_dep, + } try: await volume_reconciler.reconcile_one(volume) @@ -158,11 +162,6 @@ async def get_side_effect(entity_type, name, workspace=None): volumes_by_name[("itest", "weights")] = volume for _ in range(40): - by_name = {("itest", "puller"): puller_dep, ("itest", "server"): server_dep} - by_config = { - ("itest", "puller-cfg"): puller_dep, - ("itest", "server-cfg"): server_dep, - } await deployment_reconciler.reconcile_one( puller_dep, deployments_by_config=by_config, @@ -175,11 +174,6 @@ async def get_side_effect(entity_type, name, workspace=None): assert puller_dep.status == "SUCCEEDED" for _ in range(10): - by_name = {("itest", "puller"): puller_dep, ("itest", "server"): server_dep} - by_config = { - ("itest", "puller-cfg"): puller_dep, - ("itest", "server-cfg"): server_dep, - } await deployment_reconciler.reconcile_one( server_dep, deployments_by_config=by_config, @@ -191,15 +185,16 @@ async def get_side_effect(entity_type, name, workspace=None): await asyncio.sleep(0.5) assert server_dep.status in ("READY", "STARTING") finally: + client = docker.from_env() await backend.delete_deployment("itest", "puller") await backend.delete_deployment("itest", "server") await backend.delete_volume("itest", "weights") for c_name in (container_name("itest", "puller"), container_name("itest", "server")): try: - backend._client.containers.get(c_name).remove(force=True) + client.containers.get(c_name).remove(force=True) except NotFound: pass try: - backend._client.volumes.get(docker_volume_name("itest", "weights")).remove(force=True) + client.volumes.get(docker_volume_name("itest", "weights")).remove(force=True) except NotFound: pass diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py index d89f73836d..e93ef465e8 100644 --- a/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py @@ -18,19 +18,80 @@ def test_collect_used_host_ports() -> None: assert collect_used_host_ports([container]) == {9001} -def test_is_port_free_local() -> None: - # Ephemeral high port likely free in test environment - assert is_port_free(59999) or not is_port_free(59999) +def test_is_port_free_skips_check_for_remote_host(monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod + + monkeypatch.setattr(ports_mod, "is_remote_docker_host", lambda: True) + assert is_port_free(1) is True + + +def test_is_port_free_returns_false_when_bind_fails(monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod + + class FakeSock: + def __enter__(self) -> FakeSock: + return self + + def __exit__(self, *args: object) -> None: + return None + + def setsockopt(self, *args: object, **kwargs: object) -> None: + return None + + def bind(self, addr: tuple[str, int]) -> None: + raise OSError("Address already in use") + + monkeypatch.setattr(ports_mod, "is_remote_docker_host", lambda: False) + monkeypatch.setattr(ports_mod.socket, "socket", lambda *args, **kwargs: FakeSock()) + assert is_port_free(9000) is False + + +def test_is_port_free_returns_true_when_bind_succeeds(monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod + + class FakeSock: + def __enter__(self) -> FakeSock: + return self + + def __exit__(self, *args: object) -> None: + return None + + def setsockopt(self, *args: object, **kwargs: object) -> None: + return None + + def bind(self, addr: tuple[str, int]) -> None: + return None + + monkeypatch.setattr(ports_mod, "is_remote_docker_host", lambda: False) + monkeypatch.setattr(ports_mod.socket, "socket", lambda *args, **kwargs: FakeSock()) + assert is_port_free(9000) is True @pytest.mark.asyncio -async def test_find_available_port_skips_used(mock_docker_client: MagicMock) -> None: +async def test_find_available_port_skips_used(mock_docker_client: MagicMock, monkeypatch: pytest.MonkeyPatch) -> None: + from nemo_deployments_plugin.backends.docker import ports as ports_mod from nemo_deployments_plugin.backends.docker.ports import find_available_port used = MagicMock() used.ports = {"80/tcp": [{"HostPort": "9000"}]} mock_docker_client.containers.list.return_value = [used] + monkeypatch.setattr(ports_mod, "is_port_free", lambda port: port != 9001) cfg = DockerDeploymentConfig(port_range_start=9000, port_range_end=9002) port = await find_available_port(mock_docker_client, cfg) - assert port in {9001, 9002} + assert port == 9002 + + +@pytest.mark.asyncio +async def test_find_available_port_excludes_pending_assignments(mock_docker_client: MagicMock) -> None: + from nemo_deployments_plugin.backends.docker.ports import find_available_port + + mock_docker_client.containers.list.return_value = [] + cfg = DockerDeploymentConfig(port_range_start=9000, port_range_end=9002) + + first = await find_available_port(mock_docker_client, cfg) + assert first == 9000 + second = await find_available_port(mock_docker_client, cfg, exclude_ports={first}) + + assert first == 9000 + assert second == 9001 diff --git a/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py b/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py index f310ae3503..0e308633f8 100644 --- a/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py +++ b/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py @@ -8,9 +8,10 @@ from typing import Any from nemo_deployments_plugin.entities import Container, DeploymentConfig +from nemo_deployments_plugin.types import RestartPolicy -def sample_config(*, restart_policy: str = "Always") -> DeploymentConfig: +def sample_config(*, restart_policy: RestartPolicy = "Always") -> DeploymentConfig: return DeploymentConfig( name="cfg1", workspace="default", @@ -22,7 +23,7 @@ def sample_config(*, restart_policy: str = "Always") -> DeploymentConfig: args=["hello"], ) ], - restart_policy=restart_policy, # type: ignore[arg-type] + restart_policy=restart_policy, ) From af422b34bc0d2d4fd343e5cceaa9515ffdd4c5b2 Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Thu, 25 Jun 2026 14:22:45 -0700 Subject: [PATCH 3/9] fix(deployments): align Docker integration tests with #469 prerequisite model Move server prerequisite from DeploymentConfig to Deployment, reference puller by deployment name, and drop deployments_by_config from reconcile_one. Signed-off-by: Tyler Bray --- .../tests/integration/test_reconcile_docker.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py index 62b399496a..f25db77219 100644 --- a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py +++ b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py @@ -78,6 +78,7 @@ async def test_puller_server_prerequisite_chain(docker_registry: ExecutorRegistr workspace="itest", deployment_config="server-cfg", status="PENDING", + prerequisites=[Prerequisite(deployment_name="puller", condition="succeeded")], ) puller_cfg = DeploymentConfig( @@ -99,7 +100,6 @@ async def test_puller_server_prerequisite_chain(docker_registry: ExecutorRegistr name="server-cfg", workspace="itest", restart_policy="Always", - prerequisites=[Prerequisite(deployment_name="puller-cfg", condition="succeeded")], containers=[ Container( name="server", @@ -151,10 +151,6 @@ async def get_side_effect( volumes_by_name = {("itest", "weights"): volume} by_name = {("itest", "puller"): puller_dep, ("itest", "server"): server_dep} - by_config = { - ("itest", "puller-cfg"): puller_dep, - ("itest", "server-cfg"): server_dep, - } try: await volume_reconciler.reconcile_one(volume) @@ -164,7 +160,6 @@ async def get_side_effect( for _ in range(40): await deployment_reconciler.reconcile_one( puller_dep, - deployments_by_config=by_config, deployments_by_name=by_name, volumes_by_name=volumes_by_name, ) @@ -176,7 +171,6 @@ async def get_side_effect( for _ in range(10): await deployment_reconciler.reconcile_one( server_dep, - deployments_by_config=by_config, deployments_by_name=by_name, volumes_by_name=volumes_by_name, ) From 3901ed80e73acda387091497585a61a7bfa8a88b Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Fri, 26 Jun 2026 09:36:45 -0700 Subject: [PATCH 4/9] fix(deployments): filter reconciler lists on data.status (AIRCORE-756) Entity store persists deployment and volume status under the data JSON column; filtering on top-level status caused list queries to fail and left DELETING deployments stuck. Align controller and status_in list API with data.status and update unit test expectations. Signed-off-by: Tyler Bray --- .../src/nemo_deployments_plugin/api/v2/deployments.py | 2 +- .../src/nemo_deployments_plugin/controller.py | 6 +++--- .../tests/unit/reconciler/test_entity_client.py | 2 +- plugins/nemo-deployments/tests/unit/test_api_deployments.py | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py index 45e876a36b..277a13ed96 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/api/v2/deployments.py @@ -126,7 +126,7 @@ async def list_deployments( if statuses: filter_operation = ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=statuses, ) result = await entity_client.list( diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py index 51cf4d830a..2ea41c6620 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/controller.py @@ -179,7 +179,7 @@ async def _list_deployments(self) -> list[Deployment]: Deployment, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=list(NON_TERMINAL_DEPLOYMENT_STATUSES), ), ) @@ -197,7 +197,7 @@ async def _list_volumes(self) -> list[Volume]: Volume, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=list(NON_TERMINAL_VOLUME_STATUSES), ), ) @@ -218,7 +218,7 @@ async def _list_terminal_deployments_for_orphan_grace(self) -> list[Deployment]: Deployment, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=list(_TERMINAL_ORPHAN_STATUSES), ), ) diff --git a/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py b/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py index 3a45ead721..f0b903a420 100644 --- a/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py +++ b/plugins/nemo-deployments/tests/unit/reconciler/test_entity_client.py @@ -37,7 +37,7 @@ async def test_list_all_pages_fetches_multiple_pages() -> None: Deployment, filter_operation=ComparisonOperation( operator=FilterOperator.IN, - field="status", + field="data.status", value=["PENDING"], ), ) diff --git a/plugins/nemo-deployments/tests/unit/test_api_deployments.py b/plugins/nemo-deployments/tests/unit/test_api_deployments.py index 6f4d09c2e1..38b8dc2099 100644 --- a/plugins/nemo-deployments/tests/unit/test_api_deployments.py +++ b/plugins/nemo-deployments/tests/unit/test_api_deployments.py @@ -104,7 +104,7 @@ def test_list_deployments_status_in(client: TestClient, mock_entity_client: Asyn assert len(resp.json()["data"]) == 1 call_kwargs = mock_entity_client.list.await_args.kwargs assert call_kwargs["filter_operation"].operator.value == "$in" - assert call_kwargs["filter_operation"].field == "status" + assert call_kwargs["filter_operation"].field == "data.status" def test_list_deployments_invalid_status_in_400(client: TestClient) -> None: From aed49706dc8be70046fed5d312e03ac6a30082d6 Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Fri, 26 Jun 2026 12:07:40 -0700 Subject: [PATCH 5/9] fix(test): add deployments integration test path to pytest pythonpath CI collection failed with ModuleNotFoundError for docker_availability because integration test helpers live outside the repo root import path. Mirror the existing unit-test helper entry for plugins/nemo-deployments. Signed-off-by: Tyler Bray --- pytest.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/pytest.ini b/pytest.ini index fe374a65fb..1e1ea99b74 100644 --- a/pytest.ini +++ b/pytest.ini @@ -11,6 +11,7 @@ pythonpath = plugins/example-plugin/src plugins/nemo-deployments/src plugins/nemo-deployments/tests/unit + plugins/nemo-deployments/tests/integration plugins/nemo-safe-synthesizer/src # Test discovery paths - packages and stable services From dcd1bfa46a46942f271d293cdcb5789d95f4214a Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Fri, 26 Jun 2026 12:23:26 -0700 Subject: [PATCH 6/9] fix(deployments): recover GPU pool allocations after process restart On platform restart, seed the shared DockerGPUPool from running deployment-managed containers so GPUs already in use are not handed out again. Scan managed container labels and HostConfig DeviceRequests during pool bootstrap and add unit coverage for parsing and recovery. Signed-off-by: Tyler Bray --- .../backends/docker/gpu.py | 112 ++++++++++++++++ .../tests/unit/backends/docker/test_gpu.py | 122 ++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py index f5b1b46aa4..655c6bf958 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py @@ -13,6 +13,7 @@ import subprocess import threading from dataclasses import dataclass, field +from typing import Any logger = logging.getLogger(__name__) @@ -65,6 +66,29 @@ def release_gpu(self, workload_id: str) -> list[int]: self.gpu_to_workload_id[gpu_id] = None return gpu_ids + def restore_allocations(self, allocations: dict[str, list[int]]) -> None: + """Mark GPUs as allocated for workloads discovered from running containers.""" + with self._mutex: + for workload_id, gpu_ids in allocations.items(): + for gpu_id in gpu_ids: + if gpu_id not in self.gpu_to_workload_id: + logger.warning( + "Skipping GPU %s for workload %s during pool recovery (not in reserved pool)", + gpu_id, + workload_id, + ) + continue + existing = self.gpu_to_workload_id[gpu_id] + if existing is not None and existing != workload_id: + logger.warning( + "GPU %s already allocated to %s; skipping recovery claim for %s", + gpu_id, + existing, + workload_id, + ) + continue + self.gpu_to_workload_id[gpu_id] = workload_id + _pool: DockerGPUPool | None = None _pool_lock = threading.Lock() @@ -90,6 +114,93 @@ def detect_gpu_device_ids() -> list[int]: return ids +def parse_gpu_device_ids(device_requests: list[Any] | None) -> list[int]: + """Extract NVIDIA GPU device IDs from Docker HostConfig DeviceRequests.""" + if not device_requests: + return [] + gpu_ids: list[int] = [] + for request in device_requests: + if isinstance(request, dict): + driver = request.get("Driver") + raw_ids = request.get("DeviceIDs") + else: + driver = getattr(request, "Driver", None) + raw_ids = getattr(request, "DeviceIDs", None) + if driver != "nvidia" or not raw_ids: + continue + for raw_id in raw_ids: + try: + gpu_ids.append(int(raw_id)) + except (TypeError, ValueError): + continue + return gpu_ids + + +def discover_managed_gpu_allocations(client: Any) -> dict[str, list[int]]: + """Return workload_id -> GPU IDs for running deployment-managed containers.""" + from nemo_deployments_plugin.backends.docker.labels import ( + DEPLOYMENT_NAME_LABEL, + DEPLOYMENT_WORKSPACE_LABEL, + MANAGED_BY_KEY, + deployment_key, + managed_by_filter, + ) + from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + try: + containers = client.containers.list(all=True, filters=managed_by_filter()) + except Exception: + logger.warning("Failed to list managed containers for GPU pool recovery", exc_info=True) + return {} + + allocations: dict[str, list[int]] = {} + for container in containers: + labels = container.labels or {} + if labels.get(MANAGED_BY_KEY) != MANAGED_BY_LABEL: + continue + workspace = labels.get(DEPLOYMENT_WORKSPACE_LABEL) + name = labels.get(DEPLOYMENT_NAME_LABEL) + if not workspace or not name: + continue + if container.status != "running": + continue + try: + container.reload() + except Exception: + logger.debug("Skipping container %s during GPU pool recovery", container.name, exc_info=True) + continue + host_config = container.attrs.get("HostConfig") or {} + gpu_ids = parse_gpu_device_ids(host_config.get("DeviceRequests")) + if not gpu_ids: + continue + workload_id = deployment_key(workspace, name) + allocations.setdefault(workload_id, []).extend(gpu_ids) + return allocations + + +def _recover_pool_allocations(pool: DockerGPUPool) -> None: + try: + import docker + except ImportError: + logger.debug("Docker SDK unavailable for GPU pool recovery") + return + + client = docker.from_env() + try: + allocations = discover_managed_gpu_allocations(client) + if allocations: + pool.restore_allocations(allocations) + logger.info( + "DockerGPUPool: recovered GPU allocations from %d managed container(s): %s", + len(allocations), + allocations, + ) + except Exception: + logger.warning("Failed to recover GPU allocations from managed containers", exc_info=True) + finally: + client.close() + + def get_shared_gpu_pool() -> DockerGPUPool | None: """Lazy singleton GPU pool shared across docker executor instances in this process.""" global _pool @@ -99,4 +210,5 @@ def get_shared_gpu_pool() -> DockerGPUPool | None: if not device_ids: return None _pool = DockerGPUPool(reserved_gpu_device_ids=device_ids) + _recover_pool_allocations(_pool) return _pool diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py new file mode 100644 index 0000000000..054d62d2e1 --- /dev/null +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +from nemo_deployments_plugin.backends.docker import gpu as gpu_module +from nemo_deployments_plugin.backends.docker.gpu import ( + DockerGPUPool, + GPUAllocationError, + discover_managed_gpu_allocations, + get_shared_gpu_pool, + parse_gpu_device_ids, +) +from nemo_deployments_plugin.constants import MANAGED_BY_LABEL + + +def test_parse_gpu_device_ids_extracts_nvidia_ids() -> None: + requests = [ + { + "Driver": "nvidia", + "Count": 0, + "DeviceIDs": ["0", "1"], + "Capabilities": [["gpu"]], + } + ] + assert parse_gpu_device_ids(requests) == [0, 1] + + +def test_parse_gpu_device_ids_ignores_non_nvidia() -> None: + requests = [{"Driver": "amd", "DeviceIDs": ["0"]}] + assert parse_gpu_device_ids(requests) == [] + + +def test_restore_allocations_marks_gpus_in_use() -> None: + pool = DockerGPUPool(reserved_gpu_device_ids=[0, 1, 2]) + pool.restore_allocations({"smoke/srv": [0, 1]}) + assert pool.gpu_to_workload_id[0] == "smoke/srv" + assert pool.gpu_to_workload_id[1] == "smoke/srv" + assert pool.gpu_to_workload_id[2] is None + with pytest.raises(GPUAllocationError): + pool.allocate_gpu("other", num_requested=2) + + +def test_discover_managed_gpu_allocations_from_running_container() -> None: + container = MagicMock() + container.status = "running" + container.name = "dep-smoke-srv" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "smoke", + "nmp.nvidia.com/deployment-name": "srv", + } + container.attrs = { + "HostConfig": { + "DeviceRequests": [ + { + "Driver": "nvidia", + "DeviceIDs": ["1"], + "Capabilities": [["gpu"]], + } + ] + } + } + + client = MagicMock() + client.containers.list.return_value = [container] + + assert discover_managed_gpu_allocations(client) == {"smoke/srv": [1]} + + +def test_discover_managed_gpu_allocations_skips_exited_container() -> None: + container = MagicMock() + container.status = "exited" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "smoke", + "nmp.nvidia.com/deployment-name": "srv", + } + + client = MagicMock() + client.containers.list.return_value = [container] + + assert discover_managed_gpu_allocations(client) == {} + + +def test_get_shared_gpu_pool_recovers_running_allocations() -> None: + gpu_module._pool = None + container = MagicMock() + container.status = "running" + container.name = "dep-smoke-srv" + container.labels = { + "managed-by": MANAGED_BY_LABEL, + "nmp.nvidia.com/deployment-workspace": "smoke", + "nmp.nvidia.com/deployment-name": "srv", + } + container.attrs = { + "HostConfig": { + "DeviceRequests": [ + { + "Driver": "nvidia", + "DeviceIDs": ["0"], + "Capabilities": [["gpu"]], + } + ] + } + } + mock_client = MagicMock() + mock_client.containers.list.return_value = [container] + + with ( + patch.object(gpu_module, "detect_gpu_device_ids", return_value=[0, 1]), + patch("docker.from_env", return_value=mock_client), + ): + pool = get_shared_gpu_pool() + + assert pool is not None + assert pool.gpu_to_workload_id[0] == "smoke/srv" + assert pool.gpu_to_workload_id[1] is None + gpu_module._pool = None From c8a35ca1987bb5409368d91081364c89727c43d4 Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Fri, 26 Jun 2026 12:39:10 -0700 Subject: [PATCH 7/9] fix(test): serialize docker integration tests under xdist Docker integration tests share the itest workspace on one daemon. pytest-xdist loadscope spread them across workers, causing container remove/create races (409 removal in progress). Pin the package to a single xdist group, harden cleanup, and extend server reconcile polling. Signed-off-by: Tyler Bray --- .../backends/docker/test_docker_backend.py | 12 ++------ .../tests/integration/conftest.py | 14 ++++++++++ .../tests/integration/integration_helpers.py | 28 +++++++++++++++++++ .../integration/test_reconcile_docker.py | 8 ++---- 4 files changed, 48 insertions(+), 14 deletions(-) create mode 100644 plugins/nemo-deployments/tests/integration/integration_helpers.py diff --git a/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py index 9cfd0a666a..f7be34295f 100644 --- a/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py +++ b/plugins/nemo-deployments/tests/integration/backends/docker/test_docker_backend.py @@ -9,8 +9,8 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from docker.errors import NotFound from docker_availability import skip_without_docker +from integration_helpers import force_remove_container from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend from nemo_deployments_plugin.backends.docker.labels import container_name from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES @@ -118,10 +118,7 @@ async def test_never_deployment_succeeds(docker_backend: DockerDeploymentBackend assert status.exit_code == 0 finally: await docker_backend.delete_deployment("itest", "echo-job") - try: - client.containers.get(c_name).remove(force=True) - except NotFound: - pass + force_remove_container(client, c_name) @pytest.mark.asyncio @@ -155,7 +152,4 @@ async def get_side_effect(entity_type, name, workspace=None): assert status.status == "LOST" finally: await docker_backend.delete_deployment("itest", "lost-srv") - try: - client.containers.get(c_name).remove(force=True) - except NotFound: - pass + force_remove_container(client, c_name) diff --git a/plugins/nemo-deployments/tests/integration/conftest.py b/plugins/nemo-deployments/tests/integration/conftest.py index 386c970b2f..33f8b3eae0 100644 --- a/plugins/nemo-deployments/tests/integration/conftest.py +++ b/plugins/nemo-deployments/tests/integration/conftest.py @@ -5,6 +5,20 @@ from __future__ import annotations +from pathlib import Path + +import pytest from docker_availability import DOCKER_AVAILABLE, skip_without_docker __all__ = ["DOCKER_AVAILABLE", "skip_without_docker"] + +# All tests in this package share one Docker daemon and the same managed-by label +# namespace. Run them on a single xdist worker to avoid container/volume races. +_DOCKER_INTEGRATION_XDIST_GROUP = "nemo_deployments_docker_integration" + + +def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: + integration_dir = Path(__file__).parent.resolve() + for item in items: + if integration_dir in Path(str(item.fspath)).resolve().parents: + item.add_marker(pytest.mark.xdist_group(_DOCKER_INTEGRATION_XDIST_GROUP)) diff --git a/plugins/nemo-deployments/tests/integration/integration_helpers.py b/plugins/nemo-deployments/tests/integration/integration_helpers.py new file mode 100644 index 0000000000..e3e4dc9171 --- /dev/null +++ b/plugins/nemo-deployments/tests/integration/integration_helpers.py @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared helpers for docker-backed integration tests.""" + +from __future__ import annotations + +import time +from typing import Any + + +def force_remove_container(client: Any, name: str, *, retries: int = 3) -> None: + """Remove a container, tolerating concurrent cleanup from other tests.""" + from docker.errors import APIError, NotFound + + for attempt in range(retries): + try: + client.containers.get(name).remove(force=True) + return + except NotFound: + return + except APIError as exc: + if "already in progress" in str(exc).lower() and attempt + 1 < retries: + time.sleep(0.5) + continue + if "already in progress" in str(exc).lower(): + return + raise diff --git a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py index f25db77219..be6c88eae5 100644 --- a/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py +++ b/plugins/nemo-deployments/tests/integration/test_reconcile_docker.py @@ -14,6 +14,7 @@ import pytest from docker.errors import NotFound from docker_availability import skip_without_docker +from integration_helpers import force_remove_container from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend from nemo_deployments_plugin.backends.docker.labels import container_name, docker_volume_name from nemo_deployments_plugin.backends.registry import BACKEND_CLASSES, ExecutorRegistry @@ -168,7 +169,7 @@ async def get_side_effect( await asyncio.sleep(0.5) assert puller_dep.status == "SUCCEEDED" - for _ in range(10): + for _ in range(40): await deployment_reconciler.reconcile_one( server_dep, deployments_by_name=by_name, @@ -184,10 +185,7 @@ async def get_side_effect( await backend.delete_deployment("itest", "server") await backend.delete_volume("itest", "weights") for c_name in (container_name("itest", "puller"), container_name("itest", "server")): - try: - client.containers.get(c_name).remove(force=True) - except NotFound: - pass + force_remove_container(client, c_name) try: client.volumes.get(docker_volume_name("itest", "weights")).remove(force=True) except NotFound: From 3b89239b9ef2cd418cf49913c560176764fc5f22 Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Fri, 26 Jun 2026 13:30:56 -0700 Subject: [PATCH 8/9] fix(deployments): retry GPU pool init after recovery failure Only cache the shared GPU pool once container discovery succeeds so a transient Docker error does not leave all GPUs marked free on reuse. Signed-off-by: Tyler Bray --- .../backends/docker/gpu.py | 17 +++++++++----- .../tests/unit/backends/docker/test_gpu.py | 22 +++++++++++++++++++ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py index 655c6bf958..5a5a34a553 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/gpu.py @@ -151,7 +151,7 @@ def discover_managed_gpu_allocations(client: Any) -> dict[str, list[int]]: containers = client.containers.list(all=True, filters=managed_by_filter()) except Exception: logger.warning("Failed to list managed containers for GPU pool recovery", exc_info=True) - return {} + raise allocations: dict[str, list[int]] = {} for container in containers: @@ -178,25 +178,28 @@ def discover_managed_gpu_allocations(client: Any) -> dict[str, list[int]]: return allocations -def _recover_pool_allocations(pool: DockerGPUPool) -> None: +def _recover_pool_allocations(pool: DockerGPUPool) -> bool: + """Restore in-use GPUs from running containers. Returns False on transient failure.""" try: import docker except ImportError: logger.debug("Docker SDK unavailable for GPU pool recovery") - return + return True client = docker.from_env() try: allocations = discover_managed_gpu_allocations(client) + pool.restore_allocations(allocations) if allocations: - pool.restore_allocations(allocations) logger.info( "DockerGPUPool: recovered GPU allocations from %d managed container(s): %s", len(allocations), allocations, ) + return True except Exception: logger.warning("Failed to recover GPU allocations from managed containers", exc_info=True) + return False finally: client.close() @@ -209,6 +212,8 @@ def get_shared_gpu_pool() -> DockerGPUPool | None: device_ids = detect_gpu_device_ids() if not device_ids: return None - _pool = DockerGPUPool(reserved_gpu_device_ids=device_ids) - _recover_pool_allocations(_pool) + pool = DockerGPUPool(reserved_gpu_device_ids=device_ids) + if not _recover_pool_allocations(pool): + return None + _pool = pool return _pool diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py index 054d62d2e1..bb59a6bc7a 100644 --- a/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_gpu.py @@ -120,3 +120,25 @@ def test_get_shared_gpu_pool_recovers_running_allocations() -> None: assert pool.gpu_to_workload_id[0] == "smoke/srv" assert pool.gpu_to_workload_id[1] is None gpu_module._pool = None + + +def test_get_shared_gpu_pool_retries_after_recovery_failure() -> None: + gpu_module._pool = None + mock_client = MagicMock() + mock_client.containers.list.side_effect = RuntimeError("docker unavailable") + + with ( + patch.object(gpu_module, "detect_gpu_device_ids", return_value=[0, 1]), + patch("docker.from_env", return_value=mock_client), + ): + assert get_shared_gpu_pool() is None + assert gpu_module._pool is None + + mock_client.containers.list.side_effect = None + mock_client.containers.list.return_value = [] + + pool = get_shared_gpu_pool() + + assert pool is not None + assert pool.gpu_to_workload_id == {0: None, 1: None} + gpu_module._pool = None From d83415605e11fa97ec84981669ac8bfcaf7225fc Mon Sep 17 00:00:00 2001 From: Tyler Bray Date: Fri, 26 Jun 2026 13:36:40 -0700 Subject: [PATCH 9/9] fix(deployments): bind loopback for port-free check (CodeQL) Use 127.0.0.1 instead of 0.0.0.0 when probing host port availability, matching the nemo-agents pattern. Also colocate docker test helpers under backends/docker/ for clearer pytest imports. Signed-off-by: Tyler Bray --- .../src/nemo_deployments_plugin/backends/docker/labels.py | 2 +- .../src/nemo_deployments_plugin/backends/docker/ports.py | 2 +- .../tests/unit/backends/{ => docker}/docker_helpers.py | 0 .../tests/unit/backends/docker/test_backend_mocked.py | 2 +- .../tests/unit/backends/docker/test_idempotency.py | 2 +- .../nemo-deployments/tests/unit/backends/docker/test_ports.py | 1 + 6 files changed, 5 insertions(+), 4 deletions(-) rename plugins/nemo-deployments/tests/unit/backends/{ => docker}/docker_helpers.py (100%) diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py index cd1b525d17..221750831e 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/labels.py @@ -79,5 +79,5 @@ def volume_identity_labels(workspace: str, name: str) -> dict[str, str]: } -def managed_by_filter() -> dict[str, str]: +def managed_by_filter() -> dict[str, str | bool]: return {"label": f"{MANAGED_BY_KEY}={MANAGED_BY_LABEL}"} diff --git a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py index f95884e887..1ab238ea4c 100644 --- a/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py +++ b/plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/ports.py @@ -30,7 +30,7 @@ def is_port_free(port: int) -> bool: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(("0.0.0.0", port)) # noqa: S104 # nosec B104 + sock.bind(("127.0.0.1", port)) return True except OSError: return False diff --git a/plugins/nemo-deployments/tests/unit/backends/docker_helpers.py b/plugins/nemo-deployments/tests/unit/backends/docker/docker_helpers.py similarity index 100% rename from plugins/nemo-deployments/tests/unit/backends/docker_helpers.py rename to plugins/nemo-deployments/tests/unit/backends/docker/docker_helpers.py diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py index 0865b288d9..a546712a22 100644 --- a/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_backend_mocked.py @@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from backends.docker_helpers import container_attrs, sample_config +from backends.docker.docker_helpers import container_attrs, sample_config from docker.errors import NotFound from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend from nemo_deployments_plugin.constants import MANAGED_BY_LABEL diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py index 8f23803575..6b89981774 100644 --- a/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_idempotency.py @@ -8,7 +8,7 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from backends.docker_helpers import container_attrs, sample_config +from backends.docker.docker_helpers import container_attrs, sample_config from nemo_deployments_plugin.backends.docker.backend import DockerDeploymentBackend from nemo_deployments_plugin.backends.docker.labels import ( DEPLOYMENT_NAME_LABEL, diff --git a/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py index e93ef465e8..7b11700a98 100644 --- a/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py +++ b/plugins/nemo-deployments/tests/unit/backends/docker/test_ports.py @@ -60,6 +60,7 @@ def setsockopt(self, *args: object, **kwargs: object) -> None: return None def bind(self, addr: tuple[str, int]) -> None: + assert addr == ("127.0.0.1", 9000) return None monkeypatch.setattr(ports_mod, "is_remote_docker_host", lambda: False)