Skip to content

Commit 0c556ca

Browse files
ko3n1gclaude
andauthored
fix: catch transient sacct exceptions in SlurmTunnelScheduler.describe() (#460)
* fix: treat DGXCloud UNKNOWN/transient status as PENDING to avoid false failures When a job is submitted to DGXCloud, the API may transiently return a non-200 response or an "Unknown" phase before the workload is fully registered. Previously this was mapped to AppState.FAILED, causing wait_and_exit() to treat the job as terminated immediately while the pod was still starting up on the cluster. - DGXCloudState.UNKNOWN now maps to AppState.PENDING in DGX_STATES - executor.status() returns None (instead of DGXCloudState.UNKNOWN) on non-200 HTTP responses so transient API errors don't look like a real "Unknown" phase reported by the scheduler - describe() fallback for unknown keys in DGX_STATES changed to PENDING - Tests updated and added to cover all three code paths Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * fix: use type-specific endpoint for DGXCloud workload status The status() method was calling GET /workloads/{job_id} (generic endpoint) which returns 403 for distributed and training workloads. The correct endpoints match the create paths: /workloads/distributed/{job_id} for multi-node jobs and /workloads/trainings/{job_id} for single-node jobs. This is consistent with how cancel() already uses /workloads/distributed/. Adds test_status_distributed to verify the correct URL is used for multi-node executors. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * fix: read actualPhase from type-specific workload endpoints The /workloads/distributed/{id} and /workloads/trainings/{id} endpoints return actualPhase, not phase (which was the field on the generic /workloads/{id} endpoint). This caused a KeyError crash immediately after the 403 fix landed. Now reads actualPhase first, falls back to phase for compatibility, and returns None (PENDING) if neither field is present. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * add tests Signed-off-by: oliver könig <okoenig@nvidia.com> * fix: store job_id explicitly to avoid separator collision in app_id parsing When a role name ends with '_', the app_id string looks like: experiment___role_name____job_id Splitting on '___' produces job_id = '_job_id' (spurious leading '_'), causing the status/cancel/log_iter calls to use a wrong ID and get 404. Fix: _save_job_dir now stores the actual job_id in the JSON record. describe(), log_iter(), and _cancel_existing() all read job_id from the stored record, falling back to app_id.split('___')[-1] for backwards compatibility with existing saved jobs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * format Signed-off-by: oliver könig <okoenig@nvidia.com> * fix: catch transient sacct exceptions in SlurmTunnelScheduler.describe() After long-running jobs (hours), a transient sacct failure (daemon hiccup, invoke.UnexpectedExit from non-zero exit code, etc.) would propagate uncaught through describe() → runner.wait() → wait_and_exit(), killing the wait loop and reporting EXIT_CODE_TRAINING=1 even though the Slurm job was still running. Wrap the sacct call in a try/except and return AppState.UNKNOWN on failure. UNKNOWN is non-terminal in torchx so polling continues. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * ruff Signed-off-by: oliver könig <okoenig@nvidia.com> * implement max-retry//cancel after final attempt Signed-off-by: oliver könig <okoenig@nvidia.com> * add tests Signed-off-by: oliver könig <okoenig@nvidia.com> --------- Signed-off-by: oliver könig <okoenig@nvidia.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b725412 commit 0c556ca

5 files changed

Lines changed: 122 additions & 5 deletions

File tree

nemo_run/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,6 @@ class SetValueError(ValueError): ...
1818

1919

2020
class UnknownStatusError(Exception): ...
21+
22+
23+
class PersistentSacctFailure(Exception): ...

nemo_run/run/torchx_backend/launcher.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
from nemo_run.core.execution.base import Executor
2929
from nemo_run.core.frontend.console.api import CONSOLE
30-
from nemo_run.exceptions import UnknownStatusError
30+
from nemo_run.exceptions import PersistentSacctFailure, UnknownStatusError
3131
from nemo_run.run.logs import get_logs
3232
from nemo_run.run.torchx_backend.runner import Runner, get_runner
3333

@@ -158,6 +158,12 @@ def wait_and_exit(
158158
while tries < timeout:
159159
try:
160160
status = runner.wait(app_handle, wait_interval=2)
161+
except PersistentSacctFailure as e:
162+
logger.error(
163+
f"sacct has been unreachable for too long for job {app_id}, cancelling: {e}"
164+
)
165+
runner.cancel(app_handle)
166+
raise UnknownStatusError(str(e)) from e
161167
except RuntimeError as e:
162168
if "can't start new thread" in str(e) and thread_retries < 5:
163169
thread_retries += 1

nemo_run/run/torchx_backend/schedulers/slurm.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,13 @@
5959
from nemo_run.core.execution.base import Executor
6060
from nemo_run.core.execution.slurm import SlurmBatchRequest, SlurmExecutor, SlurmJobDetails
6161
from nemo_run.core.tunnel.client import LocalTunnel, PackagingJob, SSHTunnel, Tunnel
62+
from nemo_run.exceptions import PersistentSacctFailure
6263
from nemo_run.run import experiment as run_experiment
6364
from nemo_run.run.ray.slurm import SlurmRayRequest
6465
from nemo_run.run.torchx_backend.schedulers.api import SchedulerMixin
6566

67+
MAX_CONSECUTIVE_SACCT_FAILURES = 30
68+
6669
log: logging.Logger = logging.getLogger(__name__)
6770
SLURM_JOB_DIRS = os.path.join(get_nemorun_home(), ".slurm_jobs")
6871

@@ -74,6 +77,7 @@ def __init__(
7477
self.tunnel: Optional[Tunnel] = None
7578
super().__init__(session_name)
7679
self.experiment = experiment
80+
self._consecutive_sacct_failures: dict[str, int] = {}
7781

7882
# TODO: Move this into the SlurmExecutor
7983
def _initialize_tunnel(self, tunnel: SSHTunnel | LocalTunnel):
@@ -240,9 +244,23 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
240244
return None
241245

242246
assert self.tunnel, "Tunnel is None."
243-
p = self.tunnel.run(
244-
f"sacct --parsable2 -j {app_id}",
245-
)
247+
try:
248+
p = self.tunnel.run(
249+
f"sacct --parsable2 -j {app_id}",
250+
)
251+
except Exception as e:
252+
count = self._consecutive_sacct_failures.get(app_id, 0) + 1
253+
self._consecutive_sacct_failures[app_id] = count
254+
if count >= MAX_CONSECUTIVE_SACCT_FAILURES:
255+
raise PersistentSacctFailure(
256+
f"sacct failed {count} consecutive times for job {app_id}: {e}"
257+
) from e
258+
log.warning(
259+
f"Failed to query sacct for job {app_id} ({count}/{MAX_CONSECUTIVE_SACCT_FAILURES}): "
260+
f"{e}. Treating as transient."
261+
)
262+
return DescribeAppResponse(app_id=app_id, state=AppState.UNKNOWN)
263+
self._consecutive_sacct_failures.pop(app_id, None)
246264
output = p.stdout.strip().split("\n")
247265

248266
if len(output) <= 1:

test/run/torchx_backend/schedulers/test_slurm.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626

2727
from nemo_run.core.execution.slurm import SlurmBatchRequest, SlurmExecutor
2828
from nemo_run.core.tunnel.client import LocalTunnel
29+
from nemo_run.exceptions import PersistentSacctFailure
2930
from nemo_run.run.torchx_backend.schedulers.slurm import (
31+
MAX_CONSECUTIVE_SACCT_FAILURES,
3032
SlurmTunnelScheduler,
3133
TunnelLogIterator,
3234
_get_job_dirs,
@@ -380,6 +382,83 @@ def test_describe_returns_unknown_on_persistent_permission_error(slurm_scheduler
380382
assert result.state == AppState.UNKNOWN
381383

382384

385+
def test_describe_returns_unknown_on_sacct_exception(slurm_scheduler, mocker):
386+
"""Regression: transient sacct failure (e.g. after hours of polling) must not
387+
propagate an exception and kill the wait loop. describe() should return UNKNOWN
388+
(non-terminal) so polling continues until the job completes."""
389+
from torchx.specs import AppState
390+
391+
job_dirs = {"12345": ("/path/to/job", LocalTunnel(job_dir="/path/to/tunnel"), "log*")}
392+
mocker.patch(
393+
"nemo_run.run.torchx_backend.schedulers.slurm._get_job_dirs",
394+
return_value=job_dirs,
395+
)
396+
mocker.patch.object(SlurmTunnelScheduler, "_initialize_tunnel")
397+
398+
slurm_scheduler.tunnel = mock.MagicMock()
399+
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")
400+
401+
result = slurm_scheduler.describe("12345")
402+
assert result is not None
403+
assert result.state == AppState.UNKNOWN
404+
405+
406+
def test_describe_raises_persistent_sacct_failure_after_threshold(slurm_scheduler, mocker):
407+
"""After MAX_CONSECUTIVE_SACCT_FAILURES consecutive sacct exceptions, describe() must
408+
raise PersistentSacctFailure so the caller can cancel the job instead of spinning forever."""
409+
job_dirs = {"12345": ("/path/to/job", LocalTunnel(job_dir="/path/to/tunnel"), "log*")}
410+
mocker.patch(
411+
"nemo_run.run.torchx_backend.schedulers.slurm._get_job_dirs",
412+
return_value=job_dirs,
413+
)
414+
mocker.patch.object(SlurmTunnelScheduler, "_initialize_tunnel")
415+
416+
slurm_scheduler.tunnel = mock.MagicMock()
417+
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")
418+
419+
for _ in range(MAX_CONSECUTIVE_SACCT_FAILURES - 1):
420+
result = slurm_scheduler.describe("12345")
421+
assert result.state == AppState.UNKNOWN
422+
423+
with pytest.raises(PersistentSacctFailure, match="12345"):
424+
slurm_scheduler.describe("12345")
425+
426+
427+
def test_describe_resets_sacct_failure_counter_on_success(slurm_scheduler, mocker):
428+
"""A successful sacct call must reset the consecutive failure counter so that
429+
subsequent transient failures start fresh."""
430+
job_dirs = {"12345": ("/path/to/job", LocalTunnel(job_dir="/path/to/tunnel"), "log*")}
431+
mocker.patch(
432+
"nemo_run.run.torchx_backend.schedulers.slurm._get_job_dirs",
433+
return_value=job_dirs,
434+
)
435+
mocker.patch.object(SlurmTunnelScheduler, "_initialize_tunnel")
436+
437+
slurm_scheduler.tunnel = mock.MagicMock()
438+
439+
# Fail just below the threshold
440+
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")
441+
for _ in range(MAX_CONSECUTIVE_SACCT_FAILURES - 1):
442+
slurm_scheduler.describe("12345")
443+
444+
# Recover — sacct returns valid output
445+
header = "JobID|JobName|State|ExitCode"
446+
row = "12345|exp.master|RUNNING|0:0"
447+
success_result = mock.MagicMock()
448+
success_result.stdout = f"{header}\n{row}"
449+
slurm_scheduler.tunnel.run.side_effect = None
450+
slurm_scheduler.tunnel.run.return_value = success_result
451+
slurm_scheduler.describe("12345")
452+
453+
assert slurm_scheduler._consecutive_sacct_failures.get("12345", 0) == 0
454+
455+
# Fail again — counter should restart from 1, not trigger threshold immediately
456+
slurm_scheduler.tunnel.run.side_effect = Exception("sacct: command failed")
457+
result = slurm_scheduler.describe("12345")
458+
assert result.state == AppState.UNKNOWN
459+
assert slurm_scheduler._consecutive_sacct_failures["12345"] == 1
460+
461+
383462
def test_schedule_with_dependencies(slurm_scheduler, slurm_executor):
384463
mock_request = mock.MagicMock()
385464
mock_request.cmd = ["sbatch", "--requeue", "--parsable"]

test/run/torchx_backend/test_launcher.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from torchx.specs import AppDef, AppStatus
2424

2525
from nemo_run.core.execution.base import Executor
26-
from nemo_run.exceptions import UnknownStatusError
26+
from nemo_run.exceptions import PersistentSacctFailure, UnknownStatusError
2727
from nemo_run.run.logs import get_logs
2828
from nemo_run.run.torchx_backend.launcher import ContextThread, launch, wait_and_exit
2929

@@ -231,6 +231,17 @@ def test_wait_and_exit_other_runtime_error_propagates(mock_runner):
231231
wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)
232232

233233

234+
def test_wait_and_exit_cancels_job_on_persistent_sacct_failure(mock_runner):
235+
"""PersistentSacctFailure must cancel the job and raise UnknownStatusError."""
236+
mock_app_handle = "dummy://nemo_run/my-test-run"
237+
mock_runner.wait.side_effect = PersistentSacctFailure("sacct failed 30 times for 12345")
238+
239+
with pytest.raises(UnknownStatusError):
240+
wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)
241+
242+
mock_runner.cancel.assert_called_once_with(mock_app_handle)
243+
244+
234245
@patch("threading.Thread.run")
235246
def test_context_thread_run(mocked_run, setup_and_teardown):
236247
def test_function():

0 commit comments

Comments
 (0)