Skip to content

Commit f612b6a

Browse files
committed
feat(deployments): implement DockerDeploymentBackend (AIRCORE-756)
Add Docker backend for nemo-deployments-plugin with single-container v1 lifecycle, entity-store config fetch, port allocation, exec/http probes, plugin-local GPU pool, and unit plus integration test coverage. Stacks on #315 (AIRCORE-758 reconciler prerequisite DAG). Signed-off-by: Tyler Bray <tbray@nvidia.com>
1 parent 0b92da1 commit f612b6a

23 files changed

Lines changed: 2427 additions & 11 deletions

plugins/nemo-deployments/pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ readme = "README.md"
66
requires-python = ">=3.11,<3.14"
77
dependencies = [
88
"fastapi>=0.115",
9+
"httpx>=0.27",
910
"nemo-platform",
1011
"nemo-platform-plugin",
1112
"pydantic>=2.10.6",
1213
]
1314

15+
[project.optional-dependencies]
16+
docker = ["docker>=7.0"]
17+
1418
[project.entry-points."nemo.services"]
1519
deployments = "nemo_deployments_plugin.service:DeploymentsService"
1620

@@ -29,7 +33,7 @@ nemo-platform = { workspace = true }
2933
nemo-platform-plugin = { workspace = true }
3034

3135
[dependency-groups]
32-
dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "httpx>=0.27", "fastapi>=0.115"]
36+
dev = ["pytest>=8.3.4", "pytest-asyncio>=0.25.3", "httpx>=0.27", "fastapi>=0.115", "docker>=7.0"]
3337

3438
[tool.pytest.ini_options]
3539
testpaths = ["tests"]

plugins/nemo-deployments/src/nemo_deployments_plugin/backends/docker/backend.py

Lines changed: 509 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Executor-level Docker backend configuration."""
5+
6+
from __future__ import annotations
7+
8+
from pydantic import BaseModel, Field
9+
10+
11+
class DockerExecutorConfig(BaseModel):
12+
"""Knobs for a named docker executor instance (not entity backend_config)."""
13+
14+
docker_host: str | None = Field(default=None, description="Override DOCKER_HOST for this executor.")
15+
docker_timeout: int = Field(default=60, ge=1)
16+
pull_images: bool = Field(default=True, description="Pull container images before run when missing locally.")
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Compile DeploymentConfig into docker.containers.run kwargs."""
5+
6+
from __future__ import annotations
7+
8+
from typing import Any
9+
10+
from nemo_deployments_plugin.backends.docker.labels import docker_volume_name
11+
from nemo_deployments_plugin.entities import Container, DeploymentConfig, DockerDeploymentConfig, VolumeMount
12+
from nemo_deployments_plugin.types import RestartPolicy
13+
14+
15+
class DeploymentConfigError(ValueError):
16+
"""Invalid deployment config for docker backend."""
17+
18+
19+
def parse_docker_backend_config(backend_config: dict[str, Any]) -> DockerDeploymentConfig:
20+
docker_section = backend_config.get("docker") or {}
21+
return DockerDeploymentConfig.model_validate(docker_section)
22+
23+
24+
def validate_config_for_docker(config: DeploymentConfig) -> Container:
25+
if config.init_containers:
26+
raise DeploymentConfigError("init_containers are not supported by the docker backend in v1")
27+
if len(config.containers) != 1:
28+
raise DeploymentConfigError(f"docker backend v1 supports exactly one container; got {len(config.containers)}")
29+
return config.containers[0]
30+
31+
32+
def restart_policy_kwargs(restart_policy: RestartPolicy, backoff_limit: int) -> dict[str, Any]:
33+
if restart_policy == "Always":
34+
return {"restart_policy": {"Name": "always"}}
35+
if restart_policy == "OnFailure":
36+
return {"restart_policy": {"Name": "on-failure", "MaximumRetryCount": backoff_limit}}
37+
return {}
38+
39+
40+
def env_dict(container: Container) -> dict[str, str]:
41+
result: dict[str, str] = {}
42+
for item in container.env:
43+
if item.value is not None:
44+
result[item.name] = item.value
45+
return result
46+
47+
48+
def merged_volume_mounts(config: DeploymentConfig, container: Container) -> list[VolumeMount]:
49+
by_name: dict[str, VolumeMount] = {}
50+
for mount in config.volume_mounts:
51+
by_name[mount.name] = mount
52+
for mount in container.volume_mounts:
53+
by_name[mount.name] = mount
54+
return list(by_name.values())
55+
56+
57+
def build_volume_bindings(
58+
workspace: str,
59+
mounts: list[VolumeMount],
60+
) -> dict[str, dict[str, str]]:
61+
bindings: dict[str, dict[str, str]] = {}
62+
for mount in mounts:
63+
vol_name = docker_volume_name(workspace, mount.name)
64+
bindings[vol_name] = {
65+
"bind": mount.mount_path,
66+
"mode": "ro" if mount.read_only else "rw",
67+
}
68+
return bindings
69+
70+
71+
def build_port_bindings(
72+
container: Container,
73+
host_ports: dict[int, int],
74+
) -> dict[str, int | list[tuple[str, int]] | None]:
75+
ports: dict[str, int | list[tuple[str, int]] | None] = {}
76+
for port_spec in container.ports:
77+
container_port = port_spec.container_port
78+
protocol = port_spec.protocol.lower()
79+
key = f"{container_port}/{protocol}"
80+
host_port = host_ports.get(container_port)
81+
if host_port is not None:
82+
ports[key] = host_port
83+
else:
84+
ports[key] = container_port
85+
return ports
86+
87+
88+
def gpu_count_from_container(container: Container) -> int:
89+
limit = container.resources.limits.get("nvidia.com/gpu")
90+
if not limit:
91+
return 0
92+
try:
93+
return int(limit)
94+
except ValueError:
95+
return 0
96+
97+
98+
def device_requests_for_gpus(gpu_ids: list[int]) -> list[dict[str, Any]]:
99+
if not gpu_ids:
100+
return []
101+
return [
102+
{
103+
"Driver": "nvidia",
104+
"Count": 0,
105+
"DeviceIDs": [str(gpu_id) for gpu_id in gpu_ids],
106+
"Capabilities": [["gpu"]],
107+
}
108+
]
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Thread-safe GPU pool for Docker deployments (plugin-local; not shared with models).
5+
6+
During the 759 cutover both pools may coexist briefly — consolidate into
7+
nemo_platform_plugin when models docker backend is removed.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
import logging
13+
import subprocess
14+
import threading
15+
from dataclasses import dataclass, field
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class GPUAllocationError(Exception):
21+
"""Raised when GPU allocation fails due to insufficient resources."""
22+
23+
24+
@dataclass
25+
class GPUPoolStatus:
26+
total: int
27+
available: int
28+
allocated: int
29+
allocations: dict[str, list[int]] = field(default_factory=dict)
30+
gpu_state: dict[int, str | None] = field(default_factory=dict)
31+
32+
33+
class DockerGPUPool:
34+
"""Thread-safe pool of GPU device IDs for Docker device_requests."""
35+
36+
def __init__(self, reserved_gpu_device_ids: list[int]) -> None:
37+
self.num_reserved_gpus = len(reserved_gpu_device_ids)
38+
self.gpu_to_workload_id: dict[int, str | None] = {gpu_id: None for gpu_id in reserved_gpu_device_ids}
39+
self._mutex = threading.Lock()
40+
41+
def allocate_gpu(self, workload_id: str, num_requested: int = 1) -> list[int]:
42+
with self._mutex:
43+
if num_requested <= 0:
44+
raise GPUAllocationError(f"Invalid GPU request: {num_requested}. Must be a positive integer.")
45+
available_gpus = {gpu for gpu, workload in self.gpu_to_workload_id.items() if workload is None}
46+
if len(available_gpus) < num_requested:
47+
raise GPUAllocationError(
48+
f"Not enough GPUs available. Requested {num_requested}, "
49+
f"available {len(available_gpus)} out of {self.num_reserved_gpus} total."
50+
)
51+
gpu_ids: list[int] = []
52+
for _ in range(num_requested):
53+
gpu_id = available_gpus.pop()
54+
gpu_ids.append(gpu_id)
55+
self.gpu_to_workload_id[gpu_id] = workload_id
56+
logger.info("DockerGPUPool: allocated gpu_ids %s to workload %s", gpu_ids, workload_id)
57+
return gpu_ids
58+
59+
def release_gpu(self, workload_id: str) -> list[int]:
60+
with self._mutex:
61+
gpu_ids = [gpu for gpu, workload in self.gpu_to_workload_id.items() if workload == workload_id]
62+
if gpu_ids:
63+
logger.info("DockerGPUPool: releasing gpu_ids %s from workload %s", gpu_ids, workload_id)
64+
for gpu_id in gpu_ids:
65+
self.gpu_to_workload_id[gpu_id] = None
66+
return gpu_ids
67+
68+
69+
_pool: DockerGPUPool | None = None
70+
_pool_lock = threading.Lock()
71+
72+
73+
def detect_gpu_device_ids() -> list[int]:
74+
"""Return GPU indices from nvidia-smi when available."""
75+
try:
76+
result = subprocess.run(
77+
["nvidia-smi", "--query-gpu=index", "--format=csv,noheader"],
78+
capture_output=True,
79+
text=True,
80+
check=True,
81+
timeout=10,
82+
)
83+
except (FileNotFoundError, subprocess.SubprocessError, OSError):
84+
return []
85+
ids: list[int] = []
86+
for line in result.stdout.splitlines():
87+
stripped = line.strip()
88+
if stripped.isdigit():
89+
ids.append(int(stripped))
90+
return ids
91+
92+
93+
def get_shared_gpu_pool() -> DockerGPUPool | None:
94+
"""Lazy singleton GPU pool shared across docker executor instances in this process."""
95+
global _pool
96+
with _pool_lock:
97+
if _pool is None:
98+
device_ids = detect_gpu_device_ids()
99+
if not device_ids:
100+
return None
101+
_pool = DockerGPUPool(reserved_gpu_device_ids=device_ids)
102+
return _pool
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Docker resource naming and identity labels for orphan cleanup."""
5+
6+
from __future__ import annotations
7+
8+
import hashlib
9+
import re
10+
11+
from nemo_deployments_plugin.constants import MANAGED_BY_LABEL
12+
13+
MANAGED_BY_KEY = "managed-by"
14+
DEPLOYMENT_WORKSPACE_LABEL = "nmp.nvidia.com/deployment-workspace"
15+
DEPLOYMENT_NAME_LABEL = "nmp.nvidia.com/deployment-name"
16+
RESTART_POLICY_LABEL = "nmp.nvidia.com/restart-policy"
17+
CONFIG_NAME_LABEL = "nmp.nvidia.com/deployment-config"
18+
VOLUME_WORKSPACE_LABEL = "nmp.nvidia.com/volume-workspace"
19+
VOLUME_NAME_LABEL = "nmp.nvidia.com/volume-name"
20+
21+
22+
def k8s_safe_name(base_name: str, *, max_length: int = 63, suffix: str = "") -> str:
23+
"""Generate a DNS-label-safe name (RFC 1035) from arbitrary input."""
24+
hash_suffix = hashlib.sha256(base_name.encode()).hexdigest()[:8]
25+
normalized = re.sub(r"[^a-z0-9-]", "-", base_name.lower())
26+
normalized = re.sub(r"[-]+", "-", normalized)
27+
if normalized and not normalized[0].isalpha():
28+
normalized = f"x{normalized}"
29+
normalized = normalized.rstrip("-")
30+
31+
reserved = len(suffix) + len(hash_suffix) + 1
32+
if len(normalized) + reserved > max_length:
33+
trim = max_length - reserved
34+
normalized = normalized[:trim].rstrip("-")
35+
normalized = f"{normalized}-{hash_suffix}{suffix}"
36+
elif suffix:
37+
normalized = f"{normalized}{suffix}"
38+
return normalized
39+
40+
41+
def container_name(workspace: str, deployment_name: str) -> str:
42+
return k8s_safe_name(f"dep-{workspace}-{deployment_name}")
43+
44+
45+
def docker_volume_name(workspace: str, volume_name: str) -> str:
46+
return k8s_safe_name(f"dep-vol-{workspace}-{volume_name}")
47+
48+
49+
def deployment_key(workspace: str, name: str) -> str:
50+
return f"{workspace}/{name}"
51+
52+
53+
BACKOFF_LIMIT_LABEL = "nmp.nvidia.com/backoff-limit"
54+
55+
56+
def deployment_identity_labels(
57+
workspace: str,
58+
name: str,
59+
restart_policy: str,
60+
*,
61+
config_name: str,
62+
backoff_limit: int = 6,
63+
) -> dict[str, str]:
64+
return {
65+
MANAGED_BY_KEY: MANAGED_BY_LABEL,
66+
DEPLOYMENT_WORKSPACE_LABEL: workspace,
67+
DEPLOYMENT_NAME_LABEL: name,
68+
RESTART_POLICY_LABEL: restart_policy,
69+
CONFIG_NAME_LABEL: config_name,
70+
BACKOFF_LIMIT_LABEL: str(backoff_limit),
71+
}
72+
73+
74+
def volume_identity_labels(workspace: str, name: str) -> dict[str, str]:
75+
return {
76+
MANAGED_BY_KEY: MANAGED_BY_LABEL,
77+
VOLUME_WORKSPACE_LABEL: workspace,
78+
VOLUME_NAME_LABEL: name,
79+
}
80+
81+
82+
def managed_by_filter() -> dict[str, str]:
83+
return {"label": f"{MANAGED_BY_KEY}={MANAGED_BY_LABEL}"}

0 commit comments

Comments
 (0)