Skip to content

Commit 005c2ff

Browse files
RHOAIENG-69862: RHOAI 3.3: fix lifecycled tests
1 parent b71cdb9 commit 005c2ff

3 files changed

Lines changed: 112 additions & 28 deletions

File tree

tests/e2e/rayjob/rayjob_existing_cluster_test.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ def test_existing_kueue_cluster(self):
7171
cluster.wait_ready(timeout=600)
7272
print(f"✓ Cluster '{cluster_name}' is ready")
7373

74+
self.job_api = RayjobApi()
75+
7476
# RayJob with explicit local_queue (will be ignored for existing clusters)
7577
# Kueue does not manage RayJobs targeting existing clusters
7678
rayjob_explicit = RayJob(
@@ -92,12 +94,16 @@ def test_existing_kueue_cluster(self):
9294
)
9395

9496
try:
95-
# Test RayJob with explicit queue
9697
assert rayjob_explicit.submit() == "job-explicit-queue"
98+
unsuspend_existing_cluster_rayjob(
99+
self.job_api, rayjob_explicit.name, rayjob_explicit.namespace
100+
)
97101
self._wait_completion(rayjob_explicit)
98102

99-
# Test RayJob with default queue
100103
assert rayjob_default.submit() == "job-default-queue"
104+
unsuspend_existing_cluster_rayjob(
105+
self.job_api, rayjob_default.name, rayjob_default.namespace
106+
)
101107
self._wait_completion(rayjob_default)
102108
finally:
103109
rayjob_explicit.delete()

tests/e2e/rayjob/rayjob_lifecycled_cluster_test.py

Lines changed: 73 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,18 @@ def test_lifecycled_kueue_managed(self):
7676
)
7777

7878
assert rayjob.submit() == job_name
79+
print(f"✓ RayJob {job_name} submitted successfully")
7980

80-
# Verify Secret was created with owner reference
8181
self.verify_secret_with_owner_reference(rayjob)
8282

83-
assert self.job_api.wait_until_job_running(
84-
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=600
85-
)
83+
assert wait_for_kueue_admission(
84+
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
85+
), f"RayJob {rayjob.name} was not admitted by Kueue"
86+
87+
job_running = self._wait_for_job_running_with_retry(rayjob, max_retries=2)
88+
assert (
89+
job_running
90+
), f"RayJob {rayjob.name} failed to reach running state after retries"
8691

8792
assert self.job_api.wait_until_job_finished(
8893
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
@@ -93,7 +98,6 @@ def test_lifecycled_kueue_managed(self):
9398
except Exception:
9499
pass # Job might already be deleted
95100
verify_rayjob_cluster_cleanup(cluster_api, rayjob.name, rayjob.namespace)
96-
# Clean up the temporary script file
97101
if "script_filename" in locals():
98102
try:
99103
os.remove(script_filename)
@@ -109,7 +113,6 @@ def test_lifecycled_kueue_resource_queueing(self):
109113
self.job_api = RayjobApi()
110114
cluster_api = RayClusterApi()
111115

112-
# Get platform-appropriate resource configurations
113116
resources = get_platform_appropriate_resources()
114117

115118
cluster_config = ManagedClusterConfig(
@@ -132,9 +135,13 @@ def test_lifecycled_kueue_resource_queueing(self):
132135
local_queue=self.local_queues[0],
133136
)
134137
assert job1.submit() == "holder"
135-
assert self.job_api.wait_until_job_running(
136-
name=job1.name, k8s_namespace=job1.namespace, timeout=60
137-
)
138+
assert wait_for_kueue_admission(
139+
self, self.job_api, job1.name, job1.namespace, timeout=120
140+
), f"RayJob {job1.name} was not admitted by Kueue"
141+
job_running = self._wait_for_job_running_with_retry(job1, max_retries=1)
142+
assert (
143+
job_running
144+
), f"RayJob {job1.name} failed to reach running state after retries"
138145

139146
job2 = RayJob(
140147
job_name="waiter",
@@ -146,39 +153,32 @@ def test_lifecycled_kueue_resource_queueing(self):
146153
)
147154
assert job2.submit() == "waiter"
148155

149-
# Wait for Kueue to process the job
150156
sleep(5)
151157
job2_cr = self.job_api.get_job(name=job2.name, k8s_namespace=job2.namespace)
152158

153-
# For RayJobs with managed clusters, check if Kueue is holding resources
154159
job2_status = job2_cr.get("status", {})
155160
ray_cluster_name = job2_status.get("rayClusterName", "")
156161

157-
# If RayCluster is not created yet, it means Kueue is holding the job
158162
if not ray_cluster_name:
159-
# This is the expected behavior
160163
job_is_queued = True
161164
else:
162-
# Check RayCluster resources - if all are 0, it's queued
163165
ray_cluster_status = job2_status.get("rayClusterStatus", {})
164166
desired_cpu = ray_cluster_status.get("desiredCPU", "0")
165167
desired_memory = ray_cluster_status.get("desiredMemory", "0")
166-
167-
# Kueue creates the RayCluster but with 0 resources when queued
168168
job_is_queued = desired_cpu == "0" and desired_memory == "0"
169169

170170
assert job_is_queued, "Job2 should be queued by Kueue while Job1 is running"
171171

172172
assert self.job_api.wait_until_job_finished(
173-
name=job1.name, k8s_namespace=job1.namespace, timeout=60
173+
name=job1.name, k8s_namespace=job1.namespace, timeout=120
174174
)
175175

176176
assert wait_for_kueue_admission(
177-
self, self.job_api, job2.name, job2.namespace, timeout=30
177+
self, self.job_api, job2.name, job2.namespace, timeout=120
178178
)
179179

180180
assert self.job_api.wait_until_job_finished(
181-
name=job2.name, k8s_namespace=job2.namespace, timeout=60
181+
name=job2.name, k8s_namespace=job2.namespace, timeout=300
182182
)
183183
finally:
184184
for job in [job1, job2]:
@@ -191,25 +191,74 @@ def test_lifecycled_kueue_resource_queueing(self):
191191
except:
192192
pass
193193

194+
def _wait_for_job_running_with_retry(
195+
self, rayjob: RayJob, max_retries: int = 2
196+
) -> bool:
197+
"""
198+
Wait for RayJob to reach running state with retry logic.
199+
200+
Kueue-managed lifecycled jobs start suspended; wait_until_job_running treats
201+
Suspended as failure, so ensure Kueue admission before each attempt.
202+
"""
203+
for attempt in range(max_retries + 1):
204+
try:
205+
print(
206+
f"Waiting for RayJob {rayjob.name} to reach running state "
207+
f"(attempt {attempt + 1}/{max_retries + 1})"
208+
)
209+
210+
if not wait_for_kueue_admission(
211+
self,
212+
self.job_api,
213+
rayjob.name,
214+
rayjob.namespace,
215+
timeout=120,
216+
):
217+
print(
218+
f"✗ RayJob {rayjob.name} not admitted by Kueue on attempt "
219+
f"{attempt + 1}"
220+
)
221+
elif self.job_api.wait_until_job_running(
222+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=720
223+
):
224+
print(
225+
f"✓ RayJob {rayjob.name} reached running state on attempt "
226+
f"{attempt + 1}"
227+
)
228+
return True
229+
else:
230+
print(
231+
f"✗ RayJob {rayjob.name} failed to reach running state on "
232+
f"attempt {attempt + 1}"
233+
)
234+
235+
except Exception as e:
236+
print(f"Exception during RayJob wait attempt {attempt + 1}: {e}")
237+
238+
if attempt < max_retries:
239+
print("Retrying in 30 seconds...")
240+
sleep(30)
241+
242+
print(
243+
f"✗ RayJob {rayjob.name} failed to reach running state after "
244+
f"{max_retries + 1} attempts"
245+
)
246+
return False
247+
194248
def verify_secret_with_owner_reference(self, rayjob: RayJob):
195249
"""Verify that the Secret was created with proper owner reference to the RayJob."""
196250
v1 = client.CoreV1Api()
197251
secret_name = f"{rayjob.name}-files"
198252

199253
try:
200-
# Get the Secret
201254
secret = v1.read_namespaced_secret(
202255
name=secret_name, namespace=rayjob.namespace
203256
)
204257

205-
# Verify Secret exists
206258
assert secret is not None, f"Secret {secret_name} not found"
207-
208-
# Verify it contains the script
209259
assert secret.data is not None, "Secret has no data"
210260
assert len(secret.data) > 0, "Secret data is empty"
211261

212-
# Verify owner reference
213262
assert (
214263
secret.metadata.owner_references is not None
215264
), "Secret has no owner references"
@@ -230,7 +279,6 @@ def verify_secret_with_owner_reference(self, rayjob: RayJob):
230279
owner_ref.block_owner_deletion is True
231280
), "Owner reference blockOwnerDeletion not set to true"
232281

233-
# Verify labels
234282
assert secret.metadata.labels.get("ray.io/job-name") == rayjob.name
235283
assert (
236284
secret.metadata.labels.get("app.kubernetes.io/managed-by")

tests/e2e/support.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,10 @@ def create_namespace(self):
359359
try:
360360
self.namespace = f"test-ns-{random_choice()}"
361361
namespace_body = client.V1Namespace(
362-
metadata=client.V1ObjectMeta(name=self.namespace)
362+
metadata=client.V1ObjectMeta(
363+
name=self.namespace,
364+
labels={"kueue.openshift.io/managed": "true"},
365+
)
363366
)
364367
self.api_instance.create_namespace(namespace_body)
365368
except Exception as e:
@@ -780,6 +783,33 @@ def wait_for_kueue_admission(self, job_api, job_name, namespace, timeout=120):
780783
return False
781784

782785

786+
def unsuspend_existing_cluster_rayjob(job_api, job_name, namespace):
787+
"""
788+
Clear suspend on RayJobs targeting an existing cluster.
789+
790+
In Kueue-managed namespaces the mutating webhook may set suspend=true on RayJobs
791+
that use clusterSelector. Those jobs use shutdownAfterJobFinishes=false and
792+
must not remain suspended for Kueue lifecycled admission.
793+
"""
794+
try:
795+
job_cr = job_api.get_job(name=job_name, k8s_namespace=namespace)
796+
if not job_cr.get("spec", {}).get("clusterSelector"):
797+
return
798+
799+
if not job_cr.get("spec", {}).get("suspend", False):
800+
return
801+
802+
print(
803+
f"Unsuspending existing-cluster RayJob '{job_name}' "
804+
f"(Kueue webhook suspend override)"
805+
)
806+
assert job_api.resubmit_job(
807+
name=job_name, k8s_namespace=namespace
808+
), f"Failed to unsuspend RayJob '{job_name}'"
809+
except Exception as e:
810+
print(f"Warning: could not unsuspend RayJob '{job_name}': {e}")
811+
812+
783813
def create_limited_kueue_resources(self):
784814
print("Creating limited Kueue resources for preemption testing...")
785815

0 commit comments

Comments
 (0)