From 197bc68e2dd753b1f41f59eca52630de6b128994 Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 24 May 2026 21:23:24 -0700 Subject: [PATCH] Add declarative sandbox image builds --- packages/prime-sandboxes/README.md | 61 ++ .../src/prime_sandboxes/__init__.py | 3 + .../src/prime_sandboxes/images.py | 599 ++++++++++++++++++ .../src/prime_sandboxes/sandbox.py | 86 ++- .../tests/test_declarative_images.py | 197 ++++++ packages/prime/src/prime_cli/__init__.py | 4 + 6 files changed, 946 insertions(+), 4 deletions(-) create mode 100644 packages/prime-sandboxes/src/prime_sandboxes/images.py create mode 100644 packages/prime-sandboxes/tests/test_declarative_images.py diff --git a/packages/prime-sandboxes/README.md b/packages/prime-sandboxes/README.md index fcc96b298..15ff8d4a1 100644 --- a/packages/prime-sandboxes/README.md +++ b/packages/prime-sandboxes/README.md @@ -6,6 +6,7 @@ Lightweight Python SDK for managing Prime Intellect sandboxes - secure remote co - **Synchronous and async clients** - Use with sync or async/await code - **Full sandbox lifecycle** - Create, list, execute commands, upload/download files, delete +- **Declarative images** - Build runtime images from Python and launch sandboxes from them - **Type-safe** - Full type hints and Pydantic models - **Authentication caching** - Automatic token management - **Bulk operations** - Create and manage multiple sandboxes efficiently @@ -53,6 +54,66 @@ print(result.stdout) sandbox_client.delete(sandbox.id) ``` +## Declarative Images + +```python +from prime_sandboxes import ( + APIClient, + CreateSandboxRequest, + Image, + SandboxClient, +) + +client = APIClient(api_key="your-api-key") +sandbox_client = SandboxClient(client) + +declarative_image = ( + Image.debian_slim("3.12") + .pip_install(["requests", "pytest"]) + .workdir("/home/prime") +) + +build = sandbox_client.build_image( + declarative_image, + image_name="runtime-image", + timeout_seconds=0, # wait indefinitely for the image build + on_build_log=print, +) + +sandbox = sandbox_client.create( + CreateSandboxRequest( + name="runtime-image-sandbox", + docker_image=build.image_reference, + cpu_cores=2, + memory_gb=4, + ) +) +``` + +The SDK renders the image to a Dockerfile, builds it through the Prime images +API, waits for the registry image to become ready, then creates a normal +sandbox using that image reference. + +For batch workloads, reuse that same image reference across all sandbox requests: + +```python +runtime_image = Image.debian_slim("3.12").pip_install(["requests", "pytest"]) + +build = sandbox_client.build_image( + runtime_image, + image_name="batch-runtime", + on_build_log=print, +) + +sandboxes = [ + sandbox_client.create(CreateSandboxRequest( + name=f"batch-{i}", + docker_image=build.image_reference, + )) + for i in range(10) +] +``` + ## Async Usage ```python diff --git a/packages/prime-sandboxes/src/prime_sandboxes/__init__.py b/packages/prime-sandboxes/src/prime_sandboxes/__init__.py index abdf45687..a9f5cb500 100644 --- a/packages/prime-sandboxes/src/prime_sandboxes/__init__.py +++ b/packages/prime-sandboxes/src/prime_sandboxes/__init__.py @@ -23,6 +23,7 @@ SandboxTimeoutError, UploadTimeoutError, ) +from .images import Image, ImageBuildResult from .models import ( AdvancedConfigs, BackgroundJob, @@ -63,6 +64,8 @@ "TemplateClient", "AsyncTemplateClient", # Models + "Image", + "ImageBuildResult", "Sandbox", "SandboxStatus", "SandboxListResponse", diff --git a/packages/prime-sandboxes/src/prime_sandboxes/images.py b/packages/prime-sandboxes/src/prime_sandboxes/images.py new file mode 100644 index 000000000..d1d8b5d99 --- /dev/null +++ b/packages/prime-sandboxes/src/prime_sandboxes/images.py @@ -0,0 +1,599 @@ +"""Declarative image builders and Prime image build clients.""" + +from __future__ import annotations + +import asyncio +import hashlib +import io +import json +import re +import shlex +import tarfile +import time +from dataclasses import dataclass, replace +from datetime import datetime, timezone +from typing import Any, Callable, Iterable, Mapping, Optional, Sequence + +import httpx + +from .core import APIClient, APIError, APITimeoutError, AsyncAPIClient + +PACKAGED_DOCKERFILE_PATH = ".__prime_dockerfile__" +DEFAULT_DECLARATIVE_IMAGE_NAME = "declarative-sandbox" +DEFAULT_IMAGE_PLATFORM = "linux/amd64" + +BuildLogCallback = Callable[[str], None] + +_ACTIVE_STATUSES = {"BUILDING", "PENDING", "UPLOADING"} +_TERMINAL_FAILURE_STATUSES = {"FAILED", "CANCELLED"} +_LATEST_ROW_KEYS: tuple[str, ...] = ("pushedAt", "completedAt", "startedAt", "createdAt") +_IMAGE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]*$") +_IMAGE_TAG_RE = re.compile(r"^[A-Za-z0-9_][A-Za-z0-9_.-]{0,127}$") +_ENV_KEY_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +@dataclass(frozen=True) +class Image: + """Declarative Docker image definition for sandbox creation. + + Instances are immutable. Builder methods return a new image with the extra + Dockerfile instruction appended in call order. + """ + + base_image: str + _instructions: tuple[str, ...] = () + + @classmethod + def from_registry(cls, image: str) -> "Image": + """Start from any registry image reference.""" + _require_single_line("image", image) + if not image.strip(): + raise ValueError("image must not be empty") + return cls(base_image=image.strip()) + + @classmethod + def debian_slim(cls, python_version: str = "3.12") -> "Image": + """Start from Prime's Python-on-Debian slim base image.""" + _require_single_line("python_version", python_version) + if not python_version.strip(): + raise ValueError("python_version must not be empty") + return cls.from_registry(f"python:{python_version.strip()}-slim") + + @classmethod + def python_slim(cls, python_version: str = "3.12") -> "Image": + """Alias for :meth:`debian_slim` for users who prefer explicit Python naming.""" + return cls.debian_slim(python_version) + + def apt_install(self, packages: str | Iterable[str]) -> "Image": + """Install Debian packages with apt.""" + package_list = _coerce_non_empty_list("packages", packages) + quoted = " ".join(shlex.quote(package) for package in package_list) + return self._append( + "RUN apt-get update && apt-get install -y --no-install-recommends " + f"{quoted} && rm -rf /var/lib/apt/lists/*" + ) + + def pip_install(self, packages: str | Iterable[str]) -> "Image": + """Install Python packages with pip.""" + package_list = _coerce_non_empty_list("packages", packages) + quoted = " ".join(shlex.quote(package) for package in package_list) + return self._append(f"RUN python -m pip install --no-cache-dir {quoted}") + + def run(self, command: str) -> "Image": + """Append a Dockerfile RUN instruction.""" + _require_single_line("command", command) + if not command.strip(): + raise ValueError("command must not be empty") + return self._append(f"RUN {command.strip()}") + + def env(self, variables: Optional[Mapping[str, str]] = None, **kwargs: str) -> "Image": + """Set environment variables in the image.""" + merged: dict[str, str] = {} + if variables: + merged.update({str(key): str(value) for key, value in variables.items()}) + merged.update({str(key): str(value) for key, value in kwargs.items()}) + if not merged: + raise ValueError("at least one environment variable is required") + + image = self + for key, value in merged.items(): + if not _ENV_KEY_RE.match(key): + raise ValueError(f"invalid environment variable name: {key!r}") + _require_single_line(key, value) + image = image._append(f"ENV {key}={_dockerfile_double_quote(value)}") + return image + + def workdir(self, path: str) -> "Image": + """Set the working directory.""" + _require_single_line("path", path) + if not path.strip(): + raise ValueError("path must not be empty") + return self._append(f"WORKDIR {path.strip()}") + + def cmd(self, command: str | Sequence[str]) -> "Image": + """Set the image CMD instruction.""" + if isinstance(command, str): + _require_single_line("command", command) + if not command.strip(): + raise ValueError("command must not be empty") + return self._append(f"CMD {command.strip()}") + + command_list = _coerce_non_empty_list("command", command) + return self._append(f"CMD {json.dumps(command_list)}") + + def dockerfile(self) -> str: + """Render this declarative image as a Dockerfile.""" + lines = [f"FROM {self.base_image}", *self._instructions] + return "\n".join(lines) + "\n" + + def content_hash(self) -> str: + """Stable hash of the rendered Dockerfile.""" + return hashlib.sha256(self.dockerfile().encode("utf-8")).hexdigest() + + def default_tag(self) -> str: + """Stable default image tag derived from the image definition.""" + return f"sha-{self.content_hash()[:12]}" + + def _append(self, instruction: str) -> "Image": + return replace(self, _instructions=(*self._instructions, instruction)) + + +@dataclass(frozen=True) +class ImageBuildResult: + """Result returned after building or reusing a declarative image.""" + + image_reference: str + image_name: str + image_tag: str + status: str + build_id: Optional[str] = None + reused: bool = False + + +class ImageClient: + """Client for Prime image build APIs.""" + + def __init__(self, api_client: Optional[APIClient] = None): + self.client = api_client or APIClient() + + def build( + self, + image: Image, + *, + image_name: Optional[str] = None, + image_tag: Optional[str] = None, + platform: str = DEFAULT_IMAGE_PLATFORM, + team_id: Optional[str] = None, + visibility: Optional[str] = None, + force_rebuild: bool = False, + timeout_seconds: Optional[float] = 900, + poll_interval_seconds: float = 5.0, + on_build_log: Optional[BuildLogCallback] = None, + ) -> ImageBuildResult: + """Build a declarative image and return the registry reference. + + A ``timeout_seconds`` value of ``0`` or ``None`` waits indefinitely. + """ + image_name, image_tag = _resolve_image_name_tag(image, image_name, image_tag) + effective_team_id = _effective_team_id(self.client, team_id) + + if not force_rebuild: + existing = self._latest_image_row(image_name, image_tag, effective_team_id) + if existing is not None: + status = str(existing.get("status") or "UNKNOWN") + if status == "COMPLETED": + image_ref = _image_reference_from_row(existing, image_name, image_tag) + _emit(on_build_log, f"Using existing image {image_ref}") + return ImageBuildResult( + image_reference=image_ref, + image_name=image_name, + image_tag=image_tag, + status=status, + reused=True, + ) + if status in _ACTIVE_STATUSES: + _emit(on_build_log, f"Waiting for existing image build ({status})") + image_ref = self._wait_for_image_ready( + image_name, + image_tag, + effective_team_id, + timeout_seconds, + poll_interval_seconds, + on_build_log, + ) + return ImageBuildResult( + image_reference=image_ref, + image_name=image_name, + image_tag=image_tag, + status="COMPLETED", + reused=True, + ) + + dockerfile = image.dockerfile() + _emit(on_build_log, f"Preparing declarative image {image_name}:{image_tag}") + archive = _build_context_archive(dockerfile) + + payload: dict[str, Any] = { + "image_name": image_name, + "image_tag": image_tag, + "dockerfile_path": PACKAGED_DOCKERFILE_PATH, + "platform": platform, + } + if effective_team_id: + payload["team_id"] = effective_team_id + if visibility: + payload["visibility"] = _normalize_visibility(visibility) + + _emit(on_build_log, "Initiating image build") + build_response = self.client.request("POST", "/images/build", json=payload) + build_id = build_response.get("build_id") + upload_url = build_response.get("upload_url") + if not build_id or not upload_url: + raise APIError("Invalid response from image build API: missing build_id or upload_url") + image_reference = str(build_response.get("fullImagePath") or f"{image_name}:{image_tag}") + + _emit(on_build_log, "Uploading image build context") + upload_response = httpx.put( + str(upload_url), + content=archive, + headers={"Content-Type": "application/octet-stream"}, + timeout=600.0, + ) + upload_response.raise_for_status() + + _emit(on_build_log, "Starting image build") + self.client.request( + "POST", + f"/images/build/{build_id}/start", + json={"context_uploaded": True}, + ) + + image_reference = self._wait_for_image_ready( + image_name, + image_tag, + effective_team_id, + timeout_seconds, + poll_interval_seconds, + on_build_log, + ) + return ImageBuildResult( + image_reference=image_reference, + image_name=image_name, + image_tag=image_tag, + status="COMPLETED", + build_id=str(build_id), + ) + + def _latest_image_row( + self, image_name: str, image_tag: str, team_id: Optional[str] + ) -> Optional[dict[str, Any]]: + params: dict[str, str] = {"limit": "250", "offset": "0"} + if team_id: + params["teamId"] = team_id + response = self.client.request("GET", "/images", params=params) + return _latest_matching_row(response.get("data", []), image_name, image_tag, team_id) + + def _wait_for_image_ready( + self, + image_name: str, + image_tag: str, + team_id: Optional[str], + timeout_seconds: Optional[float], + poll_interval_seconds: float, + on_build_log: Optional[BuildLogCallback], + ) -> str: + deadline = _deadline(timeout_seconds) + last_status: Optional[str] = None + while True: + row = self._latest_image_row(image_name, image_tag, team_id) + if row is not None: + status = str(row.get("status") or "UNKNOWN") + if status != last_status: + _emit(on_build_log, f"Image build status: {status}") + last_status = status + if status == "COMPLETED": + image_ref = _image_reference_from_row(row, image_name, image_tag) + _emit(on_build_log, f"Image ready: {image_ref}") + return image_ref + if status in _TERMINAL_FAILURE_STATUSES: + raise APIError(f"Image build {status.lower()} for {image_name}:{image_tag}") + elif last_status is None: + _emit(on_build_log, "Waiting for image build to appear") + last_status = "UNKNOWN" + + _raise_if_timed_out(deadline, image_name, image_tag) + time.sleep(max(0.0, poll_interval_seconds)) + + +class AsyncImageClient: + """Async client for Prime image build APIs.""" + + def __init__(self, api_client: Optional[AsyncAPIClient] = None): + self.client = api_client or AsyncAPIClient() + + async def build( + self, + image: Image, + *, + image_name: Optional[str] = None, + image_tag: Optional[str] = None, + platform: str = DEFAULT_IMAGE_PLATFORM, + team_id: Optional[str] = None, + visibility: Optional[str] = None, + force_rebuild: bool = False, + timeout_seconds: Optional[float] = 900, + poll_interval_seconds: float = 5.0, + on_build_log: Optional[BuildLogCallback] = None, + ) -> ImageBuildResult: + """Build a declarative image and return the registry reference.""" + image_name, image_tag = _resolve_image_name_tag(image, image_name, image_tag) + effective_team_id = _effective_team_id(self.client, team_id) + + if not force_rebuild: + existing = await self._latest_image_row(image_name, image_tag, effective_team_id) + if existing is not None: + status = str(existing.get("status") or "UNKNOWN") + if status == "COMPLETED": + image_ref = _image_reference_from_row(existing, image_name, image_tag) + _emit(on_build_log, f"Using existing image {image_ref}") + return ImageBuildResult( + image_reference=image_ref, + image_name=image_name, + image_tag=image_tag, + status=status, + reused=True, + ) + if status in _ACTIVE_STATUSES: + _emit(on_build_log, f"Waiting for existing image build ({status})") + image_ref = await self._wait_for_image_ready( + image_name, + image_tag, + effective_team_id, + timeout_seconds, + poll_interval_seconds, + on_build_log, + ) + return ImageBuildResult( + image_reference=image_ref, + image_name=image_name, + image_tag=image_tag, + status="COMPLETED", + reused=True, + ) + + dockerfile = image.dockerfile() + _emit(on_build_log, f"Preparing declarative image {image_name}:{image_tag}") + archive = _build_context_archive(dockerfile) + + payload: dict[str, Any] = { + "image_name": image_name, + "image_tag": image_tag, + "dockerfile_path": PACKAGED_DOCKERFILE_PATH, + "platform": platform, + } + if effective_team_id: + payload["team_id"] = effective_team_id + if visibility: + payload["visibility"] = _normalize_visibility(visibility) + + _emit(on_build_log, "Initiating image build") + build_response = await self.client.request("POST", "/images/build", json=payload) + build_id = build_response.get("build_id") + upload_url = build_response.get("upload_url") + if not build_id or not upload_url: + raise APIError("Invalid response from image build API: missing build_id or upload_url") + image_reference = str(build_response.get("fullImagePath") or f"{image_name}:{image_tag}") + + _emit(on_build_log, "Uploading image build context") + async with httpx.AsyncClient(timeout=600.0) as http_client: + upload_response = await http_client.put( + str(upload_url), + content=archive, + headers={"Content-Type": "application/octet-stream"}, + ) + upload_response.raise_for_status() + + _emit(on_build_log, "Starting image build") + await self.client.request( + "POST", + f"/images/build/{build_id}/start", + json={"context_uploaded": True}, + ) + + image_reference = await self._wait_for_image_ready( + image_name, + image_tag, + effective_team_id, + timeout_seconds, + poll_interval_seconds, + on_build_log, + ) + return ImageBuildResult( + image_reference=image_reference, + image_name=image_name, + image_tag=image_tag, + status="COMPLETED", + build_id=str(build_id), + ) + + async def _latest_image_row( + self, image_name: str, image_tag: str, team_id: Optional[str] + ) -> Optional[dict[str, Any]]: + params: dict[str, str] = {"limit": "250", "offset": "0"} + if team_id: + params["teamId"] = team_id + response = await self.client.request("GET", "/images", params=params) + return _latest_matching_row(response.get("data", []), image_name, image_tag, team_id) + + async def _wait_for_image_ready( + self, + image_name: str, + image_tag: str, + team_id: Optional[str], + timeout_seconds: Optional[float], + poll_interval_seconds: float, + on_build_log: Optional[BuildLogCallback], + ) -> str: + deadline = _deadline(timeout_seconds) + last_status: Optional[str] = None + while True: + row = await self._latest_image_row(image_name, image_tag, team_id) + if row is not None: + status = str(row.get("status") or "UNKNOWN") + if status != last_status: + _emit(on_build_log, f"Image build status: {status}") + last_status = status + if status == "COMPLETED": + image_ref = _image_reference_from_row(row, image_name, image_tag) + _emit(on_build_log, f"Image ready: {image_ref}") + return image_ref + if status in _TERMINAL_FAILURE_STATUSES: + raise APIError(f"Image build {status.lower()} for {image_name}:{image_tag}") + elif last_status is None: + _emit(on_build_log, "Waiting for image build to appear") + last_status = "UNKNOWN" + + _raise_if_timed_out(deadline, image_name, image_tag) + await asyncio.sleep(max(0.0, poll_interval_seconds)) + + +def _build_context_archive(dockerfile: str) -> bytes: + data = dockerfile.encode("utf-8") + fileobj = io.BytesIO() + with tarfile.open(fileobj=fileobj, mode="w:gz") as tar: + info = tarfile.TarInfo(PACKAGED_DOCKERFILE_PATH) + info.size = len(data) + info.mtime = 0 + tar.addfile(info, io.BytesIO(data)) + return fileobj.getvalue() + + +def _coerce_non_empty_list(name: str, values: str | Iterable[str]) -> list[str]: + if isinstance(values, str): + result = [values] + else: + result = [str(value) for value in values] + if not result: + raise ValueError(f"{name} must not be empty") + for value in result: + _require_single_line(name, value) + if not value.strip(): + raise ValueError(f"{name} must not contain empty values") + return [value.strip() for value in result] + + +def _deadline(timeout_seconds: Optional[float]) -> Optional[float]: + if timeout_seconds is None or timeout_seconds <= 0: + return None + return time.monotonic() + timeout_seconds + + +def _dockerfile_double_quote(value: str) -> str: + return '"' + value.replace("\\", "\\\\").replace('"', '\\"') + '"' + + +def _effective_team_id(client: APIClient | AsyncAPIClient, team_id: Optional[str]) -> Optional[str]: + if team_id is not None: + return team_id + return client.config.team_id + + +def _emit(callback: Optional[BuildLogCallback], message: str) -> None: + if callback is not None: + callback(message) + + +def _image_reference_from_row(row: Mapping[str, Any], image_name: str, image_tag: str) -> str: + return str( + row.get("displayRef") + or row.get("fullImagePath") + or row.get("imageReference") + or f"{image_name}:{image_tag}" + ) + + +def _latest_matching_row( + rows: Any, image_name: str, image_tag: str, team_id: Optional[str] +) -> Optional[dict[str, Any]]: + if not isinstance(rows, list): + return None + matches: list[dict[str, Any]] = [] + for row in rows: + if not isinstance(row, dict): + continue + artifact_type = str(row.get("artifactType") or "CONTAINER_IMAGE") + if artifact_type != "CONTAINER_IMAGE": + continue + if row.get("imageName") != image_name or row.get("imageTag") != image_tag: + continue + if team_id and row.get("teamId") not in {None, team_id}: + continue + matches.append(row) + if not matches: + return None + return max(matches, key=_row_timestamp) + + +def _normalize_visibility(visibility: str) -> str: + normalized = visibility.upper() + if normalized not in {"PUBLIC", "PRIVATE"}: + raise ValueError("visibility must be PUBLIC or PRIVATE") + return normalized + + +def _parse_ts(value: Any) -> datetime: + if not value: + return datetime.min.replace(tzinfo=timezone.utc) + try: + parsed = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + except Exception: + return datetime.min.replace(tzinfo=timezone.utc) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed + + +def _raise_if_timed_out( + deadline: Optional[float], image_name: str, image_tag: str +) -> None: + if deadline is not None and time.monotonic() >= deadline: + raise APITimeoutError(f"Timed out waiting for image {image_name}:{image_tag} to build") + + +def _require_single_line(name: str, value: str) -> None: + if "\n" in value or "\r" in value or "\0" in value: + raise ValueError(f"{name} must be a single line") + + +def _resolve_image_name_tag( + image: Image, image_name: Optional[str], image_tag: Optional[str] +) -> tuple[str, str]: + resolved_name = image_name or DEFAULT_DECLARATIVE_IMAGE_NAME + resolved_tag = image_tag + if ":" in resolved_name and resolved_tag is None: + resolved_name, resolved_tag = resolved_name.rsplit(":", 1) + if resolved_tag is None: + resolved_tag = image.default_tag() + + if "/" in resolved_name or ":" in resolved_name or not _IMAGE_NAME_RE.match(resolved_name): + raise ValueError( + "image_name must be a simple registry image name without '/' " + "or ':' characters" + ) + if not _IMAGE_TAG_RE.match(resolved_tag): + raise ValueError("image_tag must be a valid Docker tag") + return resolved_name, resolved_tag + + +def _row_timestamp(row: Mapping[str, Any]) -> datetime: + for key in _LATEST_ROW_KEYS: + parsed = _parse_ts(row.get(key)) + if parsed != datetime.min.replace(tzinfo=timezone.utc): + return parsed + return datetime.min.replace(tzinfo=timezone.utc) + + +__all__ = [ + "Image", + "ImageBuildResult", +] diff --git a/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py b/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py index b36092fbc..8f99e6703 100644 --- a/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py +++ b/packages/prime-sandboxes/src/prime_sandboxes/sandbox.py @@ -36,6 +36,14 @@ SandboxTimeoutError, UploadTimeoutError, ) +from .images import ( + DEFAULT_IMAGE_PLATFORM, + AsyncImageClient, + BuildLogCallback, + Image, + ImageBuildResult, + ImageClient, +) from .models import ( BackgroundJob, BackgroundJobStatus, @@ -628,8 +636,43 @@ def _guard_vm_unsupported(self, sandbox_id: str, feature_name: str) -> None: if self._auth_cache.is_vm(sandbox_id): raise APIError(f"{feature_name} is not yet supported for VM sandboxes.") - def create(self, request: CreateSandboxRequest) -> Sandbox: - """Create a new sandbox""" + def build_image( + self, + image: Image, + *, + image_name: Optional[str] = None, + image_tag: Optional[str] = None, + platform: str = DEFAULT_IMAGE_PLATFORM, + team_id: Optional[str] = None, + visibility: Optional[str] = None, + force_rebuild: bool = False, + timeout_seconds: Optional[float] = 900, + poll_interval_seconds: float = 5.0, + on_build_log: Optional[BuildLogCallback] = None, + ) -> ImageBuildResult: + """Build or reuse a declarative image and return its registry reference. + + Use this when many sandboxes should share one runtime image. + """ + image_client = ImageClient(self.client) + return image_client.build( + image, + image_name=image_name, + image_tag=image_tag, + platform=platform, + team_id=team_id, + visibility=visibility, + force_rebuild=force_rebuild, + timeout_seconds=timeout_seconds, + poll_interval_seconds=poll_interval_seconds, + on_build_log=on_build_log, + ) + + def create( + self, + request: CreateSandboxRequest, + ) -> Sandbox: + """Create a new sandbox.""" payload = request.model_dump(by_alias=False, exclude_none=True) # Auto-populate team_id from config if not specified if request.team_id is None and self.client.config.team_id is not None: @@ -1541,8 +1584,43 @@ async def _guard_vm_unsupported(self, sandbox_id: str, feature_name: str) -> Non if await self._auth_cache.is_vm(sandbox_id): raise APIError(f"{feature_name} is not yet supported for VM sandboxes.") - async def create(self, request: CreateSandboxRequest) -> Sandbox: - """Create a new sandbox""" + async def build_image( + self, + image: Image, + *, + image_name: Optional[str] = None, + image_tag: Optional[str] = None, + platform: str = DEFAULT_IMAGE_PLATFORM, + team_id: Optional[str] = None, + visibility: Optional[str] = None, + force_rebuild: bool = False, + timeout_seconds: Optional[float] = 900, + poll_interval_seconds: float = 5.0, + on_build_log: Optional[BuildLogCallback] = None, + ) -> ImageBuildResult: + """Build or reuse a declarative image and return its registry reference. + + Use this when many sandboxes should share one runtime image. + """ + image_client = AsyncImageClient(self.client) + return await image_client.build( + image, + image_name=image_name, + image_tag=image_tag, + platform=platform, + team_id=team_id, + visibility=visibility, + force_rebuild=force_rebuild, + timeout_seconds=timeout_seconds, + poll_interval_seconds=poll_interval_seconds, + on_build_log=on_build_log, + ) + + async def create( + self, + request: CreateSandboxRequest, + ) -> Sandbox: + """Create a new sandbox.""" payload = request.model_dump(by_alias=False, exclude_none=True) if request.team_id is None and self.client.config.team_id is not None: payload["team_id"] = self.client.config.team_id diff --git a/packages/prime-sandboxes/tests/test_declarative_images.py b/packages/prime-sandboxes/tests/test_declarative_images.py new file mode 100644 index 000000000..0943d2d95 --- /dev/null +++ b/packages/prime-sandboxes/tests/test_declarative_images.py @@ -0,0 +1,197 @@ +import io +import tarfile +from types import SimpleNamespace +from typing import Any + +from prime_sandboxes import ( + CreateSandboxRequest, + Image, + ImageBuildResult, + SandboxClient, +) +from prime_sandboxes.images import PACKAGED_DOCKERFILE_PATH, ImageClient + + +def test_declarative_image_renders_dockerfile() -> None: + image = ( + Image.debian_slim("3.12") + .apt_install(["git", "curl"]) + .pip_install(["requests", "pytest"]) + .env({"PRIME_ENV": "test value"}) + .workdir("/home/daytona") + .cmd(["python", "-m", "pytest"]) + ) + + assert image.dockerfile() == ( + "FROM python:3.12-slim\n" + "RUN apt-get update && apt-get install -y --no-install-recommends " + "git curl && rm -rf /var/lib/apt/lists/*\n" + "RUN python -m pip install --no-cache-dir requests pytest\n" + 'ENV PRIME_ENV="test value"\n' + "WORKDIR /home/daytona\n" + 'CMD ["python", "-m", "pytest"]\n' + ) + assert image.default_tag().startswith("sha-") + + +def test_image_client_builds_uploads_and_waits(monkeypatch: Any) -> None: + image = Image.debian_slim("3.12").pip_install(["requests"]).workdir("/workspace") + image_tag = image.default_tag() + captured: dict[str, Any] = {} + + class DummyAPIClient: + config = SimpleNamespace(team_id=None) + + def __init__(self) -> None: + self.started = False + + def request(self, method: str, path: str, json: Any = None, params: Any = None) -> dict: + captured.setdefault("requests", []).append((method, path, json, params)) + if method == "GET" and path == "/images": + if self.started: + return { + "data": [ + { + "imageName": "declarative-sandbox", + "imageTag": image_tag, + "artifactType": "CONTAINER_IMAGE", + "status": "COMPLETED", + "fullImagePath": f"user/declarative-sandbox:{image_tag}", + "pushedAt": "2026-05-24T00:00:00Z", + } + ] + } + return {"data": []} + if method == "POST" and path == "/images/build": + captured["build_payload"] = json + return { + "build_id": "build-123", + "upload_url": "https://example.test/upload", + "fullImagePath": f"user/declarative-sandbox:{image_tag}", + } + if method == "POST" and path == "/images/build/build-123/start": + self.started = True + captured["start_payload"] = json + return {} + raise AssertionError(f"unexpected request: {method} {path}") + + class DummyUploadResponse: + def raise_for_status(self) -> None: + return None + + def fake_put(url: str, content: bytes, headers: dict[str, str], timeout: float) -> Any: + captured["upload_url"] = url + captured["archive"] = content + captured["upload_headers"] = headers + captured["upload_timeout"] = timeout + return DummyUploadResponse() + + monkeypatch.setattr("prime_sandboxes.images.httpx.put", fake_put) + + logs: list[str] = [] + result = ImageClient(DummyAPIClient()).build( + image, + poll_interval_seconds=0, + on_build_log=logs.append, + ) + + assert result.image_reference == f"user/declarative-sandbox:{image_tag}" + assert result.status == "COMPLETED" + assert result.build_id == "build-123" + assert captured["build_payload"] == { + "image_name": "declarative-sandbox", + "image_tag": image_tag, + "dockerfile_path": PACKAGED_DOCKERFILE_PATH, + "platform": "linux/amd64", + } + assert captured["start_payload"] == {"context_uploaded": True} + assert captured["upload_url"] == "https://example.test/upload" + assert captured["upload_headers"] == {"Content-Type": "application/octet-stream"} + assert captured["upload_timeout"] == 600.0 + assert "Image ready" in logs[-1] + + with tarfile.open(fileobj=io.BytesIO(captured["archive"]), mode="r:gz") as tar: + assert PACKAGED_DOCKERFILE_PATH in tar.getnames() + dockerfile_member = tar.extractfile(PACKAGED_DOCKERFILE_PATH) + assert dockerfile_member is not None + assert dockerfile_member.read().decode() == image.dockerfile() + + +def test_image_client_reuses_existing_completed_image() -> None: + image = Image.debian_slim("3.12") + image_tag = image.default_tag() + + class DummyAPIClient: + config = SimpleNamespace(team_id="team-123") + + def request(self, method: str, path: str, json: Any = None, params: Any = None) -> dict: + assert method == "GET" + assert path == "/images" + assert params == {"limit": "250", "offset": "0", "teamId": "team-123"} + return { + "data": [ + { + "imageName": "declarative-sandbox", + "imageTag": image_tag, + "teamId": "team-123", + "artifactType": "CONTAINER_IMAGE", + "status": "COMPLETED", + "fullImagePath": f"team-team-123/declarative-sandbox:{image_tag}", + "pushedAt": "2026-05-24T00:00:00Z", + } + ] + } + + result = ImageClient(DummyAPIClient()).build(image) + + assert result.reused is True + assert result.build_id is None + assert result.image_reference == f"team-team-123/declarative-sandbox:{image_tag}" + + +def test_sandbox_client_build_image_supports_batch_fanout( + monkeypatch: Any, tmp_path: Any +) -> None: + captured: dict[str, Any] = {} + + class DummyImageClient: + def __init__(self, api_client: Any) -> None: + captured["image_client_api_client"] = api_client + + def build(self, image: Image, **kwargs: Any) -> ImageBuildResult: + captured["image"] = image + captured["build_kwargs"] = kwargs + return ImageBuildResult( + image_reference="user/batch-runtime:sha-123", + image_name="batch-runtime", + image_tag="sha-123", + status="COMPLETED", + build_id="build-123", + ) + + class DummyAPIClient: + config = SimpleNamespace(team_id=None, config_dir=tmp_path) + + monkeypatch.setattr("prime_sandboxes.sandbox.ImageClient", DummyImageClient) + + image = Image.debian_slim("3.12").pip_install("requests") + build = SandboxClient(DummyAPIClient()).build_image( + image, + image_name="batch-runtime", + timeout_seconds=0, + poll_interval_seconds=0, + ) + requests = [ + CreateSandboxRequest(name=f"batch-{i}", docker_image=build.image_reference) + for i in range(3) + ] + + assert build.image_reference == "user/batch-runtime:sha-123" + assert captured["image"] is image + assert captured["build_kwargs"]["image_name"] == "batch-runtime" + assert captured["build_kwargs"]["timeout_seconds"] == 0 + assert [request.docker_image for request in requests] == [ + "user/batch-runtime:sha-123", + "user/batch-runtime:sha-123", + "user/batch-runtime:sha-123", + ] diff --git a/packages/prime/src/prime_cli/__init__.py b/packages/prime/src/prime_cli/__init__.py index 8f82f656c..983096f96 100644 --- a/packages/prime/src/prime_cli/__init__.py +++ b/packages/prime/src/prime_cli/__init__.py @@ -6,6 +6,8 @@ CommandResponse, CommandTimeoutError, CreateSandboxRequest, + Image, + ImageBuildResult, Sandbox, SandboxClient, SandboxNotRunningError, @@ -34,6 +36,8 @@ "CommandTimeoutError", "Config", "CreateSandboxRequest", + "Image", + "ImageBuildResult", "Sandbox", "SandboxClient", "SandboxNotRunningError",