diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 251746e..13be6ed 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -428,9 +428,22 @@ def _select_logdir(workflow): # Required: # Implementation of your executor class Executor(RemoteExecutor): + def __init__(self, workflow, logger): + # Must clean SLURM environment BEFORE super().__init__() starts + # the wait_thread, otherwise the thread races with __post_init__ + # and silently dies, causing the pipeline to hang forever. + if "SLURM_JOB_ID" in os.environ: + logger.warning( + "You are running snakemake in a SLURM job context. " + "This is not recommended, as it may lead to unexpected " + "behavior. " + "If possible, please run Snakemake directly on the " + "login node." + ) + delete_slurm_environment() + super().__init__(workflow, logger) + def __post_init__(self, test_mode: bool = False): - # run check whether we are running in a SLURM job context - self.warn_on_jobcontext() self.test_mode = test_mode self.run_uuid = str(uuid.uuid4()) @@ -617,20 +630,6 @@ def clean_old_logs(self) -> None: f"Could not delete empty directories in {self.slurm_logdir}: {e}" ) - def warn_on_jobcontext(self, done=None): - if not done: - if "SLURM_JOB_ID" in os.environ: - self.logger.warning( - "You are running snakemake in a SLURM job context. " - "This is not recommended, as it may lead to unexpected " - "behavior. " - "If possible, please run Snakemake directly on the " - "login node." - ) - time.sleep(5) - delete_slurm_environment() - done = True - def additional_general_args(self): """ This function defines additional arguments to be @@ -1451,14 +1450,12 @@ async def check_active_jobs( ) elif status == "PREEMPTED" and not self._preemption_warning: self._preemption_warning = True - self.logger.warning( - """ + self.logger.warning(""" ===== A Job preemption occured! ===== Leave Snakemake running, if possible. Otherwise Snakemake needs to restart this job upon a Snakemake restart. -We leave it to SLURM to resume your job(s)""" - ) +We leave it to SLURM to resume your job(s)""") yield j elif status == "UNKNOWN": # the job probably does not exist anymore, but 'sacct' did not work diff --git a/tests/test_cli.py b/tests/test_cli.py index 7e2f65b..5cad238 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -34,12 +34,11 @@ def _make_executor(jobname_prefix: str): def test_jobname_prefix_applied(): executor = _make_executor("testprefix") - with patch.object(Executor, "warn_on_jobcontext", return_value=None): - with patch( - "snakemake_executor_plugin_slurm.uuid.uuid4", - return_value=uuid.UUID("00000000-0000-0000-0000-000000000000"), - ): - executor.__post_init__(test_mode=True) + with patch( + "snakemake_executor_plugin_slurm.uuid.uuid4", + return_value=uuid.UUID("00000000-0000-0000-0000-000000000000"), + ): + executor.__post_init__(test_mode=True) assert executor.run_uuid == "testprefix_00000000-0000-0000-0000-000000000000" @@ -47,6 +46,5 @@ def test_jobname_prefix_applied(): def test_jobname_prefix_validation(): executor = _make_executor("bad!prefix") - with patch.object(Executor, "warn_on_jobcontext", return_value=None): - with pytest.raises(WorkflowError, match="jobname_prefix"): - executor.__post_init__(test_mode=True) + with pytest.raises(WorkflowError, match="jobname_prefix"): + executor.__post_init__(test_mode=True)