Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,22 @@ def _select_logdir(workflow):
# Required:
# Implementation of your executor
class Executor(RemoteExecutor):
def __init__(self, workflow, logger):
# Must clean SLURM environment BEFORE super().__init__() starts
# the wait_thread, otherwise the thread races with __post_init__
# and silently dies, causing the pipeline to hang forever.
if "SLURM_JOB_ID" in os.environ:
logger.warning(
"You are running snakemake in a SLURM job context. "
"This is not recommended, as it may lead to unexpected "
"behavior. "
"If possible, please run Snakemake directly on the "
"login node."
)
delete_slurm_environment()
super().__init__(workflow, logger)

def __post_init__(self, test_mode: bool = False):
# run check whether we are running in a SLURM job context
self.warn_on_jobcontext()
self.test_mode = test_mode

self.run_uuid = str(uuid.uuid4())
Expand Down Expand Up @@ -617,20 +630,6 @@ def clean_old_logs(self) -> None:
f"Could not delete empty directories in {self.slurm_logdir}: {e}"
)

def warn_on_jobcontext(self, done=None):
if not done:
if "SLURM_JOB_ID" in os.environ:
self.logger.warning(
"You are running snakemake in a SLURM job context. "
"This is not recommended, as it may lead to unexpected "
"behavior. "
"If possible, please run Snakemake directly on the "
"login node."
)
time.sleep(5)
delete_slurm_environment()
done = True

def additional_general_args(self):
"""
This function defines additional arguments to be
Expand Down Expand Up @@ -1451,14 +1450,12 @@ async def check_active_jobs(
)
elif status == "PREEMPTED" and not self._preemption_warning:
self._preemption_warning = True
self.logger.warning(
"""
self.logger.warning("""
===== A Job preemption occured! =====
Leave Snakemake running, if possible. Otherwise Snakemake
needs to restart this job upon a Snakemake restart.

We leave it to SLURM to resume your job(s)"""
)
We leave it to SLURM to resume your job(s)""")
yield j
elif status == "UNKNOWN":
# the job probably does not exist anymore, but 'sacct' did not work
Expand Down
16 changes: 7 additions & 9 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,17 @@ def _make_executor(jobname_prefix: str):
def test_jobname_prefix_applied():
executor = _make_executor("testprefix")

with patch.object(Executor, "warn_on_jobcontext", return_value=None):
with patch(
"snakemake_executor_plugin_slurm.uuid.uuid4",
return_value=uuid.UUID("00000000-0000-0000-0000-000000000000"),
):
executor.__post_init__(test_mode=True)
with patch(
"snakemake_executor_plugin_slurm.uuid.uuid4",
return_value=uuid.UUID("00000000-0000-0000-0000-000000000000"),
):
executor.__post_init__(test_mode=True)

assert executor.run_uuid == "testprefix_00000000-0000-0000-0000-000000000000"


def test_jobname_prefix_validation():
executor = _make_executor("bad!prefix")

with patch.object(Executor, "warn_on_jobcontext", return_value=None):
with pytest.raises(WorkflowError, match="jobname_prefix"):
executor.__post_init__(test_mode=True)
with pytest.raises(WorkflowError, match="jobname_prefix"):
executor.__post_init__(test_mode=True)
Loading