Skip to content

Commit 525d68b

Browse files
svcnvidia-nemo-ciko3n1gclaude
authored
fix: treat DGXCloud UNKNOWN/transient status as PENDING to avoid false failures (#458) (#459)
* fix: treat DGXCloud UNKNOWN/transient status as PENDING to avoid false failures When a job is submitted to DGXCloud, the API may transiently return a non-200 response or an "Unknown" phase before the workload is fully registered. Previously this was mapped to AppState.FAILED, causing wait_and_exit() to treat the job as terminated immediately while the pod was still starting up on the cluster. - DGXCloudState.UNKNOWN now maps to AppState.PENDING in DGX_STATES - executor.status() returns None (instead of DGXCloudState.UNKNOWN) on non-200 HTTP responses so transient API errors don't look like a real "Unknown" phase reported by the scheduler - describe() fallback for unknown keys in DGX_STATES changed to PENDING - Tests updated and added to cover all three code paths * fix: use type-specific endpoint for DGXCloud workload status The status() method was calling GET /workloads/{job_id} (generic endpoint) which returns 403 for distributed and training workloads. The correct endpoints match the create paths: /workloads/distributed/{job_id} for multi-node jobs and /workloads/trainings/{job_id} for single-node jobs. This is consistent with how cancel() already uses /workloads/distributed/. Adds test_status_distributed to verify the correct URL is used for multi-node executors. * fix: read actualPhase from type-specific workload endpoints The /workloads/distributed/{id} and /workloads/trainings/{id} endpoints return actualPhase, not phase (which was the field on the generic /workloads/{id} endpoint). This caused a KeyError crash immediately after the 403 fix landed. Now reads actualPhase first, falls back to phase for compatibility, and returns None (PENDING) if neither field is present. * add tests * fix: store job_id explicitly to avoid separator collision in app_id parsing When a role name ends with '_', the app_id string looks like: experiment___role_name____job_id Splitting on '___' produces job_id = '_job_id' (spurious leading '_'), causing the status/cancel/log_iter calls to use a wrong ID and get 404. Fix: _save_job_dir now stores the actual job_id in the JSON record. describe(), log_iter(), and _cancel_existing() all read job_id from the stored record, falling back to app_id.split('___')[-1] for backwards compatibility with existing saved jobs. * format --------- Signed-off-by: oliver könig <okoenig@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: oliver könig <okoenig@nvidia.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent cffe0d4 commit 525d68b

4 files changed

Lines changed: 193 additions & 17 deletions

File tree

nemo_run/core/execution/dgxcloud.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ def nproc_per_node(self) -> int:
360360
return 1
361361

362362
def status(self, job_id: str) -> Optional[DGXCloudState]:
363-
url = f"{self.base_url}/workloads/{job_id}"
363+
workload_type = "distributed" if self.nodes > 1 else "trainings"
364+
url = f"{self.base_url}/workloads/{workload_type}/{job_id}"
364365
token = self.get_auth_token()
365366
if not token:
366367
logger.error("Failed to retrieve auth token for status request.")
@@ -369,10 +370,18 @@ def status(self, job_id: str) -> Optional[DGXCloudState]:
369370
headers = self._default_headers(token=token)
370371
response = requests.get(url, headers=headers)
371372
if response.status_code != 200:
372-
return DGXCloudState("Unknown")
373+
logger.warning(
374+
f"Failed to get status for job {job_id}, "
375+
f"status_code={response.status_code}. Treating as transient."
376+
)
377+
return None
373378

374379
r_json = response.json()
375-
return DGXCloudState(r_json["phase"])
380+
phase = r_json.get("actualPhase") or r_json.get("phase")
381+
if not phase:
382+
logger.warning(f"No phase field in status response for job {job_id}: {r_json}")
383+
return None
384+
return DGXCloudState(phase)
376385

377386
def fetch_logs(
378387
self,

nemo_run/run/torchx_backend/schedulers/dgxcloud.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
DGXCloudState.FAILED: AppState.FAILED,
6060
DGXCloudState.COMPLETED: AppState.SUCCEEDED,
6161
DGXCloudState.TERMINATING: AppState.RUNNING,
62-
DGXCloudState.UNKNOWN: AppState.FAILED,
62+
DGXCloudState.UNKNOWN: AppState.PENDING,
6363
}
6464

6565
log = logging.getLogger(__name__)
@@ -161,7 +161,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str:
161161

162162
# Store a status entry or logs path if available
163163
# Currently, the DGXExecutor status is placeholder, but we keep the pattern
164-
_save_job_dir(app_id, job_status=status, executor=executor)
164+
_save_job_dir(app_id, job_status=status, executor=executor, job_id=job_id)
165165

166166
return app_id
167167

@@ -173,7 +173,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
173173
# We split out the stored values from the JSON file
174174
stored_data = _get_job_dirs()
175175
job_info = stored_data.get(app_id)
176-
_, role_name, job_id = app_id.split("___")
176+
parts = app_id.split("___")
177+
role_name = parts[1] if len(parts) > 1 else app_id
177178
roles = [Role(name=role_name, image="", num_replicas=1)]
178179
roles_statuses = [
179180
RoleStatus(
@@ -191,8 +192,9 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
191192
if not executor:
192193
return None
193194

195+
job_id = job_info.get("job_id") or parts[-1]
194196
dgx_state = executor.status(job_id) or DGXCloudState.UNKNOWN
195-
app_state = DGX_STATES.get(dgx_state, AppState.UNKNOWN)
197+
app_state = DGX_STATES.get(dgx_state, AppState.PENDING)
196198
roles_statuses[0].replicas[0].state = app_state
197199

198200
return DescribeAppResponse(
@@ -217,7 +219,7 @@ def log_iter(
217219
) -> Iterable[str]:
218220
stored_data = _get_job_dirs()
219221
job_info = stored_data.get(app_id)
220-
_, _, job_id = app_id.split("___")
222+
job_id = job_info.get("job_id") or app_id.split("___")[-1]
221223
executor: Optional[DGXCloudExecutor] = job_info.get("executor", None) # type: ignore
222224
if not executor:
223225
return [""]
@@ -240,7 +242,7 @@ def _cancel_existing(self, app_id: str) -> None:
240242
"""
241243
stored_data = _get_job_dirs()
242244
job_info = stored_data.get(app_id)
243-
_, _, job_id = app_id.split("___")
245+
job_id = job_info.get("job_id") or app_id.split("___")[-1]
244246
executor: DGXCloudExecutor = job_info.get("executor", None) # type: ignore
245247
if not executor:
246248
return None
@@ -257,7 +259,9 @@ def create_scheduler(session_name: str, **kwargs: Any) -> DGXCloudScheduler:
257259
return DGXCloudScheduler(session_name=session_name)
258260

259261

260-
def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> None:
262+
def _save_job_dir(
263+
app_id: str, job_status: str, executor: DGXCloudExecutor, job_id: str = ""
264+
) -> None:
261265
"""
262266
Saves or updates local record of job status in JSON for demonstration.
263267
"""
@@ -276,6 +280,7 @@ def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> N
276280

277281
app = {
278282
"job_status": job_status,
283+
"job_id": job_id,
279284
"executor": serializer.serialize(
280285
fdl_dc.convert_dataclasses_to_configs(executor, allow_post_init=True)
281286
),

test/core/execution/test_dgxcloud.py

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -877,7 +877,7 @@ def test_nproc_per_node_default(self):
877877
def test_status(self, mock_get):
878878
mock_response = MagicMock()
879879
mock_response.status_code = 200
880-
mock_response.json.return_value = {"phase": "Running"}
880+
mock_response.json.return_value = {"actualPhase": "Running"}
881881
mock_get.return_value = mock_response
882882

883883
with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
@@ -895,10 +895,81 @@ def test_status(self, mock_get):
895895

896896
assert status == DGXCloudState.RUNNING
897897
mock_get.assert_called_once_with(
898-
"https://dgxapi.example.com/workloads/job123",
898+
"https://dgxapi.example.com/workloads/trainings/job123",
899899
headers=executor._default_headers(token="test_token"),
900900
)
901901

902+
@patch("requests.get")
903+
def test_status_distributed(self, mock_get):
904+
mock_response = MagicMock()
905+
mock_response.status_code = 200
906+
mock_response.json.return_value = {"actualPhase": "Running"}
907+
mock_get.return_value = mock_response
908+
909+
with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
910+
executor = DGXCloudExecutor(
911+
base_url="https://dgxapi.example.com",
912+
kube_apiserver_url="https://127.0.0.1:443",
913+
app_id="test_app_id",
914+
app_secret="test_app_secret",
915+
project_name="test_project",
916+
container_image="nvcr.io/nvidia/test:latest",
917+
pvc_nemo_run_dir="/workspace/nemo_run",
918+
nodes=8,
919+
)
920+
921+
status = executor.status("job123")
922+
923+
assert status == DGXCloudState.RUNNING
924+
mock_get.assert_called_once_with(
925+
"https://dgxapi.example.com/workloads/distributed/job123",
926+
headers=executor._default_headers(token="test_token"),
927+
)
928+
929+
@patch("requests.get")
930+
def test_status_falls_back_to_phase_field(self, mock_get):
931+
mock_response = MagicMock()
932+
mock_response.status_code = 200
933+
mock_response.json.return_value = {"phase": "Running"}
934+
mock_get.return_value = mock_response
935+
936+
with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
937+
executor = DGXCloudExecutor(
938+
base_url="https://dgxapi.example.com",
939+
kube_apiserver_url="https://127.0.0.1:443",
940+
app_id="test_app_id",
941+
app_secret="test_app_secret",
942+
project_name="test_project",
943+
container_image="nvcr.io/nvidia/test:latest",
944+
pvc_nemo_run_dir="/workspace/nemo_run",
945+
)
946+
947+
status = executor.status("job123")
948+
949+
assert status == DGXCloudState.RUNNING
950+
951+
@patch("requests.get")
952+
def test_status_returns_none_when_no_phase(self, mock_get):
953+
mock_response = MagicMock()
954+
mock_response.status_code = 200
955+
mock_response.json.return_value = {"someOtherField": "value"}
956+
mock_get.return_value = mock_response
957+
958+
with patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"):
959+
executor = DGXCloudExecutor(
960+
base_url="https://dgxapi.example.com",
961+
kube_apiserver_url="https://127.0.0.1:443",
962+
app_id="test_app_id",
963+
app_secret="test_app_secret",
964+
project_name="test_project",
965+
container_image="nvcr.io/nvidia/test:latest",
966+
pvc_nemo_run_dir="/workspace/nemo_run",
967+
)
968+
969+
status = executor.status("job123")
970+
971+
assert status is None
972+
902973
@patch("requests.get")
903974
def test_status_no_token(self, mock_get):
904975
with patch.object(DGXCloudExecutor, "get_auth_token", return_value=None):
@@ -936,7 +1007,7 @@ def test_status_error_response(self, mock_get):
9361007

9371008
status = executor.status("job123")
9381009

939-
assert status == DGXCloudState.UNKNOWN
1010+
assert status is None
9401011

9411012
@patch("requests.get")
9421013
def test_cancel(self, mock_get):

test/run/torchx_backend/schedulers/test_dgxcloud.py

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919

2020
import pytest
2121
from torchx.schedulers.api import AppDryRunInfo
22-
from torchx.specs import AppDef, Role
22+
from torchx.specs import AppDef, AppState, Role
2323

24-
from nemo_run.core.execution.dgxcloud import DGXCloudExecutor
25-
from nemo_run.run.torchx_backend.schedulers.dgxcloud import DGXCloudScheduler, create_scheduler
24+
from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState
25+
from nemo_run.run.torchx_backend.schedulers.dgxcloud import (
26+
DGX_STATES,
27+
DGXCloudScheduler,
28+
create_scheduler,
29+
)
2630

2731

2832
@pytest.fixture
@@ -106,6 +110,7 @@ def test_describe(dgx_cloud_scheduler, dgx_cloud_executor):
106110
mock_get_job_dirs.return_value = {
107111
"test_experiment___test_role___test_job_id": {
108112
"job_status": "RUNNING",
113+
"job_id": "test_job_id",
109114
"executor": dgx_cloud_executor,
110115
}
111116
}
@@ -128,6 +133,7 @@ def test_cancel_existing(dgx_cloud_scheduler, dgx_cloud_executor):
128133
mock_get_job_dirs.return_value = {
129134
"test_experiment___test_role___test_job_id": {
130135
"job_status": "RUNNING",
136+
"job_id": "test_job_id",
131137
"executor": dgx_cloud_executor,
132138
}
133139
}
@@ -155,10 +161,11 @@ def test_save_and_get_job_dirs():
155161
pvc_nemo_run_dir="/workspace/nemo_run",
156162
)
157163

158-
_save_job_dir("test_app_id", "RUNNING", executor)
164+
_save_job_dir("test_app_id", "RUNNING", executor, job_id="actual_job_id")
159165
job_dirs = _get_job_dirs()
160166

161167
assert "test_app_id" in job_dirs
168+
assert job_dirs["test_app_id"]["job_id"] == "actual_job_id"
162169
assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor)
163170

164171

@@ -169,6 +176,7 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor):
169176
mock_get_job_dirs.return_value = {
170177
"test_session___test_role___test_container_id": {
171178
"job_status": "RUNNING",
179+
"job_id": "test_job_id",
172180
"executor": dgx_cloud_executor,
173181
}
174182
}
@@ -184,13 +192,96 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor):
184192
assert logs == ["log2", "log3"]
185193

186194

195+
def test_describe_uses_stored_job_id_not_split(dgx_cloud_scheduler, dgx_cloud_executor):
196+
# Regression test: when a role name ends with '_', splitting app_id on '___'
197+
# produces a job_id with a spurious leading '_' (e.g. role 'W-foo_' + sep '___'
198+
# gives '____' which splits into 'role_' and '_job_id'). describe() must use
199+
# the job_id stored at schedule time, not re-derive it from the app_id string.
200+
real_job_id = "48db46d2-ae56-4c9d-9abd-ba0d873e50eb"
201+
# role name ending with '_' triggers the collision
202+
app_id = f"experiment___role_name___{real_job_id}"
203+
204+
with (
205+
mock.patch(
206+
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
207+
) as mock_get_job_dirs,
208+
mock.patch.object(
209+
DGXCloudExecutor, "status", return_value=DGXCloudState.RUNNING
210+
) as mock_status,
211+
):
212+
mock_get_job_dirs.return_value = {
213+
app_id: {
214+
"job_status": "RUNNING",
215+
"job_id": real_job_id,
216+
"executor": dgx_cloud_executor,
217+
}
218+
}
219+
220+
response = dgx_cloud_scheduler.describe(app_id)
221+
assert response is not None
222+
mock_status.assert_called_once_with(real_job_id)
223+
224+
225+
def test_unknown_state_maps_to_pending_not_failed():
226+
# DGXCloudState.UNKNOWN must map to PENDING so transient API errors during
227+
# job startup do not cause wait_and_exit() to treat the job as terminal.
228+
assert DGX_STATES[DGXCloudState.UNKNOWN] == AppState.PENDING
229+
230+
231+
def test_describe_returns_pending_when_status_is_none(dgx_cloud_scheduler, dgx_cloud_executor):
232+
# Regression test: executor.status() returns None when the auth token is
233+
# missing or the API call fails transiently right after job submission.
234+
# describe() must return PENDING so the wait loop keeps polling.
235+
with (
236+
mock.patch(
237+
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
238+
) as mock_get_job_dirs,
239+
mock.patch.object(DGXCloudExecutor, "status", return_value=None),
240+
):
241+
mock_get_job_dirs.return_value = {
242+
"test_experiment___test_role___test_job_id": {
243+
"job_status": "RUNNING",
244+
"job_id": "test_job_id",
245+
"executor": dgx_cloud_executor,
246+
}
247+
}
248+
249+
response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id")
250+
assert response is not None
251+
assert response.state == AppState.PENDING
252+
253+
254+
def test_describe_returns_pending_when_status_is_unknown(dgx_cloud_scheduler, dgx_cloud_executor):
255+
# Regression test: the DGXCloud API transiently returns "Unknown" before a
256+
# job is visible (e.g. HTTP 404 right after submission). describe() must
257+
# return PENDING so the wait loop keeps polling instead of failing.
258+
with (
259+
mock.patch(
260+
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
261+
) as mock_get_job_dirs,
262+
mock.patch.object(DGXCloudExecutor, "status", return_value=DGXCloudState.UNKNOWN),
263+
):
264+
mock_get_job_dirs.return_value = {
265+
"test_experiment___test_role___test_job_id": {
266+
"job_status": "RUNNING",
267+
"job_id": "test_job_id",
268+
"executor": dgx_cloud_executor,
269+
}
270+
}
271+
272+
response = dgx_cloud_scheduler.describe("test_experiment___test_role___test_job_id")
273+
assert response is not None
274+
assert response.state == AppState.PENDING
275+
276+
187277
def test_log_iter_str(dgx_cloud_scheduler, dgx_cloud_executor):
188278
with mock.patch(
189279
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
190280
) as mock_get_job_dirs:
191281
mock_get_job_dirs.return_value = {
192282
"test_session___test_role___test_container_id": {
193283
"job_status": "RUNNING",
284+
"job_id": "test_job_id",
194285
"executor": dgx_cloud_executor,
195286
}
196287
}

0 commit comments

Comments
 (0)