Skip to content

Commit 01f3b46

Browse files
fix(cluster): check Kueue Workload admission for list_all_queued()
The previous implementation of list_all_queued() incorrectly filtered by RayCluster state (READY/SUSPENDED), which doesn't reliably indicate whether a cluster is queued. Instead, check the Kueue Workload admission status — a cluster is queued when its associated Workload has not been admitted yet (no admission field in status). Fixes: RHOAIENG-54734 Made-with: Cursor
1 parent d87aac4 commit 01f3b46

3 files changed

Lines changed: 133 additions & 23 deletions

File tree

src/codeflare_sdk/common/utils/unit_test_support.py

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,77 @@ def get_ray_obj(group, version, namespace, plural):
143143

144144
def get_ray_obj_with_status(group, version, namespace, plural):
145145
# To be used for mocking list_namespaced_custom_object for Ray Clusters with statuses
146+
# and Kueue Workloads for testing list_all_queued()
147+
148+
# If requesting workloads, return Kueue Workload objects
149+
if plural == "workloads":
150+
# RHOAIENG-54734: Return workloads WITHOUT admission field to indicate queued state
151+
return {
152+
"items": [
153+
{
154+
"apiVersion": "kueue.x-k8s.io/v1beta1",
155+
"kind": "Workload",
156+
"metadata": {
157+
"name": "test-cluster-a-workload",
158+
"namespace": namespace,
159+
"ownerReferences": [
160+
{
161+
"apiVersion": "ray.io/v1",
162+
"kind": "RayCluster",
163+
"name": "test-cluster-a",
164+
"uid": "test-uid-a",
165+
}
166+
],
167+
},
168+
"spec": {
169+
"queueName": "local-queue-default",
170+
},
171+
"status": {
172+
# No admission field = not admitted yet = queued
173+
"conditions": [
174+
{
175+
"type": "QuotaReserved",
176+
"status": "False",
177+
"reason": "Pending",
178+
"message": "workload didn't fit",
179+
}
180+
],
181+
},
182+
},
183+
{
184+
"apiVersion": "kueue.x-k8s.io/v1beta1",
185+
"kind": "Workload",
186+
"metadata": {
187+
"name": "test-rc-b-workload",
188+
"namespace": namespace,
189+
"ownerReferences": [
190+
{
191+
"apiVersion": "ray.io/v1",
192+
"kind": "RayCluster",
193+
"name": "test-rc-b",
194+
"uid": "test-uid-b",
195+
}
196+
],
197+
},
198+
"spec": {
199+
"queueName": "local-queue-default",
200+
},
201+
"status": {
202+
# No admission field = not admitted yet = queued
203+
"conditions": [
204+
{
205+
"type": "QuotaReserved",
206+
"status": "False",
207+
"reason": "Pending",
208+
"message": "couldn't assign flavors",
209+
}
210+
],
211+
},
212+
},
213+
]
214+
}
215+
216+
# Otherwise, return RayCluster objects (original behavior)
146217
rc_a = apply_template(
147218
f"{parent}/tests/test_cluster_yamls/support_clusters/test-rc-a.yaml",
148219
get_template_variables(),
@@ -152,39 +223,28 @@ def get_ray_obj_with_status(group, version, namespace, plural):
152223
get_template_variables(),
153224
)
154225

226+
# RHOAIENG-54734: Cluster status doesn't matter anymore since we check workload admission
227+
# But keeping some status to ensure clusters are returned in the API response
155228
rc_a.update(
156229
{
157230
"status": {
158231
"desiredWorkerReplicas": 1,
159-
"endpoints": {
160-
"client": "10001",
161-
"dashboard": "8265",
162-
"gcs": "6379",
163-
"metrics": "8080",
164-
},
165-
"head": {"serviceIP": "172.30.179.88"},
166232
"lastUpdateTime": "2024-03-05T09:55:37Z",
167233
"maxWorkerReplicas": 1,
168234
"minWorkerReplicas": 1,
169235
"observedGeneration": 1,
170-
"state": "ready",
236+
"state": "suspended", # Can be any state; admission is what matters
171237
},
172238
}
173239
)
174240
rc_b.update(
175241
{
176242
"status": {
177-
"availableWorkerReplicas": 2,
178243
"desiredWorkerReplicas": 1,
179-
"endpoints": {
180-
"client": "10001",
181-
"dashboard": "8265",
182-
"gcs": "6379",
183-
},
184244
"lastUpdateTime": "2023-02-22T16:26:16Z",
185245
"maxWorkerReplicas": 1,
186246
"minWorkerReplicas": 1,
187-
"state": "suspended",
247+
"state": "unknown", # Can be any state; admission is what matters
188248
}
189249
}
190250
)

src/codeflare_sdk/ray/cluster/cluster.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -762,17 +762,67 @@ def list_all_clusters(namespace: str, print_to_console: bool = True):
762762
return clusters
763763

764764

765+
def _get_kueue_workload_for_cluster(cluster_name: str, namespace: str) -> Optional[dict]:
766+
"""
767+
Find the Kueue Workload associated with a RayCluster.
768+
769+
Returns the workload object if found, None otherwise.
770+
"""
771+
try:
772+
config_check()
773+
api_instance = client.CustomObjectsApi(get_api_client())
774+
workloads = api_instance.list_namespaced_custom_object(
775+
group="kueue.x-k8s.io",
776+
version="v1beta1",
777+
plural="workloads",
778+
namespace=namespace,
779+
)
780+
781+
# Find workload with matching RayCluster owner reference
782+
for workload in workloads.get("items", []):
783+
owner_refs = workload.get("metadata", {}).get("ownerReferences", [])
784+
785+
for owner_ref in owner_refs:
786+
if (
787+
owner_ref.get("kind") == "RayCluster"
788+
and owner_ref.get("name") == cluster_name
789+
):
790+
return workload
791+
792+
return None
793+
except Exception:
794+
# If Kueue is not installed or workload not found, return None
795+
return None
796+
797+
765798
def list_all_queued(namespace: str, print_to_console: bool = True):
766799
"""
767800
Returns (and prints by default) a list of all currently queued-up Ray Clusters
768801
in a given namespace.
802+
803+
A cluster is considered queued if it has an associated Kueue Workload that has
804+
not been admitted yet (workload.status.admission is None or empty).
769805
"""
770-
resources = _get_ray_clusters(
771-
namespace, filter=[RayClusterStatus.READY, RayClusterStatus.SUSPENDED]
772-
)
806+
# Fix for RHOAIENG-54734: Check Kueue Workload admission status instead of
807+
# RayCluster state. The previous approach incorrectly inferred queue status
808+
# from RayCluster state, which doesn't reliably indicate Kueue admission.
809+
all_clusters = _get_ray_clusters(namespace)
810+
queued_clusters = []
811+
812+
for cluster in all_clusters:
813+
workload = _get_kueue_workload_for_cluster(cluster.name, namespace)
814+
815+
if workload:
816+
# Check if workload has been admitted by Kueue
817+
admission = workload.get("status", {}).get("admission")
818+
if not admission:
819+
# No admission field = workload not admitted yet = still queued
820+
queued_clusters.append(cluster)
821+
# If no workload exists, cluster is not using Kueue, so it's not "queued"
822+
773823
if print_to_console:
774-
pretty_print.print_ray_clusters_status(resources)
775-
return resources
824+
pretty_print.print_ray_clusters_status(queued_clusters)
825+
return queued_clusters
776826

777827

778828
def get_cluster(

src/codeflare_sdk/ray/cluster/test_cluster.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ def test_list_queue_rayclusters(mocker, capsys):
682682
assert "│" in captured.out # Check for vertical lines
683683
mocker.patch(
684684
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
685-
return_value=get_ray_obj_with_status("ray.io", "v1", "ns", "rayclusters"),
685+
side_effect=get_ray_obj_with_status,
686686
)
687687

688688
list_all_queued("ns")
@@ -694,9 +694,9 @@ def test_list_queue_rayclusters(mocker, capsys):
694694
"│ +----------------+-----------+ │\n"
695695
"│ | Name | Status | │\n"
696696
"│ +================+===========+ │\n"
697-
"│ | test-cluster-a | ready | │\n"
697+
"│ | test-cluster-a | suspended | │\n"
698698
"│ | | | │\n"
699-
"│ | test-rc-b | suspended | │\n"
699+
"│ | test-rc-b | unknown | │\n"
700700
"│ | | | │\n"
701701
"│ +----------------+-----------+ │\n"
702702
"╰────────────────────────────────╯\n"

0 commit comments

Comments
 (0)