Skip to content

Commit af6a35a

Browse files
peterschmidt85Andrey Cheptsov
andauthored
runpod: support on-demand CPU offers and provisioning (#3726)
* runpod: support on-demand CPU offers and provisioning - enable runpod-cpu flagged offers from gpuhunt - add deployCpuPod API path in the RunPod client - route gpu=0 jobs through CPU pod provisioning - enforce per-offer CPU disk limits during offer selection - add RunPod API client tests for CPU deployment mutation * runpod: restore edit_pod rationale comment - keep explicit context for why edit_pod is used - restore TODO describing future removal path --------- Co-authored-by: Andrey Cheptsov <andrey.cheptsov@github.com>
1 parent 5b9e8e3 commit af6a35a

File tree

4 files changed

+253
-35
lines changed

4 files changed

+253
-35
lines changed

src/dstack/_internal/core/backends/base/offers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"gcp-a4",
2727
"gcp-g4",
2828
"gcp-dws-calendar-mode",
29+
"runpod-cpu",
2930
"runpod-cluster",
3031
]
3132

src/dstack/_internal/core/backends/runpod/api_client.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,47 @@ def create_pod(
101101
data = resp.json()["data"]
102102
return data["podRentInterruptable"] if bid_per_gpu else data["podFindAndDeployOnDemand"]
103103

104+
def create_cpu_pod(
105+
self,
106+
name: str,
107+
image_name: str,
108+
instance_id: str,
109+
cloud_type: str,
110+
deploy_cost: float,
111+
start_ssh: bool = True,
112+
data_center_id: Optional[str] = None,
113+
container_disk_in_gb: Optional[int] = None,
114+
docker_args: Optional[str] = None,
115+
ports: Optional[str] = None,
116+
volume_mount_path: Optional[str] = None,
117+
env: Optional[Dict[str, Any]] = None,
118+
template_id: Optional[str] = None,
119+
network_volume_id: Optional[str] = None,
120+
container_registry_auth_id: Optional[str] = None,
121+
) -> Dict:
122+
resp = self._make_request(
123+
{
124+
"query": _generate_cpu_pod_deployment_mutation(
125+
name=name,
126+
image_name=image_name,
127+
instance_id=instance_id,
128+
cloud_type=cloud_type,
129+
deploy_cost=deploy_cost,
130+
start_ssh=start_ssh,
131+
data_center_id=data_center_id,
132+
container_disk_in_gb=container_disk_in_gb,
133+
docker_args=docker_args,
134+
ports=ports,
135+
volume_mount_path=volume_mount_path,
136+
env=env,
137+
template_id=template_id,
138+
network_volume_id=network_volume_id,
139+
container_registry_auth_id=container_registry_auth_id,
140+
)
141+
}
142+
)
143+
return resp.json()["data"]["deployCpuPod"]
144+
104145
def edit_pod(
105146
self,
106147
pod_id: str,
@@ -499,6 +540,77 @@ def _generate_pod_deployment_mutation(
499540
"""
500541

501542

543+
def _generate_cpu_pod_deployment_mutation(
544+
name: str,
545+
image_name: str,
546+
instance_id: str,
547+
cloud_type: str,
548+
deploy_cost: float,
549+
start_ssh: bool = True,
550+
data_center_id: Optional[str] = None,
551+
container_disk_in_gb: Optional[int] = None,
552+
docker_args: Optional[str] = None,
553+
ports: Optional[str] = None,
554+
volume_mount_path: Optional[str] = None,
555+
env: Optional[Dict[str, Any]] = None,
556+
template_id: Optional[str] = None,
557+
network_volume_id: Optional[str] = None,
558+
container_registry_auth_id: Optional[str] = None,
559+
) -> str:
560+
"""
561+
Generates a mutation to deploy CPU pod.
562+
"""
563+
input_fields = []
564+
input_fields.append(f'name: "{name}"')
565+
input_fields.append(f'imageName: "{image_name}"')
566+
input_fields.append(f'instanceId: "{instance_id}"')
567+
input_fields.append(f"cloudType: {cloud_type}")
568+
input_fields.append(f"deployCost: {deploy_cost}")
569+
570+
if start_ssh:
571+
input_fields.append("startSsh: true")
572+
if data_center_id is not None:
573+
input_fields.append(f'dataCenterId: "{data_center_id}"')
574+
if container_disk_in_gb is not None:
575+
input_fields.append(f"containerDiskInGb: {container_disk_in_gb}")
576+
if docker_args is not None:
577+
input_fields.append(f'dockerArgs: "{docker_args}"')
578+
if ports is not None:
579+
ports = ports.replace(" ", "")
580+
input_fields.append(f'ports: "{ports}"')
581+
if volume_mount_path is not None:
582+
input_fields.append(f'volumeMountPath: "{volume_mount_path}"')
583+
if env is not None:
584+
env_string = ", ".join(
585+
[f'{{ key: "{key}", value: "{value}" }}' for key, value in env.items()]
586+
)
587+
input_fields.append(f"env: [{env_string}]")
588+
if template_id is not None:
589+
input_fields.append(f'templateId: "{template_id}"')
590+
if network_volume_id is not None:
591+
input_fields.append(f'networkVolumeId: "{network_volume_id}"')
592+
if container_registry_auth_id is not None:
593+
input_fields.append(f'containerRegistryAuthId: "{container_registry_auth_id}"')
594+
595+
input_string = ", ".join(input_fields)
596+
return f"""
597+
mutation {{
598+
deployCpuPod(
599+
input: {{
600+
{input_string}
601+
}}
602+
) {{
603+
id
604+
lastStatusChange
605+
imageName
606+
machine {{
607+
podHostId
608+
}}
609+
}}
610+
}}
611+
"""
612+
613+
502614
def _generate_pod_terminate_mutation(pod_id: str) -> str:
503615
"""
504616
Generates a mutation to terminate a pod.

src/dstack/_internal/core/backends/runpod/compute.py

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,24 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability
8888
return offers
8989

9090
def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]:
91-
return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)]
91+
gpu_disk_modifier = get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)
92+
93+
def disk_modifier(
94+
offer: InstanceOfferWithAvailability,
95+
) -> Optional[InstanceOfferWithAvailability]:
96+
if len(offer.instance.resources.gpus) > 0:
97+
return gpu_disk_modifier(offer)
98+
99+
# For Runpod CPU offers, gpuhunt disk is the per-flavor max.
100+
# Choose requested disk within [1GB, max] or filter the offer out.
101+
cpu_max_disk_size_gb = Memory(offer.instance.resources.disk.size_mib / 1024)
102+
cpu_configurable_disk_size = Range[Memory](
103+
min=Memory.parse("1GB"),
104+
max=cpu_max_disk_size_gb,
105+
)
106+
return get_offers_disk_modifier(cpu_configurable_disk_size, requirements)(offer)
107+
108+
return [disk_modifier]
92109

93110
def get_offers_post_filter(
94111
self, requirements: Requirements
@@ -140,46 +157,63 @@ def run_job(
140157
job.job_spec.registry_auth
141158
)
142159
gpu_count = len(instance_offer.instance.resources.gpus)
143-
bid_per_gpu = None
144-
if instance_offer.instance.resources.spot and gpu_count:
145-
bid_per_gpu = instance_offer.price / gpu_count
146-
if _is_secure_cloud(instance_offer.region):
147-
cloud_type = "SECURE"
148-
data_center_id = instance_offer.region
149-
country_code = None
160+
if gpu_count == 0:
161+
if not _is_secure_cloud(instance_offer.region):
162+
raise ComputeError("Runpod CPU offers are only supported in secure cloud regions")
163+
resp = self.api_client.create_cpu_pod(
164+
name=pod_name,
165+
image_name=job.job_spec.image_name,
166+
instance_id=instance_offer.instance.name,
167+
cloud_type="SECURE",
168+
deploy_cost=instance_offer.price,
169+
data_center_id=instance_offer.region,
170+
container_disk_in_gb=disk_size,
171+
start_ssh=True,
172+
docker_args=_get_docker_args(authorized_keys),
173+
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
174+
network_volume_id=network_volume_id,
175+
volume_mount_path=volume_mount_path,
176+
env={"RUNPOD_POD_USER": "0"},
177+
)
150178
else:
151-
cloud_type = "COMMUNITY"
152-
data_center_id = None
153-
country_code = instance_offer.region
154-
155-
resp = self.api_client.create_pod(
156-
name=pod_name,
157-
image_name=job.job_spec.image_name,
158-
gpu_type_id=instance_offer.instance.name,
159-
cloud_type=cloud_type,
160-
data_center_id=data_center_id,
161-
country_code=country_code,
162-
gpu_count=gpu_count,
163-
container_disk_in_gb=disk_size,
164-
min_vcpu_count=instance_offer.instance.resources.cpus,
165-
min_memory_in_gb=memory_size,
166-
support_public_ip=True,
167-
docker_args=_get_docker_args(authorized_keys),
168-
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
169-
bid_per_gpu=bid_per_gpu,
170-
network_volume_id=network_volume_id,
171-
volume_mount_path=volume_mount_path,
172-
env={"RUNPOD_POD_USER": "0"},
173-
)
179+
bid_per_gpu = None
180+
if instance_offer.instance.resources.spot:
181+
bid_per_gpu = instance_offer.price / gpu_count
182+
if _is_secure_cloud(instance_offer.region):
183+
cloud_type = "SECURE"
184+
data_center_id = instance_offer.region
185+
country_code = None
186+
else:
187+
cloud_type = "COMMUNITY"
188+
data_center_id = None
189+
country_code = instance_offer.region
190+
191+
resp = self.api_client.create_pod(
192+
name=pod_name,
193+
image_name=job.job_spec.image_name,
194+
gpu_type_id=instance_offer.instance.name,
195+
cloud_type=cloud_type,
196+
data_center_id=data_center_id,
197+
country_code=country_code,
198+
gpu_count=gpu_count,
199+
container_disk_in_gb=disk_size,
200+
min_vcpu_count=instance_offer.instance.resources.cpus,
201+
min_memory_in_gb=memory_size,
202+
support_public_ip=True,
203+
docker_args=_get_docker_args(authorized_keys),
204+
ports=f"{DSTACK_RUNNER_SSH_PORT}/tcp",
205+
bid_per_gpu=bid_per_gpu,
206+
network_volume_id=network_volume_id,
207+
volume_mount_path=volume_mount_path,
208+
env={"RUNPOD_POD_USER": "0"},
209+
)
174210

175211
instance_id = resp["id"]
176212

177213
# Call edit_pod to pass container_registry_auth_id.
178214
# Expect a long time (~5m) for the pod to pick up the creds.
179-
# TODO: remove editPod once createPod supports docker's username and password
180-
# editPod is temporary solution to set container_registry_auth_id because createPod does not
181-
# support it currently. This will be removed once createPod supports container_registry_auth_id
182-
# or username and password
215+
# TODO: remove editPod once Runpod's create mutations support docker's username/password
216+
# (or a reliable containerRegistryAuthId at create time).
183217
if container_registry_auth_id is not None:
184218
instance_id = self.api_client.edit_pod(
185219
pod_id=instance_id,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from dstack._internal.core.backends.runpod.api_client import (
2+
RunpodApiClient,
3+
_generate_cpu_pod_deployment_mutation,
4+
)
5+
6+
7+
class _Response:
8+
def __init__(self, payload):
9+
self._payload = payload
10+
11+
def json(self):
12+
return self._payload
13+
14+
15+
def test_generate_cpu_pod_deployment_mutation():
16+
mutation = _generate_cpu_pod_deployment_mutation(
17+
name="cpu-test",
18+
image_name="python:3.11-slim",
19+
instance_id="cpu3g-2-8",
20+
cloud_type="SECURE",
21+
deploy_cost=0.08,
22+
start_ssh=True,
23+
data_center_id="AP-JP-1",
24+
container_disk_in_gb=5,
25+
docker_args='{"cmd":["echo hi"]}',
26+
ports="22/tcp, 8080/http",
27+
volume_mount_path="/workspace",
28+
env={"HELLO": "WORLD"},
29+
template_id="runpod-ubuntu",
30+
network_volume_id="vol-1",
31+
container_registry_auth_id="cred-1",
32+
)
33+
34+
assert "deployCpuPod" in mutation
35+
assert 'name: "cpu-test"' in mutation
36+
assert 'imageName: "python:3.11-slim"' in mutation
37+
assert 'instanceId: "cpu3g-2-8"' in mutation
38+
assert "cloudType: SECURE" in mutation
39+
assert "deployCost: 0.08" in mutation
40+
assert "startSsh: true" in mutation
41+
assert 'dataCenterId: "AP-JP-1"' in mutation
42+
assert "containerDiskInGb: 5" in mutation
43+
assert 'ports: "22/tcp,8080/http"' in mutation
44+
assert 'volumeMountPath: "/workspace"' in mutation
45+
assert 'env: [{ key: "HELLO", value: "WORLD" }]' in mutation
46+
assert 'templateId: "runpod-ubuntu"' in mutation
47+
assert 'networkVolumeId: "vol-1"' in mutation
48+
assert 'containerRegistryAuthId: "cred-1"' in mutation
49+
50+
51+
def test_create_cpu_pod_uses_deploy_cpu_pod(monkeypatch):
52+
client = RunpodApiClient(api_key="test")
53+
query = {}
54+
55+
def fake_make_request(data):
56+
query["value"] = data["query"]
57+
return _Response({"data": {"deployCpuPod": {"id": "cpu-pod-1"}}})
58+
59+
monkeypatch.setattr(client, "_make_request", fake_make_request)
60+
61+
response = client.create_cpu_pod(
62+
name="cpu-test",
63+
image_name="python:3.11-slim",
64+
instance_id="cpu3g-2-8",
65+
cloud_type="SECURE",
66+
deploy_cost=0.08,
67+
)
68+
69+
assert response["id"] == "cpu-pod-1"
70+
assert "deployCpuPod" in query["value"]
71+
assert "podFindAndDeployOnDemand" not in query["value"]

0 commit comments

Comments
 (0)