Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dstack/_internal/core/backends/base/offers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"gcp-a4",
"gcp-g4",
"gcp-dws-calendar-mode",
"runpod-cpu",
"runpod-cluster",
]

Expand Down
112 changes: 112 additions & 0 deletions src/dstack/_internal/core/backends/runpod/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,47 @@ def create_pod(
data = resp.json()["data"]
return data["podRentInterruptable"] if bid_per_gpu else data["podFindAndDeployOnDemand"]

def create_cpu_pod(
self,
name: str,
image_name: str,
instance_id: str,
cloud_type: str,
deploy_cost: float,
start_ssh: bool = True,
data_center_id: Optional[str] = None,
container_disk_in_gb: Optional[int] = None,
docker_args: Optional[str] = None,
ports: Optional[str] = None,
volume_mount_path: Optional[str] = None,
env: Optional[Dict[str, Any]] = None,
template_id: Optional[str] = None,
network_volume_id: Optional[str] = None,
container_registry_auth_id: Optional[str] = None,
) -> Dict:
resp = self._make_request(
{
"query": _generate_cpu_pod_deployment_mutation(
name=name,
image_name=image_name,
instance_id=instance_id,
cloud_type=cloud_type,
deploy_cost=deploy_cost,
start_ssh=start_ssh,
data_center_id=data_center_id,
container_disk_in_gb=container_disk_in_gb,
docker_args=docker_args,
ports=ports,
volume_mount_path=volume_mount_path,
env=env,
template_id=template_id,
network_volume_id=network_volume_id,
container_registry_auth_id=container_registry_auth_id,
)
}
)
return resp.json()["data"]["deployCpuPod"]

def edit_pod(
self,
pod_id: str,
Expand Down Expand Up @@ -499,6 +540,77 @@ def _generate_pod_deployment_mutation(
"""


def _generate_cpu_pod_deployment_mutation(
name: str,
image_name: str,
instance_id: str,
cloud_type: str,
deploy_cost: float,
start_ssh: bool = True,
data_center_id: Optional[str] = None,
container_disk_in_gb: Optional[int] = None,
docker_args: Optional[str] = None,
ports: Optional[str] = None,
volume_mount_path: Optional[str] = None,
env: Optional[Dict[str, Any]] = None,
template_id: Optional[str] = None,
network_volume_id: Optional[str] = None,
container_registry_auth_id: Optional[str] = None,
) -> str:
"""
Generates a mutation to deploy CPU pod.
"""
input_fields = []
input_fields.append(f'name: "{name}"')
input_fields.append(f'imageName: "{image_name}"')
input_fields.append(f'instanceId: "{instance_id}"')
input_fields.append(f"cloudType: {cloud_type}")
input_fields.append(f"deployCost: {deploy_cost}")

if start_ssh:
input_fields.append("startSsh: true")
if data_center_id is not None:
input_fields.append(f'dataCenterId: "{data_center_id}"')
if container_disk_in_gb is not None:
input_fields.append(f"containerDiskInGb: {container_disk_in_gb}")
if docker_args is not None:
input_fields.append(f'dockerArgs: "{docker_args}"')
if ports is not None:
ports = ports.replace(" ", "")
input_fields.append(f'ports: "{ports}"')
if volume_mount_path is not None:
input_fields.append(f'volumeMountPath: "{volume_mount_path}"')
if env is not None:
env_string = ", ".join(
[f'{{ key: "{key}", value: "{value}" }}' for key, value in env.items()]
)
input_fields.append(f"env: [{env_string}]")
if template_id is not None:
input_fields.append(f'templateId: "{template_id}"')
if network_volume_id is not None:
input_fields.append(f'networkVolumeId: "{network_volume_id}"')
if container_registry_auth_id is not None:
input_fields.append(f'containerRegistryAuthId: "{container_registry_auth_id}"')

input_string = ", ".join(input_fields)
return f"""
mutation {{
deployCpuPod(
input: {{
{input_string}
}}
) {{
id
lastStatusChange
imageName
machine {{
podHostId
}}
}}
}}
"""


def _generate_pod_terminate_mutation(pod_id: str) -> str:
"""
Generates a mutation to terminate a pod.
Expand Down
104 changes: 68 additions & 36 deletions src/dstack/_internal/core/backends/runpod/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,24 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability
return offers

def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]:
return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)]
gpu_disk_modifier = get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)

def disk_modifier(
offer: InstanceOfferWithAvailability,
) -> Optional[InstanceOfferWithAvailability]:
if len(offer.instance.resources.gpus) > 0:
return gpu_disk_modifier(offer)

# For Runpod CPU offers, gpuhunt disk is the per-flavor max.
# Choose requested disk within [1GB, max] or filter the offer out.
cpu_max_disk_size_gb = Memory(offer.instance.resources.disk.size_mib / 1024)
cpu_configurable_disk_size = Range[Memory](
min=Memory.parse("1GB"),
max=cpu_max_disk_size_gb,
)
return get_offers_disk_modifier(cpu_configurable_disk_size, requirements)(offer)

return [disk_modifier]

def get_offers_post_filter(
self, requirements: Requirements
Expand Down Expand Up @@ -140,46 +157,61 @@ def run_job(
job.job_spec.registry_auth
)
gpu_count = len(instance_offer.instance.resources.gpus)
bid_per_gpu = None
if instance_offer.instance.resources.spot and gpu_count:
bid_per_gpu = instance_offer.price / gpu_count
if _is_secure_cloud(instance_offer.region):
cloud_type = "SECURE"
data_center_id = instance_offer.region
country_code = None
if gpu_count == 0:
if not _is_secure_cloud(instance_offer.region):
raise ComputeError("Runpod CPU offers are only supported in secure cloud regions")
resp = self.api_client.create_cpu_pod(
name=pod_name,
image_name=job.job_spec.image_name,
instance_id=instance_offer.instance.name,
cloud_type="SECURE",
deploy_cost=instance_offer.price,
data_center_id=instance_offer.region,
container_disk_in_gb=disk_size,
start_ssh=True,
docker_args=_get_docker_args(authorized_keys),
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
network_volume_id=network_volume_id,
volume_mount_path=volume_mount_path,
env={"RUNPOD_POD_USER": "0"},
)
else:
cloud_type = "COMMUNITY"
data_center_id = None
country_code = instance_offer.region

resp = self.api_client.create_pod(
name=pod_name,
image_name=job.job_spec.image_name,
gpu_type_id=instance_offer.instance.name,
cloud_type=cloud_type,
data_center_id=data_center_id,
country_code=country_code,
gpu_count=gpu_count,
container_disk_in_gb=disk_size,
min_vcpu_count=instance_offer.instance.resources.cpus,
min_memory_in_gb=memory_size,
support_public_ip=True,
docker_args=_get_docker_args(authorized_keys),
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
bid_per_gpu=bid_per_gpu,
network_volume_id=network_volume_id,
volume_mount_path=volume_mount_path,
env={"RUNPOD_POD_USER": "0"},
)
bid_per_gpu = None
if instance_offer.instance.resources.spot:
bid_per_gpu = instance_offer.price / gpu_count
if _is_secure_cloud(instance_offer.region):
cloud_type = "SECURE"
data_center_id = instance_offer.region
country_code = None
else:
cloud_type = "COMMUNITY"
data_center_id = None
country_code = instance_offer.region

resp = self.api_client.create_pod(
name=pod_name,
image_name=job.job_spec.image_name,
gpu_type_id=instance_offer.instance.name,
cloud_type=cloud_type,
data_center_id=data_center_id,
country_code=country_code,
gpu_count=gpu_count,
container_disk_in_gb=disk_size,
min_vcpu_count=instance_offer.instance.resources.cpus,
min_memory_in_gb=memory_size,
support_public_ip=True,
docker_args=_get_docker_args(authorized_keys),
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
bid_per_gpu=bid_per_gpu,
network_volume_id=network_volume_id,
volume_mount_path=volume_mount_path,
env={"RUNPOD_POD_USER": "0"},
)

instance_id = resp["id"]

# Call edit_pod to pass container_registry_auth_id.
# Keep the same post-create registry-auth flow for both GPU and CPU paths.
# Expect a long time (~5m) for the pod to pick up the creds.
# TODO: remove editPod once createPod supports docker's username and password
# editPod is temporary solution to set container_registry_auth_id because createPod does not
# support it currently. This will be removed once createPod supports container_registry_auth_id
# or username and password
Comment thread
peterschmidt85 marked this conversation as resolved.
if container_registry_auth_id is not None:
instance_id = self.api_client.edit_pod(
pod_id=instance_id,
Expand Down
71 changes: 71 additions & 0 deletions src/tests/_internal/core/backends/runpod/test_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from dstack._internal.core.backends.runpod.api_client import (
RunpodApiClient,
_generate_cpu_pod_deployment_mutation,
)


class _Response:
def __init__(self, payload):
self._payload = payload

def json(self):
return self._payload


def test_generate_cpu_pod_deployment_mutation():
mutation = _generate_cpu_pod_deployment_mutation(
name="cpu-test",
image_name="python:3.11-slim",
instance_id="cpu3g-2-8",
cloud_type="SECURE",
deploy_cost=0.08,
start_ssh=True,
data_center_id="AP-JP-1",
container_disk_in_gb=5,
docker_args='{"cmd":["echo hi"]}',
ports="22/tcp, 8080/http",
volume_mount_path="/workspace",
env={"HELLO": "WORLD"},
template_id="runpod-ubuntu",
network_volume_id="vol-1",
container_registry_auth_id="cred-1",
)

assert "deployCpuPod" in mutation
assert 'name: "cpu-test"' in mutation
assert 'imageName: "python:3.11-slim"' in mutation
assert 'instanceId: "cpu3g-2-8"' in mutation
assert "cloudType: SECURE" in mutation
assert "deployCost: 0.08" in mutation
assert "startSsh: true" in mutation
assert 'dataCenterId: "AP-JP-1"' in mutation
assert "containerDiskInGb: 5" in mutation
assert 'ports: "22/tcp,8080/http"' in mutation
assert 'volumeMountPath: "/workspace"' in mutation
assert 'env: [{ key: "HELLO", value: "WORLD" }]' in mutation
assert 'templateId: "runpod-ubuntu"' in mutation
assert 'networkVolumeId: "vol-1"' in mutation
assert 'containerRegistryAuthId: "cred-1"' in mutation


def test_create_cpu_pod_uses_deploy_cpu_pod(monkeypatch):
client = RunpodApiClient(api_key="test")
query = {}

def fake_make_request(data):
query["value"] = data["query"]
return _Response({"data": {"deployCpuPod": {"id": "cpu-pod-1"}}})

monkeypatch.setattr(client, "_make_request", fake_make_request)

response = client.create_cpu_pod(
name="cpu-test",
image_name="python:3.11-slim",
instance_id="cpu3g-2-8",
cloud_type="SECURE",
deploy_cost=0.08,
)

assert response["id"] == "cpu-pod-1"
assert "deployCpuPod" in query["value"]
assert "podFindAndDeployOnDemand" not in query["value"]
Loading