Skip to content

Commit 6ab47d3

Browse files
fix(cluster): check Kueue Workload admission for list_all_queued()
The previous implementation of list_all_queued() incorrectly filtered by RayCluster state (READY/SUSPENDED), which doesn't reliably indicate whether a cluster is queued. Instead, check the Kueue Workload admission status — a cluster is queued when its associated Workload has not been admitted yet (no admission field in status). Fixes: RHOAIENG-54734 Made-with: Cursor
1 parent 0a3c9c4 commit 6ab47d3

3 files changed

Lines changed: 135 additions & 23 deletions

File tree

src/codeflare_sdk/common/utils/unit_test_support.py

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,77 @@ def get_ray_obj(group, version, namespace, plural):
143143

144144
def get_ray_obj_with_status(group, version, namespace, plural):
145145
# To be used for mocking list_namespaced_custom_object for Ray Clusters with statuses
146+
# and Kueue Workloads for testing list_all_queued()
147+
148+
# If requesting workloads, return Kueue Workload objects
149+
if plural == "workloads":
150+
# RHOAIENG-54734: Return workloads WITHOUT admission field to indicate queued state
151+
return {
152+
"items": [
153+
{
154+
"apiVersion": "kueue.x-k8s.io/v1beta1",
155+
"kind": "Workload",
156+
"metadata": {
157+
"name": "test-cluster-a-workload",
158+
"namespace": namespace,
159+
"ownerReferences": [
160+
{
161+
"apiVersion": "ray.io/v1",
162+
"kind": "RayCluster",
163+
"name": "test-cluster-a",
164+
"uid": "test-uid-a",
165+
}
166+
],
167+
},
168+
"spec": {
169+
"queueName": "local-queue-default",
170+
},
171+
"status": {
172+
# No admission field = not admitted yet = queued
173+
"conditions": [
174+
{
175+
"type": "QuotaReserved",
176+
"status": "False",
177+
"reason": "Pending",
178+
"message": "workload didn't fit",
179+
}
180+
],
181+
},
182+
},
183+
{
184+
"apiVersion": "kueue.x-k8s.io/v1beta1",
185+
"kind": "Workload",
186+
"metadata": {
187+
"name": "test-rc-b-workload",
188+
"namespace": namespace,
189+
"ownerReferences": [
190+
{
191+
"apiVersion": "ray.io/v1",
192+
"kind": "RayCluster",
193+
"name": "test-rc-b",
194+
"uid": "test-uid-b",
195+
}
196+
],
197+
},
198+
"spec": {
199+
"queueName": "local-queue-default",
200+
},
201+
"status": {
202+
# No admission field = not admitted yet = queued
203+
"conditions": [
204+
{
205+
"type": "QuotaReserved",
206+
"status": "False",
207+
"reason": "Pending",
208+
"message": "couldn't assign flavors",
209+
}
210+
],
211+
},
212+
},
213+
]
214+
}
215+
216+
# Otherwise, return RayCluster objects (original behavior)
146217
rc_a = apply_template(
147218
f"{parent}/tests/test_cluster_yamls/support_clusters/test-rc-a.yaml",
148219
get_template_variables(),
@@ -152,39 +223,28 @@ def get_ray_obj_with_status(group, version, namespace, plural):
152223
get_template_variables(),
153224
)
154225

226+
# RHOAIENG-54734: Cluster status doesn't matter anymore since we check workload admission
227+
# But keeping some status to ensure clusters are returned in the API response
155228
rc_a.update(
156229
{
157230
"status": {
158231
"desiredWorkerReplicas": 1,
159-
"endpoints": {
160-
"client": "10001",
161-
"dashboard": "8265",
162-
"gcs": "6379",
163-
"metrics": "8080",
164-
},
165-
"head": {"serviceIP": "172.30.179.88"},
166232
"lastUpdateTime": "2024-03-05T09:55:37Z",
167233
"maxWorkerReplicas": 1,
168234
"minWorkerReplicas": 1,
169235
"observedGeneration": 1,
170-
"state": "ready",
236+
"state": "suspended", # Can be any state; admission is what matters
171237
},
172238
}
173239
)
174240
rc_b.update(
175241
{
176242
"status": {
177-
"availableWorkerReplicas": 2,
178243
"desiredWorkerReplicas": 1,
179-
"endpoints": {
180-
"client": "10001",
181-
"dashboard": "8265",
182-
"gcs": "6379",
183-
},
184244
"lastUpdateTime": "2023-02-22T16:26:16Z",
185245
"maxWorkerReplicas": 1,
186246
"minWorkerReplicas": 1,
187-
"state": "suspended",
247+
"state": "unknown", # Can be any state; admission is what matters
188248
}
189249
}
190250
)

src/codeflare_sdk/ray/cluster/cluster.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -768,17 +768,69 @@ def list_all_clusters(namespace: str, print_to_console: bool = True):
768768
return clusters
769769

770770

771+
def _get_kueue_workload_for_cluster(
772+
cluster_name: str, namespace: str
773+
) -> Optional[dict]:
774+
"""
775+
Find the Kueue Workload associated with a RayCluster.
776+
777+
Returns the workload object if found, None otherwise.
778+
"""
779+
try:
780+
config_check()
781+
api_instance = client.CustomObjectsApi(get_api_client())
782+
workloads = api_instance.list_namespaced_custom_object(
783+
group="kueue.x-k8s.io",
784+
version="v1beta1",
785+
plural="workloads",
786+
namespace=namespace,
787+
)
788+
789+
# Find workload with matching RayCluster owner reference
790+
for workload in workloads.get("items", []):
791+
owner_refs = workload.get("metadata", {}).get("ownerReferences", [])
792+
793+
for owner_ref in owner_refs:
794+
if (
795+
owner_ref.get("kind") == "RayCluster"
796+
and owner_ref.get("name") == cluster_name
797+
):
798+
return workload
799+
800+
return None
801+
except Exception:
802+
# If Kueue is not installed or workload not found, return None
803+
return None
804+
805+
771806
def list_all_queued(namespace: str, print_to_console: bool = True):
772807
"""
773808
Returns (and prints by default) a list of all currently queued-up Ray Clusters
774809
in a given namespace.
810+
811+
A cluster is considered queued if it has an associated Kueue Workload that has
812+
not been admitted yet (workload.status.admission is None or empty).
775813
"""
776-
resources = _get_ray_clusters(
777-
namespace, filter=[RayClusterStatus.READY, RayClusterStatus.SUSPENDED]
778-
)
814+
# Fix for RHOAIENG-54734: Check Kueue Workload admission status instead of
815+
# RayCluster state. The previous approach incorrectly inferred queue status
816+
# from RayCluster state, which doesn't reliably indicate Kueue admission.
817+
all_clusters = _get_ray_clusters(namespace)
818+
queued_clusters = []
819+
820+
for cluster in all_clusters:
821+
workload = _get_kueue_workload_for_cluster(cluster.name, namespace)
822+
823+
if workload:
824+
# Check if workload has been admitted by Kueue
825+
admission = workload.get("status", {}).get("admission")
826+
if not admission:
827+
# No admission field = workload not admitted yet = still queued
828+
queued_clusters.append(cluster)
829+
# If no workload exists, cluster is not using Kueue, so it's not "queued"
830+
779831
if print_to_console:
780-
pretty_print.print_ray_clusters_status(resources)
781-
return resources
832+
pretty_print.print_ray_clusters_status(queued_clusters)
833+
return queued_clusters
782834

783835

784836
def get_cluster(

src/codeflare_sdk/ray/cluster/test_cluster.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ def test_list_queue_rayclusters(mocker, capsys):
682682
assert "│" in captured.out # Check for vertical lines
683683
mocker.patch(
684684
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
685-
return_value=get_ray_obj_with_status("ray.io", "v1", "ns", "rayclusters"),
685+
side_effect=get_ray_obj_with_status,
686686
)
687687

688688
list_all_queued("ns")
@@ -694,9 +694,9 @@ def test_list_queue_rayclusters(mocker, capsys):
694694
"│ +----------------+-----------+ │\n"
695695
"│ | Name | Status | │\n"
696696
"│ +================+===========+ │\n"
697-
"│ | test-cluster-a | ready | │\n"
697+
"│ | test-cluster-a | suspended | │\n"
698698
"│ | | | │\n"
699-
"│ | test-rc-b | suspended | │\n"
699+
"│ | test-rc-b | unknown | │\n"
700700
"│ | | | │\n"
701701
"│ +----------------+-----------+ │\n"
702702
"╰────────────────────────────────╯\n"

0 commit comments

Comments
 (0)