Skip to content

Commit cd333fa

Browse files
committed
Update to use lock file when getting a new job to prevent a job being started twice. Also refactored code for run_check_outputs and run_remove_outputs to user provides info to constructor and therefore doesn't need to provide implementations of these functions.
1 parent 0403e40 commit cd333fa

4 files changed

Lines changed: 139 additions & 65 deletions

File tree

pbprocesstools/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import json
4141

4242
PB_PROCESS_TOOLS_VERSION_MAJOR = 1
43-
PB_PROCESS_TOOLS_VERSION_MINOR = 4
43+
PB_PROCESS_TOOLS_VERSION_MINOR = 5
4444
PB_PROCESS_TOOLS_VERSION_PATCH = 0
4545

4646
PB_PROCESS_TOOLS_VERSION = "{}.{}.{}".format(PB_PROCESS_TOOLS_VERSION_MAJOR,

pbprocesstools/pbpt_q_process.py

Lines changed: 132 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ class PBPTProcessJob(Base):
6161
__tablename__ = "PBPTProcessJob"
6262

6363
PID = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
64-
JobParams = sqlalchemy.Column(sqlalchemy.JSON, nullable=True)
64+
JobParams = sqlalchemy.Column(sqlalchemy.PickleType, nullable=True)
6565
Start = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
6666
End = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
6767
Started = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
6868
Completed = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
6969
Checked = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
7070
Error = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
71-
ErrorInfo = sqlalchemy.Column(sqlalchemy.JSON, nullable=True)
71+
ErrorInfo = sqlalchemy.Column(sqlalchemy.PickleType, nullable=True)
7272

7373

7474
@event.listens_for(Engine, "connect")
@@ -100,6 +100,7 @@ def __init__(self, queue_db_info=None, cmd_name=None, descript=None, params=None
100100
:param params: optionally provide a dict which will be the options for the processing to execute
101101
(e.g., the input and output files).
102102
103+
103104
"""
104105
self.queue_db_info = queue_db_info
105106
self.cmd_name = cmd_name
@@ -354,52 +355,66 @@ def std_run(self, **kwargs):
354355
pbpt_utils = PBPTUtils()
355356
if self.parse_cmds(**kwargs):
356357
if self.debug_job_id is None:
358+
sqlite_db_file = self.queue_db_info['sqlite_db_file']
359+
sqlite_db_dir, sqlite_db_filename = os.path.split(sqlite_db_file)
357360
sqlite_db_conn = self.queue_db_info['sqlite_db_conn']
358361
logger.debug("Database connection info: '{}'.".format(sqlite_db_conn))
359362
found_job = False
360363
# Sleep for a random period of time to minimise clashes between multiple processes so they are offset.
361364
time.sleep(random.randint(1,10))
365+
n_failed_lck = 0
362366
while True:
363-
try:
364-
logger.debug("Creating Database Engine and Session.")
365-
db_engine = sqlalchemy.create_engine(sqlite_db_conn, pool_pre_ping=True)
366-
session_sqlalc = sqlalchemy.orm.sessionmaker(bind=db_engine)
367-
ses = session_sqlalc()
368-
logger.debug("Created Database Engine and Session.")
369-
370-
logger.debug("Find the next scene to process.")
371-
job_info = ses.query(PBPTProcessJob).filter(PBPTProcessJob.Completed == False,
372-
PBPTProcessJob.Started == False).order_by(
373-
PBPTProcessJob.PID.asc()).first()
374-
if job_info is not None:
375-
found_job = True
376-
self.job_pid = job_info.PID
377-
self.params = job_info.JobParams
378-
job_info.Started = True
379-
job_info.Start = datetime.datetime.now()
380-
ses.commit()
381-
logger.debug("Found the next scene to process. PID: {}".format(self.job_pid))
382-
else:
383-
found_job = False
384-
logger.debug("No job found to process - finishing.")
385-
ses.close()
386-
logger.debug("Closed Database Engine and Session.")
387-
except Exception as e:
388-
logger.debug("Failed to create the database connection: '{}'".format(sqlite_db_conn))
389-
logger.exception(e)
390-
found_job = False
391-
if found_job:
392-
self.check_required_fields(**kwargs)
367+
if pbpt_utils.get_file_lock(sqlite_db_file, sleep_period=1, wait_iters=180,
368+
use_except=False, timeout=300):
369+
n_failed_lck = 0
393370
try:
394-
self.do_processing(**kwargs)
395-
self.completed_processing(**kwargs)
371+
logger.debug("Creating Database Engine and Session.")
372+
db_engine = sqlalchemy.create_engine(sqlite_db_conn, pool_pre_ping=True)
373+
session_sqlalc = sqlalchemy.orm.sessionmaker(bind=db_engine)
374+
ses = session_sqlalc()
375+
logger.debug("Created Database Engine and Session.")
376+
377+
logger.debug("Find the next scene to process.")
378+
job_info = ses.query(PBPTProcessJob).filter(PBPTProcessJob.Completed == False,
379+
PBPTProcessJob.Started == False).order_by(
380+
PBPTProcessJob.PID.asc()).first()
381+
if job_info is not None:
382+
found_job = True
383+
self.job_pid = job_info.PID
384+
self.params = job_info.JobParams
385+
job_info.Started = True
386+
job_info.Start = datetime.datetime.now()
387+
ses.commit()
388+
logger.debug("Found the next scene to process. PID: {}".format(self.job_pid))
389+
else:
390+
found_job = False
391+
logger.debug("No job found to process - finishing.")
392+
ses.close()
393+
logger.debug("Closed Database Engine and Session.")
396394
except Exception as e:
397-
import traceback
398-
err_dict = dict()
399-
err_dict['error'] = str(e)
400-
err_dict['traceback'] = traceback.format_exc()
401-
self.record_process_error(err_dict)
395+
logger.debug("Failed to create the database connection: '{}'".format(sqlite_db_conn))
396+
logger.exception(e)
397+
found_job = False
398+
pbpt_utils.release_file_lock(sqlite_db_file, timeout=300)
399+
if found_job:
400+
self.check_required_fields(**kwargs)
401+
try:
402+
self.do_processing(**kwargs)
403+
self.completed_processing(**kwargs)
404+
except Exception as e:
405+
import traceback
406+
err_dict = dict()
407+
err_dict['error'] = str(e)
408+
err_dict['traceback'] = traceback.format_exc()
409+
self.record_process_error(err_dict)
410+
else:
411+
break
402412
else:
413+
n_failed_lck = n_failed_lck + 1
414+
pbpt_utils.clean_file_locks(sqlite_db_dir, timeout=300)
415+
416+
if n_failed_lck > 10:
417+
pbpt_utils.clean_file_locks(sqlite_db_dir, timeout=300)
403418
break
404419
else:
405420
sqlite_db_conn = self.queue_db_info['sqlite_db_conn']
@@ -439,18 +454,28 @@ def std_run(self, **kwargs):
439454

440455
class PBPTGenQProcessToolCmds(PBPTProcessToolsBase):
441456

442-
def __init__(self, cmd, sqlite_db_file, uid_len=6):
457+
def __init__(self, cmd, sqlite_db_file, uid_len=6, process_tools_path=None,
458+
process_tools_mod=None, process_tools_cls=None):
443459
"""
444460
A class to implement a the generation of commands for batch processing data analysis.
445461
446462
:param cmd: the command to be executed (e.g., python run_analysis.py).
447463
:param queue_db_info: The database dict info. Require fields: sqlite_db_conn, sqlite_db_file
464+
x:param process_tools_path: The path (if not already in path; i.e., same directory) to find the
465+
PBPTQProcessTool implementation used within this class
466+
:param process_tools_mod: The module name (i.e., python file name 'xxxx.py' containing the PBPTQProcessTool
467+
implementation.
468+
:param process_tools_cls: The name of the class implementing PBPTQProcessTool.
448469
449470
"""
450471
self.params = []
451472
self.cmd = cmd
452473
self.sqlite_db_file = os.path.abspath(sqlite_db_file)
453474
self.sqlite_db_conn = "sqlite:///{}".format(self.sqlite_db_file)
475+
self.process_tools_path = process_tools_path
476+
self.process_tools_mod = process_tools_mod
477+
self.process_tools_cls = process_tools_cls
478+
454479
super().__init__(uid_len)
455480

456481
@abstractmethod
@@ -470,19 +495,23 @@ def gen_command_info(self, **kwargs):
470495
"""
471496
pass
472497

473-
def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_file, out_err_info_file, **kwargs):
498+
def check_job_outputs(self, out_err_pid_file, out_err_info_file, process_tools_path=None,
499+
process_tools_mod=None, process_tools_cls=None, **kwargs):
474500
"""
475501
A function which following the completion of all the processing for a job tests whether all the output
476502
files where created (i.e., the job successfully completed).
477503
478-
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
479-
of the PBPTProcessTool class used for the processing to be checked.
480-
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
481-
for the processing to be checked.
482504
:param out_err_pid_file: the output file name and path for the list of database PIDs which have not
483505
been successfully processed.
484506
:param out_err_info_file: the output file name and path for the output error report from this function
485507
where processing might not have fully completed.
508+
:param process_tools_mod: the path containing the implementation of the PBPTProcessTool class
509+
used for the processing to be checked. If None then class value passed to
510+
constructor will be used.
511+
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
512+
of the PBPTProcessTool class used for the processing to be checked.
513+
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
514+
for the processing to be checked.
486515
:param kwargs: allows the user to pass custom variables to the function (e.q., obj.gen_command_info(input='')),
487516
these will be passed to the process_tools_mod outputs_present function.
488517
@@ -493,6 +522,22 @@ def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_fi
493522
queue_db_info['sqlite_db_file'] = self.sqlite_db_file
494523
queue_db_info['sqlite_db_conn'] = self.sqlite_db_conn
495524

525+
if process_tools_path is None:
526+
process_tools_path = self.process_tools_path
527+
528+
if process_tools_mod is None:
529+
if self.process_tools_mod is None:
530+
raise Exception("A PBPTProcessTool implementation module has not been provided to the constructor.")
531+
process_tools_mod = self.process_tools_mod
532+
533+
if process_tools_cls is None:
534+
if self.process_tools_cls is None:
535+
raise Exception("A PBPTProcessTool implementation class has not been provided to the constructor.")
536+
process_tools_cls = self.process_tools_cls
537+
538+
if process_tools_path is not None:
539+
sys.path.insert(0, process_tools_path)
540+
496541
process_tools_mod_inst = importlib.import_module(process_tools_mod)
497542
if process_tools_mod_inst is None:
498543
raise Exception("Could not load the module: '{}'".format(process_tools_mod))
@@ -544,19 +589,22 @@ def check_job_outputs(self, process_tools_mod, process_tools_cls, out_err_pid_fi
544589
pathlib.Path(out_err_pid_file).touch()
545590
pathlib.Path(out_err_info_file).touch()
546591

547-
def remove_job_outputs(self, process_tools_mod, process_tools_cls, all_jobs=False, error_jobs=False, **kwargs):
592+
def remove_job_outputs(self, all_jobs=False, error_jobs=False, process_tools_path=None, process_tools_mod=None,
593+
process_tools_cls=None, **kwargs):
548594
"""
549595
A function which following the completion of all the processing for a job tests whether all the output
550596
files where created (i.e., the job successfully completed).
551597
598+
:param all_jobs: boolean specifying that outputs should be removed for all jobs.
599+
:param error_jobs: boolean specifying that outputs should be removed for error jobs - either
600+
logged an error or started but not finished.
601+
:param process_tools_mod: the path containing the implementation of the PBPTProcessTool class
602+
used for the processing to be checked. If None then class value passed to
603+
constructor will be used.
552604
:param process_tools_mod: the module (i.e., path to python script) containing the implementation
553605
of the PBPTProcessTool class used for the processing to be checked.
554606
:param process_tools_cls: the name of the class implementing the PBPTProcessTool class used
555607
for the processing to be checked.
556-
:param all_jobs: boolean specifying that outputs should be removed for all jobs.
557-
:param error_jobs: boolean specifying that outputs should be removed for error jobs - either
558-
logged an error or started but not finished.
559-
:param job_id: int for the job id for which the outputs will be removed.
560608
:param kwargs: allows the user to pass custom variables to the function (e.q., obj.gen_command_info(input='')),
561609
these will be passed to the process_tools_mod outputs_present function.
562610
@@ -570,6 +618,22 @@ def remove_job_outputs(self, process_tools_mod, process_tools_cls, all_jobs=Fals
570618
queue_db_info['sqlite_db_file'] = self.sqlite_db_file
571619
queue_db_info['sqlite_db_conn'] = self.sqlite_db_conn
572620

621+
if process_tools_path is None:
622+
process_tools_path = self.process_tools_path
623+
624+
if process_tools_mod is None:
625+
if self.process_tools_mod is None:
626+
raise Exception("A PBPTProcessTool implementation module has not been provided to the constructor.")
627+
process_tools_mod = self.process_tools_mod
628+
629+
if process_tools_cls is None:
630+
if self.process_tools_cls is None:
631+
raise Exception("A PBPTProcessTool implementation class has not been provided to the constructor.")
632+
process_tools_cls = self.process_tools_cls
633+
634+
if process_tools_path is not None:
635+
sys.path.insert(0, process_tools_path)
636+
573637
process_tools_mod_inst = importlib.import_module(process_tools_mod)
574638
if process_tools_mod_inst is None:
575639
raise Exception("Could not load the module: '{}'".format(process_tools_mod))
@@ -844,31 +908,37 @@ def run_gen_commands(self):
844908
"""
845909
pass
846910

847-
@abstractmethod
848911
def run_check_outputs(self):
849912
"""
850-
An abstract function which needs to be implemented with the functions and inputs
851-
you want run to check the outputs of the processing have been successfully completed.
913+
A function which runs to check the outputs of the processing have been successfully completed.
914+
This function is executed when the user provides the --check option on the terminal.
915+
This function will by default output two files:
852916
853-
You will presumably want to call:
917+
* processing_errs_scns_yyyymmdd.txt
918+
* non_complete_errs_yyyymmdd.txt
854919
855-
* self.check_job_outputs
920+
To change the output file names you will probably want to create your own version of this function
921+
calling the self.check_job_outputs function.
856922
857923
"""
858-
pass
924+
time_sample_str = self.generate_readable_timestamp_str()
925+
out_err_pid_file = 'processing_errs_scns_{}.txt'.format(time_sample_str)
926+
out_err_info_file = 'non_complete_errs_{}.txt'.format(time_sample_str)
927+
self.check_job_outputs(out_err_pid_file, out_err_info_file)
859928

860-
@abstractmethod
861929
def run_remove_outputs(self, all_jobs=False, error_jobs=False):
862930
"""
863-
An abstract function which needs to be implemented with the functions and inputs
864-
you want run to check the outputs of the processing have been successfully completed.
931+
A function which removes the system output files, resetting the jobs to be rerun.
932+
This function is executed when the user provides the --rmouts option (with either --all or --error).
865933
866-
You will presumably want to call:
934+
If you want some different functionality then you may want to create your own version of this function.
867935
868-
* self.remove_job_outputs
936+
:param all_jobs: remove the outputs for all jobs regardless of whether they have successfully completed or not.
937+
:param error_jobs: only remove the outputs (which may or may not be present) from jobs which have resulted in
938+
an error.
869939
870940
"""
871-
pass
941+
self.remove_job_outputs(all_jobs, error_jobs)
872942

873943
def parse_cmds(self, argv=None):
874944
"""

pbprocesstools/pbpt_utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
class PBPTUtils(object):
4444

45-
def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=False):
45+
def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=False, timeout=3600):
4646
"""
4747
A function which gets a lock on a file.
4848
@@ -60,6 +60,7 @@ def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=F
6060
available. If False (default) False will be returned if the lock is
6161
not successful.
6262
:return: boolean. True: lock was successfully gained. False: lock was not gained.
63+
:param timeout: the time (in seconds) for the lock file timeout. Default: 3600 (1 hours).
6364
6465
"""
6566
file_path, file_name = os.path.split(input_file)
@@ -80,7 +81,10 @@ def get_file_lock(self, input_file, sleep_period=1, wait_iters=120, use_except=F
8081
f.flush()
8182
f.close()
8283
elif use_except:
84+
self.clean_file_locks(file_path, timeout)
8385
raise Exception("Lock could not be gained for file: {}".format(input_file))
86+
else:
87+
self.clean_file_locks(file_path, timeout)
8488

8589
return got_lock
8690

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import os
3838

3939
setuptools.setup(name='pb_process_tools',
40-
version='1.4.0',
40+
version='1.5.0',
4141
description='Tools for batch processing data, including on HPC cluster with slurm.',
4242
author='Pete Bunting',
4343
author_email='petebunting@mac.com',

0 commit comments

Comments
 (0)