@@ -1149,6 +1149,167 @@ def test_kuberay_worker_group_post_init_with_custom_replicas(self):
11491149 assert worker_group .max_replicas == 5
11501150
11511151
1152+ class TestKubeRayExecutorWorkdir :
1153+ """Unit tests for KubeRayExecutor workdir fields and code_dir property."""
1154+
1155+ _WORKDIR_VOLUME = {
1156+ "name" : "work-vol" ,
1157+ "persistentVolumeClaim" : {"claimName" : "my-pvc" },
1158+ }
1159+ _WORKDIR_MOUNT = {"name" : "work-vol" , "mountPath" : "/workspace" , "subPath" : "team-a" }
1160+
1161+ def test_code_dir_appends_workdir_subdir (self ):
1162+ """code_dir is mountPath/workdir_subdir when subdir is set."""
1163+ with patch ("nemo_run.core.execution.kuberay.getpass.getuser" , return_value = "testuser" ):
1164+ e = KubeRayExecutor (
1165+ volumes = [dict (self ._WORKDIR_VOLUME )],
1166+ workdir_volume_mount = dict (self ._WORKDIR_MOUNT ),
1167+ )
1168+ assert e .code_dir == "/workspace/testuser"
1169+
1170+ def test_code_dir_returns_mount_path_when_subdir_is_none (self ):
1171+ """code_dir is exactly mountPath when workdir_subdir is None."""
1172+ e = KubeRayExecutor (
1173+ volumes = [dict (self ._WORKDIR_VOLUME )],
1174+ workdir_volume_mount = dict (self ._WORKDIR_MOUNT ),
1175+ workdir_subdir = None ,
1176+ )
1177+ assert e .code_dir == "/workspace"
1178+
1179+ def test_code_dir_returns_mount_path_when_subdir_is_empty (self ):
1180+ """code_dir is exactly mountPath when workdir_subdir is ''."""
1181+ e = KubeRayExecutor (
1182+ volumes = [dict (self ._WORKDIR_VOLUME )],
1183+ workdir_volume_mount = dict (self ._WORKDIR_MOUNT ),
1184+ workdir_subdir = "" ,
1185+ )
1186+ assert e .code_dir == "/workspace"
1187+
1188+ def test_code_dir_raises_without_workdir_volume_mount (self ):
1189+ """code_dir raises ValueError when workdir_volume_mount is not configured."""
1190+ e = KubeRayExecutor ()
1191+ with pytest .raises (ValueError , match = "workdir_volume_mount is not set" ):
1192+ _ = e .code_dir
1193+
1194+ def test_get_volume_spec_copy_by_name_success (self ):
1195+ """_get_volume_spec_copy_by_name returns a deep copy of the named volume."""
1196+ e = KubeRayExecutor (volumes = [dict (self ._WORKDIR_VOLUME )])
1197+ spec = e ._get_volume_spec_copy_by_name ("work-vol" )
1198+ assert spec == self ._WORKDIR_VOLUME
1199+ # Mutating the copy must not affect the original
1200+ spec ["extra" ] = "modified"
1201+ assert "extra" not in e .volumes [0 ]
1202+
1203+ def test_get_volume_spec_copy_by_name_missing_raises (self ):
1204+ """_get_volume_spec_copy_by_name raises ValueError for unknown volume name."""
1205+ e = KubeRayExecutor (volumes = [dict (self ._WORKDIR_VOLUME )])
1206+ with pytest .raises (ValueError , match = "nonexistent" ):
1207+ e ._get_volume_spec_copy_by_name ("nonexistent" )
1208+
1209+
1210+ class TestKubeRayJobWorkdirPaths :
1211+ """Tests that KubeRayJob.start() computes sync and container paths correctly."""
1212+
1213+ @pytest .fixture
1214+ def mock_k8s_clients (self ):
1215+ with patch ("nemo_run.run.ray.kuberay.config.load_kube_config" ):
1216+ with patch ("nemo_run.run.ray.kuberay.client.CustomObjectsApi" ) as mock_api :
1217+ with patch ("nemo_run.run.ray.kuberay.client.CoreV1Api" ) as mock_core_api :
1218+ yield mock_api .return_value , mock_core_api .return_value
1219+
1220+ @pytest .fixture
1221+ def workdir_executor (self ):
1222+ return KubeRayExecutor (
1223+ namespace = "test-namespace" ,
1224+ volumes = [
1225+ {"name" : "workspace" , "persistentVolumeClaim" : {"claimName" : "workspace-pvc" }}
1226+ ],
1227+ volume_mounts = [{"name" : "workspace" , "mountPath" : "/workspace" }],
1228+ workdir_volume_mount = {"name" : "workspace" , "mountPath" : "/workspace" },
1229+ workdir_subdir = "testuser" ,
1230+ )
1231+
1232+ @pytest .fixture
1233+ def job (self , workdir_executor , mock_k8s_clients ):
1234+ with patch ("nemo_run.run.ray.kuberay.get_user" , return_value = "testuser" ):
1235+ return KubeRayJob (name = "test-job" , executor = workdir_executor )
1236+
1237+ def _run_start_with_workdir (self , job , mock_core_api , workdir = "/local/src" ):
1238+ mock_core_api .create_namespaced_pod .return_value = None
1239+ with patch ("nemo_run.core.execution.kuberay.watch.Watch" ) as mock_watch_cls :
1240+ mock_watch_cls .return_value .stream .return_value = [
1241+ {"object" : Mock (status = Mock (phase = "Running" ))}
1242+ ]
1243+ with patch ("nemo_run.core.execution.kuberay.subprocess.check_call" ):
1244+ with patch ("nemo_run.run.ray.kuberay.client.CustomObjectsApi" ):
1245+ job .start (command = "python train.py" , workdir = workdir )
1246+
1247+ def test_data_mover_mounts_only_workdir_volume (self , job , mock_k8s_clients ):
1248+ """Data-mover pod is created with only the workdir volume/mount, not all volumes."""
1249+ _ , mock_core_api = mock_k8s_clients
1250+ self ._run_start_with_workdir (job , mock_core_api )
1251+
1252+ pod_body = mock_core_api .create_namespaced_pod .call_args [1 ]["body" ]
1253+ mover_volumes = pod_body .spec .volumes
1254+ mover_mounts = pod_body .spec .containers [0 ].volume_mounts
1255+ assert len (mover_volumes ) == 1
1256+ assert len (mover_mounts ) == 1
1257+
1258+ def test_sync_destination_is_code_dir_plus_workdir_name (self , job , mock_k8s_clients ):
1259+ """user_workspace_path passed to sync is executor.code_dir + workdir basename."""
1260+ _ , mock_core_api = mock_k8s_clients
1261+ mock_core_api .create_namespaced_pod .return_value = None
1262+
1263+ with patch ("nemo_run.core.execution.kuberay.watch.Watch" ) as mock_watch_cls :
1264+ mock_watch_cls .return_value .stream .return_value = [
1265+ {"object" : Mock (status = Mock (phase = "Running" ))}
1266+ ]
1267+ with patch ("nemo_run.core.execution.kuberay.subprocess.check_call" ) as mock_check :
1268+ job .start (command = "python train.py" , workdir = "/local/src" )
1269+
1270+ # First check_call is mkdir -p <user_workspace_path>
1271+ mkdir_args = mock_check .call_args_list [0 ][0 ][0 ]
1272+ assert mkdir_args [- 1 ] == "/workspace/testuser/src"
1273+
1274+ def test_container_workdir_matches_sync_path (self , job , mock_k8s_clients ):
1275+ """workingDir set on containers equals the sync destination path."""
1276+ mock_api , mock_core_api = mock_k8s_clients
1277+ self ._run_start_with_workdir (job , mock_core_api )
1278+
1279+ body = mock_api .create_namespaced_custom_object .call_args .kwargs ["body" ]
1280+ head_containers = body ["spec" ]["rayClusterSpec" ]["headGroupSpec" ]["template" ]["spec" ][
1281+ "containers"
1282+ ]
1283+ expected_workdir = "/workspace/testuser/src"
1284+ assert all (c .get ("workingDir" ) == expected_workdir for c in head_containers )
1285+
1286+ def test_workdir_subdir_none_uses_mountpath_directly (self , mock_k8s_clients ):
1287+ """When workdir_subdir is None the sync path is mountPath/workdir_name (no user prefix)."""
1288+ _ , mock_core_api = mock_k8s_clients
1289+ executor = KubeRayExecutor (
1290+ namespace = "test-namespace" ,
1291+ volumes = [
1292+ {"name" : "workspace" , "persistentVolumeClaim" : {"claimName" : "workspace-pvc" }}
1293+ ],
1294+ volume_mounts = [{"name" : "workspace" , "mountPath" : "/workspace" }],
1295+ workdir_volume_mount = {"name" : "workspace" , "mountPath" : "/workspace" },
1296+ workdir_subdir = None ,
1297+ )
1298+ with patch ("nemo_run.run.ray.kuberay.get_user" , return_value = "testuser" ):
1299+ job = KubeRayJob (name = "test-job" , executor = executor )
1300+
1301+ mock_core_api .create_namespaced_pod .return_value = None
1302+ with patch ("nemo_run.core.execution.kuberay.watch.Watch" ) as mock_watch_cls :
1303+ mock_watch_cls .return_value .stream .return_value = [
1304+ {"object" : Mock (status = Mock (phase = "Running" ))}
1305+ ]
1306+ with patch ("nemo_run.core.execution.kuberay.subprocess.check_call" ) as mock_check :
1307+ job .start (command = "python train.py" , workdir = "/local/src" )
1308+
1309+ mkdir_args = mock_check .call_args_list [0 ][0 ][0 ]
1310+ assert mkdir_args [- 1 ] == "/workspace/src"
1311+
1312+
11521313class TestSyncWorkdirViaPod :
11531314 """Test sync_workdir_via_pod function and related error paths."""
11541315
0 commit comments