From da5e3239b228d9f74650f0cf40146853dac56ed2 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 27 May 2025 13:20:53 -0700 Subject: [PATCH 1/2] Add user scoping for k8s backend and log level support for Ray APIs Signed-off-by: Hemil Desai --- nemo_run/core/execution/kuberay.py | 20 +++-- nemo_run/core/frontend/console/api.py | 2 +- nemo_run/run/ray/cluster.py | 3 + nemo_run/run/ray/job.py | 3 + nemo_run/run/ray/kuberay.py | 108 ++++++++++++++++---------- nemo_run/run/ray/slurm.py | 34 ++++---- 6 files changed, 101 insertions(+), 69 deletions(-) diff --git a/nemo_run/core/execution/kuberay.py b/nemo_run/core/execution/kuberay.py index 9cc5b3ef..7f4dec2b 100644 --- a/nemo_run/core/execution/kuberay.py +++ b/nemo_run/core/execution/kuberay.py @@ -80,7 +80,8 @@ class KubeRayExecutor(Executor): image: str = "" # Will be set in __post_init__ if empty head_cpu: str = "1" head_memory: str = "2Gi" - ray_start_params: dict[str, Any] = field(default_factory=dict) + ray_head_start_params: dict[str, Any] = field(default_factory=dict) + ray_worker_start_params: dict[str, Any] = field(default_factory=dict) worker_groups: list[KubeRayWorkerGroup] = field(default_factory=list) labels: dict[str, Any] = field(default_factory=dict) service_type: str = "ClusterIP" @@ -122,7 +123,7 @@ def get_cluster_body(self, name: str) -> dict[str, Any]: memory_requests=self.head_memory, cpu_limits=self.head_cpu, memory_limits=self.head_memory, - ray_start_params=self.ray_start_params, + ray_start_params=self.ray_head_start_params, head_ports=self.head_ports, env_vars=self.env_vars, volumes=self.volumes, @@ -144,7 +145,7 @@ def get_cluster_body(self, name: str) -> dict[str, Any]: replicas=worker_group.replicas, min_replicas=worker_group.min_replicas or worker_group.replicas, max_replicas=worker_group.max_replicas or worker_group.replicas, - ray_start_params=self.ray_start_params, + ray_start_params=self.ray_worker_start_params, volume_mounts=worker_group.volume_mounts, volumes=worker_group.volumes, labels=worker_group.labels, @@ -459,13 +460,13 @@ def is_valid_label(name: str) -> bool: def sync_workdir_via_pod( *, - name: str, + pod_name: str, namespace: str, + user_workspace_path: str, workdir: str, core_v1_api: CoreV1Api, volumes: list[dict[str, object]], volume_mounts: list[dict[str, object]], - workspace_path: str = "/workspace", image: str = "alpine:3.19", cleanup: bool = False, cleanup_timeout: int = 5, @@ -477,9 +478,6 @@ def sync_workdir_via_pod( Requires that the *kubectl* binary is available in PATH and can access the same cluster context as the Kubernetes Python client. """ - - pod_name = f"{name}-dm" - # Pod manifest pod_body = client.V1Pod( metadata=client.V1ObjectMeta(name=pod_name, namespace=namespace), @@ -548,7 +546,7 @@ def sync_workdir_via_pod( "--", "mkdir", "-p", - workspace_path, + user_workspace_path, ] ) @@ -570,7 +568,7 @@ def sync_workdir_via_pod( f"kubectl exec -i -n {namespace} {pod_name}", "--", # Marks end-of-options for rsync – mandatory when the dest starts with "--:" f"{os.path.abspath(workdir).rstrip(os.sep)}/", - f"--:{workspace_path.rstrip('/')}/", + f"--:{user_workspace_path.rstrip('/')}/", ] ) @@ -578,9 +576,9 @@ def sync_workdir_via_pod( logger.debug("Running rsync command: %s", " ".join(rsync_cmd)) subprocess.check_call(rsync_cmd) + logger.info(f"Workdir synced to PVC at {user_workspace_path} via data-mover pod.") if cleanup: - logger.info("Workdir synced to PVC via data-mover pod. Cleaning up…") core_v1_api.delete_namespaced_pod( name=pod_name, namespace=namespace, body=client.V1DeleteOptions() ) diff --git a/nemo_run/core/frontend/console/api.py b/nemo_run/core/frontend/console/api.py index c389ae4d..1b77951e 100644 --- a/nemo_run/core/frontend/console/api.py +++ b/nemo_run/core/frontend/console/api.py @@ -37,7 +37,7 @@ def configure_logging(level: str): if _is_jupyter(): handlers = None logging.basicConfig( - level=logging.getLevelName(level.upper()), + level=level.upper(), format="%(message)s", datefmt="[%X]", handlers=handlers, diff --git a/nemo_run/run/ray/cluster.py b/nemo_run/run/ray/cluster.py index e165adea..25ec8027 100644 --- a/nemo_run/run/ray/cluster.py +++ b/nemo_run/run/ray/cluster.py @@ -18,6 +18,7 @@ from nemo_run.core.execution.base import Executor from nemo_run.core.execution.slurm import SlurmExecutor +from nemo_run.core.frontend.console.api import configure_logging from nemo_run.run.ray.slurm import SlurmRayCluster # Import guard for Kubernetes dependencies @@ -36,8 +37,10 @@ class RayCluster: name: str executor: Executor + log_level: str = "INFO" def __post_init__(self): + configure_logging(level=self.log_level) backend_map: dict[Type[Executor], Type] = { SlurmExecutor: SlurmRayCluster, } diff --git a/nemo_run/run/ray/job.py b/nemo_run/run/ray/job.py index 2abe6f6c..38d7459c 100644 --- a/nemo_run/run/ray/job.py +++ b/nemo_run/run/ray/job.py @@ -18,6 +18,7 @@ from nemo_run.core.execution.base import Executor from nemo_run.core.execution.slurm import SlurmExecutor +from nemo_run.core.frontend.console.api import configure_logging from nemo_run.run.ray.slurm import SlurmRayJob # Import guard for Kubernetes dependencies @@ -39,8 +40,10 @@ class RayJob: name: str executor: Executor pre_ray_start_commands: Optional[list[str]] = None + log_level: str = "INFO" def __post_init__(self) -> None: # noqa: D401 – simple implementation + configure_logging(level=self.log_level) backend_map: dict[Type[Executor], Type[Any]] = { SlurmExecutor: SlurmRayJob, } diff --git a/nemo_run/run/ray/kuberay.py b/nemo_run/run/ray/kuberay.py index 5f75d151..b720320b 100644 --- a/nemo_run/run/ray/kuberay.py +++ b/nemo_run/run/ray/kuberay.py @@ -14,8 +14,10 @@ # limitations under the License. # Based on https://github.com/ray-project/kuberay/blob/master/clients/python-client/python_client/kuberay_cluster_api.py +import getpass import logging import os +import re import subprocess import time from dataclasses import dataclass @@ -64,6 +66,8 @@ def __post_init__(self) -> None: # noqa: D401 – simple verb is fine # – separating CoreV1 for pods/services from CustomObjects for CRDs. self.api = client.CustomObjectsApi() self.core_v1_api = client.CoreV1Api() + self.user = get_user() + self.cluster_name = f"{self.user}-{self.name}-raycluster" def _get( self, @@ -72,7 +76,7 @@ def _get( ) -> Any: # Return the RayCluster custom object, if present. - name = name or self.name + name = name or self.cluster_name namespace = k8s_namespace or self.executor.namespace or "default" logger.debug(f"Getting Ray cluster '{name}' in namespace '{namespace}'") @@ -106,7 +110,7 @@ def status( """ namespace = self.executor.namespace or "default" - name = self.name + name = self.cluster_name logger.debug( f"Getting Ray cluster status for '{name}' in namespace '{namespace}', " @@ -137,7 +141,7 @@ def status( if resource.get("status"): status_dict = resource["status"] if display: - self._display_banner(status_dict) + self._display_banner(name, status_dict) return status_dict logger.debug(f"Ray cluster '{name}' status not set yet, waiting...") @@ -170,7 +174,7 @@ def wait_until_running( """ namespace = self.executor.namespace or "default" - name = self.name + name = self.cluster_name logger.info( f"Waiting until Ray cluster '{name}' is running in namespace '{namespace}', " @@ -244,7 +248,7 @@ def create( """Create the RayCluster CR (idempotent).""" namespace = self.executor.namespace or "default" - name = self.name + name = self.cluster_name logger.info(f"Creating Ray cluster '{name}' in namespace '{namespace}'") @@ -308,7 +312,7 @@ def delete( • *None* – cluster already absent before the call. """ namespace = self.executor.namespace or "default" - name = self.name + name = self.cluster_name logger.info(f"Deleting Ray cluster '{name}' in namespace '{namespace}'") @@ -424,7 +428,7 @@ def patch( *True* on success, *False* if the API call raised an exception. """ namespace = self.executor.namespace or "default" - name = self.name + name = self.cluster_name logger.info(f"Patching Ray cluster '{name}' in namespace '{namespace}'") try: # we patch the existing raycluster with the new config @@ -472,7 +476,7 @@ def port_forward( import threading import time - name = self.name + name = self.cluster_name executor = self.executor # Get cluster details @@ -671,14 +675,14 @@ def signal_handler(sig, frame): logger.info("Port forwarding stopped") # Helper to print banner - def _display_banner(self, status_dict: Any) -> None: + def _display_banner(self, name: str, status_dict: Any) -> None: namespace = self.executor.namespace or "default" logger.info( - f"""\n\n\033[1;34mRay cluster status (KubeRay) in namespace {namespace}:\033[0m - • \033[1mName\033[0m : {self.name} - • \033[1mState\033[0m : {status_dict.get("state", "UNKNOWN") if isinstance(status_dict, dict) else "UNKNOWN"} - • \033[1mHead svc IP\033[0m: {status_dict.get("head", {}).get("serviceIP") if isinstance(status_dict, dict) else "N/A"} - (use `kubectl get rayclusters {self.name} -n {namespace}` to inspect, `kubectl delete rayclusters {self.name} -n {namespace}` to delete)\n""" + f"""\n\nRay cluster status (KubeRay) in namespace {namespace}: + • Name : {name} + • State : {status_dict.get("state", "UNKNOWN") if isinstance(status_dict, dict) else "UNKNOWN"} + • Head svc IP: {status_dict.get("head", {}).get("serviceIP") if isinstance(status_dict, dict) else "N/A"} + (use `kubectl get rayclusters {name} -n {namespace}` to inspect, `kubectl delete rayclusters {name} -n {namespace}` to delete)\n""" ) @@ -698,10 +702,14 @@ class KubeRayJob: executor: KubeRayExecutor def __post_init__(self): + config.load_kube_config() + # Lazily create K8s API clients if not supplied self.api = client.CustomObjectsApi() self.core_v1_api = client.CoreV1Api() # Ensure backward-compat: if cluster is None we still work (stand-alone usage) + self.user = get_user() + self.job_name = f"{self.user}-{self.name}-rayjob" # ------------------------------------------------------------------ # Public helpers mirroring SlurmRayJob API for downstream symmetry. @@ -709,23 +717,25 @@ def __post_init__(self): def stop(self) -> None: """Delete the RayJob custom resource (equivalent to job cancellation).""" - logger.debug(f"Cancelling RayJob '{self.name}' in namespace '{self.executor.namespace}'") + logger.debug( + f"Cancelling RayJob '{self.job_name}' in namespace '{self.executor.namespace}'" + ) try: self.api.delete_namespaced_custom_object( group="ray.io", version="v1", plural="rayjobs", - name=self.name, + name=self.job_name, namespace=self.executor.namespace, ) - logger.debug(f"RayJob '{self.name}' cancellation requested (CR deleted)") + logger.debug(f"RayJob '{self.job_name}' cancellation requested (CR deleted)") except ApiException as e: if e.status == 404: - logger.warning(f"RayJob '{self.name}' not found – maybe already deleted") + logger.warning(f"RayJob '{self.job_name}' not found – maybe already deleted") else: - logger.error(f"Failed to cancel RayJob '{self.name}': {e}") + logger.error(f"Failed to cancel RayJob '{self.job_name}': {e}") - def logs(self, follow: bool = False, lines: int = 100) -> None: + def logs(self, follow: bool = False, lines: int = 100, timeout: int | None = None) -> None: """Stream or show logs from the RayJob submitter pod. This simply shells out to ``kubectl logs -l job-name=`` which @@ -736,7 +746,7 @@ def logs(self, follow: bool = False, lines: int = 100) -> None: "kubectl", "logs", "-l", - f"job-name={self.name}", + f"job-name={self.job_name}", "-n", self.executor.namespace, ] @@ -752,14 +762,16 @@ def logs(self, follow: bool = False, lines: int = 100) -> None: try: if follow: - subprocess.run(cmd, check=False) + subprocess.run(cmd, check=False, timeout=timeout) else: - output = subprocess.check_output(cmd, text=True) + output = subprocess.check_output(cmd, text=True, timeout=timeout) print(output) except FileNotFoundError: logger.error("kubectl not found in PATH – cannot fetch logs") except subprocess.CalledProcessError as e: logger.error(f"kubectl logs returned non-zero exit status {e.returncode}") + except subprocess.TimeoutExpired: + logger.error(f"kubectl logs timed out after {timeout} seconds") def status(self, display: bool = True) -> Dict[str, Any]: """Return current RayJob status as a lightweight dict and pretty-print it.""" @@ -769,11 +781,11 @@ def status(self, display: bool = True) -> Dict[str, Any]: group="ray.io", version="v1", plural="rayjobs", - name=self.name, + name=self.job_name, namespace=self.executor.namespace, ) except ApiException as e: - logger.error(f"Failed to fetch status for RayJob '{self.name}': {e}") + logger.error(f"Failed to fetch status for RayJob '{self.job_name}': {e}") return {"jobStatus": "ERROR", "jobDeploymentStatus": "ERROR"} status = resource.get("status", {}) if isinstance(resource, dict) else {} @@ -782,11 +794,11 @@ def status(self, display: bool = True) -> Dict[str, Any]: if display: logger.info( - f"""\n\n\033[1;34mRay Job status for KubeRay cluster in namespace {self.executor.namespace}:\033[0m - • \033[1mName\033[0m : {self.name} - • \033[1mJob status\033[0m : {job_status} - • \033[1mDeployment\033[0m : {deployment_status} - (use `kubectl logs -l job-name={self.name} -n {self.executor.namespace} -f` to view logs)\n""" + f"""\n\nRay Job status for KubeRay cluster in namespace {self.executor.namespace}: + • Name : {self.job_name} + • Job status : {job_status} + • Deployment : {deployment_status} + (use `kubectl logs -l job-name={self.job_name} -n {self.executor.namespace} -f` to view logs)\n""" ) return {"jobStatus": job_status, "jobDeploymentStatus": deployment_status} @@ -818,7 +830,7 @@ def follow_logs_until_completion( RUNNING_DEPLOY_STATUS = "Running" while True: - st = self.status(display=True) + st = self.status(display=False) if st.get("jobDeploymentStatus") == RUNNING_DEPLOY_STATUS: break @@ -899,7 +911,6 @@ def start( # ------------------------------------------------------------------ from nemo_run.core.execution.kuberay import sync_workdir_via_pod - name = self.name executor = self.executor namespace = executor.namespace @@ -913,26 +924,31 @@ def start( "exec": {"command": ["/bin/sh", "-c", k8s_pre_cmds]} } + user_workspace_path = None + if workdir: if not executor.volumes or not executor.volume_mounts: raise ValueError( "`workdir` specified but executor has no volumes/volume_mounts to mount it." ) - workspace_path = os.path.join( - executor.volume_mounts[0]["mountPath"], Path(workdir).name + user_workspace_path = os.path.join( + executor.volume_mounts[0]["mountPath"], self.user, "code", Path(workdir).name ) + # Add user-based scoping to pod name and workspace path + pod_name = f"{self.job_name}-data-mover" if not dryrun: sync_workdir_via_pod( - name=name, + pod_name=pod_name, namespace=namespace, + user_workspace_path=user_workspace_path, workdir=workdir, core_v1_api=self.core_v1_api, volumes=executor.volumes, volume_mounts=executor.volume_mounts, - workspace_path=workspace_path, ) + logger.info(f"Synced workdir {workdir} to {user_workspace_path}") # In-place patch of executor.lifecycle_kwargs with *postStart* if needed if pre_ray_start_commands: @@ -943,7 +959,7 @@ def start( # ------------------------------------------------------------------ # 2. Build RayCluster spec (via executor). # ------------------------------------------------------------------ - cluster_name = f"{name}-raycluster" + cluster_name = f"{self.job_name}-raycluster" ray_cluster_body = executor.get_cluster_body(cluster_name) ray_cluster_spec = ray_cluster_body.get("spec", {}) @@ -979,7 +995,7 @@ def _apply_workdir(pod_template: dict): "apiVersion": "ray.io/v1", "kind": "RayJob", "metadata": { - "name": name, + "name": self.job_name, "namespace": namespace, }, "spec": { @@ -1006,5 +1022,17 @@ def _apply_workdir(pod_template: dict): self.status() except ApiException as e: if e.status == 409: - raise RuntimeError(f"RayJob '{name}' already exists: {e.reason}") - raise RuntimeError(f"Error creating RayJob '{name}': {e}") + raise RuntimeError(f"RayJob '{self.job_name}' already exists: {e.reason}") + raise RuntimeError(f"Error creating RayJob '{self.job_name}': {e}") + + +def get_user(): + # Get user for scoping if not provided + try: + user = getpass.getuser() + except Exception: + # Fallback to environment variables if getpass fails + user = os.environ.get("USER") or os.environ.get("LOGNAME") or "unknown" + # Clean user name for use in pod name (k8s resource naming rules) + user = re.sub(r"[^a-z0-9\-]", "-", user.lower()) + return user diff --git a/nemo_run/run/ray/slurm.py b/nemo_run/run/ray/slurm.py index 12dd3bec..2f4d2044 100644 --- a/nemo_run/run/ray/slurm.py +++ b/nemo_run/run/ray/slurm.py @@ -361,13 +361,13 @@ def status( cluster_dir = os.path.join(self.executor.tunnel.job_dir, self.name) logs_dir = os.path.join(cluster_dir, "logs") logger.info( - f"""\n\n\033[1;34mRay cluster status (Slurm) at {self.executor.tunnel.key}:\033[0m - • \033[1mName\033[0m : {self.name} - • \033[1mJob ID\033[0m : {status_dict.get("job_id")} - • \033[1mState\033[0m : {status_dict.get("state")} - • \033[1mRay ready\033[0m : {status_dict.get("ray_ready")} - • \033[1mCluster dir\033[0m: {cluster_dir} - • \033[1mLogs dir\033[0m : {logs_dir} + f"""\n\nRay cluster status (Slurm) at {self.executor.tunnel.key}: + • Name : {self.name} + • Job ID : {status_dict.get("job_id")} + • State : {status_dict.get("state")} + • Ray ready : {status_dict.get("ray_ready")} + • Cluster dir: {cluster_dir} + • Logs dir : {logs_dir} (use `squeue -j {status_dict.get("job_id")}` to check status, `scancel {status_dict.get("job_id")}` to cancel)\n""" ) @@ -809,7 +809,7 @@ def _cleanup(self): self._ssh_process = None # Ensure it's cleared def stop_forwarding(self): - logger.debug("Stopping port forwarding") + logger.info("Stopping port forwarding") self._stop_event.set() # Create and start the forwarding thread @@ -836,7 +836,7 @@ def stop_forwarding(self): original_sigterm_handler = signal.getsignal(signal.SIGTERM) def signal_handler(sig, frame): - logger.debug(f"Received signal {sig} to stop port forwarding") + logger.info(f"Received signal {sig} to stop port forwarding") stop_event.set() # Restore original signal handlers @@ -847,7 +847,7 @@ def signal_handler(sig, frame): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - logger.debug("Port forwarding is active. Press Ctrl+C to stop...") + logger.info("Port forwarding is active. Press Ctrl+C to stop...") while not stop_event.is_set(): if not forward_thread.is_alive(): logger.error( @@ -1036,13 +1036,13 @@ def status(self, display: bool = True) -> dict[str, Any]: logs_dir = os.path.join(self.cluster_dir, "logs") if display: logger.info( - f"""\n\n\033[1;34mRay job status for Slurm cluster at {self.executor.tunnel.key}:\033[0m - • \033[1mJob ID\033[0m : \033[32m{self.job_id}\033[0m - • \033[1mState\033[0m : {status_info.get("state", "UNKNOWN")} - • \033[1mRay ready\033[0m : {status_info.get("ray_ready", False)} - • \033[1mCluster dir\033[0m : {self.cluster_dir} - • \033[1mLogs directory\033[0m : {logs_dir} - • \033[1mSBATCH script\033[0m : {sbatch_script} + f"""\n\nRay job status for Slurm cluster at {self.executor.tunnel.key}: + • Job ID : {self.job_id} + • State : {status_info.get("state", "UNKNOWN")} + • Ray ready : {status_info.get("ray_ready", False)} + • Cluster dir : {self.cluster_dir} + • Logs directory : {logs_dir} + • SBATCH script : {sbatch_script} (use `squeue -j {self.job_id}` to check status, `scancel {self.job_id}` to cancel, `tail -f {self._logs_path()}` to view logs)\n""" ) From 36dea5c07f6ec96016f25ce1fc7c3f0bc785d038 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 27 May 2025 18:34:59 -0700 Subject: [PATCH 2/2] Better logs Signed-off-by: Hemil Desai --- nemo_run/core/frontend/console/api.py | 9 ++- nemo_run/run/ray/kuberay.py | 98 ++++++++++++++++++++++++--- nemo_run/run/ray/slurm.py | 68 ++++++++++++++----- 3 files changed, 146 insertions(+), 29 deletions(-) diff --git a/nemo_run/core/frontend/console/api.py b/nemo_run/core/frontend/console/api.py index 1b77951e..3b951e1f 100644 --- a/nemo_run/core/frontend/console/api.py +++ b/nemo_run/core/frontend/console/api.py @@ -33,7 +33,14 @@ def __repr__(self): def configure_logging(level: str): - handlers = [RichHandler(console=CONSOLE)] + handlers = [ + RichHandler( + console=CONSOLE, + show_time=True, + show_level=True, + show_path=True, + ) + ] if _is_jupyter(): handlers = None logging.basicConfig( diff --git a/nemo_run/run/ray/kuberay.py b/nemo_run/run/ray/kuberay.py index b720320b..45ab1e94 100644 --- a/nemo_run/run/ray/kuberay.py +++ b/nemo_run/run/ray/kuberay.py @@ -573,7 +573,7 @@ def forward_port_daemon(): if process.stderr: stderr_output = process.stderr.read() or "" - logger.error( + logger.debug( f"Port forwarding process exited with code {process.returncode}: {stderr_output}" ) @@ -677,12 +677,42 @@ def signal_handler(sig, frame): # Helper to print banner def _display_banner(self, name: str, status_dict: Any) -> None: namespace = self.executor.namespace or "default" + + pvc_locations = ", ".join( + [vm.get("mountPath", "N/A") for vm in self.executor.volume_mounts] + ) + logger.info( - f"""\n\nRay cluster status (KubeRay) in namespace {namespace}: - • Name : {name} - • State : {status_dict.get("state", "UNKNOWN") if isinstance(status_dict, dict) else "UNKNOWN"} - • Head svc IP: {status_dict.get("head", {}).get("serviceIP") if isinstance(status_dict, dict) else "N/A"} - (use `kubectl get rayclusters {name} -n {namespace}` to inspect, `kubectl delete rayclusters {name} -n {namespace}` to delete)\n""" + f""" +Ray Cluster Status (KubeRay) +============================ + +Namespace: {namespace} +Name: {name} +State: {status_dict.get("state", "UNKNOWN") if isinstance(status_dict, dict) else "UNKNOWN"} +Head IP: {status_dict.get("head", {}).get("serviceIP") if isinstance(status_dict, dict) else "N/A"} +Persistent file paths: {pvc_locations} + +Useful Commands +--------------- + +• Inspect cluster: + kubectl get rayclusters {name} -n {namespace} + +• Delete cluster: + kubectl delete rayclusters {name} -n {namespace} + +• Exec into Ray head pod: + kubectl exec -it -n {namespace} $(kubectl get pods -n {namespace} \\ + -l ray.io/cluster={name},ray.io/node-type=head \\ + -o jsonpath='{{.items[0].metadata.name}}') -- /bin/bash + +• View Ray dashboard: + kubectl port-forward -n {namespace} service/{name}-head-svc 8265:8265 + +• List all pods: + kubectl get pods -n {namespace} -l ray.io/cluster={name} +""" ) @@ -793,12 +823,58 @@ def status(self, display: bool = True) -> Dict[str, Any]: deployment_status = status.get("jobDeploymentStatus", "UNKNOWN") if display: + # Derive related resource names + data_mover_pod = f"{self.job_name}-data-mover" + ray_cluster_name = f"{self.job_name}-raycluster" + pvc_locations = ", ".join( + [vm.get("mountPath", "N/A") for vm in self.executor.volume_mounts] + ) + + # Construct workdir paths based on standard patterns + # Note: These are estimates based on the naming conventions in the code + user_workspace_base = f"{self.executor.volume_mounts[0]['mountPath']}/{self.user}/code" + logger.info( - f"""\n\nRay Job status for KubeRay cluster in namespace {self.executor.namespace}: - • Name : {self.job_name} - • Job status : {job_status} - • Deployment : {deployment_status} - (use `kubectl logs -l job-name={self.job_name} -n {self.executor.namespace} -f` to view logs)\n""" + f""" +Ray Job Status (KubeRay) +======================== + +Namespace: {self.executor.namespace} +Job Name: {self.job_name} +Job Status: {job_status} +Deployment Status: {deployment_status} +Persistent file paths: {pvc_locations} + +Related Resources +----------------- + +• Ray Cluster: {ray_cluster_name} +• Data Mover Pod: {data_mover_pod} + (syncs local workdir to PVC) + +Workdir Locations +----------------- + +• Local code synced to: {user_workspace_base}/ +• Container workdir: {user_workspace_base}/ + +Useful Commands +--------------- + +• View logs: + kubectl logs -l job-name={self.job_name} -n {self.executor.namespace} -f + +• Exec into Ray head pod: + kubectl exec -it -n {self.executor.namespace} $(kubectl get pods -n {self.executor.namespace} \\ + -l ray.io/cluster={ray_cluster_name},ray.io/node-type=head \\ + -o jsonpath='{{.items[0].metadata.name}}') -- /bin/bash + +• Exec into data mover pod: + kubectl exec -it {data_mover_pod} -n {self.executor.namespace} -- /bin/bash + +• Check Ray cluster status: + kubectl get raycluster {ray_cluster_name} -n {self.executor.namespace} +""" ) return {"jobStatus": job_status, "jobDeploymentStatus": deployment_status} diff --git a/nemo_run/run/ray/slurm.py b/nemo_run/run/ray/slurm.py index 2f4d2044..8d49fbeb 100644 --- a/nemo_run/run/ray/slurm.py +++ b/nemo_run/run/ray/slurm.py @@ -361,14 +361,32 @@ def status( cluster_dir = os.path.join(self.executor.tunnel.job_dir, self.name) logs_dir = os.path.join(cluster_dir, "logs") logger.info( - f"""\n\nRay cluster status (Slurm) at {self.executor.tunnel.key}: - • Name : {self.name} - • Job ID : {status_dict.get("job_id")} - • State : {status_dict.get("state")} - • Ray ready : {status_dict.get("ray_ready")} - • Cluster dir: {cluster_dir} - • Logs dir : {logs_dir} - (use `squeue -j {status_dict.get("job_id")}` to check status, `scancel {status_dict.get("job_id")}` to cancel)\n""" + f""" +Ray Cluster Status (Slurm) +========================== + +Host: {self.executor.tunnel.key} +Name: {self.name} +Job ID: {status_dict.get("job_id")} +State: {status_dict.get("state")} +Ray ready: {status_dict.get("ray_ready")} +Cluster dir: {cluster_dir} +Logs dir: {logs_dir} +SBATCH script: {cluster_dir}/ray.sub + +Useful Commands +--------------- + +• Check status: + squeue -j {status_dict.get("job_id")} + +• Cancel job: + scancel {status_dict.get("job_id")} + +• View logs: + tail -f {logs_dir}/ray-*.log + +""" ) return status_dict @@ -1036,15 +1054,31 @@ def status(self, display: bool = True) -> dict[str, Any]: logs_dir = os.path.join(self.cluster_dir, "logs") if display: logger.info( - f"""\n\nRay job status for Slurm cluster at {self.executor.tunnel.key}: - • Job ID : {self.job_id} - • State : {status_info.get("state", "UNKNOWN")} - • Ray ready : {status_info.get("ray_ready", False)} - • Cluster dir : {self.cluster_dir} - • Logs directory : {logs_dir} - • SBATCH script : {sbatch_script} - (use `squeue -j {self.job_id}` to check status, `scancel {self.job_id}` to cancel, - `tail -f {self._logs_path()}` to view logs)\n""" + f""" +Ray Job Status (Slurm) +====================== + +Host: {self.executor.tunnel.key} +Job ID: {self.job_id} +State: {status_info.get("state", "UNKNOWN")} +Ray ready: {status_info.get("ray_ready", False)} +Cluster dir: {self.cluster_dir} +Logs directory: {logs_dir} +SBATCH script: {sbatch_script} + +Useful Commands (to be run on the login node of the Slurm cluster) +------------------------------------------------------------------ + +• Check status: + squeue -j {self.job_id} + +• Cancel job: + scancel {self.job_id} + +• View logs: + tail -f {self._logs_path()} + +""" ) return status_info