Skip to content

Commit ea00e6c

Browse files
Fix Custodian only killing MPI parent process but leaving VASP ghost processes blocking nodes (#414)
* fix(vasp): kill entire process group when terminating VASP jobs Use os.killpg() to send SIGTERM/SIGKILL to the entire process group instead of just the main process. This ensures all MPI child processes are properly terminated when custodian needs to kill a VASP job. - Import signal module - Use os.killpg() with SIGTERM for graceful termination - Use os.killpg() with SIGKILL for force kill after timeout - Handle ProcessLookupError when process group is already dead * test(vasp): refactor VaspJob.terminate() tests for process group killing Consolidate 13 tests across 3 classes into 9 focused tests in 1 class, reducing ~180 lines to ~95 lines while maintaining full coverage. Key changes: - Use pytest-style class with fixture instead of unittest setUp/tearDown - Use SimpleNamespace for cleaner mock access (mocks.process vs mocks["process"]) - Mock os.killpg and os.getpgid to test new process group termination logic Tests now cover all code paths in terminate(): 1. test_terminate_process_already_finished - early return when poll() != None 2. test_terminate_graceful_success - SIGTERM succeeds, verifies wait(timeout=10) 3. test_terminate_force_kill_after_timeout - SIGTERM times out, SIGKILL + kill() 4. test_terminate_oserror_during_sigterm - OSError propagates from killpg 5. test_terminate_oserror_during_wait - OSError propagates from final wait() 6. test_terminate_process_lookup_error_during_sigterm - graceful handling 7. test_terminate_process_lookup_error_during_sigkill - falls back to kill() 8. test_terminate_multiple_calls - second call returns early 9. test_terminate_integration_with_real_process - verifies real process group killed Removed duplicate tests: - test_terminate_wait_with_different_timeout_behavior (same as force_kill test) - test_terminate_kills_process_group_with_sigterm (same as graceful_success) - test_terminate_force_kills_process_group_with_sigkill (same as force_kill) - Redundant integration test (kept the one verifying process group is dead) * Add configurable terminate_timeout and remove unused function - Add terminate_timeout parameter to VaspJob (default 10s) - Large MPI jobs can use longer timeouts before SIGKILL escalation - Log message now includes timeout value for debugging - Remove unused copy_contcar_to_poscar_if_valid() from utils.py - Add test_custom_timeout to verify configurable timeout works * Skip terminate tests on Windows (os.killpg/getpgid are POSIX-only) - Code now falls back to process.terminate() on Windows - Tests use create=True to mock POSIX-only functions on Windows - Only integration test with real subprocess skipped on Windows * apply @Andrew-S-Rosen's suggested `terminate` changes and update test expectations * try fix windows CI * VaspJob.terminate() with signal escalation + added back wait confirmation - Add wait after SIGTERM to confirm process death before returning - Use try/except/else pattern instead of sigterm_sent flag for cleaner flow - Skip wait and go straight to SIGKILL if SIGTERM raises OSError - Use terminate_timeout consistently (remove hardcoded 5s for SIGKILL) - Catch OSError specifically instead of broad Exception - Add ProcessLookupError handling for race conditions - Fall back to parent process kill on Windows or if process group methods fail - Add type hint to directory parameter and document it as unused - Distinguish "killed with SIGKILL" from fallback "killed" in logs - Patch custodian.vasp.jobs.os.* in tests to ensure POSIX path is tested on all platforms - Add test for SIGTERM OSError skipping wait and going to SIGKILL Flow: SIGTERM → wait(timeout) → SIGKILL → wait(timeout) → fallback terminate/kill * try fix flaky CI * return on ProcessLookupError after SIGKILL * revert flaky test fix causing slow CI * try fix windows CI (again) * Update logging for process termination signals Clarify the logging when a `ProcessLookupError` is hit * Update warning messages for process group handling --------- Co-authored-by: Andrew S. Rosen <asrosen93@gmail.com>
1 parent 5767f84 commit ea00e6c

3 files changed

Lines changed: 206 additions & 166 deletions

File tree

src/custodian/vasp/jobs.py

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import math
55
import os
66
import shutil
7+
import signal
78
import subprocess
89
from shutil import which
910

@@ -84,6 +85,7 @@ def __init__(
8485
copy_magmom=False,
8586
auto_continue=False,
8687
update_incar=False,
88+
terminate_timeout: float = 10.0,
8789
) -> None:
8890
"""
8991
This constructor is necessarily complex due to the need for
@@ -142,6 +144,9 @@ def __init__(
142144
already present in the INCAR will be updated, i.e., no new parameters will be
143145
added even if they are in the final vasprun.xml. Note that settings_override take
144146
precedence over updated params.
147+
terminate_timeout (float): Timeout in seconds to wait for graceful
148+
termination (SIGTERM) before escalating to SIGKILL. Large MPI
149+
jobs may need longer timeouts. Defaults to 10.0 seconds.
145150
"""
146151
self.vasp_cmd = tuple(vasp_cmd)
147152
self.output_file = output_file
@@ -156,6 +161,7 @@ def __init__(
156161
self.copy_magmom = copy_magmom
157162
self.auto_continue = auto_continue
158163
self.update_incar = update_incar
164+
self.terminate_timeout = terminate_timeout
159165

160166
if SENTRY_DSN:
161167
# if using Sentry logging, add specific VASP executable to scope
@@ -703,20 +709,78 @@ def constrained_opt_run(
703709
for key in sorted(energies):
704710
file.write(f"{key} {energies[key]}\n")
705711

706-
def terminate(self, directory="./") -> None:
707-
"""Kill all VASP processes associated with the current job."""
712+
def terminate(self, directory: str = "./") -> None:
713+
"""Kill all VASP processes associated with the current job.
714+
715+
Tries to kill the entire process group (safest for MPI jobs), then waits
716+
to confirm termination. Escalates SIGTERM → SIGKILL → parent process fallback.
717+
718+
Note: The parent process fallback may leave behind ghost MPI child processes
719+
(less likely with srun since SLURM purportedly cleans up process trees).
720+
721+
Args:
722+
directory: Unused, kept for API compatibility with base class.
723+
"""
724+
pid = self._vasp_process.pid
725+
708726
if self._vasp_process.poll() is not None:
709-
logger.warning("The process was already done!")
727+
logger.warning(f"Process {pid} already terminated")
710728
return
711729

730+
if os.name != "nt":
731+
# Look up process group ID
732+
try:
733+
pgid = os.getpgid(pid)
734+
except ProcessLookupError:
735+
logger.warning(f"Process group for {pid} not found")
736+
return
737+
738+
# Send SIGTERM to the entire process group
739+
logger.info(f"Sending SIGTERM to process group {pgid}")
740+
try:
741+
os.killpg(pgid, signal.SIGTERM)
742+
except ProcessLookupError:
743+
logger.warning(f"Process group {pgid} not found")
744+
return
745+
except OSError as exc:
746+
logger.warning(f"SIGTERM to process group {pgid} failed: {exc}")
747+
else:
748+
# Wait for graceful termination (only if SIGTERM was sent)
749+
try:
750+
self._vasp_process.wait(timeout=self.terminate_timeout)
751+
logger.info(f"Process {pid} terminated gracefully")
752+
return
753+
except subprocess.TimeoutExpired:
754+
logger.warning(f"SIGTERM timeout ({self.terminate_timeout}s), sending SIGKILL")
755+
756+
# Escalate to SIGKILL
757+
logger.info(f"Sending SIGKILL to process group {pgid}")
758+
try:
759+
os.killpg(pgid, signal.SIGKILL)
760+
except ProcessLookupError:
761+
logger.warning(f"Process group {pgid} not found")
762+
return
763+
except OSError as exc:
764+
logger.warning(f"SIGKILL to process group {pgid} failed: {exc}")
765+
else:
766+
# Wait for process to die (only if SIGKILL was sent)
767+
try:
768+
self._vasp_process.wait(timeout=self.terminate_timeout)
769+
logger.info(f"Process {pid} killed with SIGKILL")
770+
return
771+
except subprocess.TimeoutExpired:
772+
pass # Fall through to parent process fallback
773+
774+
# Fall back to killing the parent launcher process (Windows or if above failed)
775+
logger.warning(f"Falling back to killing parent process {pid}")
712776
try:
713-
logger.info(f"Killing PID {self._vasp_process.pid}")
714777
self._vasp_process.terminate()
715-
self._vasp_process.wait(timeout=10)
778+
self._vasp_process.wait(timeout=self.terminate_timeout)
779+
logger.info(f"Process {pid} terminated")
716780
except subprocess.TimeoutExpired:
717-
logger.warning(f"Graceful termination did not work. Force killing PID {self._vasp_process.pid}")
718781
self._vasp_process.kill()
719782
self._vasp_process.wait()
783+
logger.info(f"Process {pid} killed")
720784

721785

722786
class VaspNEBJob(VaspJob):

tests/test_custodian.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,15 @@
77
import pytest
88
from ruamel.yaml import YAML
99

10-
from custodian.custodian import (
10+
11+
@pytest.fixture(autouse=True)
12+
def _seed_rng() -> None:
13+
"""Seed RNG for deterministic ExampleJob.run() behavior across all tests.
14+
Needed for ExampleJob.run() with random.uniform(0, 1) to not be flaky."""
15+
random.seed(42)
16+
17+
18+
from custodian.custodian import ( # noqa: E402
1119
Custodian,
1220
ErrorHandler,
1321
Job,
@@ -47,7 +55,7 @@ def setup(self, directory="./") -> None:
4755
self.params["total"] = 0
4856

4957
def run(self, directory="./") -> None:
50-
sequence = [random.uniform(0, 1) for i in range(100)]
58+
sequence = [random.uniform(0, 1) for _ in range(100)]
5159
self.params["total"] = self.params["initial"] + sum(sequence)
5260

5361
def postprocess(self, directory="./") -> None:

0 commit comments

Comments
 (0)