Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new filesystem-backed “pipe” execution mode to orchestrate many homogeneous leaf tasks inside a single scheduler array allocation, with a dedicated PipeRun orchestrator + pipe_worker consumer, and removes the legacy HDF5/array infrastructure from JobAdapter.
Changes:
- Introduces pipe task/run state machines and atomic, file-locked state updates (
pipe_state.py) plus a run orchestrator with staging, submit-script generation, and reconciliation (pipe_run.py). - Adds a standalone worker loop that claims PENDING tasks, runs adapters in
incore, persists per-attempt outputs/results, and uses claim tokens to protect terminal writes (pipe_worker.py). - Integrates pipe routing/polling/ingestion into
Scheduler, and removes legacy job-array/HDF5 machinery from adapters/tests.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| arc/settings/submit.py | Adds scheduler-type keyed templates for pipe_worker array submission. |
| arc/scripts/pipe_worker.py | Implements the pipe worker claim/execute loop and result persistence. |
| arc/scripts/pipe_worker_test.py | Unit tests for claiming, execution, dispatch routing, ownership, cleanup, and loop behavior. |
| arc/scripts/init.py | Adjusts scripts package import to use absolute module path. |
| arc/scheduler.py | Adds pipe routing helpers, active pipe polling loop integration, and ingestion hooks. |
| arc/scheduler_pipe_test.py | Extensive tests for Scheduler pipe eligibility, submission, polling, routing, and ingestion behavior. |
| arc/job/pipe_state.py | Defines pipe task/run state machines, models, and locked atomic state updates. |
| arc/job/pipe_state_test.py | Tests for state transitions, spec validation, locking semantics, and persistence helpers. |
| arc/job/pipe_run.py | Implements PipeRun staging, submit script generation, reconcile/orphan/retry logic, and from_dir restore. |
| arc/job/pipe_run_test.py | Tests for PipeRun staging/restore, submit script content, reconcile behavior, and homogeneity rules. |
| arc/job/adapters/psi_4.py | Removes legacy array/HDF5 initialization hook. |
| arc/job/adapters/common.py | Removes legacy job-array parameter determination call during adapter init. |
| arc/job/adapter.py | Removes legacy HDF5/array infrastructure and makes adapter-level pipe execution explicitly unsupported. |
| arc/job/adapter_test.py | Removes DataPoint/HDF5-related tests and pandas dependency usage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #854 +/- ##
==========================================
+ Coverage 59.21% 60.09% +0.88%
==========================================
Files 98 102 +4
Lines 30086 31009 +923
Branches 7951 8074 +123
==========================================
+ Hits 17815 18636 +821
- Misses 10029 10064 +35
- Partials 2242 2309 +67
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
with state, coordinate, planner and run for pipe
- #PBS -t → -J (PBSPro array syntax)
- $PBS_ARRAYID → $PBS_ARRAY_INDEX (PBSPro env variable)
- Added {queue} directive to PBS/SLURM/SGE templates, sourced from
servers['local']['queues'] first entry
- Added {env_setup} placeholder for engine-specific shell setup
commands (e.g., 'source /usr/local/g09/setup.sh'), configured via
pipe_settings['env_setup'] keyed by engine name
- Added pipe_settings['scratch_base'] for configurable worker scratch
directory (exported as TMPDIR in submit script)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Gaussian adapter (gaussian.py): - Reversed self.command to ['g16', 'g09', 'g03'] to prefer latest - execute_incore() now uses the actual binary found by which() instead of hardcoded g16 in incore_commands. If g09 is found, runs g09. Bug fixes in common.py which(): - Added missing os.pathsep between PATH and os.path.dirname(sys.executable) on line 518. Without the separator, the last PATH entry gets merged with the Python executable directory, corrupting the lookup. - Added missing os.pathsep between PATH and PYTHONPATH on line 528. Same concatenation bug causing spurious 'command not found' errors on nodes with non-empty PYTHONPATH. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- os.chdir(job.local_path) before job.execute() so ESS commands (e.g.,
g09 < input.gjf) run in the directory where input files were created,
not the process CWD.
- Output file existence check after execute() — raises RuntimeError if
the expected output is missing, preventing false COMPLETED status.
- ESS error detection: calls determine_ess_status() on output after
execution, even when no Python exception was raised. Detects
'completed but unconverged' cases.
- Classifies failures as deterministic ('ess_error' — SCF, MaxOptCycles,
InternalCoordinateError) vs transient ('transient_ess' — NoOutput,
ServerTimeLimit, DiskSpace). Deterministic errors are ejected to the
Scheduler for troubleshooting; transient errors are retried by pipe.
- Saves error diagnostics (keywords, error, line) to result.json
parser_summary field for downstream decisions.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ence checks Resubmission race condition: - Fresh PENDING tasks (attempt_index=0) never trigger resubmission — workers just haven't started yet. - Only retried PENDING tasks (attempt_index>0) trigger resubmission, with a 120s cooldown after last submission to prevent duplicates. ESS error handling in retry logic: - Deterministic ESS errors (failure_class='ess_error') skip blind retry and go straight to FAILED_TERMINAL for Scheduler ejection. - Transient errors (node crash, NoOutput) still get retried in pipe. Convergence checking: - Added _check_ess_convergence() gate to ALL ingestion functions. Calls determine_ess_status() and skips non-converged results. Status mismatch fix: - local.py submit_job() returns 'running' on success; coordinator now accepts both 'submitted' and 'running'. Log deduplication: - Pipe status only logged when counts change. Scratch directory fix: - PBS array job IDs contain brackets (e.g., 4012600[26].zeus-master). Strip brackets via bash parameter expansion and append array index for clean per-worker scratch paths. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Post-ingestion workflow (closes the pipe→Scheduler loop): - _post_ingest_pipe_run() dispatches by task_family after all per-task ingestion completes. - _post_ingest_ts_opt(): updates individual TSGuess objects (opt_xyz, energy, index) by conformer_index lookup, then calls determine_most_likely_ts_conformer() and run_opt_job(). - _post_ingest_conf_opt/conf_sp(): same pattern for non-TS conformers. - Post-ingestion deferred when tasks are ejected to Scheduler. Failed task ejection: - FAILED_TERMINAL tasks with failure_class='ess_error' ejected via _eject_to_scheduler() → Scheduler.run_job(). - Uses family_to_sched_job_type mapping (ts_opt→conf_opt, not opt). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- cluster_tsgs() logs at INFO when clustering reduces guess count - TS guess report shows merged method sources when clustered - Hourly status report skips empty running_jobs dict - 'Cannot compare coords' log demoted from warning to debug Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- test_ts_opt_ingestion: uses TS species with TSGuess objects - test_resubmission: uses attempt_index>0 for legitimate triggers - All ingestion tests mock determine_ess_status and post-ingestion Scheduler methods Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Archives ALL old pipe_* directories from runs/ at coordinator startup (in __init__), before any pipe run is created. This prevents FileExistsError when stage() encounters stale directories from previous ARC runs. Runs as a single startup sweep in _archive_old_pipe_dirs() rather than per-run. This ensures cleanup happens immediately regardless of which task family triggers pipe mode first. Follows existing ARC archive convention (log_and_restart_archive/ with HHMMSS_MonDD_YYYY timestamps). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Added FAILED_ESS as a distinct TaskState for deterministic ESS convergence errors (SCF, MaxOptCycles, InternalCoordinateError). Previously these were lumped into FAILED_RETRYABLE with a side-channel failure_class field. Now the state itself carries the meaning: - FAILED_RETRYABLE: transient (node crash, NoOutput) — pipe retries - FAILED_ESS: deterministic ESS error — ejected to Scheduler - FAILED_TERMINAL: exhausted retries — no further action Reverted log output to original clean format using state names directly (e.g., COMPLETED: 30, FAILED_ESS: 2, RUNNING: 8). Updated docs/source/advanced.rst with full task state documentation and pipe_settings env_setup/scratch_base configuration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two independent bugs caused the Scheduler to prematurely declare a TS failed and switch to the next guess while other conformers were still being troubleshot: Bug 1 (lines 637, 668): The for/else check for remaining conformer/tsg jobs used `spec_jobs != job_name` to skip the current job. But end_job already removed it from running_jobs before this check runs. When troubleshooting resubmitted a job with the same name, the filter incorrectly skipped it, causing the 'all done' branch to fire early. Fix: removed the unnecessary `!= job_name` filter from both the conformer check (line 637) and tsg check (line 668). Bug 2 (line 3607): troubleshoot_ess called switch_ts (which deletes ALL running jobs for the species) when a single conformer exhausted troubleshooting. But other conformers might still be running. Fix: added `and conformer is None` guard so switch_ts only fires for full TS optimization failures, not individual conformer search failures. Failed conformers are now abandoned gracefully while waiting for the others to finish. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Pipe output now follows ARC's calcs/ directory convention: - Per-species TS: calcs/TSs/<label>/pipe_<family>_<N>/ - Per-species non-TS: calcs/Species/<label>/pipe_<family>_<N>/ - Cross-species batches: calcs/batches/pipe_<run_id>_<N>/ The index <N> auto-increments (0, 1, 2...) when a previous run exists, eliminating the need for archiving old pipe directories. Changes: - PipeRun.__init__ accepts optional pipe_root parameter. The coordinator computes the path via _compute_pipe_root() which checks species_dict to determine TSs/ vs Species/ folder. Falls back to calcs/pipe_<run_id> when pipe_root is not provided (tests, direct construction). - project_directory is now stored in run.json so from_dir() can recover it regardless of directory depth (no more dirname(dirname()) heuristic). - Removed _archive_old_pipe_dirs — auto-indexing replaces archiving. - Added _next_indexed_dir static helper for finding next available index. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Added tree diagram showing the calcs/ layout for pipe runs: per-species (TSs/, Species/), cross-species batches, task/attempt hierarchy, and auto-incrementing index convention. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Generates a task_summary.txt in the pipe root at ingestion time, mapping each task to its PBS worker slot, final status, and failure class. Makes it easy to cross-reference PBS output files (named by array index) with specific tasks (claimed by work-stealing). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nd routing Coordinator tests (pipe_coordinator_test.py): - TestComputePipeRoot: TSs/ vs Species/ vs batches/ routing, auto-increment - TestNextIndexedDir: empty/nonexistent parent, sequential increment, non-matching ignored Scheduler pipe tests (scheduler_pipe_test.py): - TestDeterministicEssError: 10 cases including mixed transient+deterministic - TestFailedEssState: valid transitions, terminal (no outgoing), counts toward completion, not retried by reconcile, transition out raises ValueError - TestEjectToSchedulerJobType: ts_opt→conf_opt, species_sp→sp, unknown species - TestPipeRoutingIntegration: 15 TS guesses routed to pipe via mockter adapter, staged under calcs/TSs/H2O/, correct task count, spec/state.json on disk, submit script generated, below-threshold not piped Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nd routing Tests placed in their correct files: pipe_state_test.py: - TestFailedEssState: valid transitions, terminal (no outgoing), transition out raises ValueError, not retried by reconcile, counts toward completion pipe_coordinator_test.py: - TestComputePipeRoot: TSs/ vs Species/ vs batches/ routing, auto-increment - TestNextIndexedDir: empty/nonexistent parent, sequential increment, non-matching ignored scheduler_pipe_test.py: - TestDeterministicEssError: 10 cases including mixed transient+deterministic - TestEjectToSchedulerJobType: ts_opt→conf_opt, species_sp→sp, unknown species - TestPipeRoutingIntegration: 15 TS guesses routed via mockter, staged under calcs/TSs/, correct task count, spec/state.json on disk, submit script generated, below-threshold not piped Removed duplicate test classes left by parallel agent worktrees. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When a species' conformer jobs are routed to a pipe run, the species has no entries in running_jobs. The main loop's check at line 806 saw an empty job_list and called check_all_done(), declaring the species failed before the pipe had even started. The has_pending_pipe_work guard now also checks active_pipes for any pipe run containing tasks owned by the species. This prevents check_all_done from firing while pipe work is in progress. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
MockAdapter writes YAML output (not real ESS logs), so determine_ess_status falsely reports convergence failure. Patching _parse_ess_error to return None in TestRunTask and TestWorkerLoop skips the ESS check for these mockter-based tests.
cluster_tsgs() merges methods into method_sources list, not into the method string. Updated test_cluster_tsgs to expect the representative's original method and execution_time. Updated test_as_dict and test_from_dict to include method_sources in expected output.
|
Thanks for the fixes, updates, and improvements, @calvinp0! |
Implements a new "pipe" execution mode that orchestrates hundreds of subjobs within a single SLURM/PBS/SGE/HTCondor array allocation using a
distributed, lease-based state machine backed by the filesystem.
Architecture:
from_dir() reconstruction
before terminal writes
polling loop integration
Supported task families: conf_opt, conf_sp, ts_guess_batch_method, ts_opt, species_sp, species_freq, irc, rotor_scan_1d
Key design rules:
Legacy cleanup: Removed the old HDF5-based DataPoint/write_hdf5/determine_job_array_parameters infrastructure from JobAdapter. Updated pipe_submit
templates in settings/submit.py from the old pipe.py design to the new pipe_worker design.