Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions nemo_run/core/execution/kuberay.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class KubeRayExecutor(Executor):
image: str = "" # Will be set in __post_init__ if empty
head_cpu: str = "1"
head_memory: str = "2Gi"
ray_start_params: dict[str, Any] = field(default_factory=dict)
ray_head_start_params: dict[str, Any] = field(default_factory=dict)
ray_worker_start_params: dict[str, Any] = field(default_factory=dict)
worker_groups: list[KubeRayWorkerGroup] = field(default_factory=list)
labels: dict[str, Any] = field(default_factory=dict)
service_type: str = "ClusterIP"
Expand Down Expand Up @@ -122,7 +123,7 @@ def get_cluster_body(self, name: str) -> dict[str, Any]:
memory_requests=self.head_memory,
cpu_limits=self.head_cpu,
memory_limits=self.head_memory,
ray_start_params=self.ray_start_params,
ray_start_params=self.ray_head_start_params,
head_ports=self.head_ports,
env_vars=self.env_vars,
volumes=self.volumes,
Expand All @@ -144,7 +145,7 @@ def get_cluster_body(self, name: str) -> dict[str, Any]:
replicas=worker_group.replicas,
min_replicas=worker_group.min_replicas or worker_group.replicas,
max_replicas=worker_group.max_replicas or worker_group.replicas,
ray_start_params=self.ray_start_params,
ray_start_params=self.ray_worker_start_params,
volume_mounts=worker_group.volume_mounts,
volumes=worker_group.volumes,
labels=worker_group.labels,
Expand Down Expand Up @@ -459,13 +460,13 @@ def is_valid_label(name: str) -> bool:

def sync_workdir_via_pod(
*,
name: str,
pod_name: str,
namespace: str,
user_workspace_path: str,
workdir: str,
core_v1_api: CoreV1Api,
volumes: list[dict[str, object]],
volume_mounts: list[dict[str, object]],
workspace_path: str = "/workspace",
image: str = "alpine:3.19",
cleanup: bool = False,
cleanup_timeout: int = 5,
Expand All @@ -477,9 +478,6 @@ def sync_workdir_via_pod(
Requires that the *kubectl* binary is available in PATH and can access
the same cluster context as the Kubernetes Python client.
"""

pod_name = f"{name}-dm"

# Pod manifest
pod_body = client.V1Pod(
metadata=client.V1ObjectMeta(name=pod_name, namespace=namespace),
Expand Down Expand Up @@ -548,7 +546,7 @@ def sync_workdir_via_pod(
"--",
"mkdir",
"-p",
workspace_path,
user_workspace_path,
]
)

Expand All @@ -570,17 +568,17 @@ def sync_workdir_via_pod(
f"kubectl exec -i -n {namespace} {pod_name}",
"--", # Marks end-of-options for rsync – mandatory when the dest starts with "--:"
f"{os.path.abspath(workdir).rstrip(os.sep)}/",
f"--:{workspace_path.rstrip('/')}/",
f"--:{user_workspace_path.rstrip('/')}/",
]
)

# Emit the full command for easier troubleshooting
logger.debug("Running rsync command: %s", " ".join(rsync_cmd))

subprocess.check_call(rsync_cmd)
logger.info(f"Workdir synced to PVC at {user_workspace_path} via data-mover pod.")

if cleanup:
logger.info("Workdir synced to PVC via data-mover pod. Cleaning up…")
core_v1_api.delete_namespaced_pod(
name=pod_name, namespace=namespace, body=client.V1DeleteOptions()
)
Expand Down
11 changes: 9 additions & 2 deletions nemo_run/core/frontend/console/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ def __repr__(self):


def configure_logging(level: str):
handlers = [RichHandler(console=CONSOLE)]
handlers = [
RichHandler(
console=CONSOLE,
show_time=True,
show_level=True,
show_path=True,
)
]
if _is_jupyter():
handlers = None
logging.basicConfig(
level=logging.getLevelName(level.upper()),
level=level.upper(),
format="%(message)s",
datefmt="[%X]",
handlers=handlers,
Expand Down
3 changes: 3 additions & 0 deletions nemo_run/run/ray/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from nemo_run.core.execution.base import Executor
from nemo_run.core.execution.slurm import SlurmExecutor
from nemo_run.core.frontend.console.api import configure_logging
from nemo_run.run.ray.slurm import SlurmRayCluster

# Import guard for Kubernetes dependencies
Expand All @@ -36,8 +37,10 @@
class RayCluster:
name: str
executor: Executor
log_level: str = "INFO"

def __post_init__(self):
configure_logging(level=self.log_level)
backend_map: dict[Type[Executor], Type] = {
SlurmExecutor: SlurmRayCluster,
}
Expand Down
3 changes: 3 additions & 0 deletions nemo_run/run/ray/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from nemo_run.core.execution.base import Executor
from nemo_run.core.execution.slurm import SlurmExecutor
from nemo_run.core.frontend.console.api import configure_logging
from nemo_run.run.ray.slurm import SlurmRayJob

# Import guard for Kubernetes dependencies
Expand All @@ -39,8 +40,10 @@ class RayJob:
name: str
executor: Executor
pre_ray_start_commands: Optional[list[str]] = None
log_level: str = "INFO"

def __post_init__(self) -> None: # noqa: D401 – simple implementation
configure_logging(level=self.log_level)
backend_map: dict[Type[Executor], Type[Any]] = {
SlurmExecutor: SlurmRayJob,
}
Expand Down
Loading
Loading