Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 54 additions & 19 deletions nemo_run/core/execution/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,41 +202,76 @@ def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float =
resp.text,
)

def create_distributed_job(self, token: str, project_id: str, cluster_id: str, name: str):
def create_training_job(
self, token: str, project_id: str, cluster_id: str, name: str
) -> requests.Response:
"""
Creates a distributed PyTorch job using the provided project/cluster IDs.
Creates a training job on DGX Cloud using the provided project/cluster IDs.
For multi-node jobs, creates a distributed workload. Otherwise creates a single-node training.

Args:
token: Authentication token for DGX Cloud API
project_id: ID of the project to create the job in
cluster_id: ID of the cluster to create the job on
name: Name for the job

Returns:
Response object from the API request
"""
# Validate inputs
if not token or not project_id or not cluster_id:
raise ValueError("Token, project ID, and cluster ID are required")

url = f"{self.base_url}/workloads/distributed"
headers = self._default_headers(token=token)
if self.nodes < 1:
raise ValueError("Node count must be at least 1")

payload = {
# Common payload elements
common_payload = {
"name": name,
"useGivenNameAsPrefix": True,
"projectId": project_id,
"clusterId": cluster_id,
"spec": {
"command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh",
"image": self.container_image,
}

# Common spec elements
common_spec = {
"command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh",
"image": self.container_image,
"compute": {"gpuDevicesRequest": self.gpus_per_node},
"storage": {"pvc": self.pvcs},
"environmentVariables": [
{"name": key, "value": value} for key, value in self.env_vars.items()
],
**self.custom_spec,
}

# Determine endpoint and build payload based on node count
if self.nodes > 1:
url = f"{self.base_url}/workloads/distributed"

# Add distributed-specific parameters
distributed_spec = {
"distributedFramework": self.distributed_framework,
"minReplicas": self.nodes,
"maxReplicas": self.nodes,
"numWorkers": self.nodes,
"compute": {"gpuDevicesRequest": self.gpus_per_node},
"storage": {"pvc": self.pvcs},
"environmentVariables": [
{"name": key, "value": value} for key, value in self.env_vars.items()
],
**self.custom_spec,
},
}
}

payload = {**common_payload, "spec": {**common_spec, **distributed_spec}}
else:
url = f"{self.base_url}/workloads/trainings"
payload = {**common_payload, "spec": common_spec}

headers = self._default_headers(token=token)
response = requests.post(url, json=payload, headers=headers)

logger.debug(
"Created distributed job; response code=%s, content=%s",
"Created %s job; response code=%s, content=%s",
"distributed" if self.nodes > 1 else "training",
response.status_code,
response.text.strip(),
)

return response

def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
Expand All @@ -262,8 +297,8 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
logger.info("Creating data movement workload")
self.move_data(token, project_id, cluster_id)

logger.info("Creating distributed workload")
resp = self.create_distributed_job(token, project_id, cluster_id, name)
logger.info("Creating training workload")
resp = self.create_training_job(token, project_id, cluster_id, name)
if resp.status_code not in [200, 202]:
raise RuntimeError(f"Failed to create job, status_code={resp.status_code}")

Expand Down
149 changes: 138 additions & 11 deletions test/core/execution/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,65 @@ def test_move_data_failed(self, mock_status, mock_create, mock_sleep):
mock_status.assert_called()

@patch("requests.post")
def test_create_distributed_job(self, mock_post):
def test_create_training_job_single_node(self, mock_post):
"""Test that single node jobs use the correct training endpoint and payload structure."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = '{"status": "submitted"}'
mock_post.return_value = mock_response

executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
nodes=1,
gpus_per_node=8,
pvc_nemo_run_dir="/workspace/nemo_run",
pvcs=[{"path": "workspace", "claimName": "test-claim"}],
)
executor.pvc_job_dir = "/workspace/nemo_run/job_dir"
executor.env_vars = {"TEST_VAR": "test_value"}

response = executor.create_training_job(
token="test_token",
project_id="proj_id",
cluster_id="cluster_id",
name="test_job",
)

assert response == mock_response

# Check if the API call is made correctly for single node
mock_post.assert_called_once()
args, kwargs = mock_post.call_args

# Verify single node endpoint
assert args[0] == "https://dgxapi.example.com/workloads/trainings"

# Verify payload structure for single node job
assert kwargs["json"]["name"] == "test_job"
assert kwargs["json"]["projectId"] == "proj_id"
assert kwargs["json"]["clusterId"] == "cluster_id"
assert kwargs["json"]["spec"]["image"] == "nvcr.io/nvidia/test:latest"
assert (
kwargs["json"]["spec"]["command"]
== "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh"
)
assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8

# Verify distributed-specific fields are NOT present
assert "distributedFramework" not in kwargs["json"]["spec"]
assert "minReplicas" not in kwargs["json"]["spec"]
assert "maxReplicas" not in kwargs["json"]["spec"]
assert "numWorkers" not in kwargs["json"]["spec"]

assert kwargs["headers"] == executor._default_headers(token="test_token")

@patch("requests.post")
def test_create_training_job_multi_node(self, mock_post):
"""Test that multi-node jobs use the correct distributed endpoint and payload structure."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = '{"status": "submitted"}'
Expand All @@ -348,13 +406,14 @@ def test_create_distributed_job(self, mock_post):
container_image="nvcr.io/nvidia/test:latest",
nodes=2,
gpus_per_node=8,
distributed_framework="PyTorch",
pvc_nemo_run_dir="/workspace/nemo_run",
pvcs=[{"path": "workspace", "claimName": "test-claim"}],
)
executor.pvc_job_dir = "/workspace/nemo_run/job_dir"
executor.env_vars = {"TEST_VAR": "test_value"}

response = executor.create_distributed_job(
response = executor.create_training_job(
token="test_token",
project_id="proj_id",
cluster_id="cluster_id",
Expand All @@ -363,10 +422,14 @@ def test_create_distributed_job(self, mock_post):

assert response == mock_response

# Check if the API call is made correctly
# Check if the API call is made correctly for multi-node
mock_post.assert_called_once()
# The URL is the first argument to post
args, kwargs = mock_post.call_args

# Verify multi-node endpoint
assert args[0] == "https://dgxapi.example.com/workloads/distributed"

# Verify payload structure for multi-node job
assert kwargs["json"]["name"] == "test_job"
assert kwargs["json"]["projectId"] == "proj_id"
assert kwargs["json"]["clusterId"] == "cluster_id"
Expand All @@ -375,18 +438,24 @@ def test_create_distributed_job(self, mock_post):
kwargs["json"]["spec"]["command"]
== "/bin/bash /workspace/nemo_run/job_dir/launch_script.sh"
)
assert kwargs["json"]["spec"]["numWorkers"] == 2
assert kwargs["json"]["spec"]["compute"]["gpuDevicesRequest"] == 8
assert kwargs["json"]["spec"]["environmentVariables"] == [
{"name": "TEST_VAR", "value": "test_value"}
]

# Verify distributed-specific fields
assert kwargs["json"]["spec"]["distributedFramework"] == "PyTorch"
assert kwargs["json"]["spec"]["minReplicas"] == 2
assert kwargs["json"]["spec"]["maxReplicas"] == 2
assert kwargs["json"]["spec"]["numWorkers"] == 2

assert kwargs["headers"] == executor._default_headers(token="test_token")

@patch.object(DGXCloudExecutor, "get_auth_token")
@patch.object(DGXCloudExecutor, "get_project_and_cluster_id")
@patch.object(DGXCloudExecutor, "move_data")
@patch.object(DGXCloudExecutor, "create_distributed_job")
def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token):
@patch.object(DGXCloudExecutor, "create_training_job")
def test_launch_single_node(
self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token
):
"""Test that launch correctly handles single-node job submission."""
mock_get_token.return_value = "test_token"
mock_get_ids.return_value = ("proj_id", "cluster_id")

Expand All @@ -402,7 +471,10 @@ def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, moc
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
nodes=1, # Single node
gpus_per_node=8, # 8 GPUs per node
pvc_nemo_run_dir="/workspace/nemo_run",
pvcs=[{"path": "/workspace", "claimName": "test-claim"}],
)
executor.job_dir = tmp_dir

Expand All @@ -411,13 +483,68 @@ def test_launch_success(self, mock_create_job, mock_move_data, mock_get_ids, moc
assert job_id == "job123"
assert status == "Pending"
assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh"))

# Verify launch script contents for single node
with open(os.path.join(tmp_dir, "launch_script.sh"), "r") as f:
script = f.read()
assert "python train.py" in script

mock_get_token.assert_called_once()
mock_get_ids.assert_called_once_with("test_token")
mock_move_data.assert_called_once_with("test_token", "proj_id", "cluster_id")
mock_create_job.assert_called_once_with(
"test_token", "proj_id", "cluster_id", "test-job"
)

@patch.object(DGXCloudExecutor, "get_auth_token")
@patch.object(DGXCloudExecutor, "get_project_and_cluster_id")
@patch.object(DGXCloudExecutor, "move_data")
@patch.object(DGXCloudExecutor, "create_training_job")
def test_launch_multi_node(self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token):
"""Test that launch correctly handles multi-node job submission."""
mock_get_token.return_value = "test_token"
mock_get_ids.return_value = ("proj_id", "cluster_id")

mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"workloadId": "job456", "actualPhase": "Pending"}
mock_create_job.return_value = mock_response

with tempfile.TemporaryDirectory() as tmp_dir:
executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
nodes=2, # Multi-node
gpus_per_node=8,
distributed_framework="PyTorch",
pvc_nemo_run_dir="/workspace/nemo_run",
pvcs=[{"path": "/workspace", "claimName": "test-claim"}],
)
executor.job_dir = tmp_dir

job_id, status = executor.launch(
"test_multi_job", ["python", "-m", "torch.distributed.run", "train.py"]
)

assert job_id == "job456"
assert status == "Pending"
assert os.path.exists(os.path.join(tmp_dir, "launch_script.sh"))

# Verify launch script contents for multi-node
with open(os.path.join(tmp_dir, "launch_script.sh"), "r") as f:
script = f.read()
assert "python -m torch.distributed.run train.py" in script

mock_get_token.assert_called_once()
mock_get_ids.assert_called_once_with("test_token")
mock_move_data.assert_called_once_with("test_token", "proj_id", "cluster_id")
mock_create_job.assert_called_once_with(
"test_token", "proj_id", "cluster_id", "test-multi-job"
)

@patch.object(DGXCloudExecutor, "get_auth_token")
def test_launch_no_token(self, mock_get_token):
mock_get_token.return_value = None
Expand Down Expand Up @@ -455,7 +582,7 @@ def test_launch_no_project_id(self, mock_get_ids, mock_get_token):
@patch.object(DGXCloudExecutor, "get_auth_token")
@patch.object(DGXCloudExecutor, "get_project_and_cluster_id")
@patch.object(DGXCloudExecutor, "move_data")
@patch.object(DGXCloudExecutor, "create_distributed_job")
@patch.object(DGXCloudExecutor, "create_training_job")
def test_launch_job_creation_failed(
self, mock_create_job, mock_move_data, mock_get_ids, mock_get_token
):
Expand Down
Loading