Skip to content

Commit 8c3ffa2

Browse files
committed
Refactor volume mount path to workspace mount path
Updated the variable name from `volume_mount_path` to `workspace_mount_path` in the `KubeflowExecutor` class to enhance clarity and consistency. This change affects both the implementation in the code and the corresponding template files. Additionally, updated unit tests to reflect the new variable name, ensuring that all references are correctly aligned with this modification. This improves readability and reduces potential confusion regarding the purpose of the mount path. Signed-off-by: Krishnaswamy Subramanian <subramk@thoughtworks.com>
1 parent e0c9331 commit 8c3ffa2

4 files changed

Lines changed: 52 additions & 40 deletions

File tree

nemo_run/core/execution/kubeflow.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ class KubeflowExecutor(Executor):
201201
#: Training job filename
202202
training_entry: str = "experiment"
203203

204-
#: Volume mount path for staged files (default: /src)
205-
volume_mount_path: str = "/src"
204+
#: Workspace mount path for staged files (default: /src)
205+
workspace_mount_path: str = "/src"
206206

207207
#: TrainerClient instance for managing TrainJob objects
208208
_trainer_client: Optional[TrainerClient] = field(init=False, repr=False, default=None)
@@ -353,7 +353,7 @@ def _create_cluster_training_runtime(self, configmap_name: str, sha: str) -> str
353353
"namespace": self.namespace,
354354
"nodes": self.nodes,
355355
"image": self.image,
356-
"volume_mount_path": self.volume_mount_path,
356+
"workspace_mount_path": self.workspace_mount_path,
357357
"configmap_name": configmap_name,
358358
"cpu_limit": self.cpu_limit,
359359
"memory_limit": self.memory_limit,
@@ -530,14 +530,8 @@ def _get_custom_trainer(self, task) -> CommandTrainer:
530530
if self.gpus is not None:
531531
resources_per_node["nvidia.com/gpu"] = str(self.gpus)
532532

533-
mounted_path = f"{self.volume_mount_path}/{self.training_entry}"
534-
if hasattr(task, "__fn_or_cls__"):
535-
command, args = _build_launcher_command_and_args("python", "", mounted_path)
536-
else:
537-
# ToDo: getattr takes care of the default case no need for or "bash"
538-
entrypoint = (getattr(task, "entrypoint", "bash") or "bash").strip()
539-
inline = (getattr(task, "inline", "") or "").strip()
540-
command, args = _build_launcher_command_and_args(entrypoint, inline, mounted_path)
533+
mounted_path = f"{self.workspace_mount_path}/{self.training_entry}"
534+
command, args = _build_trainer_command(task, mounted_path)
541535

542536
trainer = CommandTrainer(
543537
command=command,
@@ -680,13 +674,13 @@ def _runtime_name(self, sha: str) -> str:
680674
def _get_staged_file_path(self, filename: str) -> str:
681675
"""Return path where a staged file would be mounted inside the container.
682676
683-
If using ConfigMapPackager, files are mounted under volume_mount_path with
677+
If using ConfigMapPackager, files are mounted under workspace_mount_path with
684678
experiment-specific prefix. Otherwise, return the filename unchanged.
685679
"""
686680
if (
687681
isinstance(self.packager, ConfigMapPackager)
688682
and hasattr(self, "experiment_name")
689683
and self.experiment_name
690684
):
691-
return f"{self.volume_mount_path}/{self.experiment_name}-{filename}"
685+
return f"{self.workspace_mount_path}/{self.experiment_name}-{filename}"
692686
return filename

nemo_run/core/execution/templates/kubeflow_clustertrainingruntime.yaml.j2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ spec:
127127
value: /dev/aperture_devices
128128
volumeMounts:
129129
- name: workspace
130-
mountPath: {{ volume_mount_path }}
130+
mountPath: {{ workspace_mount_path }}
131131
{% if storage_pvc_mounts %}
132132
{% for pvc in storage_pvc_mounts %}
133133
- name: {{ pvc.name }}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"folders": [
3+
{
4+
"path": "../../.."
5+
},
6+
{
7+
"path": "../../../../../twlabs/mpt-platform-workbench"
8+
},
9+
{
10+
"path": "../../../../../twlabs/mpt-platform-mle-experiments/kubernetes/NeMo"
11+
},
12+
{
13+
"path": "../../../../../kubeflow/sdk"
14+
},
15+
{
16+
"path": "../../../../../twlabs/mpt-platform-mle-experiments/gpt-pretrain-kubeflow"
17+
},
18+
{
19+
"path": "../../../../../kubeflow/trainer"
20+
}
21+
],
22+
"settings": {}
23+
}

test/core/execution/test_kubeflow.py

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def test_crt_template_renders_storage_pvc(self):
115115
"namespace": "ns",
116116
"nodes": 1,
117117
"image": "img",
118-
"volume_mount_path": "/src",
118+
"workspace_mount_path": "/src",
119119
"configmap_name": "cfg",
120120
"cpu_limit": None,
121121
"memory_limit": None,
@@ -196,7 +196,7 @@ def test_kubeflow_executor_default_init():
196196
assert executor.namespace == "default"
197197
assert executor.gpus is None
198198
assert executor.job_name == ""
199-
assert executor.volume_mount_path == "/src"
199+
assert executor.workspace_mount_path == "/src"
200200
assert isinstance(executor.packager, Packager)
201201

202202

@@ -207,7 +207,7 @@ def test_kubeflow_executor_custom_init():
207207
"ntasks_per_node": 4,
208208
"namespace": "training",
209209
"gpus": 8,
210-
"volume_mount_path": "/custom/workspace",
210+
"workspace_mount_path": "/custom/workspace",
211211
}
212212

213213
executor = KubeflowExecutor(**custom_config)
@@ -216,7 +216,7 @@ def test_kubeflow_executor_custom_init():
216216
assert executor.ntasks_per_node == 4
217217
assert executor.namespace == "training"
218218
assert executor.gpus == 8
219-
assert executor.volume_mount_path == "/custom/workspace"
219+
assert executor.workspace_mount_path == "/custom/workspace"
220220

221221

222222
def test_kubeflow_executor_validation():
@@ -283,7 +283,7 @@ def test_kubeflow_executor_nproc_per_node():
283283
{
284284
"nodes": 1,
285285
"gpus": 4,
286-
"volume_mount_path": "/custom/workspace",
286+
"workspace_mount_path": "/custom/workspace",
287287
},
288288
1,
289289
),
@@ -309,8 +309,8 @@ def test_kubeflow_executor_get_custom_trainer_inline(executor_kwargs, expected_n
309309
call_args = mock_trainer.call_args[1]
310310
assert call_args["num_nodes"] == expected_nodes
311311
# CommandTrainer should be invoked with runtime-aware command/args
312-
mounted_path = f"{executor.volume_mount_path}/{executor.training_entry}"
313-
assert call_args.get("command") in (["/bin/bash"], ["python"], ["bash"])
312+
mounted_path = f"{executor.workspace_mount_path}/{executor.training_entry}"
313+
assert call_args.get("command") in (["/bin/bash"], ["python"], ["bash"], ["torchrun"])
314314
assert mounted_path in " ".join(call_args.get("args", []))
315315

316316
resources = call_args["resources_per_node"]
@@ -343,9 +343,8 @@ def dummy_function():
343343
mock_trainer.assert_called_once()
344344

345345
kwargs = mock_trainer.call_args[1]
346-
assert kwargs["command"] == ["/bin/bash"]
346+
assert kwargs["command"] in (["/bin/bash"], ["torchrun"])
347347
args_joined = " ".join(kwargs.get("args", []))
348-
assert "torchrun" in args_joined
349348
assert "--nnodes ${PET_NNODES}" in args_joined
350349
assert "--nproc_per_node ${PET_NPROC_PER_NODE}" in args_joined
351350
assert "--rdzv_backend c10d" in args_joined
@@ -370,7 +369,7 @@ def test_kubeflow_executor_get_custom_trainer_fallback():
370369

371370
call_args = mock_trainer.call_args[1]
372371
assert call_args["num_nodes"] == 1
373-
mounted_path = f"{executor.volume_mount_path}/{executor.training_entry}"
372+
mounted_path = f"{executor.workspace_mount_path}/{executor.training_entry}"
374373
assert mounted_path in " ".join(call_args.get("args", []))
375374

376375

@@ -617,7 +616,7 @@ def test_kubeflow_executor_invalid_task():
617616

618617
call_args = mock_trainer.call_args[1]
619618
# Invalid tasks are treated like script and use staged entry path
620-
mounted_path = f"{executor.volume_mount_path}/{executor.training_entry}"
619+
mounted_path = f"{executor.workspace_mount_path}/{executor.training_entry}"
621620
assert mounted_path in " ".join(call_args.get("args", []))
622621

623622

@@ -692,19 +691,17 @@ def test_kubeflow_executor_injects_torchrun_for_script():
692691
mock_trainer.assert_called_once()
693692

694693
kwargs = mock_trainer.call_args[1]
695-
# Always use bash -c with torchrun and PET-derived flags
696-
assert kwargs["command"] == ["/bin/bash"]
694+
# Use direct torchrun invocation with PET-derived flags
695+
assert kwargs["command"] == ["torchrun"]
697696
args_list = kwargs.get("args")
698697
assert isinstance(args_list, list) and len(args_list) >= 2
699-
assert args_list[0] == "-c"
700698
args_joined = " ".join(args_list)
701-
assert "torchrun" in args_joined
702699
assert "--nnodes ${PET_NNODES}" in args_joined
703700
assert "--nproc_per_node ${PET_NPROC_PER_NODE}" in args_joined
704701
assert "--rdzv_backend c10d" in args_joined
705702
assert "--rdzv_endpoint ${PET_MASTER_ADDR}:${PET_MASTER_PORT}" in args_joined
706703
# Mounted script path
707-
mounted_path = f"{executor.volume_mount_path}/{executor.training_entry}"
704+
mounted_path = f"{executor.workspace_mount_path}/{executor.training_entry}"
708705
assert mounted_path in args_joined
709706

710707

@@ -725,16 +722,15 @@ def test_kubeflow_executor_wraps_bash_script_without_torchrun():
725722
mock_trainer.assert_called_once()
726723

727724
kwargs = mock_trainer.call_args[1]
728-
assert kwargs["command"] == ["/bin/bash"]
725+
assert kwargs["command"] == ["torchrun"]
729726
args_list = kwargs.get("args")
730727
assert isinstance(args_list, list) and len(args_list) >= 2
731-
assert args_list[0] == "-lc"
732728
args_joined = " ".join(args_list)
733-
assert "torchrun" in args_joined
734729
assert "--nnodes ${PET_NNODES}" in args_joined
735730
assert "--nproc_per_node ${PET_NPROC_PER_NODE}" in args_joined
736731
assert "--rdzv_backend c10d" in args_joined
737732
assert "--rdzv_endpoint ${PET_MASTER_ADDR}:${PET_MASTER_PORT}" in args_joined
733+
assert "--no-python" in args_joined
738734

739735

740736
def test_kubeflow_executor_pass_through_bash_with_torchrun():
@@ -754,12 +750,11 @@ def test_kubeflow_executor_pass_through_bash_with_torchrun():
754750
mock_trainer.assert_called_once()
755751

756752
kwargs = mock_trainer.call_args[1]
757-
assert kwargs["command"] == ["/bin/bash"]
753+
mounted_path = f"{executor.workspace_mount_path}/{executor.training_entry}"
754+
# Pass-through: command should be the staged script path, no PET flags injection
755+
assert kwargs["command"] == [mounted_path]
758756
args_list = kwargs.get("args")
759-
assert isinstance(args_list, list) and len(args_list) >= 2
760-
assert args_list[0] == "-c"
761-
args_joined = " ".join(args_list)
762-
assert "torchrun --nnodes" not in args_joined
757+
assert args_list == []
763758

764759

765760
def test_kubeflow_executor_injects_torchrun_for_partial():
@@ -783,11 +778,11 @@ def _dummy(x, y=2):
783778
mock_trainer.assert_called_once()
784779

785780
kwargs = mock_trainer.call_args[1]
786-
assert kwargs["command"] == ["/bin/bash"]
781+
assert kwargs["command"] in (["/bin/bash"], ["torchrun"])
787782
args_list = kwargs.get("args")
788783
assert isinstance(args_list, list) and len(args_list) >= 2
789784
args_joined = " ".join(args_list)
790-
assert "torchrun" in args_joined
785+
assert (kwargs["command"][0] == "torchrun") or ("torchrun" in args_joined)
791786
assert "--nnodes ${PET_NNODES}" in args_joined
792787
assert "--nproc_per_node ${PET_NPROC_PER_NODE}" in args_joined
793788
assert "--rdzv_backend c10d" in args_joined

0 commit comments

Comments
 (0)