diff --git a/docs/docs/concepts/dev-environments.md b/docs/docs/concepts/dev-environments.md index 145316ad5..f67c2351f 100644 --- a/docs/docs/concepts/dev-environments.md +++ b/docs/docs/concepts/dev-environments.md @@ -175,6 +175,8 @@ name: vscode ide: vscode resources: + # 16 or more x86_64 cores + cpu: 16.. # 200GB or more RAM memory: 200GB.. # 4 GPUs from 40GB to 80GB @@ -187,10 +189,16 @@ resources: +The `cpu` property also allows you to specify the CPU architecture, `x86` or `arm`. Examples: +`x86:16` (16 x86-64 cores), `arm:8..` (at least 8 ARM64 cores). +If the architecture is not specified, `dstack` tries to infer it from the `gpu` specification +using `x86` as the fallback value. + The `gpu` property allows specifying not only memory size but also GPU vendor, names and their quantity. Examples: `nvidia` (one NVIDIA GPU), `A100` (one A100), `A10G,A100` (either A10G or A100), `A100:80GB` (one A100 of 80GB), `A100:2` (two A100), `24GB..40GB:2` (two GPUs between 24GB and 40GB), `A100:40GB:2` (two A100 GPUs of 40GB). +If the vendor is not specified, `dstack` tries to infer it from the GPU name using `nvidia` as the fallback value. ??? info "Google Cloud TPU" To use TPUs, specify its architecture via the `gpu` property. diff --git a/docs/docs/concepts/services.md b/docs/docs/concepts/services.md index 4d13bc3e6..1f7b515b8 100644 --- a/docs/docs/concepts/services.md +++ b/docs/docs/concepts/services.md @@ -316,6 +316,8 @@ commands: port: 8000 resources: + # 16 or more x86_64 cores + cpu: 16.. # 2 GPUs of 80GB gpu: 80GB:2 @@ -325,10 +327,16 @@ resources: +The `cpu` property also allows you to specify the CPU architecture, `x86` or `arm`. Examples: +`x86:16` (16 x86-64 cores), `arm:8..` (at least 8 ARM64 cores). +If the architecture is not specified, `dstack` tries to infer it from the `gpu` specification +using `x86` as the fallback value. + The `gpu` property allows specifying not only memory size but also GPU vendor, names and their quantity. Examples: `nvidia` (one NVIDIA GPU), `A100` (one A100), `A10G,A100` (either A10G or A100), `A100:80GB` (one A100 of 80GB), `A100:2` (two A100), `24GB..40GB:2` (two GPUs between 24GB and 40GB), `A100:40GB:2` (two A100 GPUs of 40GB). +If the vendor is not specified, `dstack` tries to infer it from the GPU name using `nvidia` as the fallback value. ??? info "Google Cloud TPU" To use TPUs, specify its architecture via the `gpu` property. diff --git a/docs/docs/concepts/tasks.md b/docs/docs/concepts/tasks.md index cf3c6fdd5..dcdc6b530 100644 --- a/docs/docs/concepts/tasks.md +++ b/docs/docs/concepts/tasks.md @@ -192,6 +192,8 @@ commands: - python fine-tuning/qlora/train.py resources: + # 16 or more x86_64 cores + cpu: 16.. # 200GB or more RAM memory: 200GB.. # 4 GPUs from 40GB to 80GB @@ -204,10 +206,16 @@ resources: +The `cpu` property also allows you to specify the CPU architecture, `x86` or `arm`. Examples: +`x86:16` (16 x86-64 cores), `arm:8..` (at least 8 ARM64 cores). +If the architecture is not specified, `dstack` tries to infer it from the `gpu` specification +using `x86` as the fallback value. + The `gpu` property allows specifying not only memory size but also GPU vendor, names and their quantity. Examples: `nvidia` (one NVIDIA GPU), `A100` (one A100), `A10G,A100` (either A10G or A100), `A100:80GB` (one A100 of 80GB), `A100:2` (two A100), `24GB..40GB:2` (two GPUs between 24GB and 40GB), `A100:40GB:2` (two A100 GPUs of 40GB). +If the vendor is not specified, `dstack` tries to infer it from the GPU name using `nvidia` as the fallback value. ??? info "Google Cloud TPU" To use TPUs, specify its architecture via the `gpu` property. diff --git a/docs/docs/reference/api/python/index.md b/docs/docs/reference/api/python/index.md index f2974c1ba..82cac4a5e 100644 --- a/docs/docs/reference/api/python/index.md +++ b/docs/docs/reference/api/python/index.md @@ -136,10 +136,21 @@ finally: show_root_toc_entry: false heading_level: 4 item_id_mapping: + cpu: dstack.api.CPU gpu: dstack.api.GPU memory: dstack.api.Memory Range: dstack.api.Range +### `dstack.api.CPU` { #dstack.api.CPU data-toc-label="CPU" } + +#SCHEMA# dstack.api.CPU + overrides: + show_root_heading: false + show_root_toc_entry: false + heading_level: 4 + item_id_mapping: + Range: dstack.api.Range + ### `dstack.api.GPU` { #dstack.api.GPU data-toc-label="GPU" } #SCHEMA# dstack.api.GPU diff --git a/docs/docs/reference/dstack.yml/dev-environment.md b/docs/docs/reference/dstack.yml/dev-environment.md index 7f4c9189f..0c872e1d1 100644 --- a/docs/docs/reference/dstack.yml/dev-environment.md +++ b/docs/docs/reference/dstack.yml/dev-environment.md @@ -35,6 +35,14 @@ The `dev-environment` configuration type allows running [dev environments](../.. required: true item_id_prefix: resources- +#### `resources.cpu` { #resources-cpu data-toc-label="cpu" } + +#SCHEMA# dstack._internal.core.models.resources.CPUSpec + overrides: + show_root_heading: false + type: + required: true + #### `resources.gpu` { #resources-gpu data-toc-label="gpu" } #SCHEMA# dstack._internal.core.models.resources.GPUSpec diff --git a/docs/docs/reference/dstack.yml/fleet.md b/docs/docs/reference/dstack.yml/fleet.md index d913c726f..b6d055385 100644 --- a/docs/docs/reference/dstack.yml/fleet.md +++ b/docs/docs/reference/dstack.yml/fleet.md @@ -46,7 +46,15 @@ The `fleet` configuration type allows creating and updating fleets. required: true item_id_prefix: resources- -#### `resouces.gpu` { #resources-gpu data-toc-label="gpu" } +#### `resources.cpu` { #resources-cpu data-toc-label="cpu" } + +#SCHEMA# dstack._internal.core.models.resources.CPUSpec + overrides: + show_root_heading: false + type: + required: true + +#### `resources.gpu` { #resources-gpu data-toc-label="gpu" } #SCHEMA# dstack._internal.core.models.resources.GPUSpec overrides: @@ -54,7 +62,7 @@ The `fleet` configuration type allows creating and updating fleets. type: required: true -#### `resouces.disk` { #resources-disk data-toc-label="disk" } +#### `resources.disk` { #resources-disk data-toc-label="disk" } #SCHEMA# dstack._internal.core.models.resources.DiskSpec overrides: diff --git a/docs/docs/reference/dstack.yml/service.md b/docs/docs/reference/dstack.yml/service.md index 10b54506f..d7cb13acd 100644 --- a/docs/docs/reference/dstack.yml/service.md +++ b/docs/docs/reference/dstack.yml/service.md @@ -129,7 +129,15 @@ The `service` configuration type allows running [services](../../concepts/servic required: true item_id_prefix: resources- -#### `resouces.gpu` { #resources-gpu data-toc-label="gpu" } +#### `resources.cpu` { #resources-cpu data-toc-label="cpu" } + +#SCHEMA# dstack._internal.core.models.resources.CPUSpec + overrides: + show_root_heading: false + type: + required: true + +#### `resources.gpu` { #resources-gpu data-toc-label="gpu" } #SCHEMA# dstack._internal.core.models.resources.GPUSpec overrides: @@ -137,7 +145,7 @@ The `service` configuration type allows running [services](../../concepts/servic type: required: true -#### `resouces.disk` { #resources-disk data-toc-label="disk" } +#### `resources.disk` { #resources-disk data-toc-label="disk" } #SCHEMA# dstack._internal.core.models.resources.DiskSpec overrides: diff --git a/docs/docs/reference/dstack.yml/task.md b/docs/docs/reference/dstack.yml/task.md index 65afc2d11..0565dbf6f 100644 --- a/docs/docs/reference/dstack.yml/task.md +++ b/docs/docs/reference/dstack.yml/task.md @@ -35,7 +35,15 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md). required: true item_id_prefix: resources- -#### `resouces.gpu` { #resources-gpu data-toc-label="gpu" } +#### `resources.cpu` { #resources-cpu data-toc-label="cpu" } + +#SCHEMA# dstack._internal.core.models.resources.CPUSpec + overrides: + show_root_heading: false + type: + required: true + +#### `resources.gpu` { #resources-gpu data-toc-label="gpu" } #SCHEMA# dstack._internal.core.models.resources.GPUSpec overrides: @@ -43,7 +51,7 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md). type: required: true -#### `resouces.disk` { #resources-disk data-toc-label="disk" } +#### `resources.disk` { #resources-disk data-toc-label="disk" } #SCHEMA# dstack._internal.core.models.resources.DiskSpec overrides: diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index ce633e1c6..2ea78a723 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -117,8 +117,11 @@ For more details on the options below, refer to the [server deployment](../guide * `DSTACK_SERVER_MAX_OFFERS_TRIED` - Sets how many instance offers to try when starting a job. Setting a high value can degrade server performance. * `DSTACK_RUNNER_VERSION` – Sets exact runner version for debug. Defaults to `latest`. Ignored if `DSTACK_RUNNER_DOWNLOAD_URL` is set. - * `DSTACK_RUNNER_DOWNLOAD_URL` – Overrides `dstack-runner` binary download URL. - * `DSTACK_SHIM_DOWNLOAD_URL` – Overrides `dstack-shim` binary download URL. + * `DSTACK_RUNNER_DOWNLOAD_URL` – Overrides `dstack-runner` binary download URL. The URL can contain `{version}` and/or `{arch}` placeholders, + where `{version}` is `dstack` version in the `X.Y.Z` format or `latest`, and `{arch}` is either `amd64` or `arm64`, for example, + `https://dstack.example.com/{arch}/{version}/dstack-runner`. + * `DSTACK_SHIM_DOWNLOAD_URL` – Overrides `dstack-shim` binary download URL. The URL can contain `{version}` and/or `{arch}` placeholders, + see `DSTACK_RUNNER_DOWNLOAD_URL` for the details. * `DSTACK_DEFAULT_CREDS_DISABLED` – Disables default credentials detection if set. Defaults to `None`. * `DSTACK_LOCAL_BACKEND_ENABLED` – Enables local backend for debug if set. Defaults to `None`. diff --git a/src/dstack/_internal/cli/services/args.py b/src/dstack/_internal/cli/services/args.py index 66217a470..a18998450 100644 --- a/src/dstack/_internal/cli/services/args.py +++ b/src/dstack/_internal/cli/services/args.py @@ -19,8 +19,8 @@ def port_mapping(v: str) -> PortMapping: return PortMapping.parse(v) -def cpu_spec(v: str) -> resources.Range[int]: - return parse_obj_as(resources.Range[int], v) +def cpu_spec(v: str) -> dict: + return resources.CPUSpec.parse(v) def memory_spec(v: str) -> resources.Range[resources.Memory]: diff --git a/src/dstack/_internal/cli/services/configurators/run.py b/src/dstack/_internal/cli/services/configurators/run.py index da166ac44..b20e33c09 100644 --- a/src/dstack/_internal/cli/services/configurators/run.py +++ b/src/dstack/_internal/cli/services/configurators/run.py @@ -6,9 +6,10 @@ from typing import Dict, List, Optional, Set, Tuple import gpuhunt +from pydantic import parse_obj_as import dstack._internal.core.models.resources as resources -from dstack._internal.cli.services.args import disk_spec, gpu_spec, port_mapping +from dstack._internal.cli.services.args import cpu_spec, disk_spec, gpu_spec, port_mapping from dstack._internal.cli.services.configurators.base import ( ApplyEnvVarsConfiguratorMixin, BaseApplyConfigurator, @@ -39,6 +40,7 @@ TaskConfiguration, ) from dstack._internal.core.models.repos.base import Repo +from dstack._internal.core.models.resources import CPUSpec from dstack._internal.core.models.runs import JobSubmission, JobTerminationReason, RunStatus from dstack._internal.core.services.configs import ConfigManager from dstack._internal.core.services.diff import diff_models @@ -72,6 +74,7 @@ def apply_configuration( ): self.apply_args(conf, configurator_args, unknown_args) self.validate_gpu_vendor_and_image(conf) + self.validate_cpu_arch_and_image(conf) if repo is None: repo = self.api.repos.load(Path.cwd()) config_manager = ConfigManager() @@ -289,6 +292,14 @@ def register_args(cls, parser: argparse.ArgumentParser, default_max_offers: int default=default_max_offers, ) cls.register_env_args(configuration_group) + configuration_group.add_argument( + "--cpu", + type=cpu_spec, + help="Request CPU for the run. " + "The format is [code]ARCH[/]:[code]COUNT[/] (all parts are optional)", + dest="cpu_spec", + metavar="SPEC", + ) configuration_group.add_argument( "--gpu", type=gpu_spec, @@ -310,6 +321,8 @@ def apply_args(self, conf: BaseRunConfiguration, args: argparse.Namespace, unkno apply_profile_args(args, conf) if args.run_name: conf.name = args.run_name + if args.cpu_spec: + conf.resources.cpu = resources.CPUSpec.parse_obj(args.cpu_spec) if args.gpu_spec: conf.resources.gpu = resources.GPUSpec.parse_obj(args.gpu_spec) if args.disk_spec: @@ -342,7 +355,7 @@ def interpolate_env(self, conf: BaseRunConfiguration): def validate_gpu_vendor_and_image(self, conf: BaseRunConfiguration) -> None: """ - Infers `resources.gpu.vendor` if not set, requires `image` if the vendor is AMD. + Infers and sets `resources.gpu.vendor` if not set, requires `image` if the vendor is AMD. """ gpu_spec = conf.resources.gpu if gpu_spec is None: @@ -400,6 +413,29 @@ def validate_gpu_vendor_and_image(self, conf: BaseRunConfiguration) -> None: "`image` is required if `resources.gpu.vendor` is `tenstorrent`" ) + def validate_cpu_arch_and_image(self, conf: BaseRunConfiguration) -> None: + """ + Infers `resources.cpu.arch` if not set, requires `image` if the architecture is ARM. + """ + # TODO: Remove in 0.20. Use conf.resources.cpu directly + cpu_spec = parse_obj_as(CPUSpec, conf.resources.cpu) + arch = cpu_spec.arch + if arch is None: + gpu_spec = conf.resources.gpu + if ( + gpu_spec is not None + and gpu_spec.vendor in [None, gpuhunt.AcceleratorVendor.NVIDIA] + and gpu_spec.name + and any(map(gpuhunt.is_nvidia_superchip, gpu_spec.name)) + ): + arch = gpuhunt.CPUArchitecture.ARM + else: + arch = gpuhunt.CPUArchitecture.X86 + # NOTE: We don't set the inferred resources.cpu.arch for compatibility with older servers. + # Servers with ARM support set the arch using the same logic. + if arch == gpuhunt.CPUArchitecture.ARM and conf.image is None: + raise ConfigurationError("`image` is required if `resources.cpu.arch` is `arm`") + class RunWithPortsConfigurator(BaseRunConfigurator): @classmethod diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index c9ff5f2b1..7a6b7653f 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from functools import lru_cache from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Literal, Optional import git import requests @@ -45,6 +45,8 @@ DSTACK_SHIM_BINARY_NAME = "dstack-shim" DSTACK_RUNNER_BINARY_NAME = "dstack-runner" +GoArchType = Literal["amd64", "arm64"] + class Compute(ABC): """ @@ -523,13 +525,14 @@ def get_shim_env( base_path: Optional[PathLike] = None, bin_path: Optional[PathLike] = None, backend_shim_env: Optional[Dict[str, str]] = None, + arch: Optional[str] = None, ) -> Dict[str, str]: log_level = "6" # Trace envs = { "DSTACK_SHIM_HOME": get_dstack_working_dir(base_path), "DSTACK_SHIM_HTTP_PORT": str(DSTACK_SHIM_HTTP_PORT), "DSTACK_SHIM_LOG_LEVEL": log_level, - "DSTACK_RUNNER_DOWNLOAD_URL": get_dstack_runner_download_url(), + "DSTACK_RUNNER_DOWNLOAD_URL": get_dstack_runner_download_url(arch), "DSTACK_RUNNER_BINARY_PATH": get_dstack_runner_binary_path(bin_path), "DSTACK_RUNNER_HTTP_PORT": str(DSTACK_RUNNER_HTTP_PORT), "DSTACK_RUNNER_SSH_PORT": str(DSTACK_RUNNER_SSH_PORT), @@ -549,16 +552,19 @@ def get_shim_commands( base_path: Optional[PathLike] = None, bin_path: Optional[PathLike] = None, backend_shim_env: Optional[Dict[str, str]] = None, + arch: Optional[str] = None, ) -> List[str]: commands = get_shim_pre_start_commands( base_path=base_path, bin_path=bin_path, + arch=arch, ) shim_env = get_shim_env( authorized_keys=authorized_keys, base_path=base_path, bin_path=bin_path, backend_shim_env=backend_shim_env, + arch=arch, ) for k, v in shim_env.items(): commands += [f'export "{k}={v}"'] @@ -579,35 +585,63 @@ def get_dstack_runner_version() -> str: return version or "latest" -def get_dstack_runner_download_url() -> str: - if url := os.environ.get("DSTACK_RUNNER_DOWNLOAD_URL"): - return url - build = get_dstack_runner_version() - if settings.DSTACK_VERSION is not None: - bucket = "dstack-runner-downloads" - else: - bucket = "dstack-runner-downloads-stgn" - return ( - f"https://{bucket}.s3.eu-west-1.amazonaws.com/{build}/binaries/dstack-runner-linux-amd64" - ) - - -def get_dstack_shim_download_url() -> str: - if url := os.environ.get("DSTACK_SHIM_DOWNLOAD_URL"): - return url - build = get_dstack_runner_version() - if settings.DSTACK_VERSION is not None: - bucket = "dstack-runner-downloads" - else: - bucket = "dstack-runner-downloads-stgn" - return f"https://{bucket}.s3.eu-west-1.amazonaws.com/{build}/binaries/dstack-shim-linux-amd64" +def normalize_arch(arch: Optional[str] = None) -> GoArchType: + """ + Converts the given free-form architecture string to the Go GOARCH format. + Only 64-bit x86 and ARM are supported. If the word size is not specified (e.g., `x86`, `arm`), + 64-bit is implied. + If the arch is not specified, falls back to `amd64`. + """ + if not arch: + return "amd64" + arch_lower = arch.lower() + if "32" in arch_lower or arch_lower in ["i386", "i686"]: + raise ValueError(f"32-bit architectures are not supported: {arch}") + if arch_lower.startswith("x86") or arch_lower.startswith("amd"): + return "amd64" + if arch_lower.startswith("arm") or arch_lower.startswith("aarch"): + return "arm64" + raise ValueError(f"Unsupported architecture: {arch}") + + +def get_dstack_runner_download_url(arch: Optional[str] = None) -> str: + url_template = os.environ.get("DSTACK_RUNNER_DOWNLOAD_URL") + if not url_template: + if settings.DSTACK_VERSION is not None: + bucket = "dstack-runner-downloads" + else: + bucket = "dstack-runner-downloads-stgn" + url_template = ( + f"https://{bucket}.s3.eu-west-1.amazonaws.com" + "/{version}/binaries/dstack-runner-linux-{arch}" + ) + version = get_dstack_runner_version() + arch = normalize_arch(arch) + return url_template.format(version=version, arch=arch) + + +def get_dstack_shim_download_url(arch: Optional[str] = None) -> str: + url_template = os.environ.get("DSTACK_SHIM_DOWNLOAD_URL") + if not url_template: + if settings.DSTACK_VERSION is not None: + bucket = "dstack-runner-downloads" + else: + bucket = "dstack-runner-downloads-stgn" + url_template = ( + f"https://{bucket}.s3.eu-west-1.amazonaws.com" + "/{version}/binaries/dstack-shim-linux-{arch}" + ) + version = get_dstack_runner_version() + arch = normalize_arch(arch) + return url_template.format(version=version, arch=arch) def get_shim_pre_start_commands( base_path: Optional[PathLike] = None, bin_path: Optional[PathLike] = None, + arch: Optional[str] = None, ) -> List[str]: - url = get_dstack_shim_download_url() + url = get_dstack_shim_download_url(arch) dstack_shim_binary_path = get_dstack_shim_binary_path(bin_path) dstack_working_dir = get_dstack_working_dir(base_path) return [ diff --git a/src/dstack/_internal/core/backends/base/offers.py b/src/dstack/_internal/core/backends/base/offers.py index 5f5e96fd3..828332ae4 100644 --- a/src/dstack/_internal/core/backends/base/offers.py +++ b/src/dstack/_internal/core/backends/base/offers.py @@ -2,6 +2,7 @@ from typing import Callable, List, Optional import gpuhunt +from pydantic import parse_obj_as from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.instances import ( @@ -11,13 +12,14 @@ InstanceType, Resources, ) -from dstack._internal.core.models.resources import DEFAULT_DISK, Memory, Range +from dstack._internal.core.models.resources import DEFAULT_DISK, CPUSpec, Memory, Range from dstack._internal.core.models.runs import Requirements # Offers not supported by all dstack versions are hidden behind one or more flags. # This list enables the flags that are currently supported. SUPPORTED_GPUHUNT_FLAGS = [ "oci-spot", + "lambda-arm", ] @@ -71,6 +73,7 @@ def catalog_item_to_offer( if disk_size_mib is None: return None resources = Resources( + cpu_arch=item.cpu_arch, cpus=item.cpu, memory_mib=round(item.memory * 1024), gpus=gpus, @@ -90,6 +93,9 @@ def catalog_item_to_offer( def offer_to_catalog_item(offer: InstanceOffer) -> gpuhunt.CatalogItem: + cpu_arch = offer.instance.resources.cpu_arch + if cpu_arch is None: + cpu_arch = gpuhunt.CPUArchitecture.X86 gpu_count = len(offer.instance.resources.gpus) gpu_vendor = None gpu_name = None @@ -104,6 +110,7 @@ def offer_to_catalog_item(offer: InstanceOffer) -> gpuhunt.CatalogItem: instance_name=offer.instance.name, location=offer.region, price=offer.price, + cpu_arch=cpu_arch, cpu=offer.instance.resources.cpus, memory=offer.instance.resources.memory_mib / 1024, gpu_count=gpu_count, @@ -125,8 +132,11 @@ def requirements_to_query_filter(req: Optional[Requirements]) -> gpuhunt.QueryFi res = req.resources if res.cpu: - q.min_cpu = res.cpu.min - q.max_cpu = res.cpu.max + # TODO: Remove in 0.20. Use res.cpu directly + cpu = parse_obj_as(CPUSpec, res.cpu) + q.cpu_arch = cpu.arch + q.min_cpu = cpu.count.min + q.max_cpu = cpu.count.max if res.memory: q.min_memory = res.memory.min q.max_memory = res.memory.max diff --git a/src/dstack/_internal/core/backends/lambdalabs/compute.py b/src/dstack/_internal/core/backends/lambdalabs/compute.py index 5e7deb599..865b8972a 100644 --- a/src/dstack/_internal/core/backends/lambdalabs/compute.py +++ b/src/dstack/_internal/core/backends/lambdalabs/compute.py @@ -93,7 +93,10 @@ def update_provisioning_data( instance_info = _get_instance_info(self.api_client, provisioning_data.instance_id) if instance_info is not None and instance_info["status"] != "booting": provisioning_data.hostname = instance_info["ip"] - commands = get_shim_commands(authorized_keys=[project_ssh_public_key]) + commands = get_shim_commands( + authorized_keys=[project_ssh_public_key], + arch=provisioning_data.instance_type.resources.cpu_arch, + ) # shim is assumed to be run under root launch_command = "sudo sh -c '" + "&& ".join(commands) + "'" thread = Thread( diff --git a/src/dstack/_internal/core/backends/remote/provisioning.py b/src/dstack/_internal/core/backends/remote/provisioning.py index 7a2398b7c..9c65ec66a 100644 --- a/src/dstack/_internal/core/backends/remote/provisioning.py +++ b/src/dstack/_internal/core/backends/remote/provisioning.py @@ -6,8 +6,9 @@ from typing import Any, Dict, Generator, List, Optional import paramiko -from gpuhunt import AcceleratorVendor, correct_gpu_memory_gib +from gpuhunt import AcceleratorVendor, CPUArchitecture, correct_gpu_memory_gib +from dstack._internal.core.backends.base.compute import GoArchType, normalize_arch from dstack._internal.core.consts import DSTACK_SHIM_HTTP_PORT # FIXME: ProvisioningError is a subclass of ComputeError and should not be used outside of Compute @@ -36,6 +37,22 @@ HOST_INFO_FILE = "host_info.json" +def detect_cpu_arch(client: paramiko.SSHClient) -> GoArchType: + cmd = "uname -m" + try: + _, stdout, stderr = client.exec_command(cmd, timeout=20) + except (paramiko.SSHException, OSError) as e: + raise ProvisioningError(f"detect_cpu_arch: {e}") from e + out = stdout.read().strip().decode() + err = stderr.read().strip().decode() + if err: + raise ProvisioningError(f"detect_cpu_arch: {cmd} failed, stdout: {out}, stderr: {err}") + try: + return normalize_arch(out) + except ValueError as e: + raise ProvisioningError(f"detect_cpu_arch: failed to normalize arch: {e}") from e + + def sftp_upload(client: paramiko.SSHClient, path: str, body: str) -> None: try: sftp = client.open_sftp() @@ -226,7 +243,14 @@ def get_shim_healthcheck(client: paramiko.SSHClient) -> str: raise ProvisioningError(f"get_shim_healthcheck failed: {e}") from e -def host_info_to_instance_type(host_info: Dict[str, Any]) -> InstanceType: +def host_info_to_instance_type(host_info: Dict[str, Any], cpu_arch: GoArchType) -> InstanceType: + _cpu_arch: CPUArchitecture + if cpu_arch == "amd64": + _cpu_arch = CPUArchitecture.X86 + elif cpu_arch == "arm64": + _cpu_arch = CPUArchitecture.ARM + else: + raise ValueError(f"Unexpected cpu_arch: {cpu_arch}") gpu_count = host_info.get("gpu_count", 0) if gpu_count > 0: gpu_vendor = AcceleratorVendor.cast(host_info.get("gpu_vendor", "nvidia")) @@ -251,6 +275,7 @@ def host_info_to_instance_type(host_info: Dict[str, Any]) -> InstanceType: instance_type = InstanceType( name="instance", resources=Resources( + cpu_arch=_cpu_arch, cpus=host_info["cpus"], memory_mib=host_info["memory"] / 1024 / 1024, spot=False, diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index dc6f9fb77..54b14e383 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -49,11 +49,13 @@ class Resources(CoreModel): spot: bool disk: Disk = Disk(size_mib=102400) # the default value (100GB) for backward compatibility description: str = "" + cpu_arch: Optional[gpuhunt.CPUArchitecture] = None def pretty_format(self, include_spot: bool = False) -> str: resources = {} if self.cpus > 0: resources["cpus"] = self.cpus + resources["cpu_arch"] = self.cpu_arch if self.memory_mib > 0: resources["memory"] = f"{self.memory_mib / 1024:.0f}GB" if self.disk.size_mib > 0: diff --git a/src/dstack/_internal/core/models/resources.py b/src/dstack/_internal/core/models/resources.py index da155c8e8..0e6b2b8e9 100644 --- a/src/dstack/_internal/core/models/resources.py +++ b/src/dstack/_internal/core/models/resources.py @@ -1,8 +1,9 @@ import math +from collections.abc import Mapping from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar, Union import gpuhunt -from pydantic import Field, root_validator, validator +from pydantic import Field, parse_obj_as, root_validator, validator from pydantic.generics import GenericModel from typing_extensions import Annotated @@ -128,6 +129,67 @@ def __str__(self): DEFAULT_GPU_COUNT = Range[int](min=1, max=1) +class CPUSpec(CoreModel): + class Config: + @staticmethod + def schema_extra(schema: Dict[str, Any]): + add_extra_schema_types( + schema["properties"]["count"], + extra_types=[{"type": "integer"}, {"type": "string"}], + ) + + arch: Annotated[ + Optional[gpuhunt.CPUArchitecture], + Field(description="The CPU architecture, one of: `x86`, `arm`"), + ] = None + count: Annotated[Range[int], Field(description="The number of CPU cores")] = DEFAULT_CPU_COUNT + + @classmethod + def __get_validators__(cls): + yield cls.parse + yield cls.validate + + @classmethod + def parse(cls, v: Any) -> Any: + if isinstance(v, int): + v = str(v) + if isinstance(v, str): + tokens = v.replace(" ", "").split(":") + spec = {} + for token in tokens: + if not token: + raise ValueError(f"CPU spec contains empty token: {v}") + if ".." in token or token.isdigit(): + if "count" in spec: + raise ValueError(f"CPU spec count conflict: {v}") + spec["count"] = token + else: + try: + arch = gpuhunt.CPUArchitecture.cast(token) + except ValueError: + raise ValueError(f"Invalid CPU architecture: {v}") + if "arch" in spec: + raise ValueError(f"CPU spec arch conflict: {v}") + spec["arch"] = arch + return spec + # Range and min/max dict - for backward compatibility + if isinstance(v, Range): + return {"arch": None, "count": v} + if isinstance(v, Mapping) and v.keys() == {"min", "max"}: + return {"arch": None, "count": v} + return v + + @validator("arch", pre=True) + def _validate_arch(cls, v: Any) -> Any: + if v is None: + return None + if isinstance(v, gpuhunt.CPUArchitecture): + return v + if isinstance(v, str): + return gpuhunt.CPUArchitecture.cast(v) + return v + + class GPUSpec(CoreModel): class Config: @staticmethod @@ -302,7 +364,10 @@ def schema_extra(schema: Dict[str, Any]): extra_types=[{"type": "integer"}, {"type": "string"}], ) - cpu: Annotated[Range[int], Field(description="The number of CPU cores")] = DEFAULT_CPU_COUNT + # TODO: Remove Range[int] in 0.20. Range[int] for backward compatibility only. + cpu: Annotated[Union[CPUSpec, Range[int]], Field(description="The CPU requirements")] = ( + CPUSpec() + ) memory: Annotated[Range[Memory], Field(description="The RAM size (e.g., `8GB`)")] = ( DEFAULT_MEMORY_SIZE ) @@ -317,8 +382,18 @@ def schema_extra(schema: Dict[str, Any]): gpu: Annotated[Optional[GPUSpec], Field(description="The GPU requirements")] = None disk: Annotated[Optional[DiskSpec], Field(description="The disk resources")] = DEFAULT_DISK + # TODO: Remove in 0.20. Added for backward compatibility. + @root_validator + def _post_validate(cls, values): + cpu = values.get("cpu") + if isinstance(cpu, CPUSpec) and cpu.arch in [None, gpuhunt.CPUArchitecture.X86]: + values["cpu"] = cpu.count + return values + def pretty_format(self) -> str: - resources: Dict[str, Any] = dict(cpus=self.cpu, memory=self.memory) + # TODO: Remove in 0.20. Use self.cpu directly + cpu = parse_obj_as(CPUSpec, self.cpu) + resources: Dict[str, Any] = dict(cpu_arch=cpu.arch, cpus=cpu.count, memory=self.memory) if self.gpu: gpu = self.gpu resources.update( diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index afac2da19..8f39afcc2 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -439,9 +439,14 @@ class Run(CoreModel): @root_validator def _error(cls, values) -> Dict: + try: + termination_reason = values["termination_reason"] + jobs = values["jobs"] + except KeyError: + return values values["error"] = _get_run_error( - run_termination_reason=values["termination_reason"], - run_jobs=values["jobs"], + run_termination_reason=termination_reason, + run_jobs=jobs, ) return values diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index ecc1523e5..f1d164769 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -19,6 +19,7 @@ from dstack._internal.core.backends.base.compute import ( ComputeWithCreateInstanceSupport, ComputeWithPlacementGroupSupport, + GoArchType, generate_unique_placement_group_name, get_dstack_runner_binary_path, get_dstack_shim_binary_path, @@ -27,6 +28,7 @@ get_shim_pre_start_commands, ) from dstack._internal.core.backends.remote.provisioning import ( + detect_cpu_arch, get_host_info, get_paramiko_connection, get_shim_healthcheck, @@ -270,7 +272,7 @@ async def _add_remote(instance: InstanceModel) -> None: ) deploy_timeout = 20 * 60 # 20 minutes result = await asyncio.wait_for(future, timeout=deploy_timeout) - health, host_info = result + health, host_info, cpu_arch = result except (asyncio.TimeoutError, TimeoutError) as e: raise ProvisioningError(f"Deploy timeout: {e}") from e except Exception as e: @@ -291,7 +293,7 @@ async def _add_remote(instance: InstanceModel) -> None: instance.last_retry_at = get_current_datetime() return - instance_type = host_info_to_instance_type(host_info) + instance_type = host_info_to_instance_type(host_info, cpu_arch) instance_network = None internal_ip = None try: @@ -394,7 +396,7 @@ def _deploy_instance( pkeys: List[PKey], ssh_proxy_pkeys: Optional[list[PKey]], authorized_keys: List[str], -) -> Tuple[HealthStatus, Dict[str, Any]]: +) -> Tuple[HealthStatus, Dict[str, Any], GoArchType]: with get_paramiko_connection( remote_details.ssh_user, remote_details.host, @@ -405,13 +407,16 @@ def _deploy_instance( ) as client: logger.info(f"Connected to {remote_details.ssh_user} {remote_details.host}") + arch = detect_cpu_arch(client) + logger.info("%s: CPU arch is %s", remote_details.host, arch) + # Execute pre start commands - shim_pre_start_commands = get_shim_pre_start_commands() + shim_pre_start_commands = get_shim_pre_start_commands(arch=arch) run_pre_start_commands(client, shim_pre_start_commands, authorized_keys) logger.debug("The script for installing dstack has been executed") # Upload envs - shim_envs = get_shim_env(authorized_keys) + shim_envs = get_shim_env(authorized_keys, arch=arch) try: fleet_configuration_envs = remote_details.env.as_dict() except ValueError as e: @@ -446,7 +451,7 @@ def _deploy_instance( raise ProvisioningError("Cannot read HealthcheckResponse") from e health = runner_client.health_response_to_health_status(health_response) - return health, host_info + return health, host_info, arch async def _create_instance(session: AsyncSession, instance: InstanceModel) -> None: diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index ce1b76b40..cca73e25c 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -61,6 +61,7 @@ list_project_models, list_user_project_models, ) +from dstack._internal.server.services.resources import set_resources_defaults from dstack._internal.utils import random_names from dstack._internal.utils.logging import get_logger from dstack._internal.utils.ssh import pkey_from_str @@ -242,6 +243,7 @@ async def get_plan( spec=effective_spec, ) effective_spec = FleetSpec.parse_obj(effective_spec.dict()) + _validate_fleet_spec_and_set_defaults(spec) current_fleet: Optional[Fleet] = None current_fleet_id: Optional[uuid.UUID] = None if effective_spec.configuration.name is not None: @@ -346,7 +348,7 @@ async def create_fleet( spec=spec, ) spec = FleetSpec.parse_obj(spec.dict()) - _validate_fleet_spec(spec) + _validate_fleet_spec_and_set_defaults(spec) if spec.configuration.ssh_config is not None: _check_can_manage_ssh_fleets(user=user, project=project) @@ -646,7 +648,7 @@ def _remove_fleet_spec_sensitive_info(spec: FleetSpec): host.ssh_key = None -def _validate_fleet_spec(spec: FleetSpec): +def _validate_fleet_spec_and_set_defaults(spec: FleetSpec): if spec.configuration.name is not None: validate_dstack_resource_name(spec.configuration.name) if spec.configuration.ssh_config is None and spec.configuration.nodes is None: @@ -659,6 +661,8 @@ def _validate_fleet_spec(spec: FleetSpec): if isinstance(host, SSHHostParams) and host.ssh_key is not None: _validate_ssh_key(host.ssh_key) _validate_internal_ips(spec.configuration.ssh_config) + if spec.configuration.resources is not None: + set_resources_defaults(spec.configuration.resources) def _validate_all_ssh_params_specified(ssh_config: SSHParams): diff --git a/src/dstack/_internal/server/services/resources.py b/src/dstack/_internal/server/services/resources.py new file mode 100644 index 000000000..17cd80a66 --- /dev/null +++ b/src/dstack/_internal/server/services/resources.py @@ -0,0 +1,21 @@ +import gpuhunt +from pydantic import parse_obj_as + +from dstack._internal.core.models.resources import CPUSpec, ResourcesSpec + + +def set_resources_defaults(resources: ResourcesSpec) -> None: + # TODO: Remove in 0.20. Use resources.cpu directly + cpu = parse_obj_as(CPUSpec, resources.cpu) + if cpu.arch is None: + gpu = resources.gpu + if ( + gpu is not None + and gpu.vendor in [None, gpuhunt.AcceleratorVendor.NVIDIA] + and gpu.name + and any(map(gpuhunt.is_nvidia_superchip, gpu.name)) + ): + cpu.arch = gpuhunt.CPUArchitecture.ARM + else: + cpu.arch = gpuhunt.CPUArchitecture.X86 + resources.cpu = cpu diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 354151401..d2df9eda4 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -81,6 +81,7 @@ from dstack._internal.server.services.offers import get_offers_by_requirements from dstack._internal.server.services.plugins import apply_plugin_policies from dstack._internal.server.services.projects import list_project_models, list_user_project_models +from dstack._internal.server.services.resources import set_resources_defaults from dstack._internal.server.services.users import get_user_model_by_name from dstack._internal.utils.logging import get_logger from dstack._internal.utils.random_names import generate_name @@ -301,12 +302,14 @@ async def get_plan( project=project, run_name=effective_run_spec.run_name, ) - if ( - current_resource is not None - and not current_resource.status.is_finished() - and _can_update_run_spec(current_resource.run_spec, effective_run_spec) - ): - action = ApplyAction.UPDATE + if current_resource is not None: + # For backward compatibility (current_resource may has been submitted before + # some fields, e.g., CPUSpec.arch, were added) + set_resources_defaults(current_resource.run_spec.configuration.resources) + if not current_resource.status.is_finished() and _can_update_run_spec( + current_resource.run_spec, effective_run_spec + ): + action = ApplyAction.UPDATE jobs = await get_jobs_from_run_spec(effective_run_spec, replica_num=0) @@ -406,6 +409,10 @@ async def apply_plan( project=project, run_spec=run_spec, ) + + # For backward compatibility (current_resource may has been submitted before + # some fields, e.g., CPUSpec.arch, were added) + set_resources_defaults(current_resource.run_spec.configuration.resources) try: _check_can_update_run_spec(current_resource.run_spec, run_spec) except ServerClientError: @@ -414,6 +421,8 @@ async def apply_plan( raise ServerClientError("Cannot override active run. Stop the run first.") raise if not force: + if plan.current_resource is not None: + set_resources_defaults(plan.current_resource.run_spec.configuration.resources) if ( plan.current_resource is None or plan.current_resource.id != current_resource.id @@ -866,6 +875,7 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec): raise ServerClientError( f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_TTL_SECONDS}s" ) + set_resources_defaults(run_spec.configuration.resources) _UPDATABLE_SPEC_FIELDS = ["repo_code_hash", "configuration"] diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 32f268df0..03e95a57f 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -56,7 +56,7 @@ ) from dstack._internal.core.models.repos.base import RepoType from dstack._internal.core.models.repos.local import LocalRunRepoData -from dstack._internal.core.models.resources import Memory, Range, ResourcesSpec +from dstack._internal.core.models.resources import CPUSpec, Memory, Range, ResourcesSpec from dstack._internal.core.models.runs import ( JobProvisioningData, JobRuntimeData, @@ -556,7 +556,7 @@ async def create_instance( profile = Profile(name="test_name") if requirements is None: - requirements = Requirements(resources=ResourcesSpec(cpu=1)) + requirements = Requirements(resources=ResourcesSpec(cpu=CPUSpec.parse("1"))) if instance_configuration is None: instance_configuration = get_instance_configuration() @@ -666,20 +666,7 @@ def get_remote_connection_info( env: Optional[Union[Env, dict]] = None, ): if ssh_keys is None: - ssh_keys = [ - SSHKey( - public="ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIO6mJxVbNtm0zXgMLvByrhXJCmJRveSrJxLB5/OzcyCk", - private=""" - -----BEGIN OPENSSH PRIVATE KEY----- - b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW - QyNTUxOQAAACDupicVWzbZtM14DC7wcq4VyQpiUb3kqycSwefzs3MgpAAAAJCiWa5Volmu - VQAAAAtzc2gtZWQyNTUxOQAAACDupicVWzbZtM14DC7wcq4VyQpiUb3kqycSwefzs3MgpA - AAAEAncHi4AhS6XdMp5Gzd+IMse/4ekyQ54UngByf0Sp0uH+6mJxVbNtm0zXgMLvByrhXJ - CmJRveSrJxLB5/OzcyCkAAAACWRlZkBkZWZwYwECAwQ= - -----END OPENSSH PRIVATE KEY----- - """, - ) - ] + ssh_keys = [get_ssh_key()] if env is None: env = Env() elif isinstance(env, dict): @@ -693,6 +680,21 @@ def get_remote_connection_info( ) +def get_ssh_key() -> SSHKey: + return SSHKey( + public="ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIO6mJxVbNtm0zXgMLvByrhXJCmJRveSrJxLB5/OzcyCk", + private=""" + -----BEGIN OPENSSH PRIVATE KEY----- + b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW + QyNTUxOQAAACDupicVWzbZtM14DC7wcq4VyQpiUb3kqycSwefzs3MgpAAAAJCiWa5Volmu + VQAAAAtzc2gtZWQyNTUxOQAAACDupicVWzbZtM14DC7wcq4VyQpiUb3kqycSwefzs3MgpA + AAAEAncHi4AhS6XdMp5Gzd+IMse/4ekyQ54UngByf0Sp0uH+6mJxVbNtm0zXgMLvByrhXJ + CmJRveSrJxLB5/OzcyCkAAAACWRlZkBkZWZwYwECAwQ= + -----END OPENSSH PRIVATE KEY----- + """, + ) + + async def create_volume( session: AsyncSession, project: ProjectModel, diff --git a/src/dstack/_internal/utils/common.py b/src/dstack/_internal/utils/common.py index 7611062b8..33c05d3b4 100644 --- a/src/dstack/_internal/utils/common.py +++ b/src/dstack/_internal/utils/common.py @@ -1,4 +1,5 @@ import asyncio +import enum import itertools import re import time @@ -83,6 +84,8 @@ def pretty_date(time: datetime) -> str: def pretty_resources( + *, + cpu_arch: Optional[Any] = None, cpus: Optional[Any] = None, memory: Optional[Any] = None, gpu_count: Optional[Any] = None, @@ -110,7 +113,16 @@ def pretty_resources( """ parts = [] if cpus is not None: - parts.append(f"cpu={cpus}") + cpu_arch_lower: Optional[str] = None + if isinstance(cpu_arch, enum.Enum): + cpu_arch_lower = str(cpu_arch.value).lower() + elif isinstance(cpu_arch, str): + cpu_arch_lower = cpu_arch.lower() + if cpu_arch_lower == "arm": + cpu_arch_prefix = "arm:" + else: + cpu_arch_prefix = "" + parts.append(f"cpu={cpu_arch_prefix}{cpus}") if memory is not None: parts.append(f"mem={memory}") if disk_size: diff --git a/src/dstack/_internal/utils/json_schema.py b/src/dstack/_internal/utils/json_schema.py index 65ec5934b..73ee64317 100644 --- a/src/dstack/_internal/utils/json_schema.py +++ b/src/dstack/_internal/utils/json_schema.py @@ -1,6 +1,9 @@ def add_extra_schema_types(schema_property: dict, extra_types: list[dict]): if "allOf" in schema_property: - ref = schema_property.pop("allOf")[0] + refs = [schema_property.pop("allOf")[0]] + elif "anyOf" in schema_property: + refs = schema_property.pop("anyOf") else: - ref = {"type": schema_property.pop("type")} - schema_property["anyOf"] = [ref, *extra_types] + refs = [{"type": schema_property.pop("type")}] + refs.extend(extra_types) + schema_property["anyOf"] = refs diff --git a/src/dstack/api/__init__.py b/src/dstack/api/__init__.py index 0a4dce191..384c8be82 100644 --- a/src/dstack/api/__init__.py +++ b/src/dstack/api/__init__.py @@ -14,6 +14,7 @@ from dstack._internal.core.models.repos.remote import RemoteRepo from dstack._internal.core.models.repos.virtual import VirtualRepo from dstack._internal.core.models.resources import ComputeCapability, Memory, Range +from dstack._internal.core.models.resources import CPUSpec as CPU from dstack._internal.core.models.resources import DiskSpec as Disk from dstack._internal.core.models.resources import GPUSpec as GPU from dstack._internal.core.models.resources import ResourcesSpec as Resources diff --git a/src/dstack/api/server/_fleets.py b/src/dstack/api/server/_fleets.py index 3b8d4466c..f08f24b5b 100644 --- a/src/dstack/api/server/_fleets.py +++ b/src/dstack/api/server/_fleets.py @@ -3,6 +3,7 @@ from pydantic import parse_obj_as from dstack._internal.core.models.fleets import ApplyFleetPlanInput, Fleet, FleetPlan, FleetSpec +from dstack._internal.core.models.instances import Instance from dstack._internal.server.schemas.fleets import ( ApplyFleetPlanRequest, CreateFleetRequest, @@ -83,9 +84,24 @@ def _get_apply_plan_excludes(plan_input: ApplyFleetPlanInput) -> Dict: spec_excludes = _get_fleet_spec_excludes(plan_input.spec) if spec_excludes: apply_plan_excludes["spec"] = apply_plan_excludes + current_resource = plan_input.current_resource + if current_resource is not None: + current_resource_excludes = {} + apply_plan_excludes["current_resource"] = current_resource_excludes + if all(map(_should_exclude_instance_cpu_arch, current_resource.instances)): + current_resource_excludes["instances"] = { + "__all__": {"instance_type": {"resources": {"cpu_arch"}}} + } return {"plan": apply_plan_excludes} +def _should_exclude_instance_cpu_arch(instance: Instance) -> bool: + try: + return instance.instance_type.resources.cpu_arch is None + except AttributeError: + return True + + def _get_create_fleet_excludes(fleet_spec: FleetSpec) -> Dict: create_fleet_excludes = {} spec_excludes = _get_fleet_spec_excludes(fleet_spec) diff --git a/src/dstack/api/server/_runs.py b/src/dstack/api/server/_runs.py index 34494002c..0b863a32d 100644 --- a/src/dstack/api/server/_runs.py +++ b/src/dstack/api/server/_runs.py @@ -7,6 +7,7 @@ from dstack._internal.core.models.configurations import ServiceConfiguration from dstack._internal.core.models.runs import ( ApplyRunPlanInput, + JobSubmission, Run, RunPlan, RunSpec, @@ -96,13 +97,53 @@ def _get_apply_plan_excludes(plan: ApplyRunPlanInput) -> Optional[Dict]: run_spec_excludes = _get_run_spec_excludes(plan.run_spec) if run_spec_excludes is not None: apply_plan_excludes["run_spec"] = run_spec_excludes - if plan.current_resource is not None: - apply_plan_excludes["current_resource"] = { - "run_spec": _get_run_spec_excludes(plan.current_resource.run_spec) + current_resource = plan.current_resource + if current_resource is not None: + current_resource_excludes = {} + apply_plan_excludes["current_resource"] = current_resource_excludes + current_resource_excludes["run_spec"] = _get_run_spec_excludes(current_resource.run_spec) + job_submissions_excludes = {} + current_resource_excludes["jobs"] = { + "__all__": {"job_submissions": {"__all__": job_submissions_excludes}} } + job_submissions = [js for j in current_resource.jobs for js in j.job_submissions] + if all(map(_should_exclude_job_submission_jpd_cpu_arch, job_submissions)): + job_submissions_excludes["job_provisioning_data"] = { + "instance_type": {"resources": {"cpu_arch"}} + } + if all(map(_should_exclude_job_submission_jrd_cpu_arch, job_submissions)): + job_submissions_excludes["job_runtime_data"] = { + "offer": {"instance": {"resources": {"cpu_arch"}}} + } + latest_job_submission = current_resource.latest_job_submission + if latest_job_submission is not None: + latest_job_submission_excludes = {} + current_resource_excludes["latest_job_submission"] = latest_job_submission_excludes + if _should_exclude_job_submission_jpd_cpu_arch(latest_job_submission): + latest_job_submission_excludes["job_provisioning_data"] = { + "instance_type": {"resources": {"cpu_arch"}} + } + if _should_exclude_job_submission_jrd_cpu_arch(latest_job_submission): + latest_job_submission_excludes["job_runtime_data"] = { + "offer": {"instance": {"resources": {"cpu_arch"}}} + } return {"plan": apply_plan_excludes} +def _should_exclude_job_submission_jpd_cpu_arch(job_submission: JobSubmission) -> bool: + try: + return job_submission.job_provisioning_data.instance_type.resources.cpu_arch is None + except AttributeError: + return True + + +def _should_exclude_job_submission_jrd_cpu_arch(job_submission: JobSubmission) -> bool: + try: + return job_submission.job_runtime_data.offer.instance.resources.cpu_arch is None + except AttributeError: + return True + + def _get_get_plan_excludes(request: GetRunPlanRequest) -> Optional[Dict]: """ Excludes new fields when they are not set to keep diff --git a/src/tests/_internal/cli/services/configurators/test_run.py b/src/tests/_internal/cli/services/configurators/test_run.py index f2c8de9f5..fb8c7ac86 100644 --- a/src/tests/_internal/cli/services/configurators/test_run.py +++ b/src/tests/_internal/cli/services/configurators/test_run.py @@ -194,13 +194,17 @@ def test_two_vendors_including_amd_inferred_with_image(self, gpu_spec): def test_amd_vendor_declared_no_image(self): conf = self.prepare_conf(gpu_spec="AMD") - with pytest.raises(ConfigurationError, match=r"`image` is required"): + with pytest.raises( + ConfigurationError, match=r"`image` is required if `resources.gpu.vendor` is `amd`" + ): self.validate(conf) @pytest.mark.parametrize("gpu_spec", ["MI300X", "MI300x", "mi300x"]) def test_amd_vendor_inferred_no_image(self, gpu_spec): conf = self.prepare_conf(gpu_spec=gpu_spec) - with pytest.raises(ConfigurationError, match=r"`image` is required"): + with pytest.raises( + ConfigurationError, match=r"`image` is required if `resources.gpu.vendor` is `amd`" + ): self.validate(conf) @pytest.mark.parametrize( @@ -213,5 +217,56 @@ def test_amd_vendor_inferred_no_image(self, gpu_spec): ) def test_two_vendors_including_amd_inferred_no_image(self, gpu_spec): conf = self.prepare_conf(gpu_spec=gpu_spec) - with pytest.raises(ConfigurationError, match=r"`image` is required"): + with pytest.raises( + ConfigurationError, match=r"`image` is required if `resources.gpu.vendor` is `amd`" + ): self.validate(conf) + + +class TestValidateCPUArchAndImage: + def prepare_conf( + self, + *, + cpu_spec: str, + gpu_spec: Optional[str] = None, + image: Optional[str] = None, + ) -> BaseRunConfiguration: + conf_dict = { + "type": "none", + "resources": { + "cpu": cpu_spec, + }, + } + if image is not None: + conf_dict["image"] = image + if gpu_spec is not None: + conf_dict["resources"]["gpu"] = gpu_spec + return BaseRunConfiguration.parse_obj(conf_dict) + + def validate(self, conf: BaseRunConfiguration) -> None: + # validate_gpu_vendor_and_image sets GPU vendor if not set + BaseRunConfigurator(api_client=Mock()).validate_gpu_vendor_and_image(conf) + BaseRunConfigurator(api_client=Mock()).validate_cpu_arch_and_image(conf) + + @pytest.mark.parametrize("gpu_spec", [None, "GH200", "H100"]) + def test_explicit_arm_with_image(self, gpu_spec: Optional[str]): + conf = self.prepare_conf(cpu_spec="arm:1..", gpu_spec=gpu_spec, image="ubuntu") + self.validate(conf) + + def test_inferred_arm_with_image(self): + conf = self.prepare_conf(cpu_spec="1..", gpu_spec="GH200", image="ubuntu") + self.validate(conf) + + @pytest.mark.parametrize("cpu_spec", ["1..", "arm:1.."]) + def test_arm_no_image(self, cpu_spec: str): + conf = self.prepare_conf(cpu_spec=cpu_spec, gpu_spec="GH200") + with pytest.raises( + ConfigurationError, match=r"`image` is required if `resources.cpu.arch` is `arm`" + ): + self.validate(conf) + + @pytest.mark.parametrize("cpu_spec", ["1..", "x86:1.."]) + @pytest.mark.parametrize("image", [None, "ubuntu"]) + def test_x86(self, cpu_spec: str, image: Optional[str]): + conf = self.prepare_conf(cpu_spec=cpu_spec, gpu_spec="H100", image=image) + self.validate(conf) diff --git a/src/tests/_internal/core/backends/base/test_compute.py b/src/tests/_internal/core/backends/base/test_compute.py index 68f4564cf..8b50893c5 100644 --- a/src/tests/_internal/core/backends/base/test_compute.py +++ b/src/tests/_internal/core/backends/base/test_compute.py @@ -1,10 +1,14 @@ import re +from typing import Optional + +import pytest from dstack._internal.core.backends.base.compute import ( generate_unique_backend_name, generate_unique_gateway_instance_name, generate_unique_instance_name, generate_unique_volume_name, + normalize_arch, ) from dstack._internal.server.testing.common import ( get_gateway_compute_configuration, @@ -54,3 +58,22 @@ def test_truncates_long_names(self): def test_validates_project_name(self): name = generate_unique_backend_name("instance", "invalid_project!@", 60) assert re.match(r"^dstack-instance-[a-z0-9]{8}$", name) + + +class TestNormalizeArch: + @pytest.mark.parametrize("arch", [None, "", "X86", "x86_64", "AMD64"]) + def test_amd64(self, arch: Optional[str]): + assert normalize_arch(arch) == "amd64" + + @pytest.mark.parametrize("arch", ["arm", "ARM64", "AArch64"]) + def test_arm64(self, arch: str): + assert normalize_arch(arch) == "arm64" + + @pytest.mark.parametrize("arch", ["IA32", "i686", "ARM32", "aarch32"]) + def test_32bit_not_supported(self, arch: str): + with pytest.raises(ValueError, match="32-bit architectures are not supported"): + normalize_arch(arch) + + def test_unknown_arch(self): + with pytest.raises(ValueError, match="Unsupported architecture: MIPS"): + normalize_arch("MIPS") diff --git a/src/tests/_internal/core/models/test_resources.py b/src/tests/_internal/core/models/test_resources.py index ed537512f..5da32ec8f 100644 --- a/src/tests/_internal/core/models/test_resources.py +++ b/src/tests/_internal/core/models/test_resources.py @@ -1,8 +1,17 @@ +from typing import Optional + import pytest -from gpuhunt import AcceleratorVendor +from gpuhunt import AcceleratorVendor, CPUArchitecture from pydantic import ValidationError, parse_obj_as -from dstack._internal.core.models.resources import ComputeCapability, GPUSpec, Memory, Range +from dstack._internal.core.models.resources import ( + DEFAULT_CPU_COUNT, + ComputeCapability, + CPUSpec, + GPUSpec, + Memory, + Range, +) class TestMemory: @@ -100,6 +109,67 @@ def test_dict(self): ) +class TestCPU: + def test_integer(self): + assert parse_obj_as(CPUSpec, 1).dict() == {"arch": None, "count": {"min": 1, "max": 1}} + + @pytest.mark.parametrize( + ["value", "expected_arch", "expected_min", "expected_max"], + [ + ["1..2", None, 1, 2], + ["X86", CPUArchitecture.X86, DEFAULT_CPU_COUNT.min, DEFAULT_CPU_COUNT.max], + ["x86:2", CPUArchitecture.X86, 2, 2], + ["2..:ARM", CPUArchitecture.ARM, 2, None], + ], + ) + def test_valid_string( + self, + value: str, + expected_arch: Optional[CPUArchitecture], + expected_min: Optional[int], + expected_max: Optional[int], + ): + assert parse_obj_as(CPUSpec, value).dict() == { + "arch": expected_arch, + "count": {"min": expected_min, "max": expected_max}, + } + + @pytest.mark.parametrize( + ["value", "error"], + [ + ["arm:", "CPU spec contains empty token"], + ["2:foo", "Invalid CPU architecture"], + ["arm:x86", "CPU spec arch conflict"], + ["2:arm:2", "CPU spec count conflict"], + ], + ) + def test_invalid_string(self, value: str, error: str): + with pytest.raises(ValidationError, match=error): + parse_obj_as(CPUSpec, value) + + def test_range_object(self): + assert parse_obj_as(CPUSpec, Range[int](min=1, max=2)).dict() == { + "arch": None, + "count": {"min": 1, "max": 2}, + } + + def test_range_dict(self): + assert parse_obj_as(CPUSpec, {"min": 1, "max": 2}).dict() == { + "arch": None, + "count": {"min": 1, "max": 2}, + } + + def test_valid_dict(self): + assert parse_obj_as(CPUSpec, {"arch": "ARM", "count": {"min": 1, "max": 2}}).dict() == { + "arch": CPUArchitecture.ARM, + "count": {"min": 1, "max": 2}, + } + + def test_invalid_dict(self): + with pytest.raises(ValidationError): + parse_obj_as(CPUSpec, {"arch": "x86", "min": 1, "max": 2}) + + class TestGPU: def test_count(self): assert parse_obj_as(GPUSpec, "1") == parse_obj_as(GPUSpec, {"count": 1}) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index 7152b637a..7569d6f8d 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -881,7 +881,7 @@ def host_info(self) -> dict: @pytest.fixture def deploy_instance_mock(self, monkeypatch: pytest.MonkeyPatch, host_info: dict): - mock = Mock(return_value=(HealthStatus(healthy=True, reason="OK"), host_info)) + mock = Mock(return_value=(HealthStatus(healthy=True, reason="OK"), host_info, "amd64")) monkeypatch.setattr( "dstack._internal.server.background.tasks.process_instances._deploy_instance", mock ) diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 434b7fa39..39176efab 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -504,6 +504,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "instance_type": { "name": "ssh", "resources": { + "cpu_arch": None, "cpus": 2, "memory_mib": 8, "gpus": [], diff --git a/src/tests/_internal/server/services/test_fleets.py b/src/tests/_internal/server/services/test_fleets.py index a4df504a9..2c7ed4d80 100644 --- a/src/tests/_internal/server/services/test_fleets.py +++ b/src/tests/_internal/server/services/test_fleets.py @@ -23,6 +23,7 @@ create_project, create_user, get_fleet_spec, + get_ssh_key, ) @@ -36,7 +37,12 @@ def get_project_backends_mock(self, monkeypatch: pytest.MonkeyPatch) -> list[Bac def get_ssh_fleet_spec( self, name: Optional[str], hosts: list[Union[SSHHostParams, str]] ) -> FleetSpec: - ssh_config = SSHParams(hosts=hosts, network=None) + ssh_config = SSHParams( + hosts=hosts, + network=None, + user="ubuntu", + ssh_key=get_ssh_key(), + ) fleet_conf = FleetConfiguration(name=name, ssh_config=ssh_config) return get_fleet_spec(conf=fleet_conf)