Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ef477d0
proposed fix for https://github.com/snakemake/snakemake-executor-plug…
Mar 6, 2026
8d033ce
Merge branch 'main' of github.com:snakemake/snakemake-executor-plugin…
cmeesters Mar 9, 2026
ed1e50d
Merge branch 'main' of github.com:snakemake/snakemake-executor-plugin…
cmeesters Mar 9, 2026
677c66c
Merge branch 'main' of github.com:snakemake/snakemake-executor-plugin…
cmeesters Mar 18, 2026
5051af7
feat: (untested) job signalling raw code
cmeesters Mar 23, 2026
9ae3aab
fix: removed unused import - was from an attempt to modularize further
cmeesters Mar 23, 2026
9be9b97
fix: removed unused imports - got refactored
cmeesters Mar 23, 2026
8015c17
fix: shortend lines
cmeesters Mar 23, 2026
f95510c
Merge branch 'main' of github.com:snakemake/snakemake-executor-plugin…
cmeesters Mar 31, 2026
59b7a5c
fix: merge conflict
cmeesters Mar 31, 2026
9635787
fix: formatting
cmeesters Mar 31, 2026
a66a37b
fix: exception for test 821 of flake8 - snakemake imports of test cas…
cmeesters Mar 31, 2026
b3f49ba
tests: added tests for job signalling
cmeesters Mar 31, 2026
c94023d
feat: added validator for singals
cmeesters Mar 31, 2026
de4611a
refactor: changed to only signalling, no distinction between scopes
cmeesters Apr 1, 2026
8f85dcd
refactor: changed to only signalling, no distinction between scopes
cmeesters Apr 1, 2026
d220820
refactor: changed to only signalling, no distinction between scopes
cmeesters Apr 1, 2026
35865fd
refactor: changed to only signalling, no distinction between scopes
cmeesters Apr 1, 2026
792bc00
fix: formatting
cmeesters Apr 1, 2026
decf35d
Merge branch 'main' into feat/slurm_signals
cmeesters Apr 16, 2026
4f38aa8
Merge branch 'main' into feat/slurm_signals
cmeesters Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ extend-ignore =
# See https://github.com/PyCQA/pycodestyle/issues/373
E203,
per-file-ignores =
tests/test_scontrol_parsing.py:E501
tests/test_scontrol_parsing.py:E501
tests/testcases/slurm_signal/handle_signal.py:F821
29 changes: 28 additions & 1 deletion snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
pending_jobs_for_rule,
delete_slurm_environment,
delete_empty_dirs,
get_slurm_signal_arg,
set_gres_string,
)
from .job_status_query import (
Expand Down Expand Up @@ -352,6 +353,20 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

signal: Optional[str] = field(
default=None,
metadata={
"help": "Send signal to jobs before wall time (SLURM format). "
"Format: --slurm-signal=RULESIGNAL@TIME. "
"SIGNAL: name (SIGTERM) or number (15). TIME: seconds before wall time. "
"Use RULE='all' for all rules. Examples: "
"--slurm-signal=rule1:SIGTERM@30 --slurm-signal=rule2:SIGUSR1@60 "
"--slurm-signal=all:15@45",
"env_var": False,
"required": False,
},
)

qos: Optional[str] = field(
default=None,
metadata={
Expand Down Expand Up @@ -636,9 +651,14 @@ def additional_general_args(self):
passed to `exec_job`.
"""
general_args = "--executor slurm-jobstep --jobs 1"
# need to pass
# need to pass, if passing as script is required
if self.workflow.executor_settings.pass_command_as_script:
general_args += " --slurm-jobstep-pass-command-as-script"
# need to pass, if signal settings are defined
if self.workflow.executor_settings.signal:
general_args += " --slurm-jobstep-signal " + shlex.quote(
self.workflow.executor_settings.signal
)
return general_args

def run_jobs(self, jobs: List[JobExecutorInterface]):
Expand Down Expand Up @@ -1088,6 +1108,13 @@ def run_job(self, job: JobExecutorInterface):
failed_nodes=self._failed_nodes,
)

call += get_slurm_signal_arg(
self.workflow.executor_settings.signal,
job.name,
)

# we exclude failed nodes from further job submissions, to avoid
# repeated failures.
if self._failed_nodes:
self.logger.debug(
"Excluding failed nodes from job submission: "
Expand Down
135 changes: 134 additions & 1 deletion snakemake_executor_plugin_slurm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import shlex
import subprocess
import re
from functools import lru_cache
from pathlib import Path
from typing import Union
from typing import Optional, Union

from snakemake_interface_executor_plugins.dag import DAGExecutorInterface
from snakemake_interface_executor_plugins.jobs import (
Expand Down Expand Up @@ -336,6 +337,138 @@ def delete_empty_dirs(path: Path) -> None:
raise OSError(f"Failed to remove empty directory {path}: {e}") from e


# only run this parser once per unique input string
# chache the results for efficiency.
@lru_cache(maxsize=None)
def parse_slurm_signal_settings(signal_settings: Optional[str]) -> dict[str, str]:
"""
Parse rule-specific SLURM signal settings (format: rule:SIGNAL@TIME).

- rule: rule name or 'all' for all rules
- SIGNAL: signal name (SIGTERM, SIGUSR1) or number (15, 10)
- TIME: seconds before wall time (must be >= 1)

Examples:
- rule1:SIGTERM@30 → SIGTERM at 30 secs
- rule2:SIGUSR1@60 → SIGUSR1 at 60 secs
- all:15@45 → all rules, signal 15 at 45 secs

Returns dict mapping rule names to SLURM signal specs: <signal>@<time>
"""
if not signal_settings:
return {}

parsed_settings = {}
# Map signal names to numbers for SLURM
signal_map = {
"SIGHUP": 1,
"SIGINT": 2,
"SIGQUIT": 3,
"SIGILL": 4,
"SIGTRAP": 5,
"SIGABRT": 6,
"SIGBUS": 7,
"SIGFPE": 8,
"SIGKILL": 9,
"SIGUSR1": 10,
"SIGSEGV": 11,
"SIGUSR2": 12,
"SIGPIPE": 13,
"SIGALRM": 14,
"SIGTERM": 15,
"SIGCHLD": 17,
"SIGCONT": 18,
"SIGSTOP": 19,
"SIGTSTP": 20,
"SIGTTIN": 21,
"SIGTTOU": 22,
"SIGURG": 23,
"SIGXCPU": 24,
"SIGXFSZ": 25,
"SIGVTALRM": 26,
"SIGPROF": 27,
"SIGWINCH": 28,
"SIGIO": 29,
"SIGPWR": 30,
"SIGSYS": 31,
}

# Pattern: rule:SIGNAL@TIME
pattern = re.compile(r"^(?P<rule>\w+):(?P<signal>\w+)@(?P<seconds>\d+)$")

for raw_entry in signal_settings.split(","):
entry = raw_entry.strip()
if not entry:
raise WorkflowError(
"Invalid signal specification: empty entry found. "
"Expected 'rule:SIGNAL@TIME' (e.g., 'rule1:SIGTERM@30')."
)

match = pattern.fullmatch(entry)
if match is None:
raise WorkflowError(
f"Invalid signal specification '{entry}'. "
"Expected 'rule:SIGNAL@TIME' (e.g., 'rule1:SIGTERM@30', "
"'rule2:15@60', 'all:SIGUSR1@45')."
)

rule_name = match.group("rule")
if rule_name != "all" and rule_name in parsed_settings:
raise WorkflowError(
f"Duplicate signal specification for rule '{rule_name}'."
)

signal_str = match.group("signal")
# Validate signal name or number.
if signal_str.isdigit():
signal_num = int(signal_str)
if signal_num < 1 or signal_num > 31:
raise WorkflowError(
f"Invalid signal number '{signal_num}'. Must be between 1 and 31."
)
else:
if signal_str not in signal_map:
raise WorkflowError(
f"Invalid signal name '{signal_str}'. "
f"Valid signals: {', '.join(sorted(signal_map.keys()))}"
)
signal_num = signal_map[signal_str]

seconds = int(match.group("seconds"))
if seconds < 1:
raise WorkflowError("Signal lead time must be at least 1 second.")

parsed_settings[rule_name] = f"{signal_str}@{seconds}"

return parsed_settings


def get_slurm_signal_arg(signal_settings: Optional[str], rule_name: str) -> str:
"""Return the sbatch --signal argument for the given rule, if configured.

Checks for rule-specific signal first, then falls back to 'all' if set.
"""
# we need to check the signal settings, first.
if not signal_settings:
return ""

signal_settings_pattern = r"^\s*\w+:\w+@\d+\s*(,\s*\w+:\w+@\d+\s*)*$"
if not re.match(signal_settings_pattern, signal_settings):
raise WorkflowError(
f"Invalid signal specification: '{signal_settings}'. "
"Expected format: 'rule:SIGNAL@TIME' "
"(e.g., 'rule1:SIGTERM@30', 'rule2:15@60', "
"'all:SIGUSR1@45')."
)

parsed = parse_slurm_signal_settings(signal_settings)
# Check rule-specific first, then fall back to 'all'
signal_value = parsed.get(rule_name) or parsed.get("all")
if not signal_value:
return ""
return f" --signal={signal_value}"


def set_gres_string(job: JobExecutorInterface) -> str:
"""
Function to set the gres string for the SLURM job
Expand Down
5 changes: 5 additions & 0 deletions snakemake_executor_plugin_slurm/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from snakemake_interface_common.exceptions import WorkflowError
from .job_status_query import get_min_job_age, is_query_tool_available
from .utils import parse_slurm_signal_settings


def validate_or_get_slurm_job_id(job_id, output):
Expand Down Expand Up @@ -99,6 +100,7 @@ def get_forbidden_slurm_options():
r"--qos[=\s]": "quality of service",
r"--nodes[=\s]|-N\s": "number of nodes",
r"--clusters[=\s]": "cluster specification",
r"--signal[=\s]": "signal",
# GPU options
r"--gres[=\s]": "generic resources (GRES)",
r"--gpus[=\s]": "GPU allocation",
Expand Down Expand Up @@ -218,6 +220,9 @@ def validate_executor_settings(settings, logger=None):
if settings.delete_logfiles_older_than is not None:
if not isinstance(settings.delete_logfiles_older_than, int):
raise WorkflowError("delete-logfiles-older-than must be an integer (days).")
# signal
parse_slurm_signal_settings(settings.signal)

# status_command warnings (optional logger)
if logger:
validate_status_command_settings(settings, logger)
Loading
Loading