Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions tests/e2e/mnist_raycluster_sdk_oauth_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,10 @@ def assert_jobsubmit_withoutLogin(self, cluster):
# API endpoint is directly under the hostname
api_url = dashboard_url + "/api/jobs/"

job_spec = get_mnist_job_submission_spec()
jobdata = {
"entrypoint": "python mnist.py",
"runtime_env": {
"working_dir": "./tests/e2e/",
"pip": "./tests/e2e/mnist_pip_requirements.txt",
"env_vars": get_setup_env_variables(),
},
"entrypoint": job_spec["entrypoint"],
"runtime_env": job_spec["runtime_env"],
}

# Try to submit a job without authentication
Expand Down Expand Up @@ -189,13 +186,11 @@ def assert_jobsubmit_withlogin(self, cluster):
"Verified: No jobs exist from the previous unauthenticated submission attempt."
)

job_spec = get_mnist_job_submission_spec()
print(f"Submitting job: {job_spec['entrypoint']}")
submission_id = client.submit_job(
entrypoint="python mnist.py",
runtime_env={
"working_dir": "./tests/e2e/",
"pip": "./tests/e2e/mnist_pip_requirements.txt",
"env_vars": get_setup_env_variables(),
},
entrypoint=job_spec["entrypoint"],
runtime_env=job_spec["runtime_env"],
entrypoint_num_cpus=1,
)
print(f"Submitted job with ID: {submission_id}")
Expand All @@ -221,9 +216,10 @@ def assert_jobsubmit_withlogin(self, cluster):
client.delete_job(submission_id)

def assert_job_completion(self, status):
if status == "SUCCEEDED":
print(f"Job has completed: '{status}'")
status_value = getattr(status, "value", status)
if status_value == "SUCCEEDED":
print(f"Job has completed: '{status_value}'")
assert True
else:
print(f"Job has completed: '{status}'")
print(f"Job has completed: '{status_value}'")
assert False
253 changes: 242 additions & 11 deletions tests/e2e/support.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import random
import string
Expand Down Expand Up @@ -142,6 +143,127 @@ def get_setup_env_variables(**kwargs):
return env_vars


def _env_flag_enabled(name):
return os.environ.get(name, "").strip().lower() in ("1", "true", "yes")


def _env_flag_disabled(name):
return os.environ.get(name, "").strip().lower() in ("0", "false", "no", "off")


def _disconnected_cluster_signals():
"""Return True when the cluster is likely disconnected / air-gapped."""
if _env_flag_enabled("DISCONNECTED_CLUSTER") or _env_flag_enabled(
"IS_DISCONNECTED_CLUSTER"
):
return True
try:
server = (run_oc_command(["whoami", "--show-server=true"]) or "").lower()
if "-dis-" in server or "disconnected" in server:
return True
except Exception:
pass
return False


def _mnist_prerequisites_met():
"""
Return True when full MNIST can run (pip packages + dataset reachable).
"""
if not _disconnected_cluster_signals():
return True

pip_url = (os.environ.get("PIP_INDEX_URL") or "").strip()
aws_endpoint = (os.environ.get("AWS_DEFAULT_ENDPOINT") or "").strip()
pip_ok = bool(pip_url) and "pypi.org" not in pip_url
aws_ok = bool(aws_endpoint)
if pip_ok and aws_ok:
print(
"Disconnected cluster with PIP mirror and S3 endpoint configured; "
"using full MNIST job"
)
return True
return False


def use_smoke_job():
"""
Use a lightweight Ray job when full MNIST is not viable.

Detection order (first match wins):
1. USE_SMOKE_JOB / UPGRADE_USE_SMOKE_JOB=true|false (explicit override)
2. Full MNIST prerequisites met (connected, or disconnected with mirrors)
3. DISCONNECTED_CLUSTER / IS_DISCONNECTED_CLUSTER env (Jenkins)
4. API server URL heuristic (-dis- / disconnected), last resort

ImageDigestMirrorSet / ICSP are intentionally not used: many connected
OpenShift clusters mirror container registries without blocking pip/PyPI.
"""
for name in ("USE_SMOKE_JOB", "UPGRADE_USE_SMOKE_JOB"):
if _env_flag_enabled(name):
print(f"{name} enabled; using smoke job (no pip install)")
return True
if _env_flag_disabled(name):
print(f"{name} disabled; using full MNIST job")
return False

if _mnist_prerequisites_met():
return False

if _env_flag_enabled("DISCONNECTED_CLUSTER") or _env_flag_enabled(
"IS_DISCONNECTED_CLUSTER"
):
print(
"Disconnected cluster env set without PIP/S3 mirrors; "
"using smoke job (no pip install)"
)
return True

try:
server = (run_oc_command(["whoami", "--show-server=true"]) or "").lower()
if "-dis-" in server or "disconnected" in server:
print(
"Detected disconnected cluster from API server URL; "
"using smoke job (no pip install)"
)
return True
except Exception:
pass

return False


def use_upgrade_smoke_job():
"""Backward-compatible alias for upgrade tests."""
return use_smoke_job()


def get_mnist_job_submission_spec(**kwargs):
"""Return entrypoint and runtime_env for tier1 / upgrade MNIST job submission tests."""
env_vars = get_setup_env_variables(**kwargs)
if use_smoke_job():
return {
"entrypoint": "python upgrade_job_smoke.py",
"runtime_env": {
"working_dir": "./tests/e2e/",
"env_vars": env_vars,
},
}
return {
"entrypoint": "python mnist.py",
"runtime_env": {
"working_dir": "./tests/e2e/",
"pip": "./tests/e2e/mnist_pip_requirements.txt",
"env_vars": env_vars,
},
}


def get_upgrade_job_submission_spec(**kwargs):
"""Backward-compatible alias for post-upgrade job submission tests."""
return get_mnist_job_submission_spec(**kwargs)


def random_choice():
alphabet = string.ascii_lowercase + string.digits
return "".join(random.choices(alphabet, k=5))
Expand Down Expand Up @@ -585,18 +707,12 @@ def assert_get_cluster_and_jobsubmit(
client = cluster.job_client

# Submit a job and get the submission ID
env_vars = (
get_setup_env_variables(ACCELERATOR=accelerator)
if accelerator
else get_setup_env_variables()
)
spec_kwargs = {"ACCELERATOR": accelerator} if accelerator else {}
job_spec = get_mnist_job_submission_spec(**spec_kwargs)
print(f"Submitting job: {job_spec['entrypoint']}")
submission_id = client.submit_job(
entrypoint="python mnist.py",
runtime_env={
"working_dir": "./tests/e2e/",
"pip": "./tests/e2e/mnist_pip_requirements.txt",
"env_vars": env_vars,
},
entrypoint=job_spec["entrypoint"],
runtime_env=job_spec["runtime_env"],
entrypoint_num_cpus=1 if number_of_gpus is None else None,
entrypoint_num_gpus=number_of_gpus,
)
Expand Down Expand Up @@ -1399,3 +1515,118 @@ def verify_network_policy_spec(
result["allowed_ports"].append(port.port)

return result


def is_byoidc_cluster_detected():
"""
BYOIDC cluster detection by checking OpenShift cluster Authentication resource.
"""
try:
custom_api = client.CustomObjectsApi()
auth_resource = custom_api.get_cluster_custom_object(
group="config.openshift.io",
version="v1",
plural="authentications",
name="cluster",
)

spec = auth_resource.get("spec", {})

if (spec.get("type") or "").upper() == "OIDC":
print("Detected BYOIDC cluster: Authentication spec.type is OIDC")
return True

if "oidcProviders" in spec and spec["oidcProviders"]:
for provider in spec["oidcProviders"]:
issuer_url = provider.get("issuer", {}).get("issuerURL", "")
if "keycloak" in issuer_url.lower() and (
"rh-ods.com" in issuer_url or "qe.rh-ods.com" in issuer_url
):
print(f"Detected BYOIDC cluster with OIDC issuer: {issuer_url}")
return True

if spec.get("webhookTokenAuthenticators"):
for webhook in spec["webhookTokenAuthenticators"]:
if webhook.get("kubeConfig", {}):
print("Detected BYOIDC cluster with webhook token authenticator")
return True

status = auth_resource.get("status", {})
oidc_clients_blob = json.dumps(status.get("oidcClients", []))
if "oc-cli" in oidc_clients_blob:
print("Detected BYOIDC cluster from status.oidcClients (oc-cli client)")
return True

print("No BYOIDC indicators found in cluster Authentication resource")
return False

except Exception as e:
print(f"Could not check cluster authentication method: {e}")
return False


def get_byoidc_issuer_url():
"""Get OIDC issuer URL from cluster Authentication resource."""
try:
custom_api = client.CustomObjectsApi()
auth_resource = custom_api.get_cluster_custom_object(
group="config.openshift.io",
version="v1",
plural="authentications",
name="cluster",
)

spec = auth_resource.get("spec", {})
if "oidcProviders" in spec and spec["oidcProviders"]:
for provider in spec["oidcProviders"]:
issuer_url = provider.get("issuer", {}).get("issuerURL", "")
if issuer_url:
return issuer_url

return "https://keycloak.qe.rh-ods.com"

except Exception as e:
print(f"Could not get OIDC issuer URL from cluster: {e}")
return "https://keycloak.qe.rh-ods.com"


def get_oidc_tokens(username, password, issuer_url):
"""Get OIDC tokens (id_token and refresh_token) for a user."""
try:
import requests

if "/realms/" in issuer_url:
token_url = f"{issuer_url}/protocol/openid-connect/token"
else:
token_url = f"{issuer_url}/realms/openshift/protocol/openid-connect/token"

print(f"Requesting OIDC tokens from: {token_url}")

data = {
"grant_type": "password",
"client_id": "oc-cli",
"username": username,
"password": password,
"scope": "openid profile email",
}

response = requests.post(token_url, data=data, verify=False, timeout=30)

if response.status_code == 200:
token_data = response.json()
id_token = token_data.get("id_token")
refresh_token = token_data.get("refresh_token")

if id_token and refresh_token:
print("✓ Successfully obtained OIDC tokens")
return id_token, refresh_token
print("ERROR: Token response missing id_token or refresh_token")
return None, None

print(f"ERROR: Token request failed with status {response.status_code}")
print(f"Response: {response.text}")
return None, None

except Exception as e:
print(f"Error getting OIDC tokens: {e}")
return None, None
16 changes: 16 additions & 0 deletions tests/e2e/upgrade_job_smoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2024 IBM, Red Hat
#
# Minimal Ray job for upgrade qualification on disconnected clusters.
# Validates job submission and execution without pip installs or external datasets.

import sys


def main() -> int:
print("upgrade-job-smoke: job started")
print("upgrade-job-smoke: job finished successfully")
return 0


if __name__ == "__main__":
sys.exit(main())
Loading
Loading