Skip to content

Commit db4756f

Browse files
ko3n1gclaude
andcommitted
fix: store job_id explicitly to avoid separator collision in app_id parsing
When a role name ends with '_', the app_id string looks like: experiment___role_name____job_id Splitting on '___' produces job_id = '_job_id' (spurious leading '_'), causing the status/cancel/log_iter calls to use a wrong ID and get 404. Fix: _save_job_dir now stores the actual job_id in the JSON record. describe(), log_iter(), and _cancel_existing() all read job_id from the stored record, falling back to app_id.split('___')[-1] for backwards compatibility with existing saved jobs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent c89ba4e commit db4756f

2 files changed

Lines changed: 46 additions & 6 deletions

File tree

nemo_run/run/torchx_backend/schedulers/dgxcloud.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str:
161161

162162
# Store a status entry or logs path if available
163163
# Currently, the DGXExecutor status is placeholder, but we keep the pattern
164-
_save_job_dir(app_id, job_status=status, executor=executor)
164+
_save_job_dir(app_id, job_status=status, executor=executor, job_id=job_id)
165165

166166
return app_id
167167

@@ -173,7 +173,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
173173
# We split out the stored values from the JSON file
174174
stored_data = _get_job_dirs()
175175
job_info = stored_data.get(app_id)
176-
_, role_name, job_id = app_id.split("___")
176+
parts = app_id.split("___")
177+
role_name = parts[1] if len(parts) > 1 else app_id
177178
roles = [Role(name=role_name, image="", num_replicas=1)]
178179
roles_statuses = [
179180
RoleStatus(
@@ -191,6 +192,7 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
191192
if not executor:
192193
return None
193194

195+
job_id = job_info.get("job_id") or parts[-1]
194196
dgx_state = executor.status(job_id) or DGXCloudState.UNKNOWN
195197
app_state = DGX_STATES.get(dgx_state, AppState.PENDING)
196198
roles_statuses[0].replicas[0].state = app_state
@@ -217,7 +219,7 @@ def log_iter(
217219
) -> Iterable[str]:
218220
stored_data = _get_job_dirs()
219221
job_info = stored_data.get(app_id)
220-
_, _, job_id = app_id.split("___")
222+
job_id = job_info.get("job_id") or app_id.split("___")[-1]
221223
executor: Optional[DGXCloudExecutor] = job_info.get("executor", None) # type: ignore
222224
if not executor:
223225
return [""]
@@ -240,7 +242,7 @@ def _cancel_existing(self, app_id: str) -> None:
240242
"""
241243
stored_data = _get_job_dirs()
242244
job_info = stored_data.get(app_id)
243-
_, _, job_id = app_id.split("___")
245+
job_id = job_info.get("job_id") or app_id.split("___")[-1]
244246
executor: DGXCloudExecutor = job_info.get("executor", None) # type: ignore
245247
if not executor:
246248
return None
@@ -257,7 +259,9 @@ def create_scheduler(session_name: str, **kwargs: Any) -> DGXCloudScheduler:
257259
return DGXCloudScheduler(session_name=session_name)
258260

259261

260-
def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> None:
262+
def _save_job_dir(
263+
app_id: str, job_status: str, executor: DGXCloudExecutor, job_id: str = ""
264+
) -> None:
261265
"""
262266
Saves or updates local record of job status in JSON for demonstration.
263267
"""
@@ -276,6 +280,7 @@ def _save_job_dir(app_id: str, job_status: str, executor: DGXCloudExecutor) -> N
276280

277281
app = {
278282
"job_status": job_status,
283+
"job_id": job_id,
279284
"executor": serializer.serialize(
280285
fdl_dc.convert_dataclasses_to_configs(executor, allow_post_init=True)
281286
),

test/run/torchx_backend/schedulers/test_dgxcloud.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def test_describe(dgx_cloud_scheduler, dgx_cloud_executor):
110110
mock_get_job_dirs.return_value = {
111111
"test_experiment___test_role___test_job_id": {
112112
"job_status": "RUNNING",
113+
"job_id": "test_job_id",
113114
"executor": dgx_cloud_executor,
114115
}
115116
}
@@ -132,6 +133,7 @@ def test_cancel_existing(dgx_cloud_scheduler, dgx_cloud_executor):
132133
mock_get_job_dirs.return_value = {
133134
"test_experiment___test_role___test_job_id": {
134135
"job_status": "RUNNING",
136+
"job_id": "test_job_id",
135137
"executor": dgx_cloud_executor,
136138
}
137139
}
@@ -159,10 +161,11 @@ def test_save_and_get_job_dirs():
159161
pvc_nemo_run_dir="/workspace/nemo_run",
160162
)
161163

162-
_save_job_dir("test_app_id", "RUNNING", executor)
164+
_save_job_dir("test_app_id", "RUNNING", executor, job_id="actual_job_id")
163165
job_dirs = _get_job_dirs()
164166

165167
assert "test_app_id" in job_dirs
168+
assert job_dirs["test_app_id"]["job_id"] == "actual_job_id"
166169
assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor)
167170

168171

@@ -173,6 +176,7 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor):
173176
mock_get_job_dirs.return_value = {
174177
"test_session___test_role___test_container_id": {
175178
"job_status": "RUNNING",
179+
"job_id": "test_job_id",
176180
"executor": dgx_cloud_executor,
177181
}
178182
}
@@ -188,6 +192,34 @@ def test_log_iter(dgx_cloud_scheduler, dgx_cloud_executor):
188192
assert logs == ["log2", "log3"]
189193

190194

195+
def test_describe_uses_stored_job_id_not_split(dgx_cloud_scheduler, dgx_cloud_executor):
196+
# Regression test: when a role name ends with '_', splitting app_id on '___'
197+
# produces a job_id with a spurious leading '_' (e.g. role 'W-foo_' + sep '___'
198+
# gives '____' which splits into 'role_' and '_job_id'). describe() must use
199+
# the job_id stored at schedule time, not re-derive it from the app_id string.
200+
real_job_id = "48db46d2-ae56-4c9d-9abd-ba0d873e50eb"
201+
# role name ending with '_' triggers the collision
202+
app_id = f"experiment___role_name___{real_job_id}"
203+
204+
with (
205+
mock.patch(
206+
"nemo_run.run.torchx_backend.schedulers.dgxcloud._get_job_dirs"
207+
) as mock_get_job_dirs,
208+
mock.patch.object(DGXCloudExecutor, "status", return_value=DGXCloudState.RUNNING) as mock_status,
209+
):
210+
mock_get_job_dirs.return_value = {
211+
app_id: {
212+
"job_status": "RUNNING",
213+
"job_id": real_job_id,
214+
"executor": dgx_cloud_executor,
215+
}
216+
}
217+
218+
response = dgx_cloud_scheduler.describe(app_id)
219+
assert response is not None
220+
mock_status.assert_called_once_with(real_job_id)
221+
222+
191223
def test_unknown_state_maps_to_pending_not_failed():
192224
# DGXCloudState.UNKNOWN must map to PENDING so transient API errors during
193225
# job startup do not cause wait_and_exit() to treat the job as terminal.
@@ -207,6 +239,7 @@ def test_describe_returns_pending_when_status_is_none(dgx_cloud_scheduler, dgx_c
207239
mock_get_job_dirs.return_value = {
208240
"test_experiment___test_role___test_job_id": {
209241
"job_status": "RUNNING",
242+
"job_id": "test_job_id",
210243
"executor": dgx_cloud_executor,
211244
}
212245
}
@@ -229,6 +262,7 @@ def test_describe_returns_pending_when_status_is_unknown(dgx_cloud_scheduler, dg
229262
mock_get_job_dirs.return_value = {
230263
"test_experiment___test_role___test_job_id": {
231264
"job_status": "RUNNING",
265+
"job_id": "test_job_id",
232266
"executor": dgx_cloud_executor,
233267
}
234268
}
@@ -245,6 +279,7 @@ def test_log_iter_str(dgx_cloud_scheduler, dgx_cloud_executor):
245279
mock_get_job_dirs.return_value = {
246280
"test_session___test_role___test_container_id": {
247281
"job_status": "RUNNING",
282+
"job_id": "test_job_id",
248283
"executor": dgx_cloud_executor,
249284
}
250285
}

0 commit comments

Comments
 (0)