diff --git a/scripts/migration/MIGRATION_GUIDE.md b/scripts/migration/MIGRATION_GUIDE.md new file mode 100644 index 000000000..801d79570 --- /dev/null +++ b/scripts/migration/MIGRATION_GUIDE.md @@ -0,0 +1,224 @@ +# RHOAI RayCluster Migration Guide + +This guide walks you through migrating your RayClusters from RHOAI 2.x to RHOAI 3.x. + +## Overview + +The migration tool helps you: +1. **Back up** your RayCluster configurations before the upgrade +2. **Verify** your cluster is ready for the upgrade (automatic pre-flight checks) +3. **Migrate** your RayClusters after the upgrade is complete + +The tool is designed to be safe and predictable: +- **Staged approach**: Test on a single cluster before migrating everything +- **Idempotent**: Safe to run multiple times +- **Non-destructive**: Backups are created, nothing is deleted automatically + +--- + +## Prerequisites + +### 1. Python Environment + +Python 3.6 or later is required. + +```bash +python3 --version +``` + +### 2. Install Required Packages + +```bash +pip install -r ray_cluster_migration_requirements.txt +``` + +Or install directly: + +```bash +pip install kubernetes>=28.1.0 PyYAML>=6.0 +``` + +### 3. Cluster Access + +Verify you can connect to your OpenShift cluster: + +```bash +oc whoami +oc get rayclusters --all-namespaces +``` + +--- + +## Step 1: Pre-Upgrade (Before RHOAI Upgrade) + +Run the pre-upgrade command to verify prerequisites and back up your RayClusters: + +```bash +python ray_cluster_migration.py pre-upgrade +``` + +The script will: +1. Prompt for a backup directory (default: `./raycluster-backups`) +2. Run automatic pre-flight checks (permissions, cert-manager, codeflare-operator status) +3. Back up your RayCluster configurations + +**If any required checks fail, the script will stop and tell you exactly what needs to be fixed before you can proceed.** + +### Pre-Upgrade Options + +```bash +# Back up a specific namespace only +python ray_cluster_migration.py pre-upgrade --namespace my-namespace + +# Back up a single cluster (for testing) +python ray_cluster_migration.py pre-upgrade --cluster my-cluster --namespace my-namespace + +# Specify backup directory directly +python ray_cluster_migration.py pre-upgrade ./my-backup-directory +``` + +--- + +## Step 2: Perform the RHOAI Upgrade + +Follow your standard RHOAI upgrade procedure to upgrade from RHOAI 2.x to RHOAI 3.x. + +--- + +## Step 3: Post-Upgrade (After RHOAI Upgrade) + +After the RHOAI upgrade is complete, migrate your RayClusters. + +### Recommended: Staged Migration + +We recommend migrating in stages to verify everything works correctly: + +**Stage 1: Test with a single cluster** +```bash +# Preview first +python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-namespace --dry-run + +# Run the migration +python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-namespace +``` + +**Stage 2: Migrate a namespace** +```bash +python ray_cluster_migration.py post-upgrade --namespace my-namespace +``` + +**Stage 3: Migrate all remaining clusters** +```bash +python ray_cluster_migration.py post-upgrade +``` + +### Post-Upgrade Options + +```bash +# Skip confirmation prompt (for automation) +python ray_cluster_migration.py post-upgrade --yes + +# Preview changes without making them +python ray_cluster_migration.py post-upgrade --dry-run +``` + +### Restore from Backup + +If your clusters were deleted during the upgrade or you prefer to restore from backup files: + +```bash +# Restore all clusters from backup directory +python ray_cluster_migration.py post-upgrade --from-backup ./raycluster-backups + +# Restore a single cluster from backup +python ray_cluster_migration.py post-upgrade --from-backup ./raycluster-backups --cluster my-cluster --namespace my-namespace + +# Restore from a single backup file +python ray_cluster_migration.py post-upgrade --from-backup ./raycluster-backups/raycluster-my-cluster-my-namespace.yaml +``` + +**Important:** `--from-backup` will **delete** any existing cluster with the same name before creating it from the backup. This ensures a clean restore. + +--- + +## Check Migration Status + +Check which clusters need migration at any time: + +```bash +python ray_cluster_migration.py list +``` + +--- + +## Troubleshooting + +### Pre-flight check failed: cert-manager not detected + +Install cert-manager via OperatorHub in your OpenShift cluster: + +1. Go to OperatorHub in the OpenShift console +2. Search for "cert-manager" +3. Install the cert-manager operator +4. Wait for it to be ready +5. Run pre-upgrade again + +### Pre-flight check failed: codeflare-operator not Removed + +Update your DataScienceCluster: + +```bash +oc patch datasciencecluster --type merge -p '{"spec":{"components":{"codeflare":{"managementState":"Removed"}}}}' +``` + +### Migration failed for a cluster + +1. Check the error message for details +2. Verify cluster health: `oc get raycluster -n ` +3. Retry just that cluster: `python ray_cluster_migration.py post-upgrade --cluster --namespace ` + +### Route not available after migration + +Routes may take 30-60 seconds to be created. Check directly: + +```bash +oc get httproute -n +``` + +--- + +## Quick Reference + +```bash +# Before RHOAI upgrade +python ray_cluster_migration.py pre-upgrade + +# After RHOAI upgrade (staged approach) +python ray_cluster_migration.py post-upgrade --cluster test-cluster --namespace dev --dry-run +python ray_cluster_migration.py post-upgrade --cluster test-cluster --namespace dev +python ray_cluster_migration.py post-upgrade --namespace dev +python ray_cluster_migration.py post-upgrade + +# Check status anytime +python ray_cluster_migration.py list +``` + +## Command Reference + +| Command | Description | +|---------|-------------| +| `pre-upgrade` | Run pre-flight checks and backup RayClusters | +| `post-upgrade` | Migrate RayClusters after RHOAI upgrade | +| `list` | Show all RayClusters and their migration status | +| `delete` | [Advanced] Delete RayClusters | +| `import` | [Advanced] Restore RayClusters from backup | + +### Common Options + +| Option | Description | +|--------|-------------| +| `--cluster NAME` | Target a specific cluster (requires `--namespace`) | +| `--namespace NS` | Target a specific namespace | +| `--dry-run` | Preview changes without applying them | +| `--yes` | Skip confirmation prompts | +| `--from-backup PATH` | (post-upgrade only) Restore from backup file or directory. Deletes existing clusters before recreating. | diff --git a/scripts/migration/RAY_CLUSTER_MIGRATION_README.md b/scripts/migration/RAY_CLUSTER_MIGRATION_README.md new file mode 100644 index 000000000..d06619d10 --- /dev/null +++ b/scripts/migration/RAY_CLUSTER_MIGRATION_README.md @@ -0,0 +1,482 @@ +# RayCluster Migration Tool for RHOAI 2.x to RHOAI 3.x + +A standalone script for migrating RayClusters during RHOAI upgrades. + +## Overview + +This tool helps you safely migrate RayClusters when upgrading from RHOAI 2.x to RHOAI 3.x. It is designed to be run in stages, allowing you to test the migration on a single cluster before proceeding to an entire namespace or the whole Kubernetes cluster. + +### Key Features + +- **Staged Migration**: Test on a single cluster → proceed to namespace → then cluster-wide +- **Idempotent Operations**: Safe to run multiple times - already migrated clusters are skipped +- **Dry-Run Support**: Preview all changes before applying them +- **Clear Guidance**: Commands named for when to use them, not what they do + +## Requirements + +### Python Packages + +Install the required Python packages: + +```bash +pip install kubernetes>=28.1.0 PyYAML>=6.0 +``` + +Or use the included requirements file: + +```bash +pip install -r ray_cluster_migration_requirements.txt +``` + +### RBAC Permissions + +The script requires different permissions depending on the command: + +| Command | Permissions | +|---------|-------------| +| `list` | Read-only (RayClusters, Pods, Namespaces) | +| `pre-upgrade` | Read-only (creates local backup files only) | +| `post-upgrade` | Read + Write (RayClusters, ServiceAccounts) | +| `post-upgrade --from-backup` | Read + Write (RayClusters, ServiceAccounts) | + +See [PERMISSIONS.md](PERMISSIONS.md) for detailed RBAC requirements and example Role/ClusterRole YAML. + +## Authentication + +The script uses standard Kubernetes authentication methods: +1. `KUBECONFIG` environment variable +2. `~/.kube/config` file +3. In-cluster config (when running inside a Kubernetes pod) + +## Migration Workflow + +The recommended migration workflow follows three stages: + +### Stage 1: Pre-Upgrade (Before RHOAI Upgrade) + +Backup your RayCluster configurations. This only creates backup files - it does NOT modify or delete any clusters. + +```bash +# Backup all clusters (you'll be prompted for backup directory) +python ray_cluster_migration.py pre-upgrade + +# Or specify the backup directory directly +python ray_cluster_migration.py pre-upgrade ./my-backup-dir + +# Backup a specific namespace +python ray_cluster_migration.py pre-upgrade --namespace my-ns + +# Backup a single cluster +python ray_cluster_migration.py pre-upgrade --cluster my-cluster --namespace my-ns +``` + +#### Backup Directory Structure + +The pre-upgrade command creates two subdirectories in your backup location: + +``` +my-backup-dir/ + rhoai-2.x/ # RHOAI 2.x compatible (with CodeFlare components) + rhoai-3.x/ # RHOAI 3.x compatible (ready for post-upgrade) +``` + +- **`rhoai-2.x/`**: Contains the original RayCluster YAMLs with all CodeFlare-operator components (TLS, OAuth proxy, etc.). Use these if you **did not proceed with the RHOAI 3.x upgrade** and need to restore your RayClusters. + +- **`rhoai-3.x/`**: Contains cleaned RayCluster YAMLs with CodeFlare components removed. Use these with `post-upgrade --from-backup` to migrate to RHOAI 3.x. + +**Important**: If you decide not to proceed with the upgrade and need to restore your clusters on RHOAI 2.x, use the files in `rhoai-2.x/`, not `rhoai-3.x/`. + +### Stage 2: Perform the RHOAI Upgrade + +Follow your standard RHOAI upgrade procedure. + +### Stage 3: Post-Upgrade (After RHOAI Upgrade) + +Migrate your RayClusters to be compatible with RHOAI 3.x. + +```bash +# ALWAYS use --dry-run first to preview changes! + +# Start with the same single cluster you tested backup with +python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-ns --dry-run +python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-ns + +# Then migrate the entire namespace (you'll be prompted to confirm) +python ray_cluster_migration.py post-upgrade --namespace my-ns --dry-run +python ray_cluster_migration.py post-upgrade --namespace my-ns + +# Finally, migrate all clusters (you'll be prompted to confirm) +python ray_cluster_migration.py post-upgrade --dry-run +python ray_cluster_migration.py post-upgrade +``` + +## Commands Reference + +### `list` - Discover RayClusters and Migration Status + +See all RayClusters and whether they need migration: + +```bash +# List all clusters across all namespaces +python ray_cluster_migration.py list + +# List clusters in a specific namespace +python ray_cluster_migration.py list --namespace my-ns + +# Output as YAML for scripting +python ray_cluster_migration.py list --format yaml +``` + +Example output: +``` +Found 3 RayCluster(s): + +Name Namespace Status Workers Migration Status +---------------------------------------------------------------------------------------------------- +production-cluster production ready 5 [OK] +staging-cluster staging ready 3 [NEEDS MIGRATION] +dev-cluster dev ready 2 [NEEDS MIGRATION] + +Migration Summary: 1 migrated, 2 need migration +``` + +### `pre-upgrade` - Backup Before RHOAI Upgrade + +Runs pre-flight checks and creates backup YAML files of your RayCluster configurations. Run this BEFORE performing the RHOAI upgrade. + +```bash +# Backup all clusters (you'll be prompted for backup directory) +python ray_cluster_migration.py pre-upgrade + +# Or specify the backup directory directly +python ray_cluster_migration.py pre-upgrade ./my-backup-dir + +# Backup all clusters in a namespace +python ray_cluster_migration.py pre-upgrade --namespace my-ns + +# Backup a single cluster +python ray_cluster_migration.py pre-upgrade --cluster my-cluster --namespace my-ns +``` + +**What it does:** +- Runs pre-flight checks to verify the cluster is ready for upgrade +- Creates a backup directory if it doesn't exist +- Exports each RayCluster to a separate YAML file +- Does NOT delete or modify any clusters + +**Pre-flight checks:** +- **cert-manager**: Verifies cert-manager is installed (required for RHOAI 3.x) + +If a required check fails, you'll be warned and asked to confirm before proceeding: +``` +Running pre-upgrade checks... +------------------------------------------------------------ + [FAIL] cert-manager: cert-manager not detected + cert-manager is required for RHOAI 3.x. Install it via OperatorHub + before proceeding with the upgrade. +------------------------------------------------------------ + +Pre-upgrade checks failed. Please resolve the issues above before +proceeding with the RHOAI upgrade. +``` + +**Idempotency:** Running this multiple times simply overwrites the backup files. + +### `post-upgrade` - Migrate After RHOAI Upgrade + +Migrates RayClusters to be compatible with RHOAI 3.x. Run this AFTER the RHOAI upgrade. + +```bash +# Migrate a single cluster (always use --dry-run first!) +python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-ns --dry-run +python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-ns + +# Migrate all clusters in a namespace (you'll be prompted to confirm) +python ray_cluster_migration.py post-upgrade --namespace my-ns --dry-run +python ray_cluster_migration.py post-upgrade --namespace my-ns + +# Migrate all clusters across all namespaces (you'll be prompted to confirm) +python ray_cluster_migration.py post-upgrade --dry-run +python ray_cluster_migration.py post-upgrade + +# Skip confirmation prompt (for automation) +python ray_cluster_migration.py post-upgrade --yes +``` + +**What it does:** +- Removes CodeFlare-operator TLS/OAuth components from the RayCluster spec +- The KubeRay operator handles pod recreation with the new configuration +- The `odh.ray.io/secure-trusted-network: "true"` annotation is added automatically by the KubeRay mutating webhook +- Displays the new Gateway API routes for accessing your clusters after migration + +**Important:** Migration will cause temporary downtime for each RayCluster as pods are restarted with the updated configuration. + +**Migration modes:** +- **Live migration** (default): Modifies existing RayClusters in-place +- **Restore from backup** (`--from-backup`): Deletes existing clusters and recreates from backup files + +### Restore from Backup (`--from-backup`) + +Use this mode if your clusters were deleted during the upgrade or you prefer a clean restore: + +```bash +# Restore all clusters from the rhoai-3.x/ backup directory +python ray_cluster_migration.py post-upgrade --from-backup ./my-backup-dir/rhoai-3.x + +# Restore clusters in a specific namespace +python ray_cluster_migration.py post-upgrade --from-backup ./my-backup-dir/rhoai-3.x --namespace my-ns + +# Restore a single cluster +python ray_cluster_migration.py post-upgrade --from-backup ./my-backup-dir/rhoai-3.x --cluster my-cluster --namespace my-ns + +# Restore from a single backup file +python ray_cluster_migration.py post-upgrade --from-backup ./my-backup-dir/rhoai-3.x/raycluster-my-cluster-my-ns.yaml +``` + +**Note**: Use the `rhoai-3.x/` subdirectory for 3.x migration. The `rhoai-2.x/` subdirectory contains 2.x-compatible backups for rollback scenarios. + +**What `--from-backup` does:** +1. If a cluster with the same name exists, it is **deleted first** +2. Waits for the deletion to complete +3. Creates the cluster from the backup configuration +4. Waits for the cluster to become ready +5. Displays the dashboard URL + +**Warning shown:** +``` +WARNING: Restore from backup will DELETE and RECREATE each RayCluster. + - If a cluster currently exists, it will be deleted first. + - All running pods, jobs, and workloads will be terminated. + - Existing job state and logs will be lost. + - The cluster will be recreated from the backup configuration. +``` + +**Idempotency:** Already-migrated clusters are automatically detected and skipped. + +**Example output (live migration):** +``` +Analyzing 2 RayCluster(s) (all clusters in namespace 'my-ns') + + [MIGRATE] my-cluster (ns: my-ns) - needs migration + [MIGRATE] test-cluster (ns: my-ns) - needs migration + +Summary: 2 to migrate, 0 already migrated + + [my-cluster] Applying migration... + [my-cluster] Waiting for cluster to become ready... + Status: ready, workers: 2/2 (45s) + [OK] Migrated: my-cluster (ns: my-ns) + Dashboard: https://my-cluster-my-ns.apps.example.com + +============================================================ +Migration Summary: + Migrated: 2 + Skipped (already migrated): 0 + Failed: 0 +``` + +**Example output (restore from backup):** +``` +Found 1 RayCluster(s) in backup to migrate (cluster 'my-cluster' in namespace 'my-ns'): + + - my-cluster (ns: my-ns) from raycluster-my-cluster-my-ns.yaml + + [my-cluster] Deleting existing cluster... + [my-cluster] Waiting for cluster deletion to complete... + [my-cluster] Cluster deleted successfully + [my-cluster] Creating cluster from backup... + [my-cluster] Waiting for cluster to become ready... + Status: ready, workers: 2/2 (60s) + [OK] Restored from backup: my-cluster (ns: my-ns) + Dashboard: https://my-cluster-my-ns.apps.example.com + +============================================================ +Restore from Backup Summary: + Restored: 1 + Failed: 0 +``` + +### `delete` - [Advanced] Delete RayClusters + +Permanently deletes RayClusters from the cluster. This is an advanced operation. + +```bash +# Preview what would be deleted +python ray_cluster_migration.py delete --cluster my-cluster --namespace my-ns --dry-run +python ray_cluster_migration.py delete --namespace my-ns --all --dry-run + +# Delete (with confirmation) +python ray_cluster_migration.py delete --cluster my-cluster --namespace my-ns +``` + +### `import` - [Advanced] Restore from Backup + +Restores RayClusters from backup YAML files. This is an advanced operation. + +```bash +# Preview what would be imported +python ray_cluster_migration.py import ./backup --dry-run + +# Import from backup +python ray_cluster_migration.py import ./backup + +# Force import (overwrite conflicts) +python ray_cluster_migration.py import ./backup --force +``` + +## Scoping Options + +All commands support consistent scoping options: + +| Scope | Options | Description | +|-------|---------|-------------| +| Single Cluster | `--cluster NAME --namespace NS` | Target one specific cluster | +| Namespace | `--namespace NS` | Target all clusters in a namespace | +| Cluster-wide | (no flags) | Target all clusters across all namespaces | + +When targeting multiple clusters, you'll be prompted to confirm before proceeding. + +## Safety Features + +### Dry-Run Mode + +Always preview changes before applying: + +```bash +# See what post-upgrade would do +python ray_cluster_migration.py post-upgrade --dry-run +``` + +### Confirmation Prompts + +Migration operations require confirmation: + +**Live migration:** +``` +IMPORTANT: Migration will cause temporary downtime for each RayCluster. + - Pods will be restarted as the KubeRay operator recreates them with the new configuration. + - Existing job state and logs will be lost. + - Currently running workloads/jobs will be interrupted and progress lost. + +Proceed with migration? (yes/no): +``` + +**Restore from backup:** +``` +WARNING: Restore from backup will DELETE and RECREATE each RayCluster. + - If a cluster currently exists, it will be deleted first. + - All running pods, jobs, and workloads will be terminated. + - Existing job state and logs will be lost. + - The cluster will be recreated from the backup configuration. + +Proceed with restore from backup? (yes/no): +``` + +### Skip Confirmation for Automation + +Use `--yes` to skip confirmation in CI/CD: + +```bash +python ray_cluster_migration.py post-upgrade --yes +python ray_cluster_migration.py post-upgrade --namespace my-ns --yes +``` + +## Idempotency Details + +### Pre-Upgrade (Backup) + +- Running multiple times overwrites existing backup files +- Safe to run repeatedly - no cluster modifications + +### Post-Upgrade (Migrate) + +The script automatically detects if a cluster has already been migrated and skips it. This means you can safely run the migration multiple times without causing issues. + +## Example: Complete Migration + +```bash +# 1. Check current state +python ray_cluster_migration.py list + +# 2. Backup everything before upgrade +python ray_cluster_migration.py pre-upgrade ./backup-$(date +%Y%m%d) + +# 3. [Perform RHOAI upgrade using your standard procedure] + +# 4. Check what needs migration +python ray_cluster_migration.py list + +# 5. Test migration on a single cluster +python ray_cluster_migration.py post-upgrade --cluster test-cluster --namespace dev --dry-run +python ray_cluster_migration.py post-upgrade --cluster test-cluster --namespace dev + +# 6. Verify the test cluster works correctly + +# 7. Migrate the rest of the dev namespace +python ray_cluster_migration.py post-upgrade --namespace dev --dry-run +python ray_cluster_migration.py post-upgrade --namespace dev + +# 8. Continue with other namespaces +python ray_cluster_migration.py post-upgrade --namespace staging +python ray_cluster_migration.py post-upgrade --namespace production + +# 9. Or migrate everything at once +python ray_cluster_migration.py post-upgrade + +# 10. Verify final state +python ray_cluster_migration.py list +``` + +## Troubleshooting + +### "No Kubernetes configuration found" + +Ensure you have valid kubeconfig: +```bash +kubectl cluster-info +``` + +### "RayCluster CRD not found" + +Verify the RayCluster CRD is installed: +```bash +kubectl get crd rayclusters.ray.io +``` + +### "Permission denied" + +Check your RBAC permissions: +```bash +kubectl auth can-i list rayclusters.ray.io --all-namespaces +kubectl auth can-i update rayclusters.ray.io --all-namespaces +``` + +### Cluster shows as needing migration but was already migrated + +The cluster may be missing the annotation. Run post-upgrade again - it will add the annotation: +```bash +python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-ns +``` + +### Migration seems stuck or fails repeatedly + +Check cluster status: +```bash +kubectl get raycluster my-cluster -n my-ns -o yaml +``` + +Try with verbose output and investigate any error messages. + +## Output File Format + +Backup files are saved as `raycluster-{name}-{namespace}.yaml`: + +``` +backup/ +├── raycluster-production-cluster-production.yaml +├── raycluster-staging-cluster-staging.yaml +└── raycluster-dev-cluster-dev.yaml +``` + +Each file contains the RayCluster configuration ready for restoration if needed. diff --git a/scripts/migration/ray_cluster_migration.py b/scripts/migration/ray_cluster_migration.py new file mode 100644 index 000000000..cf2b3933b --- /dev/null +++ b/scripts/migration/ray_cluster_migration.py @@ -0,0 +1,2424 @@ +#!/usr/bin/env python3 +# Copyright 2024 IBM, Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +RayCluster Migration Tool for RHOAI 2.x to RHOAI 3.x + +This script helps migrate RayClusters during RHOAI upgrades. It is designed +to be run in stages, allowing you to test the migration on a single cluster +before proceeding to an entire namespace or the whole Kubernetes cluster. + +RECOMMENDED WORKFLOW: + 1. Run pre-upgrade to backup your RayCluster configurations + 2. Perform the RHOAI upgrade + 3. Run post-upgrade to migrate your RayClusters + +REQUIREMENTS: + - kubernetes>=28.1.0 + - PyYAML>=6.0 + +USAGE EXAMPLES: + + # Step 1: Backup - All clusters (you'll be prompted for backup directory) + python ray_cluster_migration.py pre-upgrade + + # Step 1: Backup - With specific directory + python ray_cluster_migration.py pre-upgrade ./my-backup-dir + + # Step 1: Backup - Specific namespace + python ray_cluster_migration.py pre-upgrade --namespace my-ns + + # Step 1: Backup - Single cluster + python ray_cluster_migration.py pre-upgrade --cluster my-cluster --namespace my-ns + + # ... Perform RHOAI upgrade ... + + # Step 2: Migrate - Start with the same single cluster + python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-ns --dry-run + python ray_cluster_migration.py post-upgrade --cluster my-cluster --namespace my-ns + + # Step 2: Migrate - Then proceed with entire namespace + python ray_cluster_migration.py post-upgrade --namespace my-ns --dry-run + python ray_cluster_migration.py post-upgrade --namespace my-ns + + # Step 2: Migrate - Finally, migrate all clusters (you'll be prompted to confirm) + python ray_cluster_migration.py post-upgrade --dry-run + python ray_cluster_migration.py post-upgrade + + # Discovery - List all RayClusters and their migration status + python ray_cluster_migration.py list + python ray_cluster_migration.py list --namespace my-ns + +IDEMPOTENCY: + All operations are idempotent and safe to run multiple times: + - pre-upgrade: Overwrites backup files if they already exist + - post-upgrade: Detects already-migrated clusters and skips them + +AUTHENTICATION: + The script uses standard Kubernetes authentication methods: + 1. KUBECONFIG environment variable + 2. ~/.kube/config file + 3. In-cluster config (when running in a pod) +""" + +import os +import sys +import time +import yaml +import copy +import argparse +from typing import List, Dict, Optional, Tuple +from kubernetes import client, config +from kubernetes.dynamic import DynamicClient +from kubernetes.client.rest import ApiException + +# Field manager identifier for server-side apply +CF_SDK_FIELD_MANAGER = "codeflare-sdk" + +# Annotation that marks a cluster as migrated +SECURE_NETWORK_ANNOTATION = "odh.ray.io/secure-trusted-network" + +# Gateway API constants +GATEWAY_API_GROUP = "gateway.networking.k8s.io" +GATEWAY_API_VERSION = "v1" + + +def config_check(): + """ + Check and load the Kubernetes config from the default location. + + This function checks if a Kubernetes config file exists at the default path + (~/.kube/config). If none is provided, it tries to load in-cluster config. + + Raises: + RuntimeError: If no valid credentials or config file is found. + """ + home_directory = os.path.expanduser("~") + + # Try to load kube config if not already loaded + try: + # First try to load from default location + if os.path.isfile(f"{home_directory}/.kube/config"): + config.load_kube_config() + # Then try in-cluster config + elif "KUBERNETES_PORT" in os.environ: + config.load_incluster_config() + else: + raise RuntimeError( + "No Kubernetes configuration found. Please ensure you have a valid " + "~/.kube/config file or are running in a Kubernetes cluster." + ) + except config.ConfigException as e: + raise RuntimeError(f"Failed to load Kubernetes configuration: {e}") + + +def get_api_client() -> client.ApiClient: + """ + Retrieve the Kubernetes API client with the default configuration. + + Returns: + client.ApiClient: The Kubernetes API client object. + """ + return client.ApiClient() + + +def remove_autogenerated_fields(resource): + """ + Recursively remove autogenerated fields from a dictionary. + + This removes Kubernetes metadata fields that are auto-generated and should + not be included when re-applying resources to a different cluster. + + Args: + resource: Dictionary or list to process (modified in-place) + """ + if isinstance(resource, dict): + for key in list(resource.keys()): + if key in [ + "creationTimestamp", + "resourceVersion", + "uid", + "selfLink", + "managedFields", + "finalizers", + "generation", + "status", + "workload.codeflare.dev/user", # AppWrapper field + "workload.codeflare.dev/userid", # AppWrapper field + "podSetInfos", # AppWrapper field + ]: + del resource[key] + else: + remove_autogenerated_fields(resource[key]) + + elif isinstance(resource, list): + for item in resource: + remove_autogenerated_fields(item) + + +def _has_tls_oauth_components(ray_cluster_yaml: dict) -> Tuple[bool, List[str]]: + """ + Check if a RayCluster has TLS/OAuth components that need migration. + + Returns: + Tuple[bool, List[str]]: (has_components, list of component descriptions) + """ + components_found = [] + + # Environment variable names to check (TLS/OAuth related) + tls_env_vars = { + "RAY_USE_TLS", + "RAY_TLS_SERVER_CERT", + "RAY_TLS_SERVER_KEY", + "RAY_TLS_CA_CERT", + } + + # Volume names to check + volumes_to_check = {"ca-vol", "proxy-tls-secret", "server-cert"} + + # Container names to check (sidecar containers) + containers_to_check = {"oauth-proxy"} + + def check_pod_spec(pod_spec, group_name): + """Check a pod spec for TLS/OAuth components.""" + # Check containers for oauth-proxy sidecar + if "containers" in pod_spec: + for container in pod_spec["containers"]: + if container.get("name") in containers_to_check: + components_found.append( + f"{group_name}: oauth-proxy sidecar container" + ) + # Check for TLS env vars + if "env" in container: + for env_var in container["env"]: + if env_var.get("name") in tls_env_vars: + components_found.append( + f"{group_name}: TLS env var {env_var.get('name')}" + ) + break # Only report once per container + + # Check for CodeFlare certificate generation initContainer + codeflare_init_containers = {"create-cert"} + if "initContainers" in pod_spec: + for init_container in pod_spec["initContainers"]: + if init_container.get("name") in codeflare_init_containers: + components_found.append( + f"{group_name}: initContainer '{init_container.get('name')}' (cert generation)" + ) + + # Check volumes + if "volumes" in pod_spec: + for volume in pod_spec["volumes"]: + if volume.get("name") in volumes_to_check: + components_found.append( + f"{group_name}: TLS volume {volume.get('name')}" + ) + + # Check headGroupSpec + if "spec" in ray_cluster_yaml and "headGroupSpec" in ray_cluster_yaml["spec"]: + head_spec = ray_cluster_yaml["spec"]["headGroupSpec"] + if "template" in head_spec and "spec" in head_spec["template"]: + check_pod_spec(head_spec["template"]["spec"], "head") + if head_spec.get("enableIngress") is True: + components_found.append("head: enableIngress=true") + + # Check workerGroupSpecs + if "spec" in ray_cluster_yaml and "workerGroupSpecs" in ray_cluster_yaml["spec"]: + for i, worker_spec in enumerate(ray_cluster_yaml["spec"]["workerGroupSpecs"]): + if "template" in worker_spec and "spec" in worker_spec["template"]: + check_pod_spec(worker_spec["template"]["spec"], f"worker[{i}]") + + return len(components_found) > 0, components_found + + +def _suspend_cluster( + api_instance, name: str, namespace: str, suspend: bool = True +) -> None: + """ + Suspend or unsuspend a RayCluster. + + Args: + api_instance: CustomObjectsApi instance + name: Name of the RayCluster + namespace: Namespace of the RayCluster + suspend: True to suspend, False to unsuspend + """ + patch_body = {"spec": {"suspend": suspend}} + api_instance.patch_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=name, + body=patch_body, + ) + + +def _wait_for_cluster_suspended( + api_instance, core_api, name: str, namespace: str, timeout_seconds: int = 120 +) -> bool: + """ + Wait for a RayCluster to be fully suspended (all pods terminated). + + Args: + api_instance: CustomObjectsApi instance + core_api: CoreV1Api instance + name: Name of the RayCluster + namespace: Namespace of the RayCluster + timeout_seconds: Maximum time to wait + + Returns: + bool: True if cluster is suspended, False if timeout + """ + poll_interval = 3 + elapsed = 0 + + while elapsed < timeout_seconds: + # Check for pods with the ray cluster label + label_selector = f"ray.io/cluster={name}" + pods = core_api.list_namespaced_pod( + namespace=namespace, label_selector=label_selector + ) + + # Count running/pending pods + active_pods = [ + p + for p in pods.items + if p.status.phase in ("Running", "Pending", "ContainerCreating") + ] + + if len(active_pods) == 0: + return True + + time.sleep(poll_interval) + elapsed += poll_interval + + return False + + +def _wait_for_cluster_ready( + api_instance, core_api, name: str, namespace: str, timeout_seconds: int = 300 +) -> bool: + """ + Wait for a RayCluster to be fully ready after migration. + + After migration, pods are recreated. This function: + 1. First waits for pods to start being recreated (head pod recreated or workers dropping) + 2. Then waits for cluster to become fully ready again + + Args: + api_instance: CustomObjectsApi instance + core_api: CoreV1Api instance + name: Name of the RayCluster + namespace: Namespace of the RayCluster + timeout_seconds: Maximum time to wait + + Returns: + bool: True if cluster is ready, False if timeout + """ + poll_interval = 5 + elapsed = 0 + + # Get the current head pod UID to detect when it's recreated + initial_head_uid = None + try: + pods = core_api.list_namespaced_pod( + namespace=namespace, + label_selector=f"ray.io/cluster={name},ray.io/node-type=head", + ) + if pods.items: + initial_head_uid = pods.items[0].metadata.uid + except Exception: + pass + + # Phase 1: Wait for pods to start being recreated + # We detect this by the head pod UID changing or workers dropping to 0 + recreation_detected = False + phase1_timeout = min(60, timeout_seconds // 2) # Max 60s for phase 1 + + print(f" Waiting for pods to be recreated...", end="\r") + + while elapsed < phase1_timeout and not recreation_detected: + try: + # Check if head pod has been recreated (different UID) + pods = core_api.list_namespaced_pod( + namespace=namespace, + label_selector=f"ray.io/cluster={name},ray.io/node-type=head", + ) + + if not pods.items: + # Head pod is gone, recreation in progress + recreation_detected = True + elif initial_head_uid and pods.items[0].metadata.uid != initial_head_uid: + # Head pod has a new UID, it was recreated + recreation_detected = True + + # Also check if workers dropped (indicates recreation) + rc = api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=name, + ) + status = rc.get("status", {}) + available_workers = status.get("availableWorkerReplicas", 0) + desired_workers = status.get("desiredWorkerReplicas", 0) + state = status.get("state", "").lower() + + if available_workers == 0 and desired_workers > 0: + recreation_detected = True + + if state != "ready": + recreation_detected = True + + except Exception: + pass + + if not recreation_detected: + time.sleep(poll_interval) + elapsed += poll_interval + + # Phase 2: Wait for cluster to become fully ready + print(f" Waiting for cluster to become ready...", end="\r") + + while elapsed < timeout_seconds: + try: + rc = api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=name, + ) + status = rc.get("status", {}) + + # Get desired and available workers + desired_workers = status.get("desiredWorkerReplicas", 0) + available_workers = status.get("availableWorkerReplicas", 0) + + # Check head status + state = status.get("state", "").lower() + head_ready = state == "ready" + + # Show progress + print( + f" Status: {state}, workers: {available_workers}/{desired_workers} ({elapsed}s) ", + end="\r", + ) + + # Cluster is ready when head is ready AND all workers are available + if head_ready and available_workers >= desired_workers: + # Clear the progress line + print(" " * 80, end="\r") + return True + + except Exception: + pass + + time.sleep(poll_interval) + elapsed += poll_interval + + # Clear the progress line + print(" " * 80, end="\r") + return False + + +def _is_cluster_migrated(ray_cluster: dict) -> Tuple[bool, str]: + """ + Check if a RayCluster has already been migrated. + + A cluster is considered migrated if: + 1. It has the secure-trusted-network annotation set to "true" + 2. It has no TLS/OAuth components + + Returns: + Tuple[bool, str]: (is_migrated, reason) + """ + # Check for secure network annotation + annotations = ray_cluster.get("metadata", {}).get("annotations", {}) + has_annotation = annotations.get(SECURE_NETWORK_ANNOTATION) == "true" + + # Check for TLS/OAuth components + has_tls_oauth, components = _has_tls_oauth_components(ray_cluster) + + if has_annotation and not has_tls_oauth: + return True, "Already migrated (has annotation, no TLS/OAuth components)" + elif has_annotation and has_tls_oauth: + return ( + False, + f"Partially migrated (has annotation but still has: {', '.join(components[:3])})", + ) + elif not has_annotation and not has_tls_oauth: + return False, "Needs annotation only (no TLS/OAuth components found)" + else: + return False, f"Needs migration (has: {', '.join(components[:3])})" + + +def _process_ray_cluster_yaml(ray_cluster_yaml: dict) -> dict: + """ + Processes a RayCluster YAML to remove TLS/OAuth-related components. + + This function removes hardcoded components that are typically added by RHOAI + for security and OAuth proxy support: + - TLS-related environment variables (RAY_USE_TLS, RAY_TLS_*) + - TLS-related volume mounts (ca-vol, server-cert workspace mounts) + - OAuth proxy sidecar container + - Certificate generation initContainers + - TLS/OAuth related volumes + - Service account configuration + + This processing is applied to both head and worker pod specs. + + Args: + ray_cluster_yaml (dict): The RayCluster YAML dictionary from Kubernetes API + + Returns: + dict: The processed YAML with TLS/OAuth components removed + """ + # Environment variable names to remove (TLS/OAuth related) + tls_env_vars = { + "RAY_USE_TLS", + "RAY_TLS_SERVER_CERT", + "RAY_TLS_SERVER_KEY", + "RAY_TLS_CA_CERT", + } + + # Volume names to remove + volumes_to_remove = {"ca-vol", "proxy-tls-secret", "server-cert"} + + # Volume mount names to remove from containers + volume_mounts_to_remove = {"ca-vol", "server-cert"} + + # Container names to remove (sidecar containers) + containers_to_remove = {"oauth-proxy"} + + def process_container_spec(container_spec): + """Process a single container spec to remove TLS/OAuth env vars and mounts.""" + if "env" in container_spec: + # Filter out TLS-related environment variables + container_spec["env"] = [ + env_var + for env_var in container_spec["env"] + if env_var.get("name") not in tls_env_vars + ] + # Remove env key if empty + if not container_spec["env"]: + del container_spec["env"] + + if "volumeMounts" in container_spec: + # Filter out TLS/OAuth related volume mounts + container_spec["volumeMounts"] = [ + mount + for mount in container_spec["volumeMounts"] + if mount.get("name") not in volume_mounts_to_remove + ] + # Remove volumeMounts key if empty + if not container_spec["volumeMounts"]: + del container_spec["volumeMounts"] + + def process_pod_spec(pod_spec): + """Process a pod spec (in template.spec) to clean up TLS/OAuth components.""" + # Process containers + if "containers" in pod_spec: + # Remove sidecar containers like oauth-proxy + pod_spec["containers"] = [ + container + for container in pod_spec["containers"] + if container.get("name") not in containers_to_remove + ] + # Process remaining containers to remove TLS env/mounts + for container in pod_spec["containers"]: + process_container_spec(container) + + # Remove only CodeFlare-specific initContainers (create-cert) + # Preserve user-provided initContainers + codeflare_init_containers = {"create-cert"} + if "initContainers" in pod_spec: + pod_spec["initContainers"] = [ + init_container + for init_container in pod_spec["initContainers"] + if init_container.get("name") not in codeflare_init_containers + ] + # Remove initContainers key if empty + if not pod_spec["initContainers"]: + del pod_spec["initContainers"] + + # Remove serviceAccountName field + if "serviceAccountName" in pod_spec: + del pod_spec["serviceAccountName"] + + # Process volumes + if "volumes" in pod_spec: + # Filter out TLS/OAuth related volumes + pod_spec["volumes"] = [ + volume + for volume in pod_spec["volumes"] + if volume.get("name") not in volumes_to_remove + ] + + # Process headGroupSpec + if "spec" in ray_cluster_yaml and "headGroupSpec" in ray_cluster_yaml["spec"]: + head_spec = ray_cluster_yaml["spec"]["headGroupSpec"] + if "template" in head_spec and "spec" in head_spec["template"]: + process_pod_spec(head_spec["template"]["spec"]) + # Disable enableIngress if it's set to true + if head_spec.get("enableIngress") is True: + head_spec["enableIngress"] = False + + # Process workerGroupSpecs + if "spec" in ray_cluster_yaml and "workerGroupSpecs" in ray_cluster_yaml["spec"]: + for worker_spec in ray_cluster_yaml["spec"]["workerGroupSpecs"]: + if "template" in worker_spec and "spec" in worker_spec["template"]: + process_pod_spec(worker_spec["template"]["spec"]) + + return ray_cluster_yaml + + +def _get_clusters( + api_instance, + core_api, + cluster_name: Optional[str] = None, + namespace: Optional[str] = None, +) -> List[dict]: + """ + Get RayClusters based on the specified scope. + + Args: + api_instance: CustomObjectsApi instance + core_api: CoreV1Api instance + cluster_name: Specific cluster name (requires namespace) + namespace: Specific namespace (optional, if None queries all namespaces) + + Returns: + List[dict]: List of RayCluster resources + """ + clusters = [] + + # Case 1: Specific cluster + if cluster_name: + if not namespace: + raise ValueError( + "Namespace must be specified when targeting a specific cluster" + ) + try: + rc = api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=cluster_name, + ) + clusters.append(rc) + except ApiException as e: + if e.status == 404: + print( + f"RayCluster '{cluster_name}' not found in namespace '{namespace}'" + ) + else: + print( + f"Error retrieving RayCluster '{cluster_name}' from namespace '{namespace}': {e}" + ) + return clusters + + # Case 2: All clusters in specific namespace or all namespaces + if namespace: + namespace_names = [namespace] + else: + try: + namespaces = core_api.list_namespace() + namespace_names = [ns.metadata.name for ns in namespaces.items] + except Exception as e: + print(f"Error listing namespaces: {e}") + return clusters + + for ns in namespace_names: + try: + rcs = api_instance.list_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + ) + clusters.extend(rcs.get("items", [])) + except Exception as e: + if namespace: # Only print error if user specified a namespace + print(f"Warning: Could not list RayClusters in namespace '{ns}': {e}") + continue + + return clusters + + +def _get_cluster_route( + api_instance, cluster_name: str, namespace: str +) -> Optional[str]: + """ + Get the HTTPRoute URL for a RayCluster. + + Searches for HTTPRoute resources labeled with the cluster name and namespace, + then extracts the Gateway hostname to construct the full dashboard URL. + + The URL format is: https://{gateway-hostname}/ray/{namespace}/{cluster-name} + + Args: + api_instance: CustomObjectsApi instance + cluster_name: Name of the RayCluster + namespace: Namespace of the RayCluster + + Returns: + Optional[str]: The route URL if found, None otherwise + """ + try: + label_selector = ( + f"ray.io/cluster-name={cluster_name},ray.io/cluster-namespace={namespace}" + ) + + httproute = None + debug = True # Set to False to disable debug output + + # Try cluster-wide search first (if permissions allow) + try: + httproutes = api_instance.list_cluster_custom_object( + group=GATEWAY_API_GROUP, + version=GATEWAY_API_VERSION, + plural="httproutes", + label_selector=label_selector, + ) + items = httproutes.get("items", []) + if debug: + print(f" [DEBUG] Cluster-wide search found {len(items)} items") + if items: + httproute = items[0] + except Exception as e: + if debug: + print(f" [DEBUG] Cluster-wide search failed: {e}") + pass # Will fall through to namespace search + + # If cluster-wide search didn't find anything, try namespace-specific search + if not httproute: + search_namespaces = [ + "redhat-ods-applications", + "opendatahub", + "default", + "ray-system", + ] + + for ns in search_namespaces: + try: + httproutes = api_instance.list_namespaced_custom_object( + group=GATEWAY_API_GROUP, + version=GATEWAY_API_VERSION, + namespace=ns, + plural="httproutes", + label_selector=label_selector, + ) + items = httproutes.get("items", []) + if debug: + print(f" [DEBUG] Search in {ns}: found {len(items)} items") + if items: + httproute = items[0] + break + except ApiException as e: + if debug: + print( + f" [DEBUG] Search in {ns} failed (ApiException): {e.reason}" + ) + continue + except Exception as e: + if debug: + print(f" [DEBUG] Search in {ns} failed: {e}") + continue + + if not httproute: + if debug: + print(f" [DEBUG] No HTTPRoute found for {cluster_name}/{namespace}") + return None + + # Extract Gateway reference from HTTPRoute + parent_refs = httproute.get("spec", {}).get("parentRefs", []) + if not parent_refs: + return None + + gateway_ref = parent_refs[0] + gateway_name = gateway_ref.get("name") + gateway_namespace = gateway_ref.get("namespace") + + if not gateway_name or not gateway_namespace: + return None + + hostname = None + + # Try to get hostname from Gateway spec.listeners[0].hostname + try: + gateway = api_instance.get_namespaced_custom_object( + group=GATEWAY_API_GROUP, + version=GATEWAY_API_VERSION, + namespace=gateway_namespace, + plural="gateways", + name=gateway_name, + ) + listeners = gateway.get("spec", {}).get("listeners", []) + if listeners: + hostname = listeners[0].get("hostname") + except Exception: + pass + + # If no hostname in Gateway spec, look up the OpenShift Route that exposes the Gateway + # On OpenShift, Gateways are typically exposed via a Route with the same name + if not hostname: + try: + route = api_instance.get_namespaced_custom_object( + group="route.openshift.io", + version="v1", + namespace=gateway_namespace, + plural="routes", + name=gateway_name, + ) + hostname = route.get("spec", {}).get("host") + if debug: + print(f" [DEBUG] Found Gateway hostname from Route: {hostname}") + except Exception as e: + if debug: + print( + f" [DEBUG] Failed to get Route for Gateway {gateway_name}: {e}" + ) + + if not hostname: + if debug: + print( + f" [DEBUG] Could not determine hostname for Gateway {gateway_name}" + ) + return None + + # Construct dashboard URL: https://{hostname}/ray/{namespace}/{cluster-name} + return f"https://{hostname}/ray/{namespace}/{cluster_name}" + + except Exception as e: + if debug: + print(f" [DEBUG] Error getting cluster route: {e}") + return None + + +def _get_cluster_routes(api_instance, migrated_clusters: List[Dict]) -> Dict[str, str]: + """ + Get HTTPRoute URLs for a list of migrated clusters. + + Args: + api_instance: CustomObjectsApi instance + migrated_clusters: List of dicts with 'name' and 'namespace' keys + + Returns: + Dict[str, str]: Mapping of "name/namespace" to route URL + """ + routes = {} + for cluster in migrated_clusters: + name = cluster["name"] + ns = cluster["namespace"] + route_url = _get_cluster_route(api_instance, name, ns) + if route_url: + routes[f"{name}/{ns}"] = route_url + return routes + + +def _check_cert_manager_installed(api_client) -> Tuple[bool, str]: + """ + Check if cert-manager is installed on the cluster. + + Checks for: + 1. cert-manager CRDs (certificates.cert-manager.io) + 2. cert-manager namespace + 3. openshift-cert-manager namespace (OpenShift operator) + + Returns: + Tuple[bool, str]: (is_installed, message) + """ + # Check for cert-manager CRD + try: + api_ext = client.ApiextensionsV1Api(api_client) + api_ext.read_custom_resource_definition("certificates.cert-manager.io") + return True, "cert-manager CRD found" + except ApiException as e: + if e.status == 404: + pass # CRD not found, continue checking + else: + pass # Other error, continue checking + except Exception: + pass # Continue checking other methods + + # Check for cert-manager namespace + try: + core_api = client.CoreV1Api(api_client) + core_api.read_namespace("cert-manager") + return True, "cert-manager namespace found" + except ApiException as e: + if e.status == 404: + pass # Namespace not found + else: + pass # Other error + except Exception: + pass + + # Check for openshift-cert-manager namespace (OpenShift operator installs here) + try: + core_api = client.CoreV1Api(api_client) + core_api.read_namespace("openshift-cert-manager") + return True, "openshift-cert-manager namespace found" + except ApiException as e: + if e.status == 404: + pass + else: + pass + except Exception: + pass + + return False, "cert-manager not detected" + + +def _check_codeflare_operator_status(api_client) -> Tuple[bool, str]: + """ + Check if the codeflare-operator is set to Removed or not present in the DataScienceCluster. + + Returns: + Tuple[bool, str]: (is_removed_or_not_present, message) + """ + try: + custom_api = client.CustomObjectsApi(api_client) + + # List DataScienceClusters + dscs = custom_api.list_cluster_custom_object( + group="datasciencecluster.opendatahub.io", + version="v1", + plural="datascienceclusters", + ) + + if not dscs.get("items"): + return True, "No DataScienceCluster found (OK to proceed)" + + # Check the first DSC (typically there's only one) + dsc = dscs["items"][0] + dsc_name = dsc.get("metadata", {}).get("name", "unknown") + + # Check if codeflare component exists in the DSC + components = dsc.get("spec", {}).get("components", {}) + + if "codeflare" not in components: + # codeflare not present in DSC - this is OK (common after RHOAI 3.x upgrade) + return True, f"codeflare not present in DSC '{dsc_name}' (OK to proceed)" + + # Get codeflare managementState from spec + codeflare_state = components.get("codeflare", {}).get("managementState", "") + + if codeflare_state.lower() == "removed": + return True, f"codeflare is Removed in DSC '{dsc_name}'" + elif codeflare_state.lower() == "unmanaged": + return True, f"codeflare is Unmanaged in DSC '{dsc_name}'" + elif codeflare_state.lower() == "managed": + return ( + False, + f"codeflare is Managed in DSC '{dsc_name}' (should be Removed)", + ) + elif not codeflare_state: + # codeflare entry exists but managementState not set - treat as OK + return ( + True, + f"codeflare present without managementState in DSC '{dsc_name}' (OK to proceed)", + ) + else: + return False, f"codeflare is '{codeflare_state}' in DSC '{dsc_name}'" + + except ApiException as e: + if e.status == 404: + return True, "DataScienceCluster CRD not found (OK to proceed)" + else: + return True, f"Could not check DataScienceCluster (skipping): {e.reason}" + except Exception as e: + return True, f"Could not check DataScienceCluster (skipping): {str(e)}" + + +def _check_permission( + api_client, verb: str, resource: str, group: str = "", all_namespaces: bool = True +) -> Tuple[bool, str]: + """ + Check if the current user has a specific permission. + + Args: + api_client: Kubernetes API client + verb: The verb to check (list, get, update, etc.) + resource: The resource type (rayclusters, namespaces, etc.) + group: The API group (ray.io, gateway.networking.k8s.io, etc.) + all_namespaces: Whether to check cluster-wide or not + + Returns: + Tuple[bool, str]: (has_permission, message) + """ + try: + auth_api = client.AuthorizationV1Api(api_client) + + # Build the resource attributes + resource_attributes = { + "verb": verb, + "resource": resource, + } + if group: + resource_attributes["group"] = group + if not all_namespaces: + resource_attributes["namespace"] = "default" + + # Create the access review + review = client.V1SelfSubjectAccessReview( + spec=client.V1SelfSubjectAccessReviewSpec( + resource_attributes=resource_attributes + ) + ) + + response = auth_api.create_self_subject_access_review(review) + + resource_name = f"{resource}.{group}" if group else resource + if response.status.allowed: + return True, f"can {verb} {resource_name}" + else: + return False, f"cannot {verb} {resource_name}" + + except Exception as e: + return False, f"error checking {verb} {resource}: {str(e)}" + + +def _run_pre_upgrade_checks(api_client) -> List[Dict[str, any]]: + """ + Run pre-upgrade checks to identify potential issues. + + Returns: + List[Dict]: List of check results with 'name', 'passed', 'message' keys + """ + checks = [] + + # Permission checks for pre-upgrade + permission_checks = [ + ("list", "namespaces", "", "List namespaces"), + ("list", "rayclusters", "ray.io", "List RayClusters"), + ("get", "rayclusters", "ray.io", "Get RayClusters"), + ("update", "rayclusters", "ray.io", "Update RayClusters"), + ] + + all_permissions_ok = True + permission_messages = [] + + for verb, resource, group, desc in permission_checks: + has_perm, msg = _check_permission(api_client, verb, resource, group) + if not has_perm: + all_permissions_ok = False + permission_messages.append(f"{desc}: {'OK' if has_perm else 'DENIED'}") + + checks.append( + { + "name": "Permissions", + "passed": all_permissions_ok, + "message": ( + "All required permissions granted" + if all_permissions_ok + else "Missing permissions" + ), + "required": True, + "help": ( + "Missing permissions: " + + ", ".join([m for m in permission_messages if "DENIED" in m]) + if not all_permissions_ok + else "" + ), + "details": permission_messages, + } + ) + + # Check cert-manager installation + cert_manager_installed, cert_manager_msg = _check_cert_manager_installed(api_client) + checks.append( + { + "name": "cert-manager", + "passed": cert_manager_installed, + "message": cert_manager_msg, + "required": True, + "help": "cert-manager is required for RHOAI 3.x. " + "Install it via OperatorHub before proceeding with the upgrade.", + } + ) + + # Check codeflare-operator status in DSC + codeflare_ok, codeflare_msg = _check_codeflare_operator_status(api_client) + checks.append( + { + "name": "codeflare-operator", + "passed": codeflare_ok, + "message": codeflare_msg, + "required": True, + "help": "Set codeflare to Removed in your DataScienceCluster before upgrading: " + "oc patch datasciencecluster --type merge -p " + '\'{"spec":{"components":{"codeflare":{"managementState":"Removed"}}}}\'', + } + ) + + return checks + + +def _post_upgrade_from_backup( + backup_path: str, + cluster_name: Optional[str] = None, + namespace: Optional[str] = None, + dry_run: bool = False, + auto_confirm: bool = False, +) -> Dict[str, int]: + """ + Migrate RayClusters using pre-cleaned backup files from pre-upgrade. + + This function reads the backup YAML files created during pre-upgrade and + replaces the existing RayClusters with the cleaned versions. + + Args: + backup_path: Path to a backup YAML file or directory containing backup files + cluster_name: Specific cluster to migrate (requires namespace) + namespace: Specific namespace to migrate + dry_run: If True, show what would be changed without making changes + auto_confirm: If True, skip confirmation prompt + + Returns: + Dict[str, int]: Counts of migrated, skipped, and failed clusters + """ + if not os.path.exists(backup_path): + raise ValueError(f"Backup path does not exist: {backup_path}") + + # Handle both file and directory paths + yaml_files = [] + if os.path.isfile(backup_path): + # Single file provided + if backup_path.endswith(".yaml") or backup_path.endswith(".yml"): + yaml_files.append(backup_path) + else: + raise ValueError( + f"Backup file must be a YAML file (.yaml or .yml): {backup_path}" + ) + elif os.path.isdir(backup_path): + # Directory provided - find all YAML files + for filename in os.listdir(backup_path): + if filename.endswith(".yaml") or filename.endswith(".yml"): + yaml_files.append(os.path.join(backup_path, filename)) + + # If no YAML files found, check if this is a parent backup directory with rhoai-3.x/ subdirectory + if not yaml_files: + migrated_subdir = os.path.join(backup_path, "rhoai-3.x") + if os.path.isdir(migrated_subdir): + print( + f"No YAML files in '{backup_path}', using '{migrated_subdir}' subdirectory..." + ) + for filename in os.listdir(migrated_subdir): + if filename.endswith(".yaml") or filename.endswith(".yml"): + yaml_files.append(os.path.join(migrated_subdir, filename)) + else: + raise ValueError(f"Backup path is not a file or directory: {backup_path}") + + if not yaml_files: + print(f"No YAML files found in: {backup_path}") + # Check if there are subdirectories that might contain backups + if os.path.isdir(backup_path): + subdirs = [ + d + for d in os.listdir(backup_path) + if os.path.isdir(os.path.join(backup_path, d)) + ] + if subdirs: + print(f"Found subdirectories: {', '.join(subdirs)}") + print( + f"Hint: Use --from-backup {backup_path}/rhoai-3.x for RHOAI 3.x migration" + ) + print( + f" or --from-backup {backup_path}/rhoai-2.x for RHOAI 2.x rollback" + ) + return {"migrated": 0, "skipped": 0, "failed": 0} + + # Load and filter backup files + clusters_to_migrate = [] + for yaml_file in yaml_files: + try: + with open(yaml_file, "r") as f: + doc = yaml.safe_load(f) + if doc is None: + continue + + kind = doc.get("kind", "") + if kind != "RayCluster": + continue + + name = doc.get("metadata", {}).get("name", "unknown") + ns = doc.get("metadata", {}).get("namespace", "default") + + # Filter by cluster_name and namespace if specified + if cluster_name and name != cluster_name: + continue + if namespace and ns != namespace: + continue + + clusters_to_migrate.append( + { + "name": name, + "namespace": ns, + "file": yaml_file, + "doc": doc, + } + ) + except Exception as e: + print(f"Warning: Could not read {yaml_file}: {e}") + + if not clusters_to_migrate: + scope_msg = "" + if cluster_name and namespace: + scope_msg = f" matching cluster '{cluster_name}' in namespace '{namespace}'" + elif namespace: + scope_msg = f" in namespace '{namespace}'" + print(f"No matching RayClusters found in backup files{scope_msg}") + return {"migrated": 0, "skipped": 0, "failed": 0} + + if dry_run: + print("=== DRY RUN MODE - No changes will be made ===\n") + + # Show scope message + scope_msg = "all namespaces" + if cluster_name and namespace: + scope_msg = f"cluster '{cluster_name}' in namespace '{namespace}'" + elif namespace: + scope_msg = f"namespace '{namespace}'" + + print( + f"Found {len(clusters_to_migrate)} RayCluster(s) in backup to migrate ({scope_msg}):\n" + ) + for item in clusters_to_migrate: + print( + f" - {item['name']} (ns: {item['namespace']}) from {os.path.basename(item['file'])}" + ) + + # Confirmation + if not dry_run and not auto_confirm: + print( + "\nWARNING: Restore from backup will DELETE and RECREATE each RayCluster." + ) + print(" - If a cluster currently exists, it will be deleted first.") + print(" - All running pods, jobs, and workloads will be terminated.") + print(" - Existing job state and logs will be lost.") + print(" - The cluster will be recreated from the backup configuration.") + response = ( + input("\nProceed with restore from backup? (yes/no): ").strip().lower() + ) + if response not in ["yes", "y"]: + print("Restore cancelled.") + return {"migrated": 0, "skipped": 0, "failed": 0} + print() + + api_instance = client.CustomObjectsApi(get_api_client()) + core_api = client.CoreV1Api(get_api_client()) + + migrated_count = 0 + failed_count = 0 + successfully_migrated = [] + + for item in clusters_to_migrate: + name = item["name"] + ns = item["namespace"] + doc = item["doc"] + + try: + if dry_run: + print(f" [DRY RUN] Would restore from backup: {name} (ns: {ns})") + print( + f" (will delete existing cluster if present, then create from backup)" + ) + migrated_count += 1 + continue + + # Check if cluster exists and delete it first + cluster_exists = False + try: + api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + name=name, + ) + cluster_exists = True + except ApiException as e: + if e.status != 404: + raise + + # Note: The odh.ray.io/secure-trusted-network annotation will be added + # automatically by the KubeRay mutating webhook + + if cluster_exists: + # Delete existing cluster first + print(f" [{name}] Deleting existing cluster...") + api_instance.delete_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + name=name, + ) + # Wait for deletion to complete + print(f" [{name}] Waiting for cluster deletion to complete...") + deletion_timeout = 120 # 2 minutes + start_time = time.time() + while time.time() - start_time < deletion_timeout: + try: + api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + name=name, + ) + # Still exists, wait + time.sleep(2) + except ApiException as e: + if e.status == 404: + # Cluster deleted + print(f" [{name}] Cluster deleted successfully") + break + raise + else: + print( + f" [{name}] Warning: Cluster deletion timed out, proceeding anyway..." + ) + + # Create cluster from backup + print(f" [{name}] Creating cluster from backup...") + # Remove resourceVersion and uid if present (not valid for create) + if "metadata" in doc: + doc["metadata"].pop("resourceVersion", None) + doc["metadata"].pop("uid", None) + api_instance.create_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + body=doc, + ) + + # Wait for cluster to become ready + print(f" [{name}] Waiting for cluster to become ready...") + cluster_ready = _wait_for_cluster_ready( + api_instance, core_api, name, ns, timeout_seconds=300 + ) + + if cluster_ready: + # Cluster is ready, get the route URL + route_url = _get_cluster_route(api_instance, name, ns) + if route_url: + print(f" [OK] Restored from backup: {name} (ns: {ns})") + print(f" Dashboard: {route_url}") + else: + print(f" [OK] Restored from backup: {name} (ns: {ns})") + print(f" Dashboard: Route not yet available (check later)") + else: + print( + f" [OK] Restored from backup: {name} (ns: {ns}) - cluster starting up" + ) + + migrated_count += 1 + successfully_migrated.append({"name": name, "namespace": ns}) + + except Exception as e: + print(f" [FAIL] {name} (ns: {ns}): {e}") + failed_count += 1 + + # Print summary + print(f"\n{'='*60}") + if dry_run: + print(f"DRY RUN Summary:") + print(f" Would restore: {migrated_count}") + else: + print(f"Restore from Backup Summary:") + print(f" Restored: {migrated_count}") + print(f" Failed: {failed_count}") + + return {"restored": migrated_count, "skipped": 0, "failed": failed_count} + + +def pre_upgrade( + output_dir: Optional[str] = None, + cluster_name: Optional[str] = None, + namespace: Optional[str] = None, +) -> List[str]: + """ + Backup RayCluster configurations before RHOAI upgrade. + + This function: + 1. Runs pre-flight checks to verify the cluster is ready for upgrade + 2. Exports RayClusters to YAML files for backup purposes + + It does NOT delete any clusters - it only creates backup files. + + This operation is idempotent - running it multiple times will simply + overwrite the backup files. + + Args: + output_dir: Directory to save backup YAML files (prompts if not provided) + cluster_name: Specific cluster to backup (requires namespace) + namespace: Specific namespace to backup (optional) + + Returns: + List[str]: List of backup file paths created + """ + # Prompt for output directory if not provided + if not output_dir: + default_dir = "./raycluster-backups" + user_input = input(f"Enter backup directory [{default_dir}]: ").strip() + output_dir = user_input if user_input else default_dir + print() + + try: + config_check() + except Exception as e: + raise RuntimeError(f"Failed to connect to Kubernetes cluster: {e}") + + api_client = get_api_client() + api_instance = client.CustomObjectsApi(api_client) + core_api = client.CoreV1Api(api_client) + + # Run pre-flight checks + print("Running pre-upgrade checks...") + print("-" * 60) + + checks = _run_pre_upgrade_checks(api_client) + all_passed = True + has_required_failures = False + + for check in checks: + status = "[OK]" if check["passed"] else "[FAIL]" + print(f" {status} {check['name']}: {check['message']}") + + # Show details if present (e.g., individual permission results) + if check.get("details") and not check["passed"]: + for detail in check["details"]: + print(f" - {detail}") + + if not check["passed"]: + all_passed = False + if check.get("required", False): + has_required_failures = True + if check.get("help"): + print(f" {check['help']}") + + print("-" * 60) + + if has_required_failures: + print("\nPre-upgrade checks failed. Please resolve the issues above before") + print("proceeding with the RHOAI upgrade.") + print() + print( + "WARNING: Proceeding with the RHOAI upgrade without resolving these issues" + ) + print( + "may result in your Ray infrastructure becoming unavailable or unrecoverable." + ) + return [] + elif all_passed: + print("All pre-upgrade checks passed.\n") + else: + print("Pre-upgrade checks completed with warnings.\n") + + # Create output directory if it doesn't exist + if not os.path.exists(output_dir): + try: + os.makedirs(output_dir) + print(f"Created backup directory: {output_dir}") + except OSError as e: + print(f"ERROR: Failed to create backup directory '{output_dir}': {e}") + print( + "Please check that you have write permissions to the parent directory." + ) + return [] + + # Get clusters based on scope + clusters = _get_clusters(api_instance, core_api, cluster_name, namespace) + + if not clusters: + print("No RayClusters found to backup") + return [] + + # Describe the scope + if cluster_name: + scope_msg = f"cluster '{cluster_name}' in namespace '{namespace}'" + elif namespace: + scope_msg = f"all clusters in namespace '{namespace}'" + else: + scope_msg = "all clusters across all namespaces" + + print(f"\nBacking up {len(clusters)} RayCluster(s) ({scope_msg})\n") + + # Create subdirectories for RHOAI 2.x and 3.x compatible backups + rhoai_2x_dir = os.path.join(output_dir, "rhoai-2.x") + rhoai_3x_dir = os.path.join(output_dir, "rhoai-3.x") + + for subdir in [rhoai_2x_dir, rhoai_3x_dir]: + if not os.path.exists(subdir): + try: + os.makedirs(subdir) + except OSError as e: + print(f"ERROR: Failed to create backup subdirectory '{subdir}': {e}") + return [] + + saved_files = [] + + for rc in clusters: + try: + name = rc.get("metadata", {}).get("name", "unknown") + ns = rc.get("metadata", {}).get("namespace", "unknown") + + # --- Save RHOAI 2.x backup (with CodeFlare components) --- + # This preserves the 2.x-compatible configuration in case of rollback + rc_2x = copy.deepcopy(rc) + remove_autogenerated_fields(rc_2x) + + rhoai_2x_filename = os.path.join( + rhoai_2x_dir, f"raycluster-{name}-{ns}.yaml" + ) + with open(rhoai_2x_filename, "w") as outfile: + yaml.dump(rc_2x, outfile, default_flow_style=False) + + # --- Save RHOAI 3.x backup (cleaned for 3.x) --- + # This is the processed version ready for post-upgrade migration + rc_3x = copy.deepcopy(rc) + remove_autogenerated_fields(rc_3x) + rc_3x = _process_ray_cluster_yaml(rc_3x) + + rhoai_3x_filename = os.path.join( + rhoai_3x_dir, f"raycluster-{name}-{ns}.yaml" + ) + with open(rhoai_3x_filename, "w") as outfile: + yaml.dump(rc_3x, outfile, default_flow_style=False) + + saved_files.append(rhoai_3x_filename) + print(f" Backed up: {name} (namespace: {ns})") + + except Exception as e: + name = rc.get("metadata", {}).get("name", "unknown") + print(f" Error backing up '{name}': {e}") + continue + + print(f"\nBackup complete: {len(saved_files)} RayCluster(s) saved to {output_dir}") + print(f"\nBackup directory structure:") + print(f" {output_dir}/") + print( + f" rhoai-2.x/ - RHOAI 2.x compatible (use if you did not proceed with the 3.x upgrade)" + ) + print( + f" rhoai-3.x/ - RHOAI 3.x compatible (use with post-upgrade --from-backup)" + ) + print() + print("WARNING: The 'rhoai-2.x/' backups contain CodeFlare-operator components.") + print( + " Use 'rhoai-2.x/' ONLY if attempting to restore RayClusters but did not proceed with the RHOAI 3.x upgrade." + ) + print(" Use 'rhoai-3.x/' for proceeding with the RHOAI 3.x upgrade.") + print("\nNext steps:") + print(" 1. Perform the RHOAI upgrade") + print(" 2. Run 'post-upgrade' to migrate the RayClusters") + return saved_files + + +def post_upgrade( + cluster_name: Optional[str] = None, + namespace: Optional[str] = None, + dry_run: bool = False, + auto_confirm: bool = False, + from_backup: Optional[str] = None, +) -> Dict[str, int]: + """ + Migrate RayClusters after RHOAI upgrade. + + This function can work in two modes: + + 1. Live migration (default): Modifies existing RayClusters in-place by removing + TLS/OAuth components and adding the secure-trusted-network annotation. + + 2. From backup (--from-backup): Applies the pre-cleaned backup file(s) from the + pre-upgrade step. Can be a single YAML file or a directory containing YAML files. + This is more reliable as the backup files are already cleaned of TLS/OAuth components. + + This operation is idempotent - clusters that are already migrated will be + skipped automatically. + + Recommended workflow: + 1. First test on a single cluster: --cluster NAME --namespace NS + 2. Then proceed with namespace: --namespace NS + 3. Finally migrate all: (no flags) + + Args: + cluster_name: Specific cluster to migrate (requires namespace) + namespace: Specific namespace to migrate (if not specified, targets all namespaces) + dry_run: If True, show what would be changed without making changes + auto_confirm: If True, skip confirmation prompt + from_backup: Directory containing backup files from pre-upgrade + + Returns: + Dict[str, int]: Counts of migrated, skipped, and failed clusters + """ + try: + config_check() + except Exception as e: + raise RuntimeError(f"Failed to connect to Kubernetes cluster: {e}") + + # If using backup files, delegate to the backup-based migration + if from_backup: + return _post_upgrade_from_backup( + backup_path=from_backup, + cluster_name=cluster_name, + namespace=namespace, + dry_run=dry_run, + auto_confirm=auto_confirm, + ) + + # Validate arguments + if cluster_name and not namespace: + raise ValueError( + "Namespace must be specified when migrating a specific cluster" + ) + + api_instance = client.CustomObjectsApi(get_api_client()) + core_api = client.CoreV1Api(get_api_client()) + + # Describe the scope + if cluster_name: + scope_msg = f"cluster '{cluster_name}' in namespace '{namespace}'" + elif namespace: + scope_msg = f"all clusters in namespace '{namespace}'" + else: + scope_msg = "all clusters across all namespaces" + + if dry_run: + print("=== DRY RUN MODE - No changes will be made ===\n") + + # Get clusters based on scope + print(f"Fetching RayClusters ({scope_msg})...") + clusters = _get_clusters(api_instance, core_api, cluster_name, namespace) + + if not clusters: + print("No RayClusters found to migrate") + return {"migrated": 0, "skipped": 0, "failed": 0} + + print(f"Found {len(clusters)} RayCluster(s)\n") + print("Analyzing clusters for migration status...") + + # Analyze clusters and categorize them + to_migrate = [] + already_migrated = [] + total = len(clusters) + + for idx, rc in enumerate(clusters, 1): + name = rc.get("metadata", {}).get("name", "unknown") + ns = rc.get("metadata", {}).get("namespace", "unknown") + + print(f" [{idx}/{total}] Checking {name} (ns: {ns})...", end=" ") + is_migrated, _ = _is_cluster_migrated(rc) + + if is_migrated: + already_migrated.append({"rc": rc, "name": name, "namespace": ns}) + print("already migrated") + else: + to_migrate.append({"rc": rc, "name": name, "namespace": ns}) + print("needs migration") + + print( + f"\nSummary: {len(to_migrate)} to migrate, {len(already_migrated)} already migrated" + ) + + if not to_migrate: + print("\nAll clusters are already migrated. Nothing to do.") + return {"migrated": 0, "skipped": len(already_migrated), "failed": 0} + + # Confirmation for non-dry-run + if not dry_run and not auto_confirm: + # Show scope warning for multi-cluster operations + if not cluster_name: + print("\n" + "=" * 60) + if namespace: + print( + f"WARNING: You are about to migrate ALL clusters in namespace '{namespace}'" + ) + else: + print( + "WARNING: You are about to migrate ALL clusters across ALL namespaces" + ) + print("=" * 60) + + print(f"\nThe following {len(to_migrate)} cluster(s) will be migrated:") + for item in to_migrate: + print(f" - {item['name']} (namespace: {item['namespace']})") + + print( + "\nIMPORTANT: Migration will cause temporary downtime for each RayCluster." + ) + print( + " - Pods will be restarted as the KubeRay operator recreates them with the new configuration." + ) + print(" - Existing job state and logs will be lost.") + print( + " - Currently running workloads/jobs will be interrupted and progress lost." + ) + + response = input("\nProceed with migration? (yes/no): ").strip().lower() + if response not in ["yes", "y"]: + print("Migration cancelled.") + return {"migrated": 0, "skipped": len(already_migrated), "failed": 0} + print() + + # Perform migration + migrated_count = 0 + failed_count = 0 + successfully_migrated = [] # Track for route lookup + + for item in to_migrate: + rc = item["rc"] + name = item["name"] + ns = item["namespace"] + + try: + if dry_run: + print(f" [DRY RUN] Would migrate: {name} (ns: {ns})") + migrated_count += 1 + continue + + # Step 1: Fetch the cluster to get latest resourceVersion + print(f" [{name}] Fetching current cluster state...") + rc = api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + name=name, + ) + + # Make a copy + rc_copy = copy.deepcopy(rc) + + # Save fields needed for replace operation + resource_version = rc_copy.get("metadata", {}).get("resourceVersion") + uid = rc_copy.get("metadata", {}).get("uid") + + # Remove auto-generated fields + remove_autogenerated_fields(rc_copy) + + # Process to remove TLS/OAuth components + rc_cleaned = _process_ray_cluster_yaml(rc_copy) + + # Restore fields needed for replace operation + if "metadata" not in rc_cleaned: + rc_cleaned["metadata"] = {} + if resource_version: + rc_cleaned["metadata"]["resourceVersion"] = resource_version + if uid: + rc_cleaned["metadata"]["uid"] = uid + + # Note: The odh.ray.io/secure-trusted-network annotation will be added + # automatically by the KubeRay mutating webhook + + # Step 2: Delete old CodeFlare ServiceAccount (uses different naming convention) + # CodeFlare uses: {cluster}-oauth-proxy-{hash} + # KubeRay uses: {cluster}-oauth-proxy-sa + print(f" [{name}] Cleaning up old CodeFlare ServiceAccounts...") + try: + # List all ServiceAccounts in the namespace + sa_list = core_api.list_namespaced_service_account(namespace=ns) + prefix = f"{name}-oauth-proxy-" + kuberay_sa_name = f"{name}-oauth-proxy-sa" + + for sa in sa_list.items: + sa_name = sa.metadata.name + # Delete ServiceAccounts that match CodeFlare pattern but not KubeRay pattern + if sa_name.startswith(prefix) and sa_name != kuberay_sa_name: + print(f" [{name}] Deleting old ServiceAccount: {sa_name}") + core_api.delete_namespaced_service_account( + name=sa_name, + namespace=ns, + ) + except Exception as sa_err: + print( + f" [{name}] Warning: Failed to cleanup old ServiceAccounts: {sa_err}" + ) + # Continue with migration even if SA cleanup fails + + # Step 3: Apply the changes using replace + # KubeRay operator will handle pod recreation and ServiceAccount creation + print(f" [{name}] Applying migration changes...") + api_instance.replace_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + name=name, + body=rc_cleaned, + ) + + # Step 4: Wait for cluster to become ready (KubeRay handles pod recreation) + print(f" [{name}] Waiting for cluster to become ready...") + cluster_ready = _wait_for_cluster_ready( + api_instance, core_api, name, ns, timeout_seconds=300 + ) + + if cluster_ready: + # Cluster is ready, get the route URL + route_url = _get_cluster_route(api_instance, name, ns) + if route_url: + print(f" [OK] Migrated: {name} (ns: {ns})") + print(f" Dashboard: {route_url}") + else: + print(f" [OK] Migrated: {name} (ns: {ns})") + print(f" Dashboard: Route not yet available (check later)") + else: + print( + f" [OK] Migrated: {name} (ns: {ns}) - cluster starting up (may take a moment)" + ) + + migrated_count += 1 + successfully_migrated.append({"name": name, "namespace": ns}) + + except Exception as e: + print(f" [FAIL] {name} (ns: {ns}): {e}") + failed_count += 1 + + # Print final summary + print(f"\n{'='*60}") + if dry_run: + print(f"DRY RUN Summary:") + print(f" Would migrate: {migrated_count}") + else: + print(f"Migration Summary:") + print(f" Migrated: {migrated_count}") + print(f" Skipped (already migrated): {len(already_migrated)}") + print(f" Failed: {failed_count}") + + return { + "migrated": migrated_count, + "skipped": len(already_migrated), + "failed": failed_count, + } + + +def list_ray_clusters( + namespace: Optional[str] = None, output_format: str = "table" +) -> List[Dict]: + """ + Lists all RayClusters with their migration status. + + This function helps you understand which clusters need migration and which + are already migrated. + + Args: + namespace: Specific namespace to query (optional, if None queries all) + output_format: Output format - "table" (default) or "yaml" + + Returns: + List[Dict]: List of RayCluster information dictionaries + """ + try: + config_check() + except Exception as e: + raise RuntimeError(f"Failed to connect to Kubernetes cluster: {e}") + + api_instance = client.CustomObjectsApi(get_api_client()) + core_api = client.CoreV1Api(get_api_client()) + + # Describe scope + scope_msg = f"namespace '{namespace}'" if namespace else "all namespaces" + print(f"Fetching RayClusters ({scope_msg})...") + + clusters = _get_clusters(api_instance, core_api, namespace=namespace) + + if not clusters: + print("No RayClusters found") + return [] + + print(f"Found {len(clusters)} RayCluster(s)") + print("Analyzing clusters...") + + clusters_info = [] + total = len(clusters) + + for idx, rc in enumerate(clusters, 1): + name = rc.get("metadata", {}).get("name", "unknown") + ns = rc.get("metadata", {}).get("namespace", "unknown") + status = rc.get("status", {}).get("state", "unknown") + + print(f" [{idx}/{total}] Analyzing {name} (ns: {ns})...", end="\r") + + # Check migration status + is_migrated, migration_reason = _is_cluster_migrated(rc) + has_tls_oauth, components = _has_tls_oauth_components(rc) + + # Extract resource information + head_resources = {} + worker_resources = {} + num_workers = 0 + + try: + head_spec = rc["spec"]["headGroupSpec"]["template"]["spec"]["containers"][ + 0 + ]["resources"] + head_resources = { + "cpu_request": head_spec.get("requests", {}).get("cpu", "N/A"), + "cpu_limit": head_spec.get("limits", {}).get("cpu", "N/A"), + "memory_request": head_spec.get("requests", {}).get("memory", "N/A"), + "memory_limit": head_spec.get("limits", {}).get("memory", "N/A"), + } + + worker_spec = rc["spec"]["workerGroupSpecs"][0] + num_workers = worker_spec.get("replicas", 0) + worker_container_resources = worker_spec["template"]["spec"]["containers"][ + 0 + ]["resources"] + worker_resources = { + "cpu_request": worker_container_resources.get("requests", {}).get( + "cpu", "N/A" + ), + "cpu_limit": worker_container_resources.get("limits", {}).get( + "cpu", "N/A" + ), + "memory_request": worker_container_resources.get("requests", {}).get( + "memory", "N/A" + ), + "memory_limit": worker_container_resources.get("limits", {}).get( + "memory", "N/A" + ), + } + except (KeyError, IndexError): + pass + + cluster_info = { + "name": name, + "namespace": ns, + "status": status, + "num_workers": num_workers, + "migrated": is_migrated, + "migration_status": migration_reason, + "tls_oauth_components": components if has_tls_oauth else [], + "head_resources": head_resources, + "worker_resources": worker_resources, + } + clusters_info.append(cluster_info) + + # Clear progress line + print(" " * 80, end="\r") + print("Analysis complete.\n") + + # Display results + if output_format == "yaml": + print(yaml.dump(clusters_info, default_flow_style=False)) + else: + print(f"RayCluster Migration Status:\n") + print( + f"{'Name':<25} {'Namespace':<18} {'Status':<12} {'Workers':<8} {'Migration Status':<30}" + ) + print("-" * 100) + + migrated_count = 0 + needs_migration_count = 0 + + for cluster in clusters_info: + migration_indicator = "[OK]" if cluster["migrated"] else "[NEEDS MIGRATION]" + if cluster["migrated"]: + migrated_count += 1 + else: + needs_migration_count += 1 + + print( + f"{cluster['name']:<25} " + f"{cluster['namespace']:<18} " + f"{cluster['status']:<12} " + f"{cluster['num_workers']:<8} " + f"{migration_indicator}" + ) + + print( + f"\nMigration Summary: {migrated_count} migrated, {needs_migration_count} need migration" + ) + + return clusters_info + + +def delete_ray_clusters( + cluster_name: Optional[str] = None, + namespace: Optional[str] = None, + all_clusters: bool = False, + dry_run: bool = False, + auto_confirm: bool = False, +) -> Dict[str, int]: + """ + Delete RayClusters from the cluster. + + This is an advanced operation for removing clusters entirely. + For normal migration workflow, use pre-upgrade and post-upgrade instead. + + Args: + cluster_name: Specific cluster to delete (requires namespace) + namespace: Namespace to delete from + all_clusters: If True, delete all clusters in scope + dry_run: If True, show what would be deleted without deleting + auto_confirm: If True, skip confirmation prompt + + Returns: + Dict[str, int]: Counts of deleted and failed clusters + """ + try: + config_check() + except Exception as e: + raise RuntimeError(f"Failed to connect to Kubernetes cluster: {e}") + + if cluster_name and not namespace: + raise ValueError("Namespace must be specified when deleting a specific cluster") + + if not cluster_name and not all_clusters: + raise ValueError( + "Either specify a cluster name or use --all to delete multiple clusters" + ) + + api_instance = client.CustomObjectsApi(get_api_client()) + core_api = client.CoreV1Api(get_api_client()) + + clusters = _get_clusters(api_instance, core_api, cluster_name, namespace) + + if not clusters: + print("No RayClusters found to delete") + return {"deleted": 0, "failed": 0} + + if dry_run: + print("=== DRY RUN MODE - No clusters will be deleted ===\n") + + print(f"Found {len(clusters)} RayCluster(s) to delete\n") + + # Confirmation + if not dry_run and not auto_confirm: + print("WARNING: This will PERMANENTLY DELETE the following RayCluster(s):") + for rc in clusters: + name = rc.get("metadata", {}).get("name", "unknown") + ns = rc.get("metadata", {}).get("namespace", "unknown") + print(f" - {name} (namespace: {ns})") + + print("\nThis operation is IRREVERSIBLE!") + response = input("\nDelete these clusters? (yes/no): ").strip().lower() + if response not in ["yes", "y"]: + print("Delete cancelled.") + return {"deleted": 0, "failed": 0} + print() + + deleted_count = 0 + failed_count = 0 + + for rc in clusters: + name = rc.get("metadata", {}).get("name", "unknown") + ns = rc.get("metadata", {}).get("namespace", "unknown") + + try: + if dry_run: + print(f" [DRY RUN] Would delete: {name} (ns: {ns})") + deleted_count += 1 + else: + api_instance.delete_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + name=name, + ) + print(f" Deleted: {name} (ns: {ns})") + deleted_count += 1 + except Exception as e: + print(f" Failed to delete {name} (ns: {ns}): {e}") + failed_count += 1 + + if dry_run: + print(f"\n[DRY RUN] Would delete: {deleted_count}, Would fail: {failed_count}") + else: + print(f"\nDeleted: {deleted_count}, Failed: {failed_count}") + + return {"deleted": deleted_count, "failed": failed_count} + + +def import_ray_clusters( + source_path: str, force: bool = False, dry_run: bool = False +) -> List[Dict]: + """ + Import RayClusters from backup YAML files. + + This is an advanced operation for restoring clusters from backups. + Use this if you need to restore clusters after the pre-upgrade backup. + + Args: + source_path: Path to YAML file or directory containing YAML files + force: Force apply in case of conflicts + dry_run: Preview what would be imported without applying + + Returns: + List[Dict]: Results for each import attempt + """ + if not os.path.exists(source_path): + raise ValueError(f"Source path does not exist: {source_path}") + + try: + config_check() + except Exception as e: + raise RuntimeError(f"Failed to connect to Kubernetes cluster: {e}") + + results = [] + files_to_process = [] + + if os.path.isfile(source_path): + if source_path.endswith(".yaml") or source_path.endswith(".yml"): + files_to_process.append(source_path) + else: + raise ValueError(f"File is not a YAML file: {source_path}") + elif os.path.isdir(source_path): + for filename in os.listdir(source_path): + if filename.endswith(".yaml") or filename.endswith(".yml"): + files_to_process.append(os.path.join(source_path, filename)) + else: + raise ValueError(f"Source path is neither file nor directory: {source_path}") + + if not files_to_process: + print("No YAML files found to import") + return results + + if dry_run: + print("=== DRY RUN MODE - No changes will be applied ===\n") + + try: + crds = DynamicClient(get_api_client()).resources + api_instance = crds.get(api_version="ray.io/v1", kind="RayCluster") + except Exception as e: + raise RuntimeError(f"Failed to initialize DynamicClient for RayCluster: {e}") + + for yaml_file in files_to_process: + try: + with open(yaml_file, "r") as f: + yaml_documents = yaml.safe_load_all(f) + + for doc in yaml_documents: + if doc is None: + continue + + try: + name = doc.get("metadata", {}).get("name", "unknown") + ns = doc.get("metadata", {}).get("namespace", "default") + kind = doc.get("kind", "") + + if kind != "RayCluster": + print(f" Skipping non-RayCluster: {name} (kind: {kind})") + continue + + # Add secure network annotation + if "metadata" not in doc: + doc["metadata"] = {} + if "annotations" not in doc["metadata"]: + doc["metadata"]["annotations"] = {} + doc["metadata"]["annotations"][ + SECURE_NETWORK_ANNOTATION + ] = "true" + + if dry_run: + message = f"[DRY RUN] Would import: {name} (ns: {ns})" + else: + api_instance.server_side_apply( + field_manager=CF_SDK_FIELD_MANAGER, + group="ray.io", + version="v1", + namespace=ns, + plural="rayclusters", + body=doc, + force_conflicts=force, + ) + message = f"Imported: {name} (ns: {ns})" + + results.append( + { + "cluster_name": name, + "namespace": ns, + "file": yaml_file, + "status": "success", + "message": message, + } + ) + print(f" {message}") + + except Exception as e: + name = doc.get("metadata", {}).get("name", "unknown") + ns = doc.get("metadata", {}).get("namespace", "default") + results.append( + { + "cluster_name": name, + "namespace": ns, + "file": yaml_file, + "status": "error", + "message": str(e), + } + ) + print(f" Error importing {name}: {e}") + + except Exception as e: + results.append( + { + "cluster_name": "unknown", + "namespace": "unknown", + "file": yaml_file, + "status": "error", + "message": str(e), + } + ) + print(f" Error reading {yaml_file}: {e}") + + success = sum(1 for r in results if r["status"] == "success") + errors = sum(1 for r in results if r["status"] == "error") + + if dry_run: + print(f"\n[DRY RUN] Would import: {success}, Would fail: {errors}") + else: + print(f"\nImported: {success}, Failed: {errors}") + + return results + + +def main(): + """Main CLI entry point.""" + parser = argparse.ArgumentParser( + description="RayCluster Migration Tool for RHOAI 2.x to RHOAI 3.x", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +RECOMMENDED MIGRATION WORKFLOW: + + 1. BEFORE UPGRADE - Backup your clusters: + + # Backup all clusters (you'll be prompted for backup directory) + %(prog)s pre-upgrade + + # Or specify the backup directory directly + %(prog)s pre-upgrade ./my-backup-dir + + # Backup a specific namespace + %(prog)s pre-upgrade --namespace my-ns + + # Backup a single cluster + %(prog)s pre-upgrade --cluster my-cluster --namespace my-ns + + 2. PERFORM THE RHOAI UPGRADE + + 3. AFTER UPGRADE - Migrate your clusters: + + # Test with a single cluster first (always use --dry-run first!) + %(prog)s post-upgrade --cluster my-cluster --namespace my-ns --dry-run + %(prog)s post-upgrade --cluster my-cluster --namespace my-ns + + # Then migrate the whole namespace + %(prog)s post-upgrade --namespace my-ns --dry-run + %(prog)s post-upgrade --namespace my-ns + + # Or migrate everything (you'll be prompted to confirm) + %(prog)s post-upgrade --dry-run + %(prog)s post-upgrade + + DISCOVERY - See what needs migration: + %(prog)s list + %(prog)s list --namespace my-ns + +All operations are idempotent and safe to run multiple times. + """, + ) + + subparsers = parser.add_subparsers(dest="command", help="Command to execute") + + # Pre-upgrade command + pre_parser = subparsers.add_parser( + "pre-upgrade", + help="Backup RayCluster configurations BEFORE the RHOAI upgrade", + description="Runs pre-flight checks and creates backup YAML files of your " + "RayCluster configurations. Run this BEFORE performing the RHOAI upgrade.", + ) + pre_parser.add_argument( + "output_dir", + nargs="?", + default=None, + help="Directory to save backup YAML files (you'll be prompted if not provided)", + ) + pre_parser.add_argument( + "--cluster", + "-c", + dest="cluster_name", + help="Backup a specific cluster (requires --namespace)", + ) + pre_parser.add_argument( + "--namespace", + "-n", + help="Backup all clusters in this namespace", + ) + + # Post-upgrade command + post_parser = subparsers.add_parser( + "post-upgrade", + help="Migrate RayClusters AFTER the RHOAI upgrade", + description="Migrates RayClusters by removing TLS/OAuth components and adding " + "the secure-trusted-network annotation. Run this AFTER the RHOAI upgrade.", + ) + post_parser.add_argument( + "--cluster", + "-c", + dest="cluster_name", + help="Migrate a specific cluster (requires --namespace)", + ) + post_parser.add_argument( + "--namespace", + "-n", + help="Migrate all clusters in this namespace (if omitted, targets all namespaces)", + ) + post_parser.add_argument( + "--dry-run", + action="store_true", + help="Preview what would be changed without making changes", + ) + post_parser.add_argument( + "--yes", + "-y", + action="store_true", + help="Skip confirmation prompt", + ) + post_parser.add_argument( + "--from-backup", + dest="from_backup", + metavar="PATH", + help="Migrate using pre-cleaned backup file or directory from pre-upgrade (alternative to live migration)", + ) + + # List command + list_parser = subparsers.add_parser( + "list", + help="List RayClusters and their migration status", + description="Shows all RayClusters and whether they need migration.", + ) + list_parser.add_argument( + "--namespace", + "-n", + help="List clusters in this namespace only", + ) + list_parser.add_argument( + "--format", + "-f", + choices=["table", "yaml"], + default="table", + help="Output format (default: table)", + ) + + # Delete command (advanced) + delete_parser = subparsers.add_parser( + "delete", + help="[Advanced] Delete RayClusters from the cluster", + description="Permanently deletes RayClusters. This is an advanced operation - " + "for normal migration, use pre-upgrade and post-upgrade instead.", + ) + delete_parser.add_argument( + "--cluster", + "-c", + dest="cluster_name", + help="Delete a specific cluster (requires --namespace)", + ) + delete_parser.add_argument( + "--namespace", + "-n", + help="Delete from this namespace", + ) + delete_parser.add_argument( + "--all", + action="store_true", + dest="all_clusters", + help="Delete all clusters in scope", + ) + delete_parser.add_argument( + "--dry-run", + action="store_true", + help="Preview what would be deleted", + ) + delete_parser.add_argument( + "--yes", + "-y", + action="store_true", + help="Skip confirmation prompt", + ) + + # Import command (advanced) + import_parser = subparsers.add_parser( + "import", + help="[Advanced] Import RayClusters from backup YAML files", + description="Restores RayClusters from backup files. This is an advanced operation - " + "for normal migration, use pre-upgrade and post-upgrade instead.", + ) + import_parser.add_argument( + "source_path", help="Path to YAML file or directory containing backups" + ) + import_parser.add_argument( + "--force", + action="store_true", + help="Force apply in case of conflicts", + ) + import_parser.add_argument( + "--dry-run", + action="store_true", + help="Preview what would be imported", + ) + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + try: + if args.command == "pre-upgrade": + pre_upgrade( + output_dir=args.output_dir, + cluster_name=args.cluster_name, + namespace=args.namespace, + ) + elif args.command == "post-upgrade": + post_upgrade( + cluster_name=args.cluster_name, + namespace=args.namespace, + dry_run=args.dry_run, + auto_confirm=args.yes, + from_backup=args.from_backup, + ) + elif args.command == "list": + list_ray_clusters(namespace=args.namespace, output_format=args.format) + elif args.command == "delete": + delete_ray_clusters( + cluster_name=args.cluster_name, + namespace=args.namespace, + all_clusters=args.all_clusters, + dry_run=args.dry_run, + auto_confirm=args.yes, + ) + elif args.command == "import": + import_ray_clusters( + source_path=args.source_path, + force=args.force, + dry_run=args.dry_run, + ) + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/scripts/migration/ray_cluster_migration_requirements.txt b/scripts/migration/ray_cluster_migration_requirements.txt new file mode 100644 index 000000000..fbaa2f909 --- /dev/null +++ b/scripts/migration/ray_cluster_migration_requirements.txt @@ -0,0 +1,6 @@ +# Requirements for ray_cluster_migration.py standalone script +# +# Install with: pip install -r ray_cluster_migration_requirements.txt + +kubernetes>=28.1.0 +PyYAML>=6.0