Skip to content

Commit 5c50f88

Browse files
authored
TS Edge Case & Pipe QUEUE bug (#862)
# Two changes: --- ## Change 1: In the Scheduler, only break after conformer troubleshooting if new jobs (conf_opt or conf_sp) are actually running for that species. This ensures the scheduler correctly falls through to the "all conformers done" check if troubleshooting was attempted but failed to launch new tasks. Essentially, this was a race to completion bug. If there were, for example, 3 TS guesses being troubleshooted and if the 2 out the 2 finished troubleshoot, then the last one, if it failed an exhausted all attempts would cause a bug where ARC would declare there was no TS that converged ## Change 2: There is a bug in ARC when Pipe is active. The issue is that when a batch job is submited, and let's say it has 20 jobs in it - so 20 workers needed, and then only 10 of those were picked up by workers but the other 10 are in Q mode, ARC misunderstand this and attempts to resubmit those 10 as it thinks they were not provided workers properly. And ARC will continue to do this ad infinitum
2 parents d96a9d2 + 61a711a commit 5c50f88

5 files changed

Lines changed: 82 additions & 42 deletions

File tree

arc/checks/common.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,21 @@
77

88
from typing import List, Optional
99

10+
CONFORMER_JOB_TYPES = ('conf_opt', 'conf_sp')
11+
12+
13+
def is_conformer_job(job_name: str) -> bool:
14+
"""
15+
Check whether a job name represents a conformer job.
16+
17+
Args:
18+
job_name (str): The job name, e.g., 'conf_opt_3' or 'conf_sp_0'.
19+
20+
Returns:
21+
bool: ``True`` if the job name starts with a conformer job type prefix.
22+
"""
23+
return job_name.startswith(CONFORMER_JOB_TYPES)
24+
1025

1126
def sum_time_delta(timedelta_list: List[datetime.timedelta]) -> datetime.timedelta:
1227
"""
@@ -36,10 +51,10 @@ def get_i_from_job_name(job_name: str) -> Optional[int]:
3651
Optional[int]: The corresponding conformer or tsg index.
3752
"""
3853
i = None
39-
if 'conf_opt' in job_name:
40-
i = int(job_name[9:])
41-
elif 'conf_sp' in job_name:
42-
i = int(job_name[8:])
43-
elif 'tsg' in job_name:
54+
for prefix in CONFORMER_JOB_TYPES:
55+
if job_name.startswith(prefix):
56+
i = int(job_name[len(prefix) + 1:]) # +1 for the '_' separator
57+
return i
58+
if job_name.startswith('tsg'):
4459
i = int(job_name[3:])
4560
return i

arc/job/pipe/pipe_run.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -363,20 +363,11 @@ def reconcile(self) -> Dict[str, int]:
363363
logger.debug(f'Could not promote task {task_id} to FAILED_TERMINAL '
364364
f'(lock contention or concurrent state change): {e}')
365365

366-
# Only flag resubmission for genuinely retried tasks (attempt_index > 0).
367-
# Fresh PENDING tasks (attempt_index == 0) are waiting for the initial
368-
# submission's workers to start — don't resubmit for those.
369-
# After a resubmission, allow a grace period for workers to start before
370-
# flagging again (prevents duplicate submissions).
371-
active_after_retry = counts[TaskState.CLAIMED.value] + counts[TaskState.RUNNING.value]
372-
resubmit_grace = 120 # seconds
373-
time_since_submit = (now - self.submitted_at) if self.submitted_at else float('inf')
374-
if retried_pending > 0 and active_after_retry == 0 and time_since_submit > resubmit_grace:
375-
self._needs_resubmission = True
376-
logger.info(f'Pipe run {self.run_id}: {retried_pending} retried tasks '
377-
f'need workers. Resubmission needed.')
378-
else:
379-
self._needs_resubmission = False
366+
# Never resubmit a new scheduler job for retried tasks.
367+
# Workers still in the scheduler queue (PBS Q state) will claim
368+
# retried PENDING tasks when they start. If the scheduler job
369+
# was killed, that is a manual intervention issue.
370+
self._needs_resubmission = False
380371

381372
terminal = (counts[TaskState.COMPLETED.value]
382373
+ counts[TaskState.FAILED_ESS.value]

arc/job/pipe/pipe_run_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,44 @@ def test_terminal_run_not_regressed(self):
267267
self.assertEqual(run.status, PipeRunState.COMPLETED)
268268

269269

270+
class TestPipeRunNoResubmission(unittest.TestCase):
271+
"""Pipe runs must never flag resubmission — Q-state workers handle retried tasks."""
272+
273+
def setUp(self):
274+
self.tmpdir = tempfile.mkdtemp(prefix='pipe_run_resub_')
275+
276+
def tearDown(self):
277+
shutil.rmtree(self.tmpdir, ignore_errors=True)
278+
279+
def test_never_resubmit_even_with_retried_tasks_and_no_workers(self):
280+
"""Even when all workers are done and retried tasks remain,
281+
needs_resubmission must stay False — no automatic resubmission."""
282+
tasks = [_make_spec(f't{i}') for i in range(3)]
283+
run = PipeRun(project_directory=self.tmpdir, run_id='resub',
284+
tasks=tasks, cluster_software='slurm', max_attempts=3)
285+
run.stage()
286+
run.submitted_at = time.time() - 300
287+
run.status = PipeRunState.SUBMITTED
288+
# All 3 workers started: t0 completed, t1 failed, t2 completed
289+
now = time.time()
290+
for tid in ('t0', 't2'):
291+
update_task_state(run.pipe_root, tid, new_status=TaskState.CLAIMED,
292+
claimed_by='w', claim_token='tok', claimed_at=now,
293+
lease_expires_at=now + 300)
294+
update_task_state(run.pipe_root, tid, new_status=TaskState.RUNNING, started_at=now)
295+
update_task_state(run.pipe_root, tid, new_status=TaskState.COMPLETED, ended_at=now)
296+
update_task_state(run.pipe_root, 't1', new_status=TaskState.CLAIMED,
297+
claimed_by='w', claim_token='tok', claimed_at=now,
298+
lease_expires_at=now + 300)
299+
update_task_state(run.pipe_root, 't1', new_status=TaskState.RUNNING, started_at=now)
300+
update_task_state(run.pipe_root, 't1', new_status=TaskState.FAILED_RETRYABLE,
301+
ended_at=now + 1, failure_class='timeout')
302+
303+
run.reconcile()
304+
self.assertFalse(run.needs_resubmission,
305+
'Should never resubmit — Q-state workers or manual intervention handle retries')
306+
307+
270308
class TestPipeRunHomogeneity(unittest.TestCase):
271309
"""Tests for PipeRun homogeneity validation."""
272310

arc/scheduler.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import arc.parser.parser as parser
1717
from arc import plotter
18-
from arc.checks.common import get_i_from_job_name, sum_time_delta
18+
from arc.checks.common import get_i_from_job_name, is_conformer_job, sum_time_delta
1919
from arc.checks.ts import check_imaginary_frequencies, check_ts, check_irc_species_and_rxn
2020
from arc.common import (extremum_list,
2121
get_angle_in_180_range,
@@ -630,7 +630,15 @@ def schedule_jobs(self):
630630
# Accumulate for deferred pipe batching of conf_sp.
631631
self._pending_pipe_conf_sp.setdefault(label, set()).add(i)
632632
if troubleshooting_conformer:
633-
break
633+
# Only break if other conformer jobs are still in flight.
634+
# When the last conformer exhausts troubleshooting without
635+
# converging, we must fall through to the "all done" check
636+
# below so it can call determine_most_likely_ts_conformer
637+
# on the conformers that already succeeded — otherwise ARC
638+
# mistakenly concludes no TS guess converged.
639+
if any(is_conformer_job(j)
640+
for j in self.running_jobs.get(label, [])):
641+
break
634642
# Just terminated a conformer job.
635643
# Are there additional conformer jobs currently running for this species?
636644
# Note: end_job already removed the current job from running_jobs,
@@ -3791,7 +3799,7 @@ def save_restart_dict(self):
37913799
self.restart_dict['running_jobs'][spc.label] = \
37923800
[self.job_dict[spc.label][job_name.rsplit('_', 1)[0]][job_name].as_dict()
37933801
for job_name in self.running_jobs[spc.label]
3794-
if all(x not in job_name for x in ['conf_opt', 'conf_sp', 'tsg'])] \
3802+
if not is_conformer_job(job_name) and 'tsg' not in job_name] \
37953803
+ [self.job_dict[spc.label]['conf_opt'][get_i_from_job_name(job_name)].as_dict()
37963804
for job_name in self.running_jobs[spc.label] if 'conf_opt' in job_name] \
37973805
+ [self.job_dict[spc.label]['conf_sp'][get_i_from_job_name(job_name)].as_dict()

arc/scheduler_pipe_test.py

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,8 +1011,8 @@ def test_scan_not_mixed_with_other_families(self):
10111011
run.stage()
10121012

10131013

1014-
class TestResubmissionLifecycle(unittest.TestCase):
1015-
"""Tests for #1: resubmission sets SUBMITTED status and clears flag."""
1014+
class TestNoResubmissionLifecycle(unittest.TestCase):
1015+
"""Pipe runs must never resubmit — Q-state workers handle retried tasks."""
10161016

10171017
def setUp(self):
10181018
self.tmpdir = tempfile.mkdtemp(prefix='pipe_resub_test_')
@@ -1021,11 +1021,11 @@ def setUp(self):
10211021
def tearDown(self):
10221022
shutil.rmtree(self.tmpdir, ignore_errors=True)
10231023

1024-
def test_resubmission_sets_submitted_status(self):
1025-
"""After successful resubmission, pipe status should be SUBMITTED."""
1024+
def test_no_resubmission_even_with_retried_tasks(self):
1025+
"""Even when all workers are done and retried tasks remain,
1026+
poll_pipes must not resubmit a new scheduler job."""
10261027
tasks = [_make_task_spec(f'task_{i}') for i in range(3)]
10271028
pipe = self.sched.pipe_coordinator.submit_pipe_run('resub_test', tasks)
1028-
# Simulate retried tasks (attempt_index > 0) so reconcile flags resubmission
10291029
for task_id in ['task_0', 'task_1', 'task_2']:
10301030
now = time.time()
10311031
update_task_state(pipe.pipe_root, task_id, new_status=TaskState.CLAIMED,
@@ -1038,22 +1038,10 @@ def test_resubmission_sets_submitted_status(self):
10381038
claimed_at=None, lease_expires_at=None,
10391039
started_at=None, ended_at=None, failure_class=None)
10401040
pipe.status = PipeRunState.RECONCILING
1041-
# Mock submit_to_scheduler to succeed
1042-
with patch.object(pipe, 'submit_to_scheduler', return_value=('submitted', '12345')):
1041+
with patch.object(pipe, 'submit_to_scheduler', return_value=('submitted', '12345')) as mock_submit:
10431042
self.sched.pipe_coordinator.poll_pipes()
1044-
self.assertEqual(pipe.status, PipeRunState.SUBMITTED)
1045-
self.assertEqual(pipe.scheduler_job_id, '12345')
1046-
self.assertFalse(pipe._needs_resubmission)
1047-
1048-
def test_resubmission_clears_flag_on_failure(self):
1049-
"""After failed resubmission, flag should still be cleared to avoid infinite loops."""
1050-
tasks = [_make_task_spec(f'task_{i}') for i in range(3)]
1051-
pipe = self.sched.pipe_coordinator.submit_pipe_run('resub_fail', tasks)
1052-
pipe._needs_resubmission = True
1053-
pipe.status = PipeRunState.RECONCILING
1054-
with patch.object(pipe, 'submit_to_scheduler', return_value=('errored', None)):
1055-
self.sched.pipe_coordinator.poll_pipes()
1056-
self.assertFalse(pipe._needs_resubmission)
1043+
mock_submit.assert_not_called()
1044+
self.assertFalse(pipe.needs_resubmission)
10571045

10581046

10591047
class TestShouldUsePipeOwnerType(unittest.TestCase):

0 commit comments

Comments
 (0)