diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index fe70ed5a..ecd9f1b1 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -55,6 +55,8 @@ Deprecated: use ``job_script_prologue`` instead. This parameter will be removed in a future version. job_script_prologue : list Other commands to add to script before launching worker. + job_script_epilogue : list + Commands to add to script which will run after the worker command has exited. header_skip : list Deprecated: use ``job_directives_skip`` instead. This parameter will be removed in a future version. job_directives_skip : list @@ -144,6 +146,7 @@ class Job(ProcessInterface, abc.ABC): %(job_header)s %(job_script_prologue)s %(worker_command)s +%(job_script_epilogue)s """.lstrip() # Following class attributes should be overridden by extending classes. @@ -173,6 +176,7 @@ def __init__( job_extra_directives=None, env_extra=None, job_script_prologue=None, + job_script_epilogue=None, header_skip=None, job_directives_skip=None, log_directory=None, @@ -274,6 +278,10 @@ def __init__( job_script_prologue = dask.config.get( "jobqueue.%s.job-script-prologue" % self.config_name ) + if job_script_epilogue is None: + job_script_epilogue = dask.config.get( + "jobqueue.%s.job-script-epilogue" % self.config_name + ) if env_extra is not None: warn = ( "env_extra has been renamed to job_script_prologue. " @@ -341,6 +349,7 @@ def __init__( self.shebang = shebang self._job_script_prologue = job_script_prologue + self._job_script_epilogue = job_script_epilogue # dask-worker command line build dask_worker_command = "%(python)s -m %(worker_command)s" % dict( @@ -393,6 +402,7 @@ def job_script(self): "job_header": self.job_header, "job_script_prologue": "\n".join(filter(None, self._job_script_prologue)), "worker_command": self._command_template, + "job_script_epilogue": "\n".join(filter(None, self._job_script_epilogue)), } return self._script_template % pieces diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index 4ab5c07a..9ab26368 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -62,6 +62,12 @@ def __init__( self._job_script_prologue + [self._command_template] ) + if self._job_script_epilogue is not None: + # Overwrite command template: append commands from job_script_epilogue separated by semicolon. + self._command_template = "; ".join( + [self._command_template] + self._job_script_epilogue + ) + self.job_header_dict = { "MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"', "batch_name": self.name, diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index f9424158..becccd5c 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -23,6 +23,7 @@ jobqueue: walltime: '00:30:00' env-extra: null job-script-prologue: [] + job-script-epilogue: [] resource-spec: null job-extra: null job-extra-directives: [] @@ -57,6 +58,7 @@ jobqueue: walltime: '00:30:00' env-extra: null job-script-prologue: [] + job-script-epilogue: [] resource-spec: null job-extra: null job-extra-directives: [] @@ -90,6 +92,7 @@ jobqueue: walltime: '00:30:00' env-extra: null job-script-prologue: [] + job-script-epilogue: [] job-extra: null job-extra-directives: [] job-directives-skip: [] @@ -123,6 +126,7 @@ jobqueue: walltime: '00:30:00' env-extra: null job-script-prologue: [] + job-script-epilogue: [] job-cpu: null job-mem: null job-extra: null @@ -157,6 +161,7 @@ jobqueue: walltime: '00:30:00' env-extra: null job-script-prologue: [] + job-script-epilogue: [] resource-spec: null job-extra: null job-extra-directives: [] @@ -190,6 +195,7 @@ jobqueue: walltime: '00:30' env-extra: null job-script-prologue: [] + job-script-epilogue: [] ncpus: null mem: null job-extra: null @@ -223,6 +229,7 @@ jobqueue: disk: null # Total amount of disk per job env-extra: null job-script-prologue: [] + job-script-epilogue: [] job-extra: null # Extra submit attributes job-extra-directives: {} # Extra submit attributes job-directives-skip: [] @@ -252,6 +259,7 @@ jobqueue: env-extra: null job-script-prologue: [] + job-script-epilogue: [] job-extra: null job-extra-directives: [] job-directives-skip: [] diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 4ee455af..6b07623b 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -33,6 +33,9 @@ def test_job_script(): "cd /some/path/", "source venv/bin/activate", ], + job_script_epilogue=[ + 'echo "Job finished"', + ], job_extra_directives={"+Extra": "True"}, submit_command_extra=["-verbose"], cancel_command_extra=["-forcex"], @@ -64,6 +67,7 @@ def test_job_script(): assert f"--memory-limit {formatted_bytes}" in job_script assert "--nthreads 2" in job_script assert "--nworkers 2" in job_script + assert 'echo ""Job finished""' in job_script @pytest.mark.env("htcondor") @@ -144,6 +148,7 @@ def test_config_name_htcondor_takes_custom_config(): "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], + "job-script-epilogue": [], "log-directory": None, "shebang": "#!/usr/bin/env condor_submit", "local-directory": "/tmp", diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 96fdc601..5f8c7435 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -318,6 +318,7 @@ def test_config_name_lsf_takes_custom_config(): "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], + "job-script-epilogue": [], "log-directory": None, "shebang": "#!/usr/bin/env bash", "use-stdin": None, diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 894ffdd4..cd9ff12e 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -96,6 +96,9 @@ def test_job_script(): 'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"', ], + job_script_epilogue=[ + 'echo "Job finished"', + ], ) as cluster: job_script = cluster.job_script() assert "#OAR" in job_script @@ -118,6 +121,7 @@ def test_job_script(): assert "--nthreads 2" in job_script assert "--nworkers 4" in job_script assert f"--memory-limit {formatted_bytes}" in job_script + assert 'echo "Job finished"' in job_script def test_config_name_oar_takes_custom_config(): @@ -141,6 +145,7 @@ def test_config_name_oar_takes_custom_config(): "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], + "job-script-epilogue": [], "log-directory": None, "shebang": "#!/usr/bin/env bash", "job-cpu": None, diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 89db0f68..ee150465 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -355,6 +355,7 @@ def test_config_name_pbs_takes_custom_config(): "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], + "job-script-epilogue": [], "log-directory": None, "shebang": "#!/usr/bin/env bash", "job-cpu": None, diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 9e347ed9..b250c3ee 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -61,6 +61,7 @@ def test_config_name_sge_takes_custom_config(): "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], + "job-script-epilogue": [], "log-directory": None, "shebang": "#!/usr/bin/env bash", "job-cpu": None, @@ -84,6 +85,9 @@ def test_job_script(tmpdir): project="my-project", walltime="02:00:00", job_script_prologue=["export MY_VAR=my_var"], + job_script_epilogue=[ + 'echo "Job finished"', + ], job_extra_directives=["-w e", "-m e"], log_directory=log_directory, resource_spec="h_vmem=12G,mem_req=12G", @@ -106,6 +110,7 @@ def test_job_script(tmpdir): "-l h_vmem=12G,mem_req=12G", "#$ -cwd", "#$ -j y", + 'echo "Job finished"', ]: assert each in job_script diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index c06ece50..2f0b1c08 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -103,6 +103,9 @@ def test_job_script(): 'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"', ], + job_script_epilogue=[ + 'echo "Job finished"', + ], ) as cluster: job_script = cluster.job_script() assert "#SBATCH" in job_script @@ -127,6 +130,8 @@ def test_job_script(): assert "--nworkers 4" in job_script assert f"--memory-limit {formatted_bytes}" in job_script + assert 'echo "Job finished"' in job_script + @pytest.mark.env("slurm") def test_basic(loop): @@ -208,6 +213,7 @@ def test_config_name_slurm_takes_custom_config(): "worker-extra-args": [], "env-extra": None, "job-script-prologue": [], + "job-script-epilogue": [], "log-directory": None, "shebang": "#!/usr/bin/env bash", "job-cpu": None, diff --git a/docs/source/clusters-advanced-tips-and-tricks.rst b/docs/source/clusters-advanced-tips-and-tricks.rst index 257ed098..8b1e40ca 100644 --- a/docs/source/clusters-advanced-tips-and-tricks.rst +++ b/docs/source/clusters-advanced-tips-and-tricks.rst @@ -104,6 +104,7 @@ parameter in the submit description file. The relevant lines will look like this For other batch systems (``*Cluster`` classes) the additional commands will be inserted as separate lines in the submission script. +Similarly, if you need to run some commands after the worker has exited, then use ``job_script_epilogue`` parameter. How to handle job queueing system walltime killing workers ----------------------------------------------------------