11#! /usr/bin/env python3
22
3-
43import argparse
4+ import atexit
55import concurrent .futures
66import itertools
77import logging
@@ -19,6 +19,7 @@ from types import FrameType
1919from typing import Any , List , TextIO , Tuple , TypeVar , Union
2020
2121from _tool_helpers import (
22+ check_file_exists ,
2223 get_char_with_timeout ,
2324 get_engine_config ,
2425 get_max_futures_workers ,
4142except ModuleNotFoundError :
4243 import json # type: ignore[no-redef]
4344
44- # TODO
45- __version__ = "1.3.0" # See https://www.python.org/dev/peps/pep-0396/
46- __date__ = "2022-11-29"
47- __updated__ = "2023-12-19"
48-
4945LOG_FORMAT = "%(asctime)s - %(levelname)s: %(message)s"
5046LONG_RECORD = 300
5147# Text to substitute in output depending on the mode
@@ -78,7 +74,7 @@ T = TypeVar("T")
7874
7975
8076errors_file = f"{ MODULE_NAME } _errors_{ START_TS } .log"
81- with_info_file = f"{ MODULE_NAME } _withInfo_ { START_TS } .jsonl"
77+ with_info_file = f"{ MODULE_NAME } _with_info_ { START_TS } .jsonl"
8278# If running in a Docker container use /data/, prior file_loader uses this as volume mount for data input/output files
8379if in_docker ():
8480 if pathlib .Path ("/data" ).exists ():
@@ -105,22 +101,22 @@ except IOError as err_outer:
105101shutdown = Event ()
106102
107103
108- class CustomArgumentParser (argparse .ArgumentParser ):
109- """Custom argparse class to remove the errors file if CLI help or usage is displayed"""
104+ def files_clean_up () -> None :
105+ """TODO"""
106+ if check_file_exists (errors_file ) and pathlib .Path (errors_file ).stat ().st_size == 0 :
107+ pathlib .Path (errors_file ).unlink ()
110108
111- def print_help (self , file = None ):
112- pathlib .Path (errors_file ).unlink (missing_ok = True )
113- return super ().print_help (file )
109+ if check_file_exists (with_info_file ) and pathlib .Path (with_info_file ).stat ().st_size == 0 :
110+ pathlib .Path (with_info_file ).unlink ()
114111
115- def print_usage (self , file = None ):
116- pathlib .Path (errors_file ).unlink (missing_ok = True )
117- return super ().print_usage (file )
112+
113+ atexit .register (files_clean_up )
118114
119115
120116def parse_cli_args () -> dict [str , Union [bool , int , list [str ], None , str ]]:
121117 """# TODO"""
122118 # TODO - Change github link
123- arg_parser = CustomArgumentParser (
119+ arg_parser = argparse . ArgumentParser (
124120 allow_abbrev = False ,
125121 description = "Utility to load Senzing JSON records and process redo records" ,
126122 formatter_class = argparse .RawTextHelpFormatter ,
@@ -585,10 +581,9 @@ def startup_info(
585581 time .sleep (5 )
586582
587583
588- # TODO - No type error on -> str when returning None also?
589- def add_record (engine : SzEngine , rec_to_add : str , with_info : bool ) -> str :
584+ def add_record (engine : SzEngine , rec_to_add : str , with_info : bool ) -> Union [None , str ]:
590585 """Add a single record, returning with info details if requested"""
591- # Return None if a blank line was read to prevent blank lines throwing errors when trying to json.loads()
586+ # Return "" if a blank line was read to prevent blank lines throwing errors when trying to json.loads()
592587 rec_to_add = rec_to_add .strip ()
593588 if not rec_to_add :
594589 return None
@@ -630,7 +625,7 @@ def prime_redo_records(engine: SzEngine, quantity: int) -> List[str]:
630625
631626
632627def process_redo_record (engine : SzEngine , record : str , with_info : bool ) -> str :
633- """Process a single redo record, returning with info details if --info or SENZING_WITHINFO was specified """
628+ """Process a single redo record, returning with info details if --info"""
634629 if with_info :
635630 response = engine .process_redo_record (record , SzEngineFlags .SZ_WITH_INFO )
636631 return response
@@ -728,6 +723,7 @@ def load_and_redo(
728723 return False
729724
730725 error_recs = 0
726+ load_blank_lines = 0
731727 load_errors = 0
732728 load_success = 0
733729 load_time = 0.0
@@ -810,7 +806,8 @@ def load_and_redo(
810806 error_recs += 1
811807 logger .info ("" )
812808 logger .error (
813- "%s - Operation: %s - Record: %s" ,
809+ "%s%s - Operation: %s - Record: %s" ,
810+ "" if console_handle .formatter ._fmt == LOG_FORMAT else "ERROR: " ,
814811 err ,
815812 MODE_TEXT [mode .__name__ ]["except_msg" ],
816813 futures [f ][0 ].strip (),
@@ -828,12 +825,17 @@ def load_and_redo(
828825 if add_future and not shutdown .is_set ():
829826 more_recs = add_new_future ()
830827
831- # Write out with info result if it was requested
832- if result :
833- with_info_out .write (f"{ result } \n " )
828+ # If loading and None the line in the source was blank
829+ if mode .__name__ == "add_record" and result is None :
830+ load_blank_lines += 1
831+ else :
832+ # Write out with info result if it was requested
833+ if result :
834+ with_info_out .write (f"{ result } \n " )
835+
836+ success_recs += 1
834837
835- success_recs += 1
836- if success_recs % recs_per_sec_output_frequency == 0 :
838+ if success_recs > 0 and success_recs % recs_per_sec_output_frequency == 0 :
837839 prev_time = record_stats (
838840 success_recs ,
839841 error_recs ,
@@ -900,6 +902,7 @@ def load_and_redo(
900902 "errors_file" : (str (pathlib .Path (errors_file ).resolve ()) if (load_errors + error_recs ) > 0 else None ),
901903 "with_info" : (str (pathlib .Path (with_info_file ).resolve ()) if with_info else None ),
902904 "elapsed_time_total" : load_time + redo_time ,
905+ "blank_lines" : load_blank_lines ,
903906 "load_stats" : {
904907 "success_recs" : load_success ,
905908 "error_recs" : load_errors ,
@@ -932,6 +935,7 @@ def per_result(cli_args: argparse.Namespace, result: dict[str, Any]) -> None:
932935 else :
933936 did_shuf = "Yes" if result ["did_shuff" ] else "No"
934937 logger .info ("Source file shuffled: %s" , did_shuf )
938+ logger .info ("Source file blank lines: %s" , result ["blank_lines" ])
935939 logger .info (
936940 "With info file: %s" ,
937941 result ["with_info" ] if result ["with_info" ] else "Not requested" ,
@@ -973,6 +977,7 @@ def summary_results(
973977 shuffle_no_delete = cli_args .shuffleNoDelete
974978
975979 elapsed_time_total = 0
980+ load_blank_lines_total = 0
976981 load_error_total = 0
977982 load_success_total = 0
978983 load_time_total = 0
@@ -983,6 +988,7 @@ def summary_results(
983988 for result in overall_results .values ():
984989 load_success_total += result ["load_stats" ]["success_recs" ]
985990 load_error_total += result ["load_stats" ]["error_recs" ]
991+ load_blank_lines_total += result ["blank_lines" ]
986992 load_time_total += result ["load_stats" ]["elapsed_time" ]
987993 redo_success_total += result ["redo_stats" ]["success_recs" ]
988994 redo_error_total += result ["redo_stats" ]["error_recs" ]
@@ -994,7 +1000,7 @@ def summary_results(
9941000 logger .info ("---------------" )
9951001 logger .info ("" )
9961002 logger .info ("Files processed: %s" , len (overall_results ))
997-
1003+ logger . info ( "Empty lines: %s" , load_blank_lines_total )
9981004 if shuffle_no_delete :
9991005 logger .info (
10001006 "Files shuffled: %s" ,
@@ -1030,12 +1036,12 @@ def main() -> None:
10301036
10311037 signal .signal (signal .SIGINT , signal_int )
10321038
1033- errors_total = 0
10341039 ingest_file_shuff = None
10351040 overall_results : dict [str , dict [str , Any ]] = {}
10361041
10371042 cli_args = parse_cli_args ()
10381043
1044+ # TODO
10391045 for ingest_file in cli_args .file :
10401046 try :
10411047 with open (ingest_file , "r" , encoding = "utf-8" ) as _ :
@@ -1123,17 +1129,6 @@ def main() -> None:
11231129 if len (overall_results ) > 1 :
11241130 summary_results (cli_args , overall_results )
11251131
1126- # Check if there were any errors for load or redo to determine if errors file should be deleted
1127- for result in overall_results .values ():
1128- errors_total += result ["load_stats" ]["error_recs" ]
1129- errors_total += result ["redo_stats" ]["error_recs" ]
1130-
1131- if not errors_total :
1132- pathlib .Path (errors_file ).unlink (missing_ok = True )
1133-
1134- if not cli_args .withinfo :
1135- pathlib .Path (with_info_file ).unlink (missing_ok = True )
1136-
11371132
11381133if __name__ == "__main__" :
11391134 main ()
0 commit comments