Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dstack/_internal/core/backends/base/offers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"gcp-a4",
"gcp-g4",
"gcp-dws-calendar-mode",
"runpod-cpu",
"runpod-cluster",
]

Expand Down
112 changes: 112 additions & 0 deletions src/dstack/_internal/core/backends/runpod/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,47 @@ def create_pod(
data = resp.json()["data"]
return data["podRentInterruptable"] if bid_per_gpu else data["podFindAndDeployOnDemand"]

def create_cpu_pod(
self,
name: str,
image_name: str,
instance_id: str,
cloud_type: str,
deploy_cost: float,
start_ssh: bool = True,
data_center_id: Optional[str] = None,
container_disk_in_gb: Optional[int] = None,
docker_args: Optional[str] = None,
ports: Optional[str] = None,
volume_mount_path: Optional[str] = None,
env: Optional[Dict[str, Any]] = None,
template_id: Optional[str] = None,
network_volume_id: Optional[str] = None,
container_registry_auth_id: Optional[str] = None,
) -> Dict:
resp = self._make_request(
{
"query": _generate_cpu_pod_deployment_mutation(
name=name,
image_name=image_name,
instance_id=instance_id,
cloud_type=cloud_type,
deploy_cost=deploy_cost,
start_ssh=start_ssh,
data_center_id=data_center_id,
container_disk_in_gb=container_disk_in_gb,
docker_args=docker_args,
ports=ports,
volume_mount_path=volume_mount_path,
env=env,
template_id=template_id,
network_volume_id=network_volume_id,
container_registry_auth_id=container_registry_auth_id,
)
}
)
return resp.json()["data"]["deployCpuPod"]

def edit_pod(
self,
pod_id: str,
Expand Down Expand Up @@ -499,6 +540,77 @@ def _generate_pod_deployment_mutation(
"""


def _generate_cpu_pod_deployment_mutation(
name: str,
image_name: str,
instance_id: str,
cloud_type: str,
deploy_cost: float,
start_ssh: bool = True,
data_center_id: Optional[str] = None,
container_disk_in_gb: Optional[int] = None,
docker_args: Optional[str] = None,
ports: Optional[str] = None,
volume_mount_path: Optional[str] = None,
env: Optional[Dict[str, Any]] = None,
template_id: Optional[str] = None,
network_volume_id: Optional[str] = None,
container_registry_auth_id: Optional[str] = None,
) -> str:
"""
Generates a mutation to deploy CPU pod.
"""
input_fields = []
input_fields.append(f'name: "{name}"')
input_fields.append(f'imageName: "{image_name}"')
input_fields.append(f'instanceId: "{instance_id}"')
input_fields.append(f"cloudType: {cloud_type}")
input_fields.append(f"deployCost: {deploy_cost}")

if start_ssh:
input_fields.append("startSsh: true")
if data_center_id is not None:
input_fields.append(f'dataCenterId: "{data_center_id}"')
if container_disk_in_gb is not None:
input_fields.append(f"containerDiskInGb: {container_disk_in_gb}")
if docker_args is not None:
input_fields.append(f'dockerArgs: "{docker_args}"')
if ports is not None:
ports = ports.replace(" ", "")
input_fields.append(f'ports: "{ports}"')
if volume_mount_path is not None:
input_fields.append(f'volumeMountPath: "{volume_mount_path}"')
if env is not None:
env_string = ", ".join(
[f'{{ key: "{key}", value: "{value}" }}' for key, value in env.items()]
)
input_fields.append(f"env: [{env_string}]")
if template_id is not None:
input_fields.append(f'templateId: "{template_id}"')
if network_volume_id is not None:
input_fields.append(f'networkVolumeId: "{network_volume_id}"')
if container_registry_auth_id is not None:
input_fields.append(f'containerRegistryAuthId: "{container_registry_auth_id}"')

input_string = ", ".join(input_fields)
return f"""
mutation {{
deployCpuPod(
input: {{
{input_string}
}}
) {{
id
lastStatusChange
imageName
machine {{
podHostId
}}
}}
}}
"""


def _generate_pod_terminate_mutation(pod_id: str) -> str:
"""
Generates a mutation to terminate a pod.
Expand Down
104 changes: 69 additions & 35 deletions src/dstack/_internal/core/backends/runpod/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,24 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability
return offers

def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]:
return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)]
gpu_disk_modifier = get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)

def disk_modifier(
offer: InstanceOfferWithAvailability,
) -> Optional[InstanceOfferWithAvailability]:
if len(offer.instance.resources.gpus) > 0:
return gpu_disk_modifier(offer)

# For Runpod CPU offers, gpuhunt disk is the per-flavor max.
# Choose requested disk within [1GB, max] or filter the offer out.
cpu_max_disk_size_gb = Memory(offer.instance.resources.disk.size_mib / 1024)
cpu_configurable_disk_size = Range[Memory](
min=Memory.parse("1GB"),
max=cpu_max_disk_size_gb,
)
return get_offers_disk_modifier(cpu_configurable_disk_size, requirements)(offer)

return [disk_modifier]

def get_offers_post_filter(
self, requirements: Requirements
Expand Down Expand Up @@ -140,46 +157,63 @@ def run_job(
job.job_spec.registry_auth
)
gpu_count = len(instance_offer.instance.resources.gpus)
bid_per_gpu = None
if instance_offer.instance.resources.spot and gpu_count:
bid_per_gpu = instance_offer.price / gpu_count
if _is_secure_cloud(instance_offer.region):
cloud_type = "SECURE"
data_center_id = instance_offer.region
country_code = None
if gpu_count == 0:
if not _is_secure_cloud(instance_offer.region):
raise ComputeError("Runpod CPU offers are only supported in secure cloud regions")
resp = self.api_client.create_cpu_pod(
name=pod_name,
image_name=job.job_spec.image_name,
instance_id=instance_offer.instance.name,
cloud_type="SECURE",
deploy_cost=instance_offer.price,
data_center_id=instance_offer.region,
container_disk_in_gb=disk_size,
start_ssh=True,
docker_args=_get_docker_args(authorized_keys),
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
network_volume_id=network_volume_id,
volume_mount_path=volume_mount_path,
env={"RUNPOD_POD_USER": "0"},
)
else:
cloud_type = "COMMUNITY"
data_center_id = None
country_code = instance_offer.region

resp = self.api_client.create_pod(
name=pod_name,
image_name=job.job_spec.image_name,
gpu_type_id=instance_offer.instance.name,
cloud_type=cloud_type,
data_center_id=data_center_id,
country_code=country_code,
gpu_count=gpu_count,
container_disk_in_gb=disk_size,
min_vcpu_count=instance_offer.instance.resources.cpus,
min_memory_in_gb=memory_size,
support_public_ip=True,
docker_args=_get_docker_args(authorized_keys),
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
bid_per_gpu=bid_per_gpu,
network_volume_id=network_volume_id,
volume_mount_path=volume_mount_path,
env={"RUNPOD_POD_USER": "0"},
)
bid_per_gpu = None
if instance_offer.instance.resources.spot:
bid_per_gpu = instance_offer.price / gpu_count
if _is_secure_cloud(instance_offer.region):
cloud_type = "SECURE"
data_center_id = instance_offer.region
country_code = None
else:
cloud_type = "COMMUNITY"
data_center_id = None
country_code = instance_offer.region

resp = self.api_client.create_pod(
name=pod_name,
image_name=job.job_spec.image_name,
gpu_type_id=instance_offer.instance.name,
cloud_type=cloud_type,
data_center_id=data_center_id,
country_code=country_code,
gpu_count=gpu_count,
container_disk_in_gb=disk_size,
min_vcpu_count=instance_offer.instance.resources.cpus,
min_memory_in_gb=memory_size,
support_public_ip=True,
docker_args=_get_docker_args(authorized_keys),
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
bid_per_gpu=bid_per_gpu,
network_volume_id=network_volume_id,
volume_mount_path=volume_mount_path,
env={"RUNPOD_POD_USER": "0"},
)

instance_id = resp["id"]

# Call edit_pod to pass container_registry_auth_id.
# Expect a long time (~5m) for the pod to pick up the creds.
# TODO: remove editPod once createPod supports docker's username and password
# editPod is temporary solution to set container_registry_auth_id because createPod does not
# support it currently. This will be removed once createPod supports container_registry_auth_id
# or username and password
# TODO: remove editPod once Runpod's create mutations support docker's username/password
# (or a reliable containerRegistryAuthId at create time).
if container_registry_auth_id is not None:
instance_id = self.api_client.edit_pod(
pod_id=instance_id,
Expand Down
71 changes: 71 additions & 0 deletions src/tests/_internal/core/backends/runpod/test_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from dstack._internal.core.backends.runpod.api_client import (
RunpodApiClient,
_generate_cpu_pod_deployment_mutation,
)


class _Response:
def __init__(self, payload):
self._payload = payload

def json(self):
return self._payload


def test_generate_cpu_pod_deployment_mutation():
mutation = _generate_cpu_pod_deployment_mutation(
name="cpu-test",
image_name="python:3.11-slim",
instance_id="cpu3g-2-8",
cloud_type="SECURE",
deploy_cost=0.08,
start_ssh=True,
data_center_id="AP-JP-1",
container_disk_in_gb=5,
docker_args='{"cmd":["echo hi"]}',
ports="22/tcp, 8080/http",
volume_mount_path="/workspace",
env={"HELLO": "WORLD"},
template_id="runpod-ubuntu",
network_volume_id="vol-1",
container_registry_auth_id="cred-1",
)

assert "deployCpuPod" in mutation
assert 'name: "cpu-test"' in mutation
assert 'imageName: "python:3.11-slim"' in mutation
assert 'instanceId: "cpu3g-2-8"' in mutation
assert "cloudType: SECURE" in mutation
assert "deployCost: 0.08" in mutation
assert "startSsh: true" in mutation
assert 'dataCenterId: "AP-JP-1"' in mutation
assert "containerDiskInGb: 5" in mutation
assert 'ports: "22/tcp,8080/http"' in mutation
assert 'volumeMountPath: "/workspace"' in mutation
assert 'env: [{ key: "HELLO", value: "WORLD" }]' in mutation
assert 'templateId: "runpod-ubuntu"' in mutation
assert 'networkVolumeId: "vol-1"' in mutation
assert 'containerRegistryAuthId: "cred-1"' in mutation


def test_create_cpu_pod_uses_deploy_cpu_pod(monkeypatch):
client = RunpodApiClient(api_key="test")
query = {}

def fake_make_request(data):
query["value"] = data["query"]
return _Response({"data": {"deployCpuPod": {"id": "cpu-pod-1"}}})

monkeypatch.setattr(client, "_make_request", fake_make_request)

response = client.create_cpu_pod(
name="cpu-test",
image_name="python:3.11-slim",
instance_id="cpu3g-2-8",
cloud_type="SECURE",
deploy_cost=0.08,
)

assert response["id"] == "cpu-pod-1"
assert "deployCpuPod" in query["value"]
assert "podFindAndDeployOnDemand" not in query["value"]
Loading