Skip to content

Commit 0d1ffab

Browse files
committed
test(RHOAIENG-57445): added E2E test for RayCluster Autoscaling
1 parent 112357e commit 0d1ffab

4 files changed

Lines changed: 210 additions & 0 deletions

File tree

tests/e2e/autoscaling_load.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""
2+
Workload used by E2E autoscaling tests.
3+
4+
This script is submitted via Ray Job submission to generate enough queued work
5+
to trigger Ray in-tree autoscaling on a KinD cluster.
6+
"""
7+
8+
import os
9+
import time
10+
11+
import ray
12+
13+
14+
def main():
15+
# Expect to run inside the Ray cluster environment (dashboard job submission)
16+
ray.init(address="auto")
17+
18+
concurrency = int(os.getenv("AUTOSCALING_TASKS", "4"))
19+
sleep_s = int(os.getenv("AUTOSCALING_TASK_SLEEP_S", "120"))
20+
21+
@ray.remote(num_cpus=1)
22+
def burn_cpu():
23+
time.sleep(sleep_s)
24+
return True
25+
26+
futures = [burn_cpu.remote() for _ in range(concurrency)]
27+
ray.get(futures)
28+
29+
30+
if __name__ == "__main__":
31+
main()
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from time import sleep
2+
3+
import pytest
4+
5+
from codeflare_sdk import Cluster, ClusterConfiguration
6+
7+
from support import *
8+
9+
10+
@pytest.mark.kind
11+
class TestRayClusterAutoscalingSDKKind:
12+
def setup_method(self):
13+
initialize_kubernetes_client(self)
14+
15+
def teardown_method(self):
16+
delete_namespace(self)
17+
18+
def test_autoscaling_scale_up_and_down_kind(self):
19+
self.setup_method()
20+
create_namespace(self)
21+
22+
cluster_name = f"autoscale-{random_choice()}"
23+
ray_image = get_ray_image()
24+
25+
cluster = Cluster(
26+
ClusterConfiguration(
27+
name=cluster_name,
28+
namespace=self.namespace,
29+
enable_autoscaling=True,
30+
min_workers=1,
31+
max_workers=4,
32+
head_cpu_requests="500m",
33+
head_cpu_limits="500m",
34+
worker_cpu_requests="500m",
35+
worker_cpu_limits=1,
36+
worker_memory_requests=1,
37+
worker_memory_limits=4,
38+
image=ray_image,
39+
write_to_file=True,
40+
verify_tls=False,
41+
)
42+
)
43+
44+
cluster.apply()
45+
cluster.wait_ready(timeout=600, dashboard_check=False)
46+
47+
# Verify initial state: 1 worker (min_workers)
48+
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=300)
49+
50+
# Trigger scale-up via load script in head pod
51+
run_autoscaling_load_in_head_pod(self, cluster_name)
52+
53+
# Verify scale-up
54+
wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=600)
55+
56+
# Wait for idle timeout + verify scale-down back to min_workers
57+
sleep(90)
58+
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600)
59+
60+
cluster.down()
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from time import sleep
2+
3+
import pytest
4+
5+
from codeflare_sdk import Cluster, ClusterConfiguration
6+
7+
from support import *
8+
9+
10+
@pytest.mark.openshift
11+
@pytest.mark.tier1
12+
class TestRayClusterAutoscalingSDKOauth:
13+
def setup_method(self):
14+
initialize_kubernetes_client(self)
15+
16+
def teardown_method(self):
17+
if hasattr(self, "auth_instance"):
18+
cleanup_authentication(self.auth_instance)
19+
delete_namespace(self)
20+
21+
@pytest.mark.timeout(1800)
22+
def test_autoscaling_scale_up_and_down_openshift_oauth(self):
23+
self.setup_method()
24+
25+
create_namespace(self)
26+
27+
ray_image = get_ray_image()
28+
resources = get_platform_appropriate_resources()
29+
self.auth_instance = authenticate_for_tests()
30+
31+
cluster_name = f"autoscale-{random_choice()}"
32+
33+
cluster = Cluster(
34+
ClusterConfiguration(
35+
name=cluster_name,
36+
namespace=self.namespace,
37+
enable_autoscaling=True,
38+
min_workers=1,
39+
max_workers=4,
40+
image=ray_image,
41+
write_to_file=True,
42+
verify_tls=False,
43+
**resources,
44+
)
45+
)
46+
47+
cluster.apply()
48+
wait_ready_with_stuck_detection(cluster, timeout=900, dashboard_check=False)
49+
50+
# Verify initial state: 1 worker (min_workers)
51+
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600)
52+
53+
# Trigger scale-up via load script in head pod
54+
run_autoscaling_load_in_head_pod(self, cluster_name, tasks=4, sleep_s=180)
55+
56+
# Verify scale-up
57+
wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=900)
58+
59+
# Wait for idle timeout + verify scale-down back to min_workers
60+
sleep(120)
61+
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=900)
62+
63+
cluster.down()

tests/e2e/support.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import random
33
import string
44
import subprocess
5+
import time
56
import warnings
67
from time import sleep
8+
79
from codeflare_sdk import get_cluster
810
from kubernetes import client, config
911
from kubernetes.client import V1Toleration
@@ -344,6 +346,60 @@ def run_kubectl_command(args):
344346
return None
345347

346348

349+
def wait_for_worker_count(self, cluster_name, predicate, timeout_s=600):
350+
"""Wait until the number of worker pods for cluster_name satisfies predicate."""
351+
label = f"ray.io/node-type=worker,ray.io/cluster={cluster_name}"
352+
start = time.time()
353+
last = None
354+
while time.time() - start < timeout_s:
355+
pods = self.api_instance.list_namespaced_pod(
356+
self.namespace, label_selector=label
357+
)
358+
last = len(pods.items or [])
359+
if predicate(last):
360+
return last
361+
sleep(10)
362+
raise TimeoutError(
363+
f"Timed out waiting for worker count. cluster={cluster_name} last={last}"
364+
)
365+
366+
367+
def run_autoscaling_load_in_head_pod(self, cluster_name, tasks=4, sleep_s=120):
368+
"""
369+
Copy autoscaling_load.py into the head pod and run it.
370+
Avoids port-forwarding / Ray Dashboard API dependency.
371+
"""
372+
label = f"ray.io/node-type=head,ray.io/cluster={cluster_name}"
373+
pods = self.api_instance.list_namespaced_pod(self.namespace, label_selector=label)
374+
if not pods.items:
375+
raise RuntimeError(f"No head pod found for cluster {cluster_name}")
376+
head_pod = pods.items[0].metadata.name
377+
378+
subprocess.check_call(
379+
[
380+
"kubectl",
381+
"cp",
382+
"./tests/e2e/autoscaling_load.py",
383+
f"{self.namespace}/{head_pod}:/tmp/autoscaling_load.py",
384+
]
385+
)
386+
387+
subprocess.check_call(
388+
[
389+
"kubectl",
390+
"exec",
391+
"-n",
392+
self.namespace,
393+
head_pod,
394+
"--",
395+
"bash",
396+
"-lc",
397+
f"AUTOSCALING_TASKS={tasks} AUTOSCALING_TASK_SLEEP_S={sleep_s} "
398+
f"python /tmp/autoscaling_load.py",
399+
]
400+
)
401+
402+
347403
def create_cluster_queue(self, cluster_queue, flavor):
348404
cluster_queue_json = {
349405
"apiVersion": "kueue.x-k8s.io/v1beta1",

0 commit comments

Comments
 (0)