Skip to content

Commit 7a6d9d9

Browse files
committed
Update to provide an option to remove output files and print job parameters.
1 parent 99be28b commit 7a6d9d9

3 files changed

Lines changed: 117 additions & 6 deletions

File tree

pbprocesstools/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
import json
4141

4242
PB_PROCESS_TOOLS_VERSION_MAJOR = 1
43-
PB_PROCESS_TOOLS_VERSION_MINOR = 2
44-
PB_PROCESS_TOOLS_VERSION_PATCH = 4
43+
PB_PROCESS_TOOLS_VERSION_MINOR = 3
44+
PB_PROCESS_TOOLS_VERSION_PATCH = 0
4545

4646
PB_PROCESS_TOOLS_VERSION = str(PB_PROCESS_TOOLS_VERSION_MAJOR) + "." + str(PB_PROCESS_TOOLS_VERSION_MINOR) + "." + str(PB_PROCESS_TOOLS_VERSION_PATCH)
4747
PB_PROCESS_TOOLS_VERSION_OBJ = LooseVersion(PB_PROCESS_TOOLS_VERSION)

pbprocesstools/pbpt_q_process.py

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ def __init__(self, queue_db_info=None, cmd_name=None, descript=None, params=None
107107
self.descript = descript
108108
self.params = params
109109
self.debug_job_id = None
110+
self.debug_job_id_params = False
111+
self.debug_job_id_rmouts = False
110112
super().__init__(uid_len)
111113

112114
def set_params(self, params):
@@ -260,6 +262,18 @@ def outputs_present(self, **kwargs):
260262
"""
261263
pass
262264

265+
@abstractmethod
266+
def remove_outputs(self, **kwargs):
267+
"""
268+
An abstract function which checks if an output is present and removes them. This can be useful
269+
if an job failed part way through processing and needs to be reset.
270+
271+
:param kwargs: allows the user to pass custom variables to the function
272+
(e.q., obj.remove_outputs(mod_version=True)).
273+
274+
"""
275+
pass
276+
263277
def check_required_fields(self, **kwargs):
264278
"""
265279
A function which checks that the required fields are present within the
@@ -317,10 +331,18 @@ def parse_cmds(self, argv=None, **kwargs):
317331
"therefore the database will not be "
318332
"updated and the job will be run "
319333
"regardless of the database status.")
334+
parser.add_argument("-p", "--params", action='store_true', default=False,
335+
help="If a job is specified then rather than running the parameters will be "
336+
"printed to the console.")
337+
parser.add_argument("-r", "--rmouts", action='store_true', default=False,
338+
help="If a job is specified then rather than running the job the outputs will be "
339+
"removed.")
320340
if argv is None:
321341
argv = sys.argv[1:]
322342
args = parser.parse_args(argv)
323343
self.debug_job_id = args.job
344+
self.debug_job_id_params = args.params
345+
self.debug_job_id_rmouts = args.rmouts
324346
with open(args.dbinfo) as f:
325347
self.queue_db_info = json.load(f)
326348
self.check_db_info()
@@ -425,8 +447,15 @@ def std_run(self, **kwargs):
425447
self.params = job_info.JobParams
426448
if job_info is not None:
427449
logger.debug("Found the job to process, PID: {}".format(self.job_pid))
428-
self.check_required_fields(**kwargs)
429-
self.do_processing(**kwargs)
450+
if self.debug_job_id_params:
451+
import pprint
452+
pprint.pprint(self.params)
453+
elif self.debug_job_id_rmouts:
454+
self.check_required_fields(**kwargs)
455+
self.remove_outputs(**kwargs)
456+
else:
457+
self.check_required_fields(**kwargs)
458+
self.do_processing(**kwargs)
430459

431460
logger.debug("Finished processing the job, PID: {}".format(self.job_pid))
432461
else:
@@ -545,6 +574,69 @@ def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_fi
545574
pathlib.Path(out_err_pid_file).touch()
546575
pathlib.Path(out_err_info_file).touch()
547576

577+
def remove_job_outputs(self, process_tools_mod, process_tools_cls, all_jobs=False, error_jobs=False, **kwargs):
578+
"""
579+
A function which following the completion of all the processing for a job tests whether all the output
580+
files where created (i.e., the job successfully completed).
581+
582+
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
583+
of the PBPTProcessTool class used for the processing to be checked.
584+
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
585+
for the processing to be checked.
586+
:param all_jobs: boolean specifying that outputs should be removed for all jobs.
587+
:param error_jobs: boolean specifying that outputs should be removed for error jobs - either
588+
logged an error or started but not finished.
589+
:param job_id: int for the job id for which the outputs will be removed.
590+
:param kwargs: allows the user to pass custom variables to the function (e.q., obj.gen_command_info(input='')),
591+
these will be passed to the process_tools_mod outputs_present function.
592+
593+
"""
594+
import importlib
595+
596+
if (not all_jobs) and (not error_jobs):
597+
raise Exception("Must specify for either all or only error jobs to have the outputs removed.")
598+
599+
queue_db_info = dict()
600+
queue_db_info['sqlite_db_file'] = self.sqlite_db_file
601+
queue_db_info['sqlite_db_conn'] = self.sqlite_db_conn
602+
603+
process_tools_mod_inst = importlib.import_module(process_tools_mod)
604+
if process_tools_mod_inst is None:
605+
raise Exception("Could not load the module: '{}'".format(process_tools_mod))
606+
607+
process_tools_cls_inst = getattr(process_tools_mod_inst, process_tools_cls)()
608+
if process_tools_cls_inst is None:
609+
raise Exception("Could not create instance of '{}'".format(process_tools_cls))
610+
process_tools_cls_inst.set_queue_db_info(queue_db_info)
611+
612+
pbpt_utils = PBPTUtils()
613+
614+
if pbpt_utils.get_file_lock(self.sqlite_db_file, sleep_period=1, wait_iters=180, use_except=False):
615+
try:
616+
logger.debug("Creating Database Engine and Session.")
617+
db_engine = sqlalchemy.create_engine(self.sqlite_db_conn, pool_pre_ping=True)
618+
session_sqlalc = sqlalchemy.orm.sessionmaker(bind=db_engine)
619+
ses = session_sqlalc()
620+
logger.debug("Created Database Engine and Session.")
621+
622+
if all_jobs:
623+
jobs = ses.query(PBPTProcessJob).filter().all()
624+
elif error_jobs:
625+
jobs = ses.query(PBPTProcessJob).filter(PBPTProcessJob.Started == True,
626+
PBPTProcessJob.Completed == False).all()
627+
else:
628+
raise Exception("Must specify for either all or only error jobs to have the outputs removed.")
629+
630+
if jobs is not None:
631+
for job_info in tqdm.tqdm(jobs):
632+
process_tools_cls_inst.set_params(job_info.JobParams)
633+
process_tools_cls_inst.remove_outputs(**kwargs)
634+
ses.close()
635+
pbpt_utils.release_file_lock(self.sqlite_db_file)
636+
except:
637+
pbpt_utils.release_file_lock(self.sqlite_db_file)
638+
639+
548640
def create_jobs_report(self, out_report_file=None):
549641
"""
550642
A function which generates a JSON report which can either
@@ -579,9 +671,9 @@ def create_jobs_report(self, out_report_file=None):
579671
n_started += 1
580672
n_ended += 1
581673
else:
582-
err_info[job_info.PID] = dict()
583674
if job_info.Error:
584675
n_errs += 1
676+
err_info[job_info.PID] = dict()
585677
err_info[job_info.PID]['info'] = job_info.ErrorInfo
586678

587679
if job_info.Started:
@@ -804,6 +896,19 @@ def run_check_outputs(self):
804896
"""
805897
pass
806898

899+
@abstractmethod
900+
def run_remove_outputs(self, all_jobs=False, error_jobs=False):
901+
"""
902+
An abstract function which needs to be implemented with the functions and inputs
903+
you want run to check the outputs of the processing have been successfully completed.
904+
905+
You will presumably want to call:
906+
907+
* self.remove_job_outputs
908+
909+
"""
910+
pass
911+
807912
def parse_cmds(self, argv=None):
808913
"""
809914
A function to parse the command line arguments to retrieve the
@@ -816,6 +921,9 @@ def parse_cmds(self, argv=None):
816921
parser.add_argument("--gen", action='store_true', default=False, help="Execute run_gen_commands() function.")
817922
parser.add_argument("--check", action='store_true', default=False, help="Execute run_check_outputs() function.")
818923
parser.add_argument("--report", action='store_true', default=False, help="Execute create_jobs_report() function.")
924+
parser.add_argument("--rmouts", action='store_true', default=False, help="Execute run_remove_outputs() function.")
925+
parser.add_argument("--all", action='store_true', default=False, help="Remove outputs for all jobs.")
926+
parser.add_argument("--error", action='store_true', default=False, help="Remove outputs for jobs with errors.")
819927
parser.add_argument("-o", "--output", type=str, required=False, help="Specify a report output JSON file. If not provided then report written to console.")
820928
if argv is None:
821929
argv = sys.argv[1:]
@@ -827,6 +935,9 @@ def parse_cmds(self, argv=None):
827935
self.run_check_outputs()
828936
elif args.report:
829937
self.create_jobs_report(args.output)
938+
elif args.rmouts:
939+
self.run_remove_outputs(all_jobs=args.all, error_jobs=args.error)
940+
830941

831942

832943

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import os
3838

3939
setuptools.setup(name='pb_process_tools',
40-
version='1.2.4',
40+
version='1.3.0',
4141
description='Tools for batch processing data, including on HPC cluster with slurm.',
4242
author='Pete Bunting',
4343
author_email='petebunting@mac.com',

0 commit comments

Comments
 (0)