Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions src/codeflare_sdk/ray/cluster/build_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
This sub-module exists primarily to be used internally by the Cluster object
(in the cluster sub-module) for RayCluster generation.
"""

from typing import List, Union, Tuple, Dict
from ...common import _kube_api_error_handling
from ...common.kubernetes_cluster import get_api_client, config_check
Expand Down Expand Up @@ -54,6 +55,24 @@
from codeflare_sdk.common.utils import constants

FORBIDDEN_CUSTOM_RESOURCE_TYPES = ["GPU", "CPU", "memory"]


def _cpu_limit_to_num_cpus(cpu_limit: Union[int, str]) -> str:
"""Convert a Kubernetes CPU limit to an integer string for Ray's num-cpus.

Ray auto-detects host CPUs when num-cpus is not set, which can vastly
overcount in containerised environments (e.g. KinD on a beefy laptop).
Pinning num-cpus to the container CPU limit keeps the autoscaler's view
of available resources accurate.
"""
if isinstance(cpu_limit, int):
return str(max(cpu_limit, 0))
s = str(cpu_limit).strip()
if s.endswith("m"):
return str(max(int(float(s[:-1]) / 1000), 1))
return str(max(int(float(s)), 1))


VOLUME_MOUNTS = [
V1VolumeMount(
mount_path="/etc/pki/tls/certs/odh-trusted-ca-bundle.crt",
Expand Down Expand Up @@ -156,6 +175,7 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"):
"rayStartParams": {
"dashboard-host": "0.0.0.0",
"block": "true",
"num-cpus": _cpu_limit_to_num_cpus(cluster.config.head_cpu_limits),
"num-gpus": str(head_gpu_count),
"resources": head_resources,
},
Expand All @@ -180,6 +200,9 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"):
"groupName": f"small-group-{cluster.config.name}",
"rayStartParams": {
"block": "true",
"num-cpus": _cpu_limit_to_num_cpus(
cluster.config.worker_cpu_limits
),
"num-gpus": str(worker_gpu_count),
"resources": worker_resources,
},
Expand Down Expand Up @@ -209,9 +232,9 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"):
gcs_ft_options = {"redisAddress": cluster.config.redis_address}

if cluster.config.external_storage_namespace:
gcs_ft_options[
"externalStorageNamespace"
] = cluster.config.external_storage_namespace
gcs_ft_options["externalStorageNamespace"] = (
cluster.config.external_storage_namespace
)

if cluster.config.redis_password_secret:
gcs_ft_options["redisPassword"] = {
Expand Down Expand Up @@ -456,28 +479,18 @@ def head_worker_extended_resources_from_cluster(
resource_type = cluster.config.extended_resource_mapping[k]
if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES:
continue
head_worker_extended_resources[0][
resource_type
] = cluster.config.head_extended_resource_requests[
k
] + head_worker_extended_resources[
0
].get(
resource_type, 0
head_worker_extended_resources[0][resource_type] = (
cluster.config.head_extended_resource_requests[k]
+ head_worker_extended_resources[0].get(resource_type, 0)
)

for k in cluster.config.worker_extended_resource_requests.keys():
resource_type = cluster.config.extended_resource_mapping[k]
if resource_type in FORBIDDEN_CUSTOM_RESOURCE_TYPES:
continue
head_worker_extended_resources[1][
resource_type
] = cluster.config.worker_extended_resource_requests[
k
] + head_worker_extended_resources[
1
].get(
resource_type, 0
head_worker_extended_resources[1][resource_type] = (
cluster.config.worker_extended_resource_requests[k]
+ head_worker_extended_resources[1].get(resource_type, 0)
)
return head_worker_extended_resources

Expand Down
20 changes: 19 additions & 1 deletion src/codeflare_sdk/ray/cluster/test_build_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
# limitations under the License.
from collections import namedtuple
import sys
from .build_ray_cluster import gen_names, update_image, build_ray_cluster
from .build_ray_cluster import (
gen_names,
update_image,
build_ray_cluster,
_cpu_limit_to_num_cpus,
)
import uuid
from codeflare_sdk.ray.cluster.cluster import ClusterConfiguration, Cluster

Expand Down Expand Up @@ -66,6 +71,19 @@ def test_update_image_without_supported_python_version(mocker):
assert image is None


def test_cpu_limit_to_num_cpus():
assert _cpu_limit_to_num_cpus(2) == "2"
assert _cpu_limit_to_num_cpus(0) == "0"
assert _cpu_limit_to_num_cpus(1) == "1"
assert _cpu_limit_to_num_cpus("500m") == "1"
assert _cpu_limit_to_num_cpus("1000m") == "1"
assert _cpu_limit_to_num_cpus("2000m") == "2"
assert _cpu_limit_to_num_cpus("250m") == "1"
assert _cpu_limit_to_num_cpus("1") == "1"
assert _cpu_limit_to_num_cpus("4") == "4"
assert _cpu_limit_to_num_cpus("8") == "8"


def test_build_ray_cluster_with_gcs_ft(mocker):
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
mocker.patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object")
Expand Down
31 changes: 31 additions & 0 deletions tests/e2e/autoscaling_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Workload used by E2E autoscaling tests.

This script is submitted via Ray Job submission to generate enough queued work
to trigger Ray in-tree autoscaling on a KinD cluster.
"""

import os
import time

import ray


def main():
# Expect to run inside the Ray cluster environment (dashboard job submission)
ray.init(address="auto")

concurrency = int(os.getenv("AUTOSCALING_TASKS", "4"))
sleep_s = int(os.getenv("AUTOSCALING_TASK_SLEEP_S", "120"))

@ray.remote(num_cpus=1)
def burn_cpu():
time.sleep(sleep_s)
return True

futures = [burn_cpu.remote() for _ in range(concurrency)]
ray.get(futures)


if __name__ == "__main__":
main()
60 changes: 60 additions & 0 deletions tests/e2e/autoscaling_raycluster_sdk_kind_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest

from codeflare_sdk import Cluster, ClusterConfiguration

from support import *


@pytest.mark.kind
class TestRayClusterAutoscalingSDKKind:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)

def test_autoscaling_scale_up_and_down_kind(self):
self.setup_method()
create_namespace(self)

cluster_name = f"autoscale-{random_choice()}"
ray_image = get_ray_image()

cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
enable_autoscaling=True,
min_workers=1,
max_workers=2,
head_cpu_requests=1,
head_cpu_limits=1,
worker_cpu_requests=1,
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
image=ray_image,
write_to_file=True,
verify_tls=False,
)
)

cluster.apply()
cluster.wait_ready(timeout=600, dashboard_check=False)

# Verify initial state: 1 worker (min_workers)
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=300)

# Trigger scale-up: 3 tasks @ 1 CPU each exceeds head(1) + worker(1) = 2 CPUs
load_proc = run_autoscaling_load_in_head_pod(self, cluster_name, tasks=3)

# Verify scale-up while load is still running
wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=600)

# Wait for load to finish, then verify scale-down back to min_workers
load_proc.wait(timeout=600)
assert load_proc.returncode == 0, "Load script failed"

wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600)

cluster.down()
65 changes: 65 additions & 0 deletions tests/e2e/autoscaling_raycluster_sdk_oauth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pytest

from codeflare_sdk import Cluster, ClusterConfiguration

from support import *


@pytest.mark.openshift
@pytest.mark.tier1
class TestRayClusterAutoscalingSDKOauth:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
if hasattr(self, "auth_instance"):
cleanup_authentication(self.auth_instance)
delete_namespace(self)

@pytest.mark.timeout(1800)
def test_autoscaling_scale_up_and_down_openshift_oauth(self):
self.setup_method()

create_namespace(self)

ray_image = get_ray_image()
resources = get_platform_appropriate_resources()
self.auth_instance = authenticate_for_tests()

cluster_name = f"autoscale-{random_choice()}"

cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
enable_autoscaling=True,
min_workers=1,
max_workers=2,
image=ray_image,
write_to_file=True,
verify_tls=False,
**resources,
)
)

cluster.apply()
wait_ready_with_stuck_detection(cluster, timeout=900, dashboard_check=False)

# Verify initial state: 1 worker (min_workers)
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600)

# Trigger scale-up via load script in head pod (async)
load_proc = run_autoscaling_load_in_head_pod(
self, cluster_name, tasks=2, sleep_s=180
)

# Verify scale-up while load is still running
wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=900)

# Wait for load to finish, then verify scale-down back to min_workers
load_proc.wait(timeout=600)
assert load_proc.returncode == 0, "Load script failed"

wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=900)

cluster.down()
62 changes: 62 additions & 0 deletions tests/e2e/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import random
import string
import subprocess
import time
import warnings
from time import sleep

from codeflare_sdk import get_cluster
from kubernetes import client, config
from kubernetes.client import V1Toleration
Expand Down Expand Up @@ -344,6 +346,66 @@ def run_kubectl_command(args):
return None


def wait_for_worker_count(self, cluster_name, predicate, timeout_s=600):
"""Wait until the number of worker pods for cluster_name satisfies predicate."""
label = f"ray.io/node-type=worker,ray.io/cluster={cluster_name}"
start = time.time()
last = None
while time.time() - start < timeout_s:
pods = self.api_instance.list_namespaced_pod(
self.namespace, label_selector=label
)
last = len(pods.items or [])
if predicate(last):
return last
sleep(10)
raise TimeoutError(
f"Timed out waiting for worker count. cluster={cluster_name} last={last}"
)


def run_autoscaling_load_in_head_pod(self, cluster_name, tasks=2, sleep_s=120):
"""
Copy autoscaling_load.py into the head pod and run it asynchronously.
Returns the Popen handle so the caller can check for scale-up while
the workload is still running (avoids the race where blocking execution
lets workers scale back down before the assertion runs).
"""
label = f"ray.io/node-type=head,ray.io/cluster={cluster_name}"
pods = self.api_instance.list_namespaced_pod(self.namespace, label_selector=label)
if not pods.items:
raise RuntimeError(f"No head pod found for cluster {cluster_name}")
head_pod = pods.items[0].metadata.name

load_script = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "autoscaling_load.py"
)

subprocess.check_call(
[
"kubectl",
"cp",
load_script,
f"{self.namespace}/{head_pod}:/tmp/autoscaling_load.py",
]
)

return subprocess.Popen(
[
"kubectl",
"exec",
"-n",
self.namespace,
head_pod,
"--",
"bash",
"-lc",
f"AUTOSCALING_TASKS={tasks} AUTOSCALING_TASK_SLEEP_S={sleep_s} "
f"python /tmp/autoscaling_load.py",
]
)


def create_cluster_queue(self, cluster_queue, flavor):
cluster_queue_json = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ spec:
rayStartParams:
block: 'true'
dashboard-host: 0.0.0.0
num-cpus: '2'
num-gpus: '0'
resources: '"{}"'
serviceType: ClusterIP
Expand Down Expand Up @@ -92,6 +93,7 @@ spec:
minReplicas: 2
rayStartParams:
block: 'true'
num-cpus: '4'
num-gpus: '0'
resources: '"{}"'
replicas: 2
Expand Down
Loading
Loading