Skip to content

Commit 9374ae8

Browse files
Kubernetes: report cpu/gpu/memory from parent node (#4413)
* Report cpu/gpu/memory from parent node * Upgrade kind to fix bug * swap * Add resources test * fix runtime -- bytes reporting * comment * int * cmt * fix * fix * fmt * fix * update * Update test_cli.py * Fix parse * rm return * fix * Fix test * Fix * Update test_cli.py * fix * fix --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent a139943 commit 9374ae8

6 files changed

Lines changed: 69 additions & 1 deletion

File tree

codalab/worker/runtime/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,6 @@ def kill(self, container_id: str):
7575

7676
def remove(self, container_id: str):
7777
raise NotImplementedError
78+
79+
def get_node_availability_stats(self) -> dict:
80+
raise NotImplementedError

codalab/worker/runtime/kubernetes_runtime.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from codalab.common import BundleRuntime
1313
from codalab.worker.runtime import Runtime
1414

15+
import os
1516
import urllib3
1617

1718
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -232,3 +233,15 @@ def remove(self, pod_name: str):
232233
f'Exception when calling Kubernetes api->delete_namespaced_pod...: {e}'
233234
)
234235
raise e
236+
237+
def get_node_availability_stats(self) -> dict:
238+
node_name = os.getenv("CODALAB_KUBERNETES_NODE_NAME")
239+
node = self.k8_api.read_node(name=node_name)
240+
allocatable = node.status.allocatable
241+
242+
return {
243+
'cpus': int(allocatable.get('cpu')),
244+
'gpus': int(allocatable.get('nvidia.com/gpu') or '0'),
245+
'memory_bytes': int(utils.parse_quantity(allocatable.get('memory'))),
246+
'free_disk_bytes': int(utils.parse_quantity(allocatable.get('ephemeral-storage'))),
247+
}

codalab/worker/worker.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,17 @@ def checkin(self):
500500
'is_terminating': self.terminate or self.terminate_and_restage,
501501
'preemptible': self.preemptible,
502502
}
503+
if self.bundle_runtime.name == BundleRuntime.KUBERNETES.value:
504+
stats = self.bundle_runtime.get_node_availability_stats()
505+
request = dict(
506+
request,
507+
**{
508+
'cpus': stats['cpus'],
509+
'gpus': stats['gpus'],
510+
'memory_bytes': stats['memory_bytes'],
511+
'free_disk_bytes': stats['free_disk_bytes'],
512+
},
513+
)
503514
try:
504515
response = self.bundle_service.checkin(self.id, request)
505516
logger.info('Connected! Successful check in!')

codalab/worker_manager/kubernetes_worker_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ def start_worker_job(self) -> None:
156156
'env': [
157157
{'name': 'CODALAB_USERNAME', 'value': self.codalab_username},
158158
{'name': 'CODALAB_PASSWORD', 'value': self.codalab_password},
159+
{
160+
'name': 'CODALAB_KUBERNETES_NODE_NAME',
161+
'valueFrom': {'fieldRef': {'fieldPath': 'spec.nodeName'}},
162+
},
159163
],
160164
'resources': {'limits': limits, 'requests': requests},
161165
'volumeMounts': [

scripts/local-k8s/kind-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ networking:
66
apiServerPort: 6443
77
nodes:
88
- role: control-plane
9-
image: kindest/node:v1.21.10@sha256:84709f09756ba4f863769bdcabe5edafc2ada72d3c8c44d6515fc581b66b029c
9+
image: kindest/node:v1.22.15@sha256:7d9708c4b0873f0fe2e171e2b1b7f45ae89482617778c1c875f1053d4cef2e41

tests/cli/test_cli.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2950,6 +2950,10 @@ def test_unicode(ctx):
29502950

29512951
@TestModule.register('workers')
29522952
def test_workers(ctx):
2953+
# Spin up a run in case a worker isn't already running, so it can be started by the worker manager.
2954+
uuid = _run_command([cl, 'run', 'echo'])
2955+
wait(uuid)
2956+
29532957
result = _run_command([cl, 'workers'])
29542958
lines = result.split("\n")
29552959

@@ -2983,6 +2987,39 @@ def test_workers(ctx):
29832987
worker_info = lines[2].split()
29842988
assert len(worker_info) >= 10
29852989

2990+
# Make sure that when we run a worker that uses resources, the worker's available resources are decremented accordingly.
2991+
cpus_original, gpus_original, free_memory_original, free_disk_original = worker_info[1:5]
2992+
cpus_available, cpus_total = (int(i) for i in cpus_original.split("/"))
2993+
gpus_available, gpus_total = (int(i) for i in gpus_original.split("/"))
2994+
uuid = _run_command(
2995+
[
2996+
cl,
2997+
'run',
2998+
'sleep 100',
2999+
'--request-cpus',
3000+
str(cpus_available),
3001+
'--request-gpus',
3002+
str(cpus_available),
3003+
],
3004+
request_memory="100m",
3005+
request_disk="100m",
3006+
)
3007+
wait_until_state(uuid, State.RUNNING)
3008+
result = _run_command([cl, 'workers'])
3009+
lines = result.split("\n")
3010+
worker_info = lines[2].split()
3011+
cpus, gpus, free_memory, free_disk = worker_info[1:5]
3012+
check_equals(f'0/{cpus_total}', cpus)
3013+
check_equals(f'0/{gpus_total}', gpus)
3014+
3015+
wait(uuid)
3016+
result = _run_command([cl, 'workers'])
3017+
lines = result.split("\n")
3018+
worker_info = lines[2].split()
3019+
cpus, gpus, free_memory, free_disk = worker_info[1:5]
3020+
check_equals(cpus_original, cpus)
3021+
check_equals(gpus_original, gpus)
3022+
29863023

29873024
@TestModule.register('sharing_workers')
29883025
def test_sharing_workers(ctx):

0 commit comments

Comments
 (0)