Skip to content

Add pipe execution mode for distributed HPC job orchestration#854

Merged
alongd merged 26 commits intomainfrom
pipe
Apr 9, 2026
Merged

Add pipe execution mode for distributed HPC job orchestration#854
alongd merged 26 commits intomainfrom
pipe

Conversation

@alongd
Copy link
Copy Markdown
Member

@alongd alongd commented Apr 1, 2026

Implements a new "pipe" execution mode that orchestrates hundreds of subjobs within a single SLURM/PBS/SGE/HTCondor array allocation using a
distributed, lease-based state machine backed by the filesystem.

Architecture:

  • pipe_state.py — Task/run state machines, data models (TaskSpec, TaskStateRecord), file-locked atomic I/O, claim tokens for ownership verification
  • pipe_run.py — PipeRun orchestrator: staging, submit-script generation, reconciliation with orphan detection, retry budgets, run.json persistence,
    from_dir() reconstruction
  • pipe_worker.py — Standalone worker script that loops claiming PENDING tasks, dispatches by task family, writes result.json, verifies ownership
    before terminal writes
  • scheduler.py — Pipe API on Scheduler: eligibility checks, pipe routing for conformer/TS/species-side/scan jobs, family-based ingestion dispatch,
    polling loop integration

Supported task families: conf_opt, conf_sp, ts_guess_batch_method, ts_opt, species_sp, species_freq, irc, rotor_scan_1d

Key design rules:

  • Pipe executes only ready leaf jobs — all QA, troubleshooting, and downstream branching stays in mother ARC
  • One family / one engine / one level per PipeRun (homogeneity enforced at staging)
  • Ingestion happens only after full PipeRun completion
  • Workers verify ownership via claimed_by + claim_token before writing terminal state

Legacy cleanup: Removed the old HDF5-based DataPoint/write_hdf5/determine_job_array_parameters infrastructure from JobAdapter. Updated pipe_submit
templates in settings/submit.py from the old pipe.py design to the new pipe_worker design.

Comment thread arc/job/pipe_state_test.py Fixed
Comment thread arc/job/pipe_run.py Fixed
Comment thread arc/job/pipe_run.py Fixed
Comment thread arc/job/pipe_state.py Fixed
Comment thread arc/job/pipe_state_test.py Fixed
Comment thread arc/scheduler.py Fixed
Comment thread arc/scheduler_pipe_test.py Fixed
Comment thread arc/scheduler_pipe_test.py Fixed
Comment thread arc/scripts/pipe_worker.py Fixed
Comment thread arc/scripts/pipe_worker.py Fixed
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new filesystem-backed “pipe” execution mode to orchestrate many homogeneous leaf tasks inside a single scheduler array allocation, with a dedicated PipeRun orchestrator + pipe_worker consumer, and removes the legacy HDF5/array infrastructure from JobAdapter.

Changes:

  • Introduces pipe task/run state machines and atomic, file-locked state updates (pipe_state.py) plus a run orchestrator with staging, submit-script generation, and reconciliation (pipe_run.py).
  • Adds a standalone worker loop that claims PENDING tasks, runs adapters in incore, persists per-attempt outputs/results, and uses claim tokens to protect terminal writes (pipe_worker.py).
  • Integrates pipe routing/polling/ingestion into Scheduler, and removes legacy job-array/HDF5 machinery from adapters/tests.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
arc/settings/submit.py Adds scheduler-type keyed templates for pipe_worker array submission.
arc/scripts/pipe_worker.py Implements the pipe worker claim/execute loop and result persistence.
arc/scripts/pipe_worker_test.py Unit tests for claiming, execution, dispatch routing, ownership, cleanup, and loop behavior.
arc/scripts/init.py Adjusts scripts package import to use absolute module path.
arc/scheduler.py Adds pipe routing helpers, active pipe polling loop integration, and ingestion hooks.
arc/scheduler_pipe_test.py Extensive tests for Scheduler pipe eligibility, submission, polling, routing, and ingestion behavior.
arc/job/pipe_state.py Defines pipe task/run state machines, models, and locked atomic state updates.
arc/job/pipe_state_test.py Tests for state transitions, spec validation, locking semantics, and persistence helpers.
arc/job/pipe_run.py Implements PipeRun staging, submit script generation, reconcile/orphan/retry logic, and from_dir restore.
arc/job/pipe_run_test.py Tests for PipeRun staging/restore, submit script content, reconcile behavior, and homogeneity rules.
arc/job/adapters/psi_4.py Removes legacy array/HDF5 initialization hook.
arc/job/adapters/common.py Removes legacy job-array parameter determination call during adapter init.
arc/job/adapter.py Removes legacy HDF5/array infrastructure and makes adapter-level pipe execution explicitly unsupported.
arc/job/adapter_test.py Removes DataPoint/HDF5-related tests and pandas dependency usage.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread arc/settings/submit.py Outdated
Comment thread arc/settings/submit.py Outdated
Comment thread arc/settings/submit.py
Comment thread arc/scripts/pipe_worker.py Outdated
Comment thread arc/scripts/pipe_worker.py
Comment thread arc/job/pipe/pipe_state.py
Comment thread arc/scheduler.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/job/pipe_state_test.py Fixed
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread arc/settings/submit.py
Comment thread arc/scripts/pipe_worker.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/scheduler.py Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread arc/settings/submit.py
Comment thread arc/settings/submit.py
Comment thread arc/scripts/pipe_worker.py Outdated
Comment thread arc/job/pipe/pipe_run.py
Comment thread arc/job/pipe_run.py Outdated
Comment thread arc/job/pipe_run.py Outdated
Comment thread arc/settings/submit.py Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 2, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 60.09%. Comparing base (828ebef) to head (700524e).
⚠️ Report is 27 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #854      +/-   ##
==========================================
+ Coverage   59.21%   60.09%   +0.88%     
==========================================
  Files          98      102       +4     
  Lines       30086    31009     +923     
  Branches     7951     8074     +123     
==========================================
+ Hits        17815    18636     +821     
- Misses      10029    10064      +35     
- Partials     2242     2309      +67     
Flag Coverage Δ
functionaltests 60.09% <ø> (+0.88%) ⬆️
unittests 60.09% <ø> (+0.88%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread arc/scripts/pipe_worker.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/job/pipe_run.py Fixed
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread arc/scheduler.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/job/pipe_run.py Outdated
Comment thread arc/settings/submit.py Outdated
Comment thread arc/job/pipe/pipe_run.py
Comment thread arc/settings/settings.py
Comment thread arc/job/pipe_run.py Outdated
Comment thread arc/job/pipe/pipe_coordinator.py Dismissed
Comment thread arc/job/pipe/pipe_planner.py Dismissed
Comment thread arc/job/pipe/pipe_planner.py Dismissed
Comment thread arc/scheduler.py Dismissed
Comment thread arc/scheduler.py Fixed
Comment thread arc/job/pipe/pipe_coordinator.py Fixed
Comment thread arc/scheduler_pipe_test.py Fixed
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 21 out of 21 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread arc/scripts/pipe_worker.py Outdated
Comment thread arc/job/pipe/pipe_run.py Outdated
Comment thread arc/scheduler.py Outdated
Comment thread arc/job/pipe/pipe_coordinator.py Fixed
Comment thread arc/job/pipe/pipe_planner.py Fixed
Comment thread arc/job/pipe/pipe_planner.py Fixed
Comment thread arc/scheduler.py Fixed
Comment thread arc/scheduler.py Fixed
Comment thread arc/scheduler.py Dismissed
alongd and others added 26 commits April 8, 2026 10:44
with state, coordinate, planner and run for pipe
- #PBS -t → -J (PBSPro array syntax)
- $PBS_ARRAYID → $PBS_ARRAY_INDEX (PBSPro env variable)
- Added {queue} directive to PBS/SLURM/SGE templates, sourced from
  servers['local']['queues'] first entry
- Added {env_setup} placeholder for engine-specific shell setup
  commands (e.g., 'source /usr/local/g09/setup.sh'), configured via
  pipe_settings['env_setup'] keyed by engine name
- Added pipe_settings['scratch_base'] for configurable worker scratch
  directory (exported as TMPDIR in submit script)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Gaussian adapter (gaussian.py):
- Reversed self.command to ['g16', 'g09', 'g03'] to prefer latest
- execute_incore() now uses the actual binary found by which() instead
  of hardcoded g16 in incore_commands. If g09 is found, runs g09.

Bug fixes in common.py which():
- Added missing os.pathsep between PATH and os.path.dirname(sys.executable)
  on line 518. Without the separator, the last PATH entry gets merged with
  the Python executable directory, corrupting the lookup.
- Added missing os.pathsep between PATH and PYTHONPATH on line 528.
  Same concatenation bug causing spurious 'command not found' errors on
  nodes with non-empty PYTHONPATH.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- os.chdir(job.local_path) before job.execute() so ESS commands (e.g.,
  g09 < input.gjf) run in the directory where input files were created,
  not the process CWD.
- Output file existence check after execute() — raises RuntimeError if
  the expected output is missing, preventing false COMPLETED status.
- ESS error detection: calls determine_ess_status() on output after
  execution, even when no Python exception was raised. Detects
  'completed but unconverged' cases.
- Classifies failures as deterministic ('ess_error' — SCF, MaxOptCycles,
  InternalCoordinateError) vs transient ('transient_ess' — NoOutput,
  ServerTimeLimit, DiskSpace). Deterministic errors are ejected to the
  Scheduler for troubleshooting; transient errors are retried by pipe.
- Saves error diagnostics (keywords, error, line) to result.json
  parser_summary field for downstream decisions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ence checks

Resubmission race condition:
- Fresh PENDING tasks (attempt_index=0) never trigger resubmission —
  workers just haven't started yet.
- Only retried PENDING tasks (attempt_index>0) trigger resubmission,
  with a 120s cooldown after last submission to prevent duplicates.

ESS error handling in retry logic:
- Deterministic ESS errors (failure_class='ess_error') skip blind retry
  and go straight to FAILED_TERMINAL for Scheduler ejection.
- Transient errors (node crash, NoOutput) still get retried in pipe.

Convergence checking:
- Added _check_ess_convergence() gate to ALL ingestion functions.
  Calls determine_ess_status() and skips non-converged results.

Status mismatch fix:
- local.py submit_job() returns 'running' on success; coordinator now
  accepts both 'submitted' and 'running'.

Log deduplication:
- Pipe status only logged when counts change.

Scratch directory fix:
- PBS array job IDs contain brackets (e.g., 4012600[26].zeus-master).
  Strip brackets via bash parameter expansion and append array index
  for clean per-worker scratch paths.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Post-ingestion workflow (closes the pipe→Scheduler loop):
- _post_ingest_pipe_run() dispatches by task_family after all per-task
  ingestion completes.
- _post_ingest_ts_opt(): updates individual TSGuess objects (opt_xyz,
  energy, index) by conformer_index lookup, then calls
  determine_most_likely_ts_conformer() and run_opt_job().
- _post_ingest_conf_opt/conf_sp(): same pattern for non-TS conformers.
- Post-ingestion deferred when tasks are ejected to Scheduler.

Failed task ejection:
- FAILED_TERMINAL tasks with failure_class='ess_error' ejected via
  _eject_to_scheduler() → Scheduler.run_job().
- Uses family_to_sched_job_type mapping (ts_opt→conf_opt, not opt).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- cluster_tsgs() logs at INFO when clustering reduces guess count
- TS guess report shows merged method sources when clustered
- Hourly status report skips empty running_jobs dict
- 'Cannot compare coords' log demoted from warning to debug

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- test_ts_opt_ingestion: uses TS species with TSGuess objects
- test_resubmission: uses attempt_index>0 for legitimate triggers
- All ingestion tests mock determine_ess_status and post-ingestion
  Scheduler methods

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Archives ALL old pipe_* directories from runs/ at coordinator startup
(in __init__), before any pipe run is created. This prevents
FileExistsError when stage() encounters stale directories from
previous ARC runs.

Runs as a single startup sweep in _archive_old_pipe_dirs() rather than
per-run. This ensures cleanup happens immediately regardless of which
task family triggers pipe mode first.

Follows existing ARC archive convention (log_and_restart_archive/
with HHMMSS_MonDD_YYYY timestamps).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Added FAILED_ESS as a distinct TaskState for deterministic ESS
convergence errors (SCF, MaxOptCycles, InternalCoordinateError).
Previously these were lumped into FAILED_RETRYABLE with a side-channel
failure_class field. Now the state itself carries the meaning:

- FAILED_RETRYABLE: transient (node crash, NoOutput) — pipe retries
- FAILED_ESS: deterministic ESS error — ejected to Scheduler
- FAILED_TERMINAL: exhausted retries — no further action

Reverted log output to original clean format using state names directly
(e.g., COMPLETED: 30, FAILED_ESS: 2, RUNNING: 8).

Updated docs/source/advanced.rst with full task state documentation
and pipe_settings env_setup/scratch_base configuration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two independent bugs caused the Scheduler to prematurely declare a TS
failed and switch to the next guess while other conformers were still
being troubleshot:

Bug 1 (lines 637, 668): The for/else check for remaining conformer/tsg
jobs used `spec_jobs != job_name` to skip the current job. But end_job
already removed it from running_jobs before this check runs. When
troubleshooting resubmitted a job with the same name, the filter
incorrectly skipped it, causing the 'all done' branch to fire early.
Fix: removed the unnecessary `!= job_name` filter from both the
conformer check (line 637) and tsg check (line 668).

Bug 2 (line 3607): troubleshoot_ess called switch_ts (which deletes
ALL running jobs for the species) when a single conformer exhausted
troubleshooting. But other conformers might still be running.
Fix: added `and conformer is None` guard so switch_ts only fires for
full TS optimization failures, not individual conformer search failures.
Failed conformers are now abandoned gracefully while waiting for the
others to finish.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Pipe output now follows ARC's calcs/ directory convention:
- Per-species TS: calcs/TSs/<label>/pipe_<family>_<N>/
- Per-species non-TS: calcs/Species/<label>/pipe_<family>_<N>/
- Cross-species batches: calcs/batches/pipe_<run_id>_<N>/

The index <N> auto-increments (0, 1, 2...) when a previous run exists,
eliminating the need for archiving old pipe directories.

Changes:
- PipeRun.__init__ accepts optional pipe_root parameter. The coordinator
  computes the path via _compute_pipe_root() which checks species_dict
  to determine TSs/ vs Species/ folder. Falls back to calcs/pipe_<run_id>
  when pipe_root is not provided (tests, direct construction).
- project_directory is now stored in run.json so from_dir() can recover
  it regardless of directory depth (no more dirname(dirname()) heuristic).
- Removed _archive_old_pipe_dirs — auto-indexing replaces archiving.
- Added _next_indexed_dir static helper for finding next available index.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Added tree diagram showing the calcs/ layout for pipe runs:
per-species (TSs/, Species/), cross-species batches, task/attempt
hierarchy, and auto-incrementing index convention.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Generates a task_summary.txt in the pipe root at ingestion time,
mapping each task to its PBS worker slot, final status, and failure
class. Makes it easy to cross-reference PBS output files (named by
array index) with specific tasks (claimed by work-stealing).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nd routing

Coordinator tests (pipe_coordinator_test.py):
- TestComputePipeRoot: TSs/ vs Species/ vs batches/ routing, auto-increment
- TestNextIndexedDir: empty/nonexistent parent, sequential increment, non-matching ignored

Scheduler pipe tests (scheduler_pipe_test.py):
- TestDeterministicEssError: 10 cases including mixed transient+deterministic
- TestFailedEssState: valid transitions, terminal (no outgoing), counts toward
  completion, not retried by reconcile, transition out raises ValueError
- TestEjectToSchedulerJobType: ts_opt→conf_opt, species_sp→sp, unknown species
- TestPipeRoutingIntegration: 15 TS guesses routed to pipe via mockter adapter,
  staged under calcs/TSs/H2O/, correct task count, spec/state.json on disk,
  submit script generated, below-threshold not piped

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nd routing

Tests placed in their correct files:

pipe_state_test.py:
- TestFailedEssState: valid transitions, terminal (no outgoing), transition
  out raises ValueError, not retried by reconcile, counts toward completion

pipe_coordinator_test.py:
- TestComputePipeRoot: TSs/ vs Species/ vs batches/ routing, auto-increment
- TestNextIndexedDir: empty/nonexistent parent, sequential increment, non-matching ignored

scheduler_pipe_test.py:
- TestDeterministicEssError: 10 cases including mixed transient+deterministic
- TestEjectToSchedulerJobType: ts_opt→conf_opt, species_sp→sp, unknown species
- TestPipeRoutingIntegration: 15 TS guesses routed via mockter, staged under
  calcs/TSs/, correct task count, spec/state.json on disk, submit script
  generated, below-threshold not piped

Removed duplicate test classes left by parallel agent worktrees.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When a species' conformer jobs are routed to a pipe run, the species
has no entries in running_jobs. The main loop's check at line 806
saw an empty job_list and called check_all_done(), declaring the
species failed before the pipe had even started.

The has_pending_pipe_work guard now also checks active_pipes for any
pipe run containing tasks owned by the species. This prevents
check_all_done from firing while pipe work is in progress.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
MockAdapter writes YAML output (not real ESS logs), so
determine_ess_status falsely reports convergence failure.
Patching _parse_ess_error to return None in TestRunTask and
TestWorkerLoop skips the ESS check for these mockter-based tests.
cluster_tsgs() merges methods into method_sources list, not into the
method string. Updated test_cluster_tsgs to expect the representative's
original method and execution_time. Updated test_as_dict and
test_from_dict to include method_sources in expected output.
@alongd
Copy link
Copy Markdown
Member Author

alongd commented Apr 9, 2026

Thanks for the fixes, updates, and improvements, @calvinp0!

@alongd alongd merged commit e782431 into main Apr 9, 2026
8 checks passed
@alongd alongd deleted the pipe branch April 9, 2026 01:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants