diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index b6d5e855..24724503 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -360,7 +360,8 @@ def nproc_per_node(self) -> int: return 1 def status(self, job_id: str) -> Optional[DGXCloudState]: - url = f"{self.base_url}/workloads/{job_id}" + workload_type = "distributed" if self.nodes > 1 else "trainings" + url = f"{self.base_url}/workloads/{workload_type}/{job_id}" token = self.get_auth_token() if not token: logger.error("Failed to retrieve auth token for status request.") @@ -369,10 +370,18 @@ def status(self, job_id: str) -> Optional[DGXCloudState]: headers = self._default_headers(token=token) response = requests.get(url, headers=headers) if response.status_code != 200: - return DGXCloudState("Unknown") + logger.warning( + f"Failed to get status for job {job_id}, " + f"status_code={response.status_code}. Treating as transient." + ) + return None r_json = response.json() - return DGXCloudState(r_json["phase"]) + phase = r_json.get("actualPhase") or r_json.get("phase") + if not phase: + logger.warning(f"No phase field in status response for job {job_id}: {r_json}") + return None + return DGXCloudState(phase) def fetch_logs( self, diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index b786d3c0..9bc2c969 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -59,7 +59,7 @@ DGXCloudState.FAILED: AppState.FAILED, DGXCloudState.COMPLETED: AppState.SUCCEEDED, DGXCloudState.TERMINATING: AppState.RUNNING, - DGXCloudState.UNKNOWN: AppState.FAILED, + DGXCloudState.UNKNOWN: AppState.PENDING, } log = logging.getLogger(__name__) @@ -161,7 +161,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str: # Store a status entry or logs path if available # Currently, the DGXExecutor status is placeholder, but we keep the pattern - _save_job_dir(app_id, job_status=status, executor=executor) + _save_job_dir(app_id, job_status=status, executor=executor, job_id=job_id) return app_id @@ -173,7 +173,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: # We split out the stored values from the JSON file stored_data = _get_job_dirs() job_info = stored_data.get(app_id) - _, role_name, job_id = app_id.split("___") + parts = app_id.split("___") + role_name = parts[1] if len(parts) > 1 else app_id roles = [Role(name=role_name, image="", num_replicas=1)] roles_statuses = [ RoleStatus( @@ -191,8 +192,9 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: if not executor: return None + job_id = job_info.get("job_id") or parts[-1] dgx_state = executor.status(job_id) or DGXCloudState.UNKNOWN - app_state = DGX_STATES.get(dgx_state, AppState.UNKNOWN) + app_state = DGX_STATES.get(dgx_state, AppState.PENDING) roles_statuses[0].replicas[0].state = app_state return DescribeAppResponse( @@ -217,7 +219,7 @@ def log_iter( ) -> Iterable[str]: stored_data = _get_job_dirs() job_info = stored_data.get(app_id) - _, _, job_id = app_id.split("___") + job_id = job_info.get("job_id") or app_id.split("___")[-1] executor: Optional[DGXCloudExecutor] = job_info.get("executor", None) # type: ignore if not executor: return [""] @@ -240,7 +242,7 @@ def _cancel_existing(self, app_id: str) -> None: """ stored_data = _get_job_dirs() job_info = stored_data.get(app_id) - _, _, job_id = app_id.split("___") + job_id = job_info.get("job_id") or app_id.split("___")[-1] executor: DGXCloudExecutor = job_info.get("executor", None) # type: ignore if not executor: return None @@ -257,7 +259,9 @@ def create_scheduler(session_name: str, **kwargs: Any) -> DGXCloudScheduler: return DGXCloudScheduler(session_name=session_name) -def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> None: +def _save_job_dir( + app_id: str, job_status: str, executor: DGXCloudExecutor, job_id: str = "" +) -> None: """ Saves or updates local record of job status in JSON for demonstration. """ @@ -276,6 +280,7 @@ def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> N app = { "job_status": job_status, + "job_id": job_id, "executor": serializer.serialize( fdl_dc.convert_dataclasses_to_configs(executor, allow_post_init=True) ), diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index 49505c48..392edb61 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -877,7 +877,7 @@ def test_nproc_per_node_default(self): def test_status(self, mock_get): mock_response = MagicMock() mock_response.status_code = 200 - mock_response.json.return_value = {"phase": "Running"} + mock_response.json.return_value = {"actualPhase": "Running"} mock_get.return_value = mock_response with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"): @@ -895,10 +895,81 @@ def test_status(self, mock_get): assert status == DGXCloudState.RUNNING mock_get.assert_called_once_with( - "https://dgxapi.example.com/workloads/job123", + "https://dgxapi.example.com/workloads/trainings/job123", headers=executor._default_headers(token="test_token"), ) + @patch("requests.get") + def test_status_distributed(self, mock_get): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"actualPhase": "Running"} + mock_get.return_value = mock_response + + with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"): + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nodes=8, + ) + + status = executor.status("job123") + + assert status == DGXCloudState.RUNNING + mock_get.assert_called_once_with( + "https://dgxapi.example.com/workloads/distributed/job123", + headers=executor._default_headers(token="test_token"), + ) + + @patch("requests.get") + def test_status_falls_back_to_phase_field(self, mock_get): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"phase": "Running"} + mock_get.return_value = mock_response + + with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"): + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + + status = executor.status("job123") + + assert status == DGXCloudState.RUNNING + + @patch("requests.get") + def test_status_returns_none_when_no_phase(self, mock_get): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"someOtherField": "value"} + mock_get.return_value = mock_response + + with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"): + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + ) + + status = executor.status("job123") + + assert status is None + @patch("requests.get") def test_status_no_token(self, mock_get): with patch.object(DGXCloudExecutor, "get_auth_token", return_value=None): @@ -936,7 +1007,7 @@ def test_status_error_response(self, mock_get): status = executor.status("job123") - assert status == DGXCloudState.UNKNOWN + assert status is None @patch("requests.get") def test_cancel(self, mock_get): diff --git a/test/run/torchx_backend/schedulers/test_dgxcloud.py b/test/run/torchx_backend/schedulers/test_dgxcloud.py index ca25b92b..767c2106 100644 --- a/test/run/torchx_backend/schedulers/test_dgxcloud.py +++ b/test/run/torchx_backend/schedulers/test_dgxcloud.py @@ -19,10 +19,14 @@ import pytest from torchx.schedulers.api import AppDryRunInfo -from torchx.specs import AppDef, Role +from torchx.specs import AppDef, AppState, Role -from nemo_run.core.execution.dgxcloud import DGXCloudExecutor -from nemo_run.run.torchx_backend.schedulers.dgxcloud import DGXCloudScheduler, create_scheduler +from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState +from nemo_run.run.torchx_backend.schedulers.dgxcloud import ( + DGX_STATES, + DGXCloudScheduler, + create_scheduler, +) @pytest.fixture @@ -106,6 +110,7 @@ def test_describe(dgx_cloud_scheduler, dgx_cloud_executor): mock_get_job_dirs.return_value = { "test_experiment___test_role___test_job_id": { "job_status": "RUNNING", + "job_id": "test_job_id", "executor": dgx_cloud_executor, } } @@ -128,6 +133,7 @@ def test_cancel_existing(dgx_cloud_scheduler, dgx_cloud_executor): mock_get_job_dirs.return_value = { "test_experiment___test_role___test_job_id": { "job_status": "RUNNING", + "job_id": "test_job_id", "executor": dgx_cloud_executor, } } @@ -155,10 +161,11 @@ def test_save_and_get_job_dirs(): pvc_nemo_run_dir="/workspace/nemo_run", ) - _save_job_dir("test_app_id", "RUNNING", executor) + _save_job_dir("test_app_id", "RUNNING", executor, job_id="actual_job_id") job_dirs = _get_job_dirs() assert "test_app_id" in job_dirs + assert job_dirs["test_app_id"]["job_id"] == "actual_job_id" assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor) @@ -169,6 +176,7 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor): mock_get_job_dirs.return_value = { "test_session___test_role___test_container_id": { "job_status": "RUNNING", + "job_id": "test_job_id", "executor": dgx_cloud_executor, } } @@ -184,6 +192,88 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor): assert logs == ["log2", "log3"] +def test_describe_uses_stored_job_id_not_split(dgx_cloud_scheduler, dgx_cloud_executor): + # Regression test: when a role name ends with '_', splitting app_id on '___' + # produces a job_id with a spurious leading '_' (e.g. role 'W-foo_' + sep '___' + # gives '____' which splits into 'role_' and '_job_id'). describe() must use + # the job_id stored at schedule time, not re-derive it from the app_id string. + real_job_id = "48db46d2-ae56-4c9d-9abd-ba0d873e50eb" + # role name ending with '_' triggers the collision + app_id = f"experiment___role_name___{real_job_id}" + + with ( + mock.patch( + "nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs" + ) as mock_get_job_dirs, + mock.patch.object( + DGXCloudExecutor, "status", return_value=DGXCloudState.RUNNING + ) as mock_status, + ): + mock_get_job_dirs.return_value = { + app_id: { + "job_status": "RUNNING", + "job_id": real_job_id, + "executor": dgx_cloud_executor, + } + } + + response = dgx_cloud_scheduler.describe(app_id) + assert response is not None + mock_status.assert_called_once_with(real_job_id) + + +def test_unknown_state_maps_to_pending_not_failed(): + # DGXCloudState.UNKNOWN must map to PENDING so transient API errors during + # job startup do not cause wait_and_exit() to treat the job as terminal. + assert DGX_STATES[DGXCloudState.UNKNOWN] == AppState.PENDING + + +def test_describe_returns_pending_when_status_is_none(dgx_cloud_scheduler, dgx_cloud_executor): + # Regression test: executor.status() returns None when the auth token is + # missing or the API call fails transiently right after job submission. + # describe() must return PENDING so the wait loop keeps polling. + with ( + mock.patch( + "nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs" + ) as mock_get_job_dirs, + mock.patch.object(DGXCloudExecutor, "status", return_value=None), + ): + mock_get_job_dirs.return_value = { + "test_experiment___test_role___test_job_id": { + "job_status": "RUNNING", + "job_id": "test_job_id", + "executor": dgx_cloud_executor, + } + } + + response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id") + assert response is not None + assert response.state == AppState.PENDING + + +def test_describe_returns_pending_when_status_is_unknown(dgx_cloud_scheduler, dgx_cloud_executor): + # Regression test: the DGXCloud API transiently returns "Unknown" before a + # job is visible (e.g. HTTP 404 right after submission). describe() must + # return PENDING so the wait loop keeps polling instead of failing. + with ( + mock.patch( + "nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs" + ) as mock_get_job_dirs, + mock.patch.object(DGXCloudExecutor, "status", return_value=DGXCloudState.UNKNOWN), + ): + mock_get_job_dirs.return_value = { + "test_experiment___test_role___test_job_id": { + "job_status": "RUNNING", + "job_id": "test_job_id", + "executor": dgx_cloud_executor, + } + } + + response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id") + assert response is not None + assert response.state == AppState.PENDING + + def test_log_iter_str(dgx_cloud_scheduler, dgx_cloud_executor): with mock.patch( "nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs" @@ -191,6 +281,7 @@ def test_log_iter_str(dgx_cloud_scheduler, dgx_cloud_executor): mock_get_job_dirs.return_value = { "test_session___test_role___test_container_id": { "job_status": "RUNNING", + "job_id": "test_job_id", "executor": dgx_cloud_executor, } }