Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions nemo_run/core/execution/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ def nproc_per_node(self) -> int:
return 1

def status(self, job_id: str) -> Optional[DGXCloudState]:
url = f"{self.base_url}/workloads/{job_id}"
workload_type = "distributed" if self.nodes > 1 else "trainings"
url = f"{self.base_url}/workloads/{workload_type}/{job_id}"
token = self.get_auth_token()
if not token:
logger.error("Failed to retrieve auth token for status request.")
Expand All @@ -369,10 +370,18 @@ def status(self, job_id: str) -> Optional[DGXCloudState]:
headers = self._default_headers(token=token)
response = requests.get(url, headers=headers)
if response.status_code != 200:
return DGXCloudState("Unknown")
logger.warning(
f"Failed to get status for job {job_id}, "
f"status_code={response.status_code}. Treating as transient."
)
return None

r_json = response.json()
return DGXCloudState(r_json["phase"])
phase = r_json.get("actualPhase") or r_json.get("phase")
if not phase:
logger.warning(f"No phase field in status response for job {job_id}: {r_json}")
return None
return DGXCloudState(phase)

def fetch_logs(
self,
Expand Down
19 changes: 12 additions & 7 deletions nemo_run/run/torchx_backend/schedulers/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
DGXCloudState.FAILED: AppState.FAILED,
DGXCloudState.COMPLETED: AppState.SUCCEEDED,
DGXCloudState.TERMINATING: AppState.RUNNING,
DGXCloudState.UNKNOWN: AppState.FAILED,
DGXCloudState.UNKNOWN: AppState.PENDING,
}

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,7 +161,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str:

# Store a status entry or logs path if available
# Currently, the DGXExecutor status is placeholder, but we keep the pattern
_save_job_dir(app_id, job_status=status, executor=executor)
_save_job_dir(app_id, job_status=status, executor=executor, job_id=job_id)

return app_id

Expand All @@ -173,7 +173,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
# We split out the stored values from the JSON file
stored_data = _get_job_dirs()
job_info = stored_data.get(app_id)
_, role_name, job_id = app_id.split("___")
parts = app_id.split("___")
role_name = parts[1] if len(parts) > 1 else app_id
roles = [Role(name=role_name, image="", num_replicas=1)]
roles_statuses = [
RoleStatus(
Expand All @@ -191,8 +192,9 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
if not executor:
return None

job_id = job_info.get("job_id") or parts[-1]
dgx_state = executor.status(job_id) or DGXCloudState.UNKNOWN
app_state = DGX_STATES.get(dgx_state, AppState.UNKNOWN)
app_state = DGX_STATES.get(dgx_state, AppState.PENDING)
roles_statuses[0].replicas[0].state = app_state

return DescribeAppResponse(
Expand All @@ -217,7 +219,7 @@ def log_iter(
) -> Iterable[str]:
stored_data = _get_job_dirs()
job_info = stored_data.get(app_id)
_, _, job_id = app_id.split("___")
job_id = job_info.get("job_id") or app_id.split("___")[-1]
executor: Optional[DGXCloudExecutor] = job_info.get("executor", None) # type: ignore
if not executor:
return [""]
Expand All @@ -240,7 +242,7 @@ def _cancel_existing(self, app_id: str) -> None:
"""
stored_data = _get_job_dirs()
job_info = stored_data.get(app_id)
_, _, job_id = app_id.split("___")
job_id = job_info.get("job_id") or app_id.split("___")[-1]
executor: DGXCloudExecutor = job_info.get("executor", None) # type: ignore
if not executor:
return None
Expand All @@ -257,7 +259,9 @@ def create_scheduler(session_name: str, **kwargs: Any) -> DGXCloudScheduler:
return DGXCloudScheduler(session_name=session_name)


def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> None:
def _save_job_dir(
app_id: str, job_status: str, executor: DGXCloudExecutor, job_id: str = ""
) -> None:
"""
Saves or updates local record of job status in JSON for demonstration.
"""
Expand All @@ -276,6 +280,7 @@ def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> N

app = {
"job_status": job_status,
"job_id": job_id,
"executor": serializer.serialize(
fdl_dc.convert_dataclasses_to_configs(executor, allow_post_init=True)
),
Expand Down
77 changes: 74 additions & 3 deletions test/core/execution/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ def test_nproc_per_node_default(self):
def test_status(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"phase": "Running"}
mock_response.json.return_value = {"actualPhase": "Running"}
mock_get.return_value = mock_response

with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
Expand All @@ -895,10 +895,81 @@ def test_status(self, mock_get):

assert status == DGXCloudState.RUNNING
mock_get.assert_called_once_with(
"https://dgxapi.example.com/workloads/job123",
"https://dgxapi.example.com/workloads/trainings/job123",
headers=executor._default_headers(token="test_token"),
)

@patch("requests.get")
def test_status_distributed(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"actualPhase": "Running"}
mock_get.return_value = mock_response

with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
kube_apiserver_url="https://127.0.0.1:443",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
nodes=8,
)

status = executor.status("job123")

assert status == DGXCloudState.RUNNING
mock_get.assert_called_once_with(
"https://dgxapi.example.com/workloads/distributed/job123",
headers=executor._default_headers(token="test_token"),
)

@patch("requests.get")
def test_status_falls_back_to_phase_field(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"phase": "Running"}
mock_get.return_value = mock_response

with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
kube_apiserver_url="https://127.0.0.1:443",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
)

status = executor.status("job123")

assert status == DGXCloudState.RUNNING

@patch("requests.get")
def test_status_returns_none_when_no_phase(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"someOtherField": "value"}
mock_get.return_value = mock_response

with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
kube_apiserver_url="https://127.0.0.1:443",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
)

status = executor.status("job123")

assert status is None

@patch("requests.get")
def test_status_no_token(self, mock_get):
with patch.object(DGXCloudExecutor, "get_auth_token", return_value=None):
Expand Down Expand Up @@ -936,7 +1007,7 @@ def test_status_error_response(self, mock_get):

status = executor.status("job123")

assert status == DGXCloudState.UNKNOWN
assert status is None

@patch("requests.get")
def test_cancel(self, mock_get):
Expand Down
99 changes: 95 additions & 4 deletions test/run/torchx_backend/schedulers/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

import pytest
from torchx.schedulers.api import AppDryRunInfo
from torchx.specs import AppDef, Role
from torchx.specs import AppDef, AppState, Role

from nemo_run.core.execution.dgxcloud import DGXCloudExecutor
from nemo_run.run.torchx_backend.schedulers.dgxcloud import DGXCloudScheduler, create_scheduler
from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState
from nemo_run.run.torchx_backend.schedulers.dgxcloud import (
DGX_STATES,
DGXCloudScheduler,
create_scheduler,
)


@pytest.fixture
Expand Down Expand Up @@ -106,6 +110,7 @@ def test_describe(dgx_cloud_scheduler, dgx_cloud_executor):
mock_get_job_dirs.return_value = {
"test_experiment___test_role___test_job_id": {
"job_status": "RUNNING",
"job_id": "test_job_id",
"executor": dgx_cloud_executor,
}
}
Expand All @@ -128,6 +133,7 @@ def test_cancel_existing(dgx_cloud_scheduler, dgx_cloud_executor):
mock_get_job_dirs.return_value = {
"test_experiment___test_role___test_job_id": {
"job_status": "RUNNING",
"job_id": "test_job_id",
"executor": dgx_cloud_executor,
}
}
Expand Down Expand Up @@ -155,10 +161,11 @@ def test_save_and_get_job_dirs():
pvc_nemo_run_dir="/workspace/nemo_run",
)

_save_job_dir("test_app_id", "RUNNING", executor)
_save_job_dir("test_app_id", "RUNNING", executor, job_id="actual_job_id")
job_dirs = _get_job_dirs()

assert "test_app_id" in job_dirs
assert job_dirs["test_app_id"]["job_id"] == "actual_job_id"
assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor)


Expand All @@ -169,6 +176,7 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor):
mock_get_job_dirs.return_value = {
"test_session___test_role___test_container_id": {
"job_status": "RUNNING",
"job_id": "test_job_id",
"executor": dgx_cloud_executor,
}
}
Expand All @@ -184,13 +192,96 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor):
assert logs == ["log2", "log3"]


def test_describe_uses_stored_job_id_not_split(dgx_cloud_scheduler, dgx_cloud_executor):
# Regression test: when a role name ends with '_', splitting app_id on '___'
# produces a job_id with a spurious leading '_' (e.g. role 'W-foo_' + sep '___'
# gives '____' which splits into 'role_' and '_job_id'). describe() must use
# the job_id stored at schedule time, not re-derive it from the app_id string.
real_job_id = "48db46d2-ae56-4c9d-9abd-ba0d873e50eb"
# role name ending with '_' triggers the collision
app_id = f"experiment___role_name___{real_job_id}"

with (
mock.patch(
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
) as mock_get_job_dirs,
mock.patch.object(
DGXCloudExecutor, "status", return_value=DGXCloudState.RUNNING
) as mock_status,
):
mock_get_job_dirs.return_value = {
app_id: {
"job_status": "RUNNING",
"job_id": real_job_id,
"executor": dgx_cloud_executor,
}
}

response = dgx_cloud_scheduler.describe(app_id)
assert response is not None
mock_status.assert_called_once_with(real_job_id)


def test_unknown_state_maps_to_pending_not_failed():
# DGXCloudState.UNKNOWN must map to PENDING so transient API errors during
# job startup do not cause wait_and_exit() to treat the job as terminal.
assert DGX_STATES[DGXCloudState.UNKNOWN] == AppState.PENDING


def test_describe_returns_pending_when_status_is_none(dgx_cloud_scheduler, dgx_cloud_executor):
# Regression test: executor.status() returns None when the auth token is
# missing or the API call fails transiently right after job submission.
# describe() must return PENDING so the wait loop keeps polling.
with (
mock.patch(
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
) as mock_get_job_dirs,
mock.patch.object(DGXCloudExecutor, "status", return_value=None),
):
mock_get_job_dirs.return_value = {
"test_experiment___test_role___test_job_id": {
"job_status": "RUNNING",
"job_id": "test_job_id",
"executor": dgx_cloud_executor,
}
}

response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id")
assert response is not None
assert response.state == AppState.PENDING


def test_describe_returns_pending_when_status_is_unknown(dgx_cloud_scheduler, dgx_cloud_executor):
# Regression test: the DGXCloud API transiently returns "Unknown" before a
# job is visible (e.g. HTTP 404 right after submission). describe() must
# return PENDING so the wait loop keeps polling instead of failing.
with (
mock.patch(
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
) as mock_get_job_dirs,
mock.patch.object(DGXCloudExecutor, "status", return_value=DGXCloudState.UNKNOWN),
):
mock_get_job_dirs.return_value = {
"test_experiment___test_role___test_job_id": {
"job_status": "RUNNING",
"job_id": "test_job_id",
"executor": dgx_cloud_executor,
}
}

response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id")
assert response is not None
assert response.state == AppState.PENDING


def test_log_iter_str(dgx_cloud_scheduler, dgx_cloud_executor):
with mock.patch(
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
) as mock_get_job_dirs:
mock_get_job_dirs.return_value = {
"test_session___test_role___test_container_id": {
"job_status": "RUNNING",
"job_id": "test_job_id",
"executor": dgx_cloud_executor,
}
}
Expand Down
Loading