Skip to content

Commit fce82de

Browse files
committed
fix: recover orphaned running jobs
1 parent 388b2e8 commit fce82de

File tree

5 files changed

+76
-0
lines changed

5 files changed

+76
-0
lines changed

app/backends/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,15 @@ async def monitor(
111111
"""
112112
...
113113

114+
@abstractmethod
115+
async def check_job_status(self, job_id: str, work_dir: str) -> int | None:
116+
"""Check if a job process has finished without blocking.
117+
118+
Returns:
119+
Exit code if the process has exited, None if still running.
120+
"""
121+
...
122+
114123
@abstractmethod
115124
async def list_workflow_files(
116125
self,

app/backends/local.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,27 @@ async def _drain_log(off: int) -> int:
198198

199199
await asyncio.sleep(self._config.poll_interval)
200200

201+
async def check_job_status(self, job_id: str, work_dir: str) -> int | None:
202+
"""Check if a job process has finished without blocking."""
203+
wd = Path(work_dir)
204+
exitcode_path = wd / ".exitcode"
205+
if exitcode_path.exists():
206+
try:
207+
return int(exitcode_path.read_text(encoding="utf-8").strip())
208+
except (ValueError, OSError):
209+
return None
210+
pid_path = wd / ".pid"
211+
if pid_path.exists():
212+
try:
213+
pid = int(pid_path.read_text(encoding="utf-8").strip())
214+
os.kill(pid, 0)
215+
return None # still running
216+
except (ProcessLookupError, PermissionError):
217+
return -1 # dead without exitcode
218+
except (ValueError, OSError):
219+
return None
220+
return None
221+
201222
async def list_workflow_files(
202223
self,
203224
job_id: str,

app/backends/slurm_ssh.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,26 @@ async def monitor(
393393

394394
await asyncio.sleep(self._config.poll_interval)
395395

396+
async def check_job_status(self, job_id: str, work_dir: str) -> int | None:
397+
"""Check if a job process has finished without blocking."""
398+
wd = shlex.quote(work_dir)
399+
cmd = (
400+
f"test -f {wd}/.exitcode && cat {wd}/.exitcode "
401+
f"|| (kill -0 $(cat {wd}/.pid 2>/dev/null) 2>/dev/null "
402+
f"&& echo {MONITOR_RUNNING_SENTINEL} "
403+
f"|| echo {MONITOR_DEAD_SENTINEL})"
404+
)
405+
result = await self._run_ssh(cmd, check=False)
406+
status = (result.stdout or "").strip()
407+
if status == MONITOR_RUNNING_SENTINEL:
408+
return None
409+
if status == MONITOR_DEAD_SENTINEL:
410+
return -1
411+
try:
412+
return int(status)
413+
except ValueError:
414+
return None
415+
396416
async def check_connectivity(self) -> bool:
397417
"""Check SSH connectivity and scratch filesystem health."""
398418
try:

app/tasks.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,29 @@ async def sync_job_data_loop(
223223
continue
224224
if not record.work_dir:
225225
continue
226+
227+
# Recover orphaned RUNNING jobs (no active monitor task)
228+
if record.task is None or record.task.done():
229+
try:
230+
exit_code = await backend.check_job_status(
231+
job_id, record.work_dir
232+
)
233+
if exit_code is not None:
234+
logger.warning(
235+
"Recovering orphaned job %s (exit code %d)",
236+
job_id,
237+
exit_code,
238+
)
239+
store.mark_finished(job_id, exit_code)
240+
store.persist(record)
241+
continue
242+
except Exception:
243+
logger.warning(
244+
"Failed to check orphaned job %s",
245+
job_id,
246+
exc_info=True,
247+
)
248+
226249
_flush_logs(store, job_id)
227250
snkmt_db_path = store.get_snkmt_db_path(job_id)
228251
if snkmt_db_path:

tests/test_base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ async def save_cache(self, job_id, work_dir, cache_key, cache_dirs):
4848
async def sync_snkmt_db(self, job_id, work_dir, local_path):
4949
pass
5050

51+
async def check_job_status(self, job_id, work_dir):
52+
return None
53+
5154
async def check_connectivity(self):
5255
return True
5356

0 commit comments

Comments
 (0)