Skip to content

Commit dd827e2

Browse files
committed
Decouple species stage completion checks in the Scheduler
Refactor the scheduling logic to separate job termination processing from stage transition decisions. By moving conformer, TS guess, and species completion checks into dedicated methods called at the end of the polling loop, the state machine becomes more robust and ensures transitions are evaluated consistently across different execution modes.
1 parent d7b20a3 commit dd827e2

1 file changed

Lines changed: 101 additions & 48 deletions

File tree

arc/scheduler.py

Lines changed: 101 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -622,55 +622,21 @@ def schedule_jobs(self):
622622
job = self.job_dict[label]['conf_opt'][i] if 'conf_opt' in job_name \
623623
else self.job_dict[label]['conf_sp'][i]
624624
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
625-
# this is a completed conformer job
626625
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
627626
if successful_server_termination:
628627
troubleshooting_conformer = self.parse_conformer(job=job, label=label, i=i)
629628
if 'conf_opt' in job_name and self.job_types['conf_sp'] and not troubleshooting_conformer:
630629
# Accumulate for deferred pipe batching of conf_sp.
631630
self._pending_pipe_conf_sp.setdefault(label, set()).add(i)
632-
if troubleshooting_conformer:
633-
break
634-
# Just terminated a conformer job.
635-
# Are there additional conformer jobs currently running for this species?
636-
# Note: end_job already removed the current job from running_jobs,
637-
# so we don't need to exclude job_name.
638-
for spec_jobs in job_list:
639-
if 'conf_opt' in spec_jobs or 'conf_sp' in spec_jobs:
640-
break
641-
else:
642-
# All conformer jobs terminated.
643-
# Check isomorphism and run opt on most stable conformer geometry.
644-
logger.info(f'\nConformer jobs for {label} successfully terminated.\n')
645-
if self.species_dict[label].is_ts:
646-
self.determine_most_likely_ts_conformer(label)
647-
else:
648-
self.determine_most_stable_conformer(label, sp_flag=True if self.job_types['conf_sp'] else False) # also checks isomorphism
649-
if self.species_dict[label].initial_xyz is not None:
650-
# if initial_xyz is None, then we're probably troubleshooting conformers, don't opt
651-
if not self.composite_method:
652-
self.run_opt_job(label, fine=self.fine_only)
653-
else:
654-
self.run_composite_job(label)
655631
self.timer = False
656632
break
657633
if 'tsg' in job_name:
658634
job = self.job_dict[label]['tsg'][get_i_from_job_name(job_name)]
659635
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
660-
# This is a successfully completed tsg job. It may have resulted in several TSGuesses.
661636
self.end_job(job=job, label=label, job_name=job_name)
662637
if job.local_path_to_output_file.endswith('.yml') or job.local_path_to_output_file.endswith('.log'):
663638
for rxn in job.reactions:
664639
rxn.ts_species.process_completed_tsg_queue_jobs(path=job.local_path_to_output_file)
665-
# Just terminated a tsg job.
666-
# Are there additional tsg jobs currently running for this species?
667-
for spec_jobs in job_list:
668-
if 'tsg' in spec_jobs:
669-
break
670-
else:
671-
# All tsg jobs terminated. Spawn confs.
672-
logger.info(f'\nTS guess jobs for {label} successfully terminated.\n')
673-
self.run_conformer_jobs(labels=[label])
674640
self.timer = False
675641
break
676642
elif 'opt' in job_name and 'conf_opt' not in job_name:
@@ -803,20 +769,12 @@ def schedule_jobs(self):
803769
self.timer = False
804770
break
805771

806-
if not len(job_list):
807-
has_pending_pipe_work = (
808-
label in self._pending_pipe_sp
809-
or label in self._pending_pipe_freq
810-
or any(lbl == label for lbl, _ in self._pending_pipe_irc)
811-
or label in self._pending_pipe_conf_sp
812-
or any(label in {t.owner_key for t in p.tasks}
813-
for p in self.active_pipes.values())
814-
)
815-
if not has_pending_pipe_work:
816-
self.check_all_done(label)
817-
if not self.running_jobs[label]:
818-
# Delete the label only if it represents an empty entry.
819-
del self.running_jobs[label]
772+
for label in list(self.unique_species_labels):
773+
if label in self.output and self.output[label]['convergence'] is False:
774+
continue
775+
self._check_conformer_stage_complete(label)
776+
self._check_tsg_stage_complete(label)
777+
self._check_species_complete(label)
820778

821779
# Poll active pipe runs (per-run failures are handled inside poll_pipes).
822780
if self.active_pipes:
@@ -840,6 +798,101 @@ def schedule_jobs(self):
840798
# Generate a TS report:
841799
self.generate_final_ts_guess_report()
842800

801+
def _check_conformer_stage_complete(self, label: str) -> None:
802+
"""
803+
Check whether all conformer jobs (conf_opt/conf_sp) for a species have
804+
finished. If so, select the best conformer and spawn the next job.
805+
806+
Called unconditionally after job event processing so that no break
807+
in the job-processing loop can skip the conformer-to-opt transition.
808+
"""
809+
if 'conf_opt' not in self.job_dict.get(label, {}):
810+
return
811+
if any('conf_opt' in j or 'conf_sp' in j
812+
for j in self.running_jobs.get(label, [])):
813+
return
814+
if label in self._pending_pipe_conf_sp:
815+
return
816+
if any(label in {t.owner_key for t in p.tasks}
817+
for p in self.active_pipes.values()
818+
if any(t.task_family in ('conf_opt', 'conf_sp', 'ts_opt') for t in p.tasks)):
819+
return
820+
if self.species_dict[label].initial_xyz is not None:
821+
return
822+
if self.output[label].get('job_types', {}).get('conf_opt'):
823+
return
824+
if self.species_dict[label].is_ts and self.species_dict[label].ts_guesses_exhausted:
825+
return
826+
827+
if self.species_dict[label].is_ts:
828+
has_successful_conformer = any(
829+
tsg.energy is not None for tsg in self.species_dict[label].ts_guesses)
830+
else:
831+
has_successful_conformer = any(
832+
e is not None for e in self.species_dict[label].conformer_energies)
833+
834+
if not has_successful_conformer:
835+
logger.error(f'All conformer jobs for {label} failed. '
836+
f'No conformer has a valid energy.')
837+
if self.species_dict[label].is_ts:
838+
self.species_dict[label].ts_guesses_exhausted = True
839+
return
840+
841+
logger.info(f'\nConformer jobs for {label} successfully terminated.\n')
842+
if self.species_dict[label].is_ts:
843+
self.determine_most_likely_ts_conformer(label)
844+
else:
845+
self.determine_most_stable_conformer(
846+
label, sp_flag=True if self.job_types.get('conf_sp') else False)
847+
if self.species_dict[label].initial_xyz is not None:
848+
if not self.composite_method:
849+
self.run_opt_job(label, fine=self.fine_only)
850+
else:
851+
self.run_composite_job(label)
852+
elif not any('conf_opt' in j or 'conf_sp' in j
853+
for j in self.running_jobs.get(label, [])):
854+
self.output[label]['job_types']['conf_opt'] = True
855+
856+
def _check_tsg_stage_complete(self, label: str) -> None:
857+
"""
858+
Check whether all TS guess jobs for a species have finished.
859+
If so, spawn conformer jobs for the TS.
860+
"""
861+
if 'tsg' not in self.job_dict.get(label, {}):
862+
return
863+
if any('tsg' in j for j in self.running_jobs.get(label, [])):
864+
return
865+
if not self.species_dict[label].is_ts:
866+
return
867+
if self.species_dict[label].ts_conf_spawned:
868+
return
869+
if not all(tsg.success is not None for tsg in self.species_dict[label].ts_guesses):
870+
return
871+
872+
logger.info(f'\nTS guess jobs for {label} successfully terminated.\n')
873+
self.run_conformer_jobs(labels=[label])
874+
875+
def _check_species_complete(self, label: str) -> None:
876+
"""
877+
Check whether all jobs for a species are complete and call
878+
check_all_done if so. Clean up empty running_jobs entries.
879+
"""
880+
running = self.running_jobs.get(label, [])
881+
if running:
882+
return
883+
has_pending_pipe_work = (
884+
label in self._pending_pipe_sp
885+
or label in self._pending_pipe_freq
886+
or any(lbl == label for lbl, _ in self._pending_pipe_irc)
887+
or label in self._pending_pipe_conf_sp
888+
or any(label in {t.owner_key for t in p.tasks}
889+
for p in self.active_pipes.values())
890+
)
891+
if not has_pending_pipe_work:
892+
self.check_all_done(label)
893+
if label in self.running_jobs and not self.running_jobs[label]:
894+
del self.running_jobs[label]
895+
843896
def run_job(self,
844897
job_type: str,
845898
conformer: Optional[int] = None,

0 commit comments

Comments
 (0)