11# Copyright (c) 2023 Forschungszentrum Juelich GmbH.
2- # This file is part of LLview.
2+ # This file is part of LLview.
33#
44# This is an open source software distributed under the GPLv3 license. More information see the LICENSE file at the top level.
55#
66# Contributions must follow the Contributor License Agreement. More information see the CONTRIBUTING.md file at the top level.
77#
88# Contributors:
9- # Filipe Guimarães (Forschungszentrum Juelich GmbH)
9+ # Filipe Guimarães (Forschungszentrum Juelich GmbH)
10+ # Matthias Lapu (CEA)
1011
1112import os # OS library (files and folders operations)
1213import sys # system variables for stdout and stderr
3031import itertools
3132import shutil
3233import yaml
33- import datetime
34+ import datetime
3435from pytz import timezone
3536
3637
@@ -57,7 +58,7 @@ def check_shutdown_callback(self):
5758 """
5859 Callback used after every job to check if shutdown file exists
5960 to terminate the jobs
60- """
61+ """
6162 if check_shutdown ():
6263 if email : msg .send_email (semail ,remail ,"Shutdown file found, stopping generation of PDF-job reports" )
6364 log .warning ("Shutdown file found, stopping jobs" )
@@ -68,7 +69,7 @@ def error_handler(e):
6869 """
6970 Error callback to catch any raised exception raised by some
7071 of the child processes, and send email
71- """
72+ """
7273 if email : msg .send_email (semail ,remail ,f"Error in PDF-Job report:\n { ' ' .join (traceback .format_exception (type (e ), e , e .__traceback__ ))} " )
7374 log .error (f"Error:\n { ' ' .join (traceback .format_exception (type (e ), e , e .__traceback__ ))} " )
7475 global nerrors
@@ -98,7 +99,7 @@ def ProcessReport(njob,total_jobs,job,config):
9899 Wrapper to catch eventual errors in _ProcessReport
99100 """
100101 log = logging .getLogger ('logger' )
101-
102+
102103 try :
103104 _ProcessReport (njob ,total_jobs ,job ,config )
104105 except Exception as e :
@@ -125,7 +126,7 @@ def _ProcessReport(njob,total_jobs,job,config):
125126
126127 # Getting timezonegap
127128 config ['appearance' ]['timezonegap' ] = timezone (config ['appearance' ]['timezone' ]).localize (datetime .datetime .strptime (data ["job" ]["starttime" ],'%Y-%m-%d %H:%M:%S' )).utcoffset ().seconds
128-
129+
129130 # Removing sensitive data in demo mode
130131 if config ['demo' ]:
131132 # folder = "."
@@ -154,7 +155,7 @@ def _ProcessReport(njob,total_jobs,job,config):
154155 except (ValueError ,KeyError ):
155156 data ['job' ]['numgpus' ] = 0
156157 num_gpus = 0
157-
158+
158159 # Escaping job name
159160 data ['job' ]['name' ] = re .escape (data ['job' ]['name' ])
160161
@@ -287,7 +288,7 @@ def _ProcessReport(njob,total_jobs,job,config):
287288 ##################################### Reading data from files #####################################
288289 for fh ,fh_info in files .items ():
289290 # If number of points is less than 2 or if the filename is not given, don't read the file
290- if (int (fh_info ['datapoints' ])< 2 ) or (fh not in data ['files' ]) or (data ['files' ][fh ] in ["" ,0 ,"-" ]):
291+ if (int (fh_info ['datapoints' ])< 2 ) or (fh not in data ['files' ]) or (data ['files' ][fh ] in ["" ,0 ,"-" ]):
291292 continue
292293
293294 # Reading file with information for all nodes and all times
@@ -313,11 +314,11 @@ def _ProcessReport(njob,total_jobs,job,config):
313314 y_x_keys = [key for key in {** y_headers , ** x_headers }.keys ()]
314315
315316 # Dropping duplicated lines
316- df_temp .drop_duplicates (subset = y_x_keys , keep = 'first' , inplace = True )
317+ df_temp .drop_duplicates (subset = y_x_keys , keep = 'first' , inplace = True )
317318
318319 # Dropping rows above ts range
319320 if config ['appearance' ]['maxsec' ]:
320- df_temp .drop (df_temp [df_temp [config ['plots' ]['_x' ]['header' ]] > df_temp [config ['plots' ]['_x' ]['header' ]].min ()+ config ['appearance' ]['maxsec' ]].index , inplace = True )
321+ df_temp .drop (df_temp [df_temp [config ['plots' ]['_x' ]['header' ]] > df_temp [config ['plots' ]['_x' ]['header' ]].min ()+ config ['appearance' ]['maxsec' ]].index , inplace = True )
321322
322323 # Dropping rows with infinity values
323324 df_temp = df_temp [~ df_temp .isin ([np .inf , - np .inf ]).any (axis = 1 )]
@@ -379,7 +380,7 @@ def _ProcessReport(njob,total_jobs,job,config):
379380 # If there are no graphs to plot in this section, skip
380381 if not graphs_to_plot : continue
381382
382- # Getting the file headers in this section
383+ # Getting the file headers in this section
383384 files_in_section = [config_section [_ ]['_file_header' ] for _ in graphs_to_plot ]
384385
385386 ################################# Setting up TOC and graphs_to_plot ####################################
@@ -487,7 +488,7 @@ def _ProcessReport(njob,total_jobs,job,config):
487488 to_plot_extra [section ]['colorplot' ] = []
488489 to_plot_extra [section ]['unified' ] = []
489490 to_plot_extra [section ]['description' ] = []
490- # Looping over graphs defined in the custom section
491+ # Looping over graphs defined in the custom section
491492 for idx ,_ in enumerate (graphs_to_plot ):
492493 # The header on the dat file uses a generic name, and not the real "name" of the graph
493494 to_plot_extra [section ]['headers' ].append (f"value{ idx } " )
@@ -548,7 +549,7 @@ def _ProcessReport(njob,total_jobs,job,config):
548549 df_overview [side ]['legend' ] = []
549550 for (fh ,graphs ),legend in zip (cols .items (),legends ):
550551 # Skipping plot if no data is present
551- if files [fh ]['data' ] is None : continue
552+ if files [fh ]['data' ] is None : continue
552553 if x_header_overview == 'ts' : x_header = 'datetime'
553554 df_temp = files [fh ]['data' ][list (graphs )].groupby ([x_header ], as_index = False ).mean ()
554555 # # Transforming timestamps (with timezone) to datetime
@@ -694,8 +695,8 @@ def _ProcessReport(njob,total_jobs,job,config):
694695 # proj_end = timeline_df['end'].max()
695696 proj_end = datetime .datetime .timestamp (datetime .datetime .strptime (data ['job' ]['updatetime' ], '%Y-%m-%d %H:%M:%S' ))
696697 timeline_df .loc [timeline_df ['end' ]< 0 ,'end' ] = proj_end
697- timeline_df ['start_time' ] = timeline_df ['beg' ].apply (lambda x : datetime .datetime .fromtimestamp (int (x + config ['appearance' ]['timezonegap' ]),datetime .timezone .utc ))
698- timeline_df ['end_time' ] = timeline_df ['end' ].apply (lambda x : datetime .datetime .fromtimestamp (int (x + config ['appearance' ]['timezonegap' ]),datetime .timezone .utc ))
698+ timeline_df ['start_time' ] = timeline_df ['beg' ].apply (lambda x : datetime .datetime .fromtimestamp (int (x + config ['appearance' ]['timezonegap' ]),datetime .timezone .utc ))
699+ timeline_df ['end_time' ] = timeline_df ['end' ].apply (lambda x : datetime .datetime .fromtimestamp (int (x + config ['appearance' ]['timezonegap' ]),datetime .timezone .utc ))
699700 timeline_df ['duration' ] = timeline_df ['end_time' ]- timeline_df ['start_time' ]
700701 timeline_df [['color' ,'edgecolor' ,'colorhtml' ,'edgecolorhtml' ]] = timeline_df ['st' ].apply (lambda x : add_color (x ))
701702 # Escaping job names
@@ -764,7 +765,7 @@ def _ProcessReport(njob,total_jobs,job,config):
764765 # Output files:
765766 # output = f"{folder}/python_{data['files']['pdffile']}"
766767 output_pdf = f"{ config ['outfolder' ]} /{ data ['files' ]['pdffile' ]} "
767- if config ['html' ] or config ['gzip' ]:
768+ if config ['html' ] or config ['gzip' ]:
768769 output_html = f"{ config ['outfolder' ]} /{ data ['files' ]['htmlfile' ]} "
769770
770771 # Getting time range of the job:
@@ -802,17 +803,17 @@ def _ProcessReport(njob,total_jobs,job,config):
802803 timeline_html ,system_report_html = LastPages .LastPages (pdf ,data ,config ,page_num ,timeline_df ,time_range ,error_lines )
803804
804805 ############################################################################
805- if config ['html' ] or config ['gzip' ]:
806+ if config ['html' ] or config ['gzip' ]:
806807 config ['appearance' ]['jobid' ] = data ['job' ]['jobid' ] # Job ID for title and filename
807808 config ['appearance' ]['system' ] = data ['job' ]['system' ].lower ().replace ('_' ,' ' ) # System for filename
808- GenerateHTML .CreateHTML (config ,
809- figs ,
810- navbar = navbar ,
811- first = first_page_html ,
812- overview = overview_fig ,
813- nodelist = nodelist_html ,
809+ GenerateHTML .CreateHTML (config ,
810+ figs ,
811+ navbar = navbar ,
812+ first = first_page_html ,
813+ overview = overview_fig ,
814+ nodelist = nodelist_html ,
814815 timeline = timeline_html ,
815- system_report = system_report_html ,
816+ system_report = system_report_html ,
816817 filename = output_html )
817818 # Moving files to final folder
818819 if config ['move' ]:
@@ -850,7 +851,7 @@ def process_plotlist(config,q):
850851 counter += 1
851852
852853 # Getting list of json files with all running jobs (and finished in the last 30 min) to process
853- # If config['json']=True, all files are already json
854+ # If config['json']=True, all files are already json
854855 if config ['json' ]:
855856 jobs = config ['file' ]
856857 else :
@@ -868,21 +869,21 @@ def process_plotlist(config,q):
868869 njobs = len (jobs )
869870 total_jobs = min (njobs ,config ['maxjobs' ])
870871
871- if total_jobs == 0 :
872+ if total_jobs == 0 :
872873 log .warning (f"No jobs in plotlist file!" )
873874 return
874875
875- # Create pool for dispatching work
876+ # Create pool for dispatching work
876877 global pool
877878 pool = mp .Pool (config ['nprocs' ], worker_init , [q ,config ['logging' ]['level' ]])
878879
879880 log .info (f"Generating report of { total_jobs } jobs" )
880881
881882 njob = 0
882883 for job in jobs :
883- njob += 1 # FOR DEBUG
884- if njob > config ['maxjobs' ]: # FOR DEBUG
885- break # FOR DEBUG
884+ njob += 1 # FOR DEBUG
885+ if njob > config ['maxjobs' ]: # FOR DEBUG
886+ break # FOR DEBUG
886887 pool .apply_async (ProcessReport , [njob ,total_jobs ,job ,config ], callback = check_shutdown_callback , error_callback = error_handler )
887888
888889 pool .close ()
@@ -946,12 +947,12 @@ def __init__(self,fmt,datefmt=""):
946947 logging .ERROR : self .red + self .fmt + self .reset ,
947948 logging .CRITICAL : self .bold_red + self .fmt + self .reset
948949 }
949-
950+
950951 def format (self , record ):
951952 log_fmt = self .FORMATS .get (record .levelno )
952953 formatter = logging .Formatter (fmt = log_fmt ,datefmt = self .datefmt )
953954 return formatter .format (record )
954-
955+
955956# Adapted from: https://stackoverflow.com/a/53257669/3142385
956957class _ExcludeErrorsFilter (logging .Filter ):
957958 def filter (self , record ):
@@ -967,7 +968,7 @@ def log_init(config):
967968 log = logging .getLogger ('logger' )
968969 log .setLevel (config ['level' ])
969970
970- # Setup handler: file (when configured) or stdout, stderr
971+ # Setup handler: file (when configured) or stdout, stderr
971972 if 'file' in config :
972973 fh = logging .FileHandler (config ['file' ], mode = config ['filemode' ])
973974 fh .setLevel (config ['level' ])
@@ -1024,7 +1025,7 @@ def main():
10241025
10251026 # Parse arguments
10261027 parser = argparse .ArgumentParser (description = "JuRepTool" )
1027- parser .add_argument ("file" , nargs = "+" , help = "File including list of running and recently-finished jobs or JSON file of a job" )
1028+ parser .add_argument ("file" , nargs = "+" , default = "" , help = "File including list of running and recently-finished jobs or JSON file of a job" )
10281029 parser .add_argument ("--daemon" , default = False , action = "store_true" , help = "Run as a 'daemon', i.e., in an infinite loop" )
10291030 parser .add_argument ("--demo" , default = False , action = "store_true" , help = "Run in 'demo' mode (hide usernames, project id and job names)" )
10301031 parser .add_argument ("--nomove" , default = False , action = "store_true" , help = "Don't copy files to final location" )
@@ -1056,12 +1057,23 @@ def main():
10561057 # config['appearance']['plotly_js'] = args.plotlyjs
10571058
10581059 # Configuration
1059- config ['file' ] = []
1060- for file in args .file :
1061- config ['file' ]+= glob .glob (file )
1062- config ['json' ] = False
1063- if all ([_ .endswith ('json' ) for _ in config ['file' ]]):
1064- config ['json' ] = True
1060+ config ['file' ] = set ()
1061+ # changed to a set to remove duplicate, edgecase where
1062+ # input : fileA folderB(fileA,fileB) -> config['file']= fileA, fileA, fileB
1063+
1064+ if args .file :
1065+ for element in args .file :
1066+ if os .path .isfile (element ):
1067+ config ['file' ].update (glob .glob (element ))
1068+ elif os .path .isdir (element ):
1069+ config ['file' ].update (os .path .join (element , fname ) for fname in os .listdir (element ) if fname .endswith ('.json' ))
1070+ else :
1071+ raise FileNotFoundError (f"File not found: { element } " )
1072+ config ['json' ] = all ([_ .endswith ('json' ) for _ in config ['file' ]])
1073+ else :
1074+ parser .print_help ()
1075+ raise FileNotFoundError (f"Config { args .config } does not exist" )
1076+
10651077 config ['demo' ] = args .demo
10661078 config ['html' ] = not args .nohtml
10671079 config ['gzip' ] = args .gzip
0 commit comments