Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions tests/e2e/rayjob/rayjob_existing_cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def test_existing_kueue_cluster(self):
cluster.wait_ready(timeout=600)
print(f"✓ Cluster '{cluster_name}' is ready")

self.job_api = RayjobApi()

# RayJob with explicit local_queue (will be ignored for existing clusters)
# Kueue does not manage RayJobs targeting existing clusters
rayjob_explicit = RayJob(
Expand All @@ -92,12 +94,16 @@ def test_existing_kueue_cluster(self):
)

try:
# Test RayJob with explicit queue
assert rayjob_explicit.submit() == "job-explicit-queue"
unsuspend_existing_cluster_rayjob(
self.job_api, rayjob_explicit.name, rayjob_explicit.namespace
)
self._wait_completion(rayjob_explicit)

# Test RayJob with default queue
assert rayjob_default.submit() == "job-default-queue"
unsuspend_existing_cluster_rayjob(
self.job_api, rayjob_default.name, rayjob_default.namespace
)
self._wait_completion(rayjob_default)
finally:
rayjob_explicit.delete()
Expand Down
98 changes: 73 additions & 25 deletions tests/e2e/rayjob/rayjob_lifecycled_cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,18 @@ def test_lifecycled_kueue_managed(self):
)

assert rayjob.submit() == job_name
print(f"✓ RayJob {job_name} submitted successfully")

# Verify Secret was created with owner reference
self.verify_secret_with_owner_reference(rayjob)

assert self.job_api.wait_until_job_running(
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=600
)
assert wait_for_kueue_admission(
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
), f"RayJob {rayjob.name} was not admitted by Kueue"

job_running = self._wait_for_job_running_with_retry(rayjob, max_retries=2)
assert (
job_running
), f"RayJob {rayjob.name} failed to reach running state after retries"

assert self.job_api.wait_until_job_finished(
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
Expand All @@ -93,7 +98,6 @@ def test_lifecycled_kueue_managed(self):
except Exception:
pass # Job might already be deleted
verify_rayjob_cluster_cleanup(cluster_api, rayjob.name, rayjob.namespace)
# Clean up the temporary script file
if "script_filename" in locals():
try:
os.remove(script_filename)
Expand All @@ -109,7 +113,6 @@ def test_lifecycled_kueue_resource_queueing(self):
self.job_api = RayjobApi()
cluster_api = RayClusterApi()

# Get platform-appropriate resource configurations
resources = get_platform_appropriate_resources()

cluster_config = ManagedClusterConfig(
Expand All @@ -132,9 +135,13 @@ def test_lifecycled_kueue_resource_queueing(self):
local_queue=self.local_queues[0],
)
assert job1.submit() == "holder"
assert self.job_api.wait_until_job_running(
name=job1.name, k8s_namespace=job1.namespace, timeout=60
)
assert wait_for_kueue_admission(
self, self.job_api, job1.name, job1.namespace, timeout=120
), f"RayJob {job1.name} was not admitted by Kueue"
job_running = self._wait_for_job_running_with_retry(job1, max_retries=1)
assert (
job_running
), f"RayJob {job1.name} failed to reach running state after retries"

job2 = RayJob(
job_name="waiter",
Expand All @@ -146,39 +153,32 @@ def test_lifecycled_kueue_resource_queueing(self):
)
assert job2.submit() == "waiter"

# Wait for Kueue to process the job
sleep(5)
job2_cr = self.job_api.get_job(name=job2.name, k8s_namespace=job2.namespace)

# For RayJobs with managed clusters, check if Kueue is holding resources
job2_status = job2_cr.get("status", {})
ray_cluster_name = job2_status.get("rayClusterName", "")

# If RayCluster is not created yet, it means Kueue is holding the job
if not ray_cluster_name:
# This is the expected behavior
job_is_queued = True
else:
# Check RayCluster resources - if all are 0, it's queued
ray_cluster_status = job2_status.get("rayClusterStatus", {})
desired_cpu = ray_cluster_status.get("desiredCPU", "0")
desired_memory = ray_cluster_status.get("desiredMemory", "0")

# Kueue creates the RayCluster but with 0 resources when queued
job_is_queued = desired_cpu == "0" and desired_memory == "0"

assert job_is_queued, "Job2 should be queued by Kueue while Job1 is running"

assert self.job_api.wait_until_job_finished(
name=job1.name, k8s_namespace=job1.namespace, timeout=60
name=job1.name, k8s_namespace=job1.namespace, timeout=120
)

assert wait_for_kueue_admission(
self, self.job_api, job2.name, job2.namespace, timeout=30
self, self.job_api, job2.name, job2.namespace, timeout=120
)

assert self.job_api.wait_until_job_finished(
name=job2.name, k8s_namespace=job2.namespace, timeout=60
name=job2.name, k8s_namespace=job2.namespace, timeout=300
)
finally:
for job in [job1, job2]:
Expand All @@ -191,25 +191,74 @@ def test_lifecycled_kueue_resource_queueing(self):
except:
pass

def _wait_for_job_running_with_retry(
self, rayjob: RayJob, max_retries: int = 2
) -> bool:
"""
Wait for RayJob to reach running state with retry logic.

Kueue-managed lifecycled jobs start suspended; wait_until_job_running treats
Suspended as failure, so ensure Kueue admission before each attempt.
"""
for attempt in range(max_retries + 1):
try:
print(
f"Waiting for RayJob {rayjob.name} to reach running state "
f"(attempt {attempt + 1}/{max_retries + 1})"
)

if not wait_for_kueue_admission(
self,
self.job_api,
rayjob.name,
rayjob.namespace,
timeout=120,
):
print(
f"✗ RayJob {rayjob.name} not admitted by Kueue on attempt "
f"{attempt + 1}"
)
elif self.job_api.wait_until_job_running(
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=720
):
print(
f"✓ RayJob {rayjob.name} reached running state on attempt "
f"{attempt + 1}"
)
return True
else:
print(
f"✗ RayJob {rayjob.name} failed to reach running state on "
f"attempt {attempt + 1}"
)

except Exception as e:
print(f"Exception during RayJob wait attempt {attempt + 1}: {e}")

if attempt < max_retries:
print("Retrying in 30 seconds...")
sleep(30)

print(
f"✗ RayJob {rayjob.name} failed to reach running state after "
f"{max_retries + 1} attempts"
)
return False

def verify_secret_with_owner_reference(self, rayjob: RayJob):
"""Verify that the Secret was created with proper owner reference to the RayJob."""
v1 = client.CoreV1Api()
secret_name = f"{rayjob.name}-files"

try:
# Get the Secret
secret = v1.read_namespaced_secret(
name=secret_name, namespace=rayjob.namespace
)

# Verify Secret exists
assert secret is not None, f"Secret {secret_name} not found"

# Verify it contains the script
assert secret.data is not None, "Secret has no data"
assert len(secret.data) > 0, "Secret data is empty"

# Verify owner reference
assert (
secret.metadata.owner_references is not None
), "Secret has no owner references"
Expand All @@ -230,7 +279,6 @@ def verify_secret_with_owner_reference(self, rayjob: RayJob):
owner_ref.block_owner_deletion is True
), "Owner reference blockOwnerDeletion not set to true"

# Verify labels
assert secret.metadata.labels.get("ray.io/job-name") == rayjob.name
assert (
secret.metadata.labels.get("app.kubernetes.io/managed-by")
Expand Down
32 changes: 31 additions & 1 deletion tests/e2e/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ def create_namespace(self):
try:
self.namespace = f"test-ns-{random_choice()}"
namespace_body = client.V1Namespace(
metadata=client.V1ObjectMeta(name=self.namespace)
metadata=client.V1ObjectMeta(
name=self.namespace,
labels={"kueue.openshift.io/managed": "true"},
)
)
self.api_instance.create_namespace(namespace_body)
except Exception as e:
Expand Down Expand Up @@ -780,6 +783,33 @@ def wait_for_kueue_admission(self, job_api, job_name, namespace, timeout=120):
return False


def unsuspend_existing_cluster_rayjob(job_api, job_name, namespace):
"""
Clear suspend on RayJobs targeting an existing cluster.

In Kueue-managed namespaces the mutating webhook may set suspend=true on RayJobs
that use clusterSelector. Those jobs use shutdownAfterJobFinishes=false and
must not remain suspended for Kueue lifecycled admission.
"""
try:
job_cr = job_api.get_job(name=job_name, k8s_namespace=namespace)
if not job_cr.get("spec", {}).get("clusterSelector"):
return

if not job_cr.get("spec", {}).get("suspend", False):
return

print(
f"Unsuspending existing-cluster RayJob '{job_name}' "
f"(Kueue webhook suspend override)"
)
assert job_api.resubmit_job(
name=job_name, k8s_namespace=namespace
), f"Failed to unsuspend RayJob '{job_name}'"
except Exception as e:
print(f"Warning: could not unsuspend RayJob '{job_name}': {e}")


def create_limited_kueue_resources(self):
print("Creating limited Kueue resources for preemption testing...")

Expand Down
Loading