diff --git a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py index e5526756a..e6aec3bd0 100644 --- a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py @@ -16,6 +16,7 @@ This sub-module exists primarily to be used internally by the Cluster object (in the cluster sub-module) for RayCluster generation. """ + from typing import List, Union, Tuple, Dict from ...common import _kube_api_error_handling from ...common.kubernetes_cluster import get_api_client, config_check @@ -54,6 +55,24 @@ from codeflare_sdk.common.utils import constants FORBIDDEN_CUSTOM_RESOURCE_TYPES = ["GPU", "CPU", "memory"] + + +def _cpu_limit_to_num_cpus(cpu_limit: Union[int, str]) -> str: + """Convert a Kubernetes CPU limit to an integer string for Ray's num-cpus. + + Ray auto-detects host CPUs when num-cpus is not set, which can vastly + overcount in containerised environments (e.g. KinD on a beefy laptop). + Pinning num-cpus to the container CPU limit keeps the autoscaler's view + of available resources accurate. + """ + if isinstance(cpu_limit, int): + return str(max(cpu_limit, 0)) + s = str(cpu_limit).strip() + if s.endswith("m"): + return str(max(int(float(s[:-1]) / 1000), 1)) + return str(max(int(float(s)), 1)) + + VOLUME_MOUNTS = [ V1VolumeMount( mount_path="/etc/pki/tls/certs/odh-trusted-ca-bundle.crt", @@ -156,6 +175,7 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"): "rayStartParams": { "dashboard-host": "0.0.0.0", "block": "true", + "num-cpus": _cpu_limit_to_num_cpus(cluster.config.head_cpu_limits), "num-gpus": str(head_gpu_count), "resources": head_resources, }, @@ -180,6 +200,9 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"): "groupName": f"small-group-{cluster.config.name}", "rayStartParams": { "block": "true", + "num-cpus": _cpu_limit_to_num_cpus( + cluster.config.worker_cpu_limits + ), "num-gpus": str(worker_gpu_count), "resources": worker_resources, }, @@ -209,9 +232,9 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"): gcs_ft_options = {"redisAddress": cluster.config.redis_address} if cluster.config.external_storage_namespace: - gcs_ft_options[ - "externalStorageNamespace" - ] = cluster.config.external_storage_namespace + gcs_ft_options["externalStorageNamespace"] = ( + cluster.config.external_storage_namespace + ) if cluster.config.redis_password_secret: gcs_ft_options["redisPassword"] = { @@ -456,28 +479,18 @@ def head_worker_extended_resources_from_cluster( resource_type = cluster.config.extended_resource_mapping[k] if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES: continue - head_worker_extended_resources[0][ - resource_type - ] = cluster.config.head_extended_resource_requests[ - k - ] + head_worker_extended_resources[ - 0 - ].get( - resource_type, 0 + head_worker_extended_resources[0][resource_type] = ( + cluster.config.head_extended_resource_requests[k] + + head_worker_extended_resources[0].get(resource_type, 0) ) for k in cluster.config.worker_extended_resource_requests.keys(): resource_type = cluster.config.extended_resource_mapping[k] if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES: continue - head_worker_extended_resources[1][ - resource_type - ] = cluster.config.worker_extended_resource_requests[ - k - ] + head_worker_extended_resources[ - 1 - ].get( - resource_type, 0 + head_worker_extended_resources[1][resource_type] = ( + cluster.config.worker_extended_resource_requests[k] + + head_worker_extended_resources[1].get(resource_type, 0) ) return head_worker_extended_resources diff --git a/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py index cb1175c63..ec567f359 100644 --- a/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py @@ -13,7 +13,12 @@ # limitations under the License. from collections import namedtuple import sys -from .build_ray_cluster import gen_names, update_image, build_ray_cluster +from .build_ray_cluster import ( + gen_names, + update_image, + build_ray_cluster, + _cpu_limit_to_num_cpus, +) import uuid from codeflare_sdk.ray.cluster.cluster import ClusterConfiguration, Cluster @@ -66,6 +71,19 @@ def test_update_image_without_supported_python_version(mocker): assert image is None +def test_cpu_limit_to_num_cpus(): + assert _cpu_limit_to_num_cpus(2) == "2" + assert _cpu_limit_to_num_cpus(0) == "0" + assert _cpu_limit_to_num_cpus(1) == "1" + assert _cpu_limit_to_num_cpus("500m") == "1" + assert _cpu_limit_to_num_cpus("1000m") == "1" + assert _cpu_limit_to_num_cpus("2000m") == "2" + assert _cpu_limit_to_num_cpus("250m") == "1" + assert _cpu_limit_to_num_cpus("1") == "1" + assert _cpu_limit_to_num_cpus("4") == "4" + assert _cpu_limit_to_num_cpus("8") == "8" + + def test_build_ray_cluster_with_gcs_ft(mocker): mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") diff --git a/tests/e2e/autoscaling_load.py b/tests/e2e/autoscaling_load.py new file mode 100644 index 000000000..6e4504248 --- /dev/null +++ b/tests/e2e/autoscaling_load.py @@ -0,0 +1,31 @@ +""" +Workload used by E2E autoscaling tests. + +This script is submitted via Ray Job submission to generate enough queued work +to trigger Ray in-tree autoscaling on a KinD cluster. +""" + +import os +import time + +import ray + + +def main(): + # Expect to run inside the Ray cluster environment (dashboard job submission) + ray.init(address="auto") + + concurrency = int(os.getenv("AUTOSCALING_TASKS", "4")) + sleep_s = int(os.getenv("AUTOSCALING_TASK_SLEEP_S", "120")) + + @ray.remote(num_cpus=1) + def burn_cpu(): + time.sleep(sleep_s) + return True + + futures = [burn_cpu.remote() for _ in range(concurrency)] + ray.get(futures) + + +if __name__ == "__main__": + main() diff --git a/tests/e2e/autoscaling_raycluster_sdk_kind_test.py b/tests/e2e/autoscaling_raycluster_sdk_kind_test.py new file mode 100644 index 000000000..70c236d9e --- /dev/null +++ b/tests/e2e/autoscaling_raycluster_sdk_kind_test.py @@ -0,0 +1,60 @@ +import pytest + +from codeflare_sdk import Cluster, ClusterConfiguration + +from support import * + + +@pytest.mark.kind +class TestRayClusterAutoscalingSDKKind: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + + def test_autoscaling_scale_up_and_down_kind(self): + self.setup_method() + create_namespace(self) + + cluster_name = f"autoscale-{random_choice()}" + ray_image = get_ray_image() + + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + enable_autoscaling=True, + min_workers=1, + max_workers=2, + head_cpu_requests=1, + head_cpu_limits=1, + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=4, + image=ray_image, + write_to_file=True, + verify_tls=False, + ) + ) + + cluster.apply() + cluster.wait_ready(timeout=600, dashboard_check=False) + + # Verify initial state: 1 worker (min_workers) + wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=300) + + # Trigger scale-up: 3 tasks @ 1 CPU each exceeds head(1) + worker(1) = 2 CPUs + load_proc = run_autoscaling_load_in_head_pod(self, cluster_name, tasks=3) + + # Verify scale-up while load is still running + wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=600) + + # Wait for load to finish, then verify scale-down back to min_workers + load_proc.wait(timeout=600) + assert load_proc.returncode == 0, "Load script failed" + + wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600) + + cluster.down() diff --git a/tests/e2e/autoscaling_raycluster_sdk_oauth_test.py b/tests/e2e/autoscaling_raycluster_sdk_oauth_test.py new file mode 100644 index 000000000..f5c4f8cac --- /dev/null +++ b/tests/e2e/autoscaling_raycluster_sdk_oauth_test.py @@ -0,0 +1,65 @@ +import pytest + +from codeflare_sdk import Cluster, ClusterConfiguration + +from support import * + + +@pytest.mark.openshift +@pytest.mark.tier1 +class TestRayClusterAutoscalingSDKOauth: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + if hasattr(self, "auth_instance"): + cleanup_authentication(self.auth_instance) + delete_namespace(self) + + @pytest.mark.timeout(1800) + def test_autoscaling_scale_up_and_down_openshift_oauth(self): + self.setup_method() + + create_namespace(self) + + ray_image = get_ray_image() + resources = get_platform_appropriate_resources() + self.auth_instance = authenticate_for_tests() + + cluster_name = f"autoscale-{random_choice()}" + + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + enable_autoscaling=True, + min_workers=1, + max_workers=2, + image=ray_image, + write_to_file=True, + verify_tls=False, + **resources, + ) + ) + + cluster.apply() + wait_ready_with_stuck_detection(cluster, timeout=900, dashboard_check=False) + + # Verify initial state: 1 worker (min_workers) + wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600) + + # Trigger scale-up via load script in head pod (async) + load_proc = run_autoscaling_load_in_head_pod( + self, cluster_name, tasks=2, sleep_s=180 + ) + + # Verify scale-up while load is still running + wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=900) + + # Wait for load to finish, then verify scale-down back to min_workers + load_proc.wait(timeout=600) + assert load_proc.returncode == 0, "Load script failed" + + wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=900) + + cluster.down() diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 46f6c375b..a0642076a 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -2,8 +2,10 @@ import random import string import subprocess +import time import warnings from time import sleep + from codeflare_sdk import get_cluster from kubernetes import client, config from kubernetes.client import V1Toleration @@ -344,6 +346,66 @@ def run_kubectl_command(args): return None +def wait_for_worker_count(self, cluster_name, predicate, timeout_s=600): + """Wait until the number of worker pods for cluster_name satisfies predicate.""" + label = f"ray.io/node-type=worker,ray.io/cluster={cluster_name}" + start = time.time() + last = None + while time.time() - start < timeout_s: + pods = self.api_instance.list_namespaced_pod( + self.namespace, label_selector=label + ) + last = len(pods.items or []) + if predicate(last): + return last + sleep(10) + raise TimeoutError( + f"Timed out waiting for worker count. cluster={cluster_name} last={last}" + ) + + +def run_autoscaling_load_in_head_pod(self, cluster_name, tasks=2, sleep_s=120): + """ + Copy autoscaling_load.py into the head pod and run it asynchronously. + Returns the Popen handle so the caller can check for scale-up while + the workload is still running (avoids the race where blocking execution + lets workers scale back down before the assertion runs). + """ + label = f"ray.io/node-type=head,ray.io/cluster={cluster_name}" + pods = self.api_instance.list_namespaced_pod(self.namespace, label_selector=label) + if not pods.items: + raise RuntimeError(f"No head pod found for cluster {cluster_name}") + head_pod = pods.items[0].metadata.name + + load_script = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "autoscaling_load.py" + ) + + subprocess.check_call( + [ + "kubectl", + "cp", + load_script, + f"{self.namespace}/{head_pod}:/tmp/autoscaling_load.py", + ] + ) + + return subprocess.Popen( + [ + "kubectl", + "exec", + "-n", + self.namespace, + head_pod, + "--", + "bash", + "-lc", + f"AUTOSCALING_TASKS={tasks} AUTOSCALING_TASK_SLEEP_S={sleep_s} " + f"python /tmp/autoscaling_load.py", + ] + ) + + def create_cluster_queue(self, cluster_queue, flavor): cluster_queue_json = { "apiVersion": "kueue.x-k8s.io/v1beta1", diff --git a/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml b/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml index 66dfbdd7f..189720346 100644 --- a/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml +++ b/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml @@ -24,6 +24,7 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 + num-cpus: '2' num-gpus: '0' resources: '"{}"' serviceType: ClusterIP @@ -92,6 +93,7 @@ spec: minReplicas: 2 rayStartParams: block: 'true' + num-cpus: '4' num-gpus: '0' resources: '"{}"' replicas: 2 diff --git a/tests/test_cluster_yamls/ray/default-ray-cluster.yaml b/tests/test_cluster_yamls/ray/default-ray-cluster.yaml index 024fc5ffd..51c1de267 100644 --- a/tests/test_cluster_yamls/ray/default-ray-cluster.yaml +++ b/tests/test_cluster_yamls/ray/default-ray-cluster.yaml @@ -23,6 +23,7 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 + num-cpus: '2' num-gpus: '0' resources: '"{}"' serviceType: ClusterIP @@ -91,6 +92,7 @@ spec: minReplicas: 1 rayStartParams: block: 'true' + num-cpus: '1' num-gpus: '0' resources: '"{}"' replicas: 1 diff --git a/tests/test_cluster_yamls/ray/unit-test-all-params.yaml b/tests/test_cluster_yamls/ray/unit-test-all-params.yaml index 036b946ff..03952672f 100644 --- a/tests/test_cluster_yamls/ray/unit-test-all-params.yaml +++ b/tests/test_cluster_yamls/ray/unit-test-all-params.yaml @@ -30,6 +30,7 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 + num-cpus: '8' num-gpus: '1' resources: '"{\"TPU\": 2}"' serviceType: ClusterIP @@ -137,6 +138,7 @@ spec: minReplicas: 10 rayStartParams: block: 'true' + num-cpus: '8' num-gpus: '1' resources: '"{}"' replicas: 10