diff --git a/setup.cfg b/setup.cfg index 34599d0e..2c2f2bef 100644 --- a/setup.cfg +++ b/setup.cfg @@ -6,4 +6,5 @@ extend-ignore = # See https://github.com/PyCQA/pycodestyle/issues/373 E203, per-file-ignores = - tests/test_scontrol_parsing.py:E501 \ No newline at end of file + tests/test_scontrol_parsing.py:E501 + tests/testcases/slurm_signal/handle_signal.py:F821 \ No newline at end of file diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 13be6ed3..b3668bdf 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -47,6 +47,7 @@ pending_jobs_for_rule, delete_slurm_environment, delete_empty_dirs, + get_slurm_signal_arg, set_gres_string, ) from .job_status_query import ( @@ -352,6 +353,20 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) + signal: Optional[str] = field( + default=None, + metadata={ + "help": "Send signal to jobs before wall time (SLURM format). " + "Format: --slurm-signal=RULESIGNAL@TIME. " + "SIGNAL: name (SIGTERM) or number (15). TIME: seconds before wall time. " + "Use RULE='all' for all rules. Examples: " + "--slurm-signal=rule1:SIGTERM@30 --slurm-signal=rule2:SIGUSR1@60 " + "--slurm-signal=all:15@45", + "env_var": False, + "required": False, + }, + ) + qos: Optional[str] = field( default=None, metadata={ @@ -636,9 +651,14 @@ def additional_general_args(self): passed to `exec_job`. """ general_args = "--executor slurm-jobstep --jobs 1" - # need to pass + # need to pass, if passing as script is required if self.workflow.executor_settings.pass_command_as_script: general_args += " --slurm-jobstep-pass-command-as-script" + # need to pass, if signal settings are defined + if self.workflow.executor_settings.signal: + general_args += " --slurm-jobstep-signal " + shlex.quote( + self.workflow.executor_settings.signal + ) return general_args def run_jobs(self, jobs: List[JobExecutorInterface]): @@ -1088,6 +1108,13 @@ def run_job(self, job: JobExecutorInterface): failed_nodes=self._failed_nodes, ) + call += get_slurm_signal_arg( + self.workflow.executor_settings.signal, + job.name, + ) + + # we exclude failed nodes from further job submissions, to avoid + # repeated failures. if self._failed_nodes: self.logger.debug( "Excluding failed nodes from job submission: " diff --git a/snakemake_executor_plugin_slurm/utils.py b/snakemake_executor_plugin_slurm/utils.py index 4de29a4b..5b7c41e7 100644 --- a/snakemake_executor_plugin_slurm/utils.py +++ b/snakemake_executor_plugin_slurm/utils.py @@ -6,8 +6,9 @@ import shlex import subprocess import re +from functools import lru_cache from pathlib import Path -from typing import Union +from typing import Optional, Union from snakemake_interface_executor_plugins.dag import DAGExecutorInterface from snakemake_interface_executor_plugins.jobs import ( @@ -336,6 +337,138 @@ def delete_empty_dirs(path: Path) -> None: raise OSError(f"Failed to remove empty directory {path}: {e}") from e +# only run this parser once per unique input string +# chache the results for efficiency. +@lru_cache(maxsize=None) +def parse_slurm_signal_settings(signal_settings: Optional[str]) -> dict[str, str]: + """ + Parse rule-specific SLURM signal settings (format: rule:SIGNAL@TIME). + + - rule: rule name or 'all' for all rules + - SIGNAL: signal name (SIGTERM, SIGUSR1) or number (15, 10) + - TIME: seconds before wall time (must be >= 1) + + Examples: + - rule1:SIGTERM@30 → SIGTERM at 30 secs + - rule2:SIGUSR1@60 → SIGUSR1 at 60 secs + - all:15@45 → all rules, signal 15 at 45 secs + + Returns dict mapping rule names to SLURM signal specs: @