diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index b11a97e9..3c93220c 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -202,41 +202,76 @@ def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float = resp.text, ) - def create_distributed_job(self, token: str, project_id: str, cluster_id: str, name: str): + def create_training_job( + self, token: str, project_id: str, cluster_id: str, name: str + ) -> requests.Response: """ - Creates a distributed PyTorch job using the provided project/cluster IDs. + Creates a training job on DGX Cloud using the provided project/cluster IDs. + For multi-node jobs, creates a distributed workload. Otherwise creates a single-node training. + + Args: + token: Authentication token for DGX Cloud API + project_id: ID of the project to create the job in + cluster_id: ID of the cluster to create the job on + name: Name for the job + + Returns: + Response object from the API request """ + # Validate inputs + if not token or not project_id or not cluster_id: + raise ValueError("Token, project ID, and cluster ID are required") - url = f"{self.base_url}/workloads/distributed" - headers = self._default_headers(token=token) + if self.nodes < 1: + raise ValueError("Node count must be at least 1") - payload = { + # Common payload elements + common_payload = { "name": name, "useGivenNameAsPrefix": True, "projectId": project_id, "clusterId": cluster_id, - "spec": { - "command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh", - "image": self.container_image, + } + + # Common spec elements + common_spec = { + "command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh", + "image": self.container_image, + "compute": {"gpuDevicesRequest": self.gpus_per_node}, + "storage": {"pvc": self.pvcs}, + "environmentVariables": [ + {"name": key, "value": value} for key, value in self.env_vars.items() + ], + **self.custom_spec, + } + + # Determine endpoint and build payload based on node count + if self.nodes > 1: + url = f"{self.base_url}/workloads/distributed" + + # Add distributed-specific parameters + distributed_spec = { "distributedFramework": self.distributed_framework, "minReplicas": self.nodes, "maxReplicas": self.nodes, "numWorkers": self.nodes, - "compute": {"gpuDevicesRequest": self.gpus_per_node}, - "storage": {"pvc": self.pvcs}, - "environmentVariables": [ - {"name": key, "value": value} for key, value in self.env_vars.items() - ], - **self.custom_spec, - }, - } + } + payload = {**common_payload, "spec": {**common_spec, **distributed_spec}} + else: + url = f"{self.base_url}/workloads/trainings" + payload = {**common_payload, "spec": common_spec} + + headers = self._default_headers(token=token) response = requests.post(url, json=payload, headers=headers) + logger.debug( - "Created distributed job; response code=%s, content=%s", + "Created %s job; response code=%s, content=%s", + "distributed" if self.nodes > 1 else "training", response.status_code, response.text.strip(), ) + return response def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: @@ -262,8 +297,8 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: logger.info("Creating data movement workload") self.move_data(token, project_id, cluster_id) - logger.info("Creating distributed workload") - resp = self.create_distributed_job(token, project_id, cluster_id, name) + logger.info("Creating training workload") + resp = self.create_training_job(token, project_id, cluster_id, name) if resp.status_code not in [200, 202]: raise RuntimeError(f"Failed to create job, status_code={resp.status_code}") diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index 506242a5..4d431e3c 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -334,7 +334,65 @@ def test_move_data_failed(self, mock_status, mock_create, mock_sleep): mock_status.assert_called() @patch("requests.post") - def test_create_distributed_job(self, mock_post): + def test_create_training_job_single_node(self, mock_post): + """Test that single node jobs use the correct training endpoint and payload structure.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = '{"status": "submitted"}' + mock_post.return_value = mock_response + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + nodes=1, + gpus_per_node=8, + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "workspace", "claimName": "test-claim"}], + ) + executor.pvc_job_dir = "/workspace/nemo_run/job_dir" + executor.env_vars = {"TEST_VAR": "test_value"} + + response = executor.create_training_job( + token="test_token", + project_id="proj_id", + cluster_id="cluster_id", + name="test_job", + ) + + assert response == mock_response + + # Check if the API call is made correctly for single node + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + + # Verify single node endpoint + assert args[0] == "https://dgxapi.example.com/workloads/trainings" + + # Verify payload structure for single node job + assert kwargs["json"]["name"] == "test_job" + assert kwargs["json"]["projectId"] == "proj_id" + assert kwargs["json"]["clusterId"] == "cluster_id" + assert kwargs["json"]["spec"]["image"] == "nvcr.io/nvidia/test:latest" + assert ( + kwargs["json"]["spec"]["command"] + == "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh" + ) + assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8 + + # Verify distributed-specific fields are NOT present + assert "distributedFramework" not in kwargs["json"]["spec"] + assert "minReplicas" not in kwargs["json"]["spec"] + assert "maxReplicas" not in kwargs["json"]["spec"] + assert "numWorkers" not in kwargs["json"]["spec"] + + assert kwargs["headers"] == executor._default_headers(token="test_token") + + @patch("requests.post") + def test_create_training_job_multi_node(self, mock_post): + """Test that multi-node jobs use the correct distributed endpoint and payload structure.""" mock_response = MagicMock() mock_response.status_code = 200 mock_response.text = '{"status": "submitted"}' @@ -348,13 +406,14 @@ def test_create_distributed_job(self, mock_post): container_image="nvcr.io/nvidia/test:latest", nodes=2, gpus_per_node=8, + distributed_framework="PyTorch", pvc_nemo_run_dir="/workspace/nemo_run", pvcs=[{"path": "workspace", "claimName": "test-claim"}], ) executor.pvc_job_dir = "/workspace/nemo_run/job_dir" executor.env_vars = {"TEST_VAR": "test_value"} - response = executor.create_distributed_job( + response = executor.create_training_job( token="test_token", project_id="proj_id", cluster_id="cluster_id", @@ -363,10 +422,14 @@ def test_create_distributed_job(self, mock_post): assert response == mock_response - # Check if the API call is made correctly + # Check if the API call is made correctly for multi-node mock_post.assert_called_once() - # The URL is the first argument to post args, kwargs = mock_post.call_args + + # Verify multi-node endpoint + assert args[0] == "https://dgxapi.example.com/workloads/distributed" + + # Verify payload structure for multi-node job assert kwargs["json"]["name"] == "test_job" assert kwargs["json"]["projectId"] == "proj_id" assert kwargs["json"]["clusterId"] == "cluster_id" @@ -375,18 +438,24 @@ def test_create_distributed_job(self, mock_post): kwargs["json"]["spec"]["command"] == "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh" ) - assert kwargs["json"]["spec"]["numWorkers"] == 2 assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8 - assert kwargs["json"]["spec"]["environmentVariables"] == [ - {"name": "TEST_VAR", "value": "test_value"} - ] + + # Verify distributed-specific fields + assert kwargs["json"]["spec"]["distributedFramework"] == "PyTorch" + assert kwargs["json"]["spec"]["minReplicas"] == 2 + assert kwargs["json"]["spec"]["maxReplicas"] == 2 + assert kwargs["json"]["spec"]["numWorkers"] == 2 + assert kwargs["headers"] == executor._default_headers(token="test_token") @patch.object(DGXCloudExecutor, "get_auth_token") @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") @patch.object(DGXCloudExecutor, "move_data") - @patch.object(DGXCloudExecutor, "create_distributed_job") - def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token): + @patch.object(DGXCloudExecutor, "create_training_job") + def test_launch_single_node( + self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token + ): + """Test that launch correctly handles single-node job submission.""" mock_get_token.return_value = "test_token" mock_get_ids.return_value = ("proj_id", "cluster_id") @@ -402,7 +471,10 @@ def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, moc app_secret="test_app_secret", project_name="test_project", container_image="nvcr.io/nvidia/test:latest", + nodes=1, # Single node + gpus_per_node=8, # 8 GPUs per node pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "/workspace", "claimName": "test-claim"}], ) executor.job_dir = tmp_dir @@ -411,6 +483,12 @@ def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, moc assert job_id == "job123" assert status == "Pending" assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh")) + + # Verify launch script contents for single node + with open(os.path.join(tmp_dir, "launch_script.sh"), "r") as f: + script = f.read() + assert "python train.py" in script + mock_get_token.assert_called_once() mock_get_ids.assert_called_once_with("test_token") mock_move_data.assert_called_once_with("test_token", "proj_id", "cluster_id") @@ -418,6 +496,55 @@ def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, moc "test_token", "proj_id", "cluster_id", "test-job" ) + @patch.object(DGXCloudExecutor, "get_auth_token") + @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") + @patch.object(DGXCloudExecutor, "move_data") + @patch.object(DGXCloudExecutor, "create_training_job") + def test_launch_multi_node(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token): + """Test that launch correctly handles multi-node job submission.""" + mock_get_token.return_value = "test_token" + mock_get_ids.return_value = ("proj_id", "cluster_id") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"workloadId": "job456", "actualPhase": "Pending"} + mock_create_job.return_value = mock_response + + with tempfile.TemporaryDirectory() as tmp_dir: + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + nodes=2, # Multi-node + gpus_per_node=8, + distributed_framework="PyTorch", + pvc_nemo_run_dir="/workspace/nemo_run", + pvcs=[{"path": "/workspace", "claimName": "test-claim"}], + ) + executor.job_dir = tmp_dir + + job_id, status = executor.launch( + "test_multi_job", ["python", "-m", "torch.distributed.run", "train.py"] + ) + + assert job_id == "job456" + assert status == "Pending" + assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh")) + + # Verify launch script contents for multi-node + with open(os.path.join(tmp_dir, "launch_script.sh"), "r") as f: + script = f.read() + assert "python -m torch.distributed.run train.py" in script + + mock_get_token.assert_called_once() + mock_get_ids.assert_called_once_with("test_token") + mock_move_data.assert_called_once_with("test_token", "proj_id", "cluster_id") + mock_create_job.assert_called_once_with( + "test_token", "proj_id", "cluster_id", "test-multi-job" + ) + @patch.object(DGXCloudExecutor, "get_auth_token") def test_launch_no_token(self, mock_get_token): mock_get_token.return_value = None @@ -455,7 +582,7 @@ def test_launch_no_project_id(self, mock_get_ids, mock_get_token): @patch.object(DGXCloudExecutor, "get_auth_token") @patch.object(DGXCloudExecutor, "get_project_and_cluster_id") @patch.object(DGXCloudExecutor, "move_data") - @patch.object(DGXCloudExecutor, "create_distributed_job") + @patch.object(DGXCloudExecutor, "create_training_job") def test_launch_job_creation_failed( self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token ):