diff --git a/MANIFEST.in b/MANIFEST.in index 3da9075..a83136b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -17,5 +17,5 @@ prune docs # Exclude project config files exclude mkdocs.yml -exclude .pre-commit.yml +exclude .pre-commit-config.yaml exclude .gitignore diff --git a/README.md b/README.md index 15763a9..8329042 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,8 @@ Results from both file- and dataset-level checks are aggregated, summarized, and ### Currently supported checkers While `esgf-qa` has been primarily developed for workflows assessing compliance with WCRP project data specifications -(e.g., CMIP, CORDEX), it can also be used for general CF-compliance testing and easily extended to support any -`cc-plugin` and projects following CORDEX- or CMIP-style CMOR table conventions. +(e.g., CMIP, CORDEX), it can also be used for general CF-compliance testing and generally supports any +`cc-plugin`. It can be easily extended to support any projects following CORDEX- or CMIP-style CMOR table conventions. | Standard | Checker Name | | ---------------------------------------------------------------------------------------------------- | ------------ | @@ -79,15 +79,16 @@ esgvoc status ## Usage ```shell -$ esgqa [-h] [-o ] [-t ] [-O OPTION] [-i ] [-r] [-C] +$ esgqa [-h] [-P ] [-o ] [-t ] [-O OPTION] [-i ] [-r] [-C] ``` - positional arguments: - `parent_dir`: Parent directory to scan for netCDF-files to check - options: - `-h, --help`: show this help message and exit + - `-P, --parallel_processes`: Specify the maximum number of parallel processes. Default: 0 (= number of cores). - `-o, --output_dir OUTPUT_DIR`: Directory to store QA results. Needs to be non-existing or empty or from previous QA run. If not specified, will store results in `./cc-qa-check-results/YYYYMMDD-HHmm_`. - - `-t, --test TEST`: The test to run (`'wcrp_cmip6:latest'`, `'wcrp_cordex_cmip6:latest'` or `'cf:'`, can be specified multiple times, eg.: `'-t wcrp_cmip6:latest -t cf:1.7'`) - default: running latest CF checks `'cf:latest'`. + - `-t, --test TEST`: The test to run (`'wcrp_cmip6:latest'`, `'wcrp_cordex_cmip6:latest'` or `'cf:'`, can be specified multiple times, eg.: `'-t wcrp_cmip6:latest -t cf:1.7'`) - default: running latest CF checks `'cf:latest'`. If the version is omitted, `latest` will be used. - `-O, --option OPTION`: Additional options to be passed to the checkers. Format: `':[:]'`. Multiple invocations possible. - `-i, --info INFO`: Information used to tag the QA results, eg. the simulation id to identify the checked run. Suggested is the original experiment-id you gave the run. - `-r, --resume`: Specify to continue a previous QC run. Requires the `` argument to be set. @@ -96,7 +97,7 @@ $ esgqa [-h] [-o ] [-t ] [-O OPTION] [-i ] [-r] [-C] ` diff --git a/esgf_qa/_constants.py b/esgf_qa/_constants.py index b4ba0ba..c58f0b3 100644 --- a/esgf_qa/_constants.py +++ b/esgf_qa/_constants.py @@ -1,15 +1,14 @@ from datetime import timedelta -# Collection of supported checkers +# Mapping of checker names to project names for better readability checker_dict = { "cc6": "CORDEX-CMIP6", "cf": "CF-Conventions", "mip": "MIP", - "plugin_cmip6": "CMIP6", # "wcrp-cmip5": "CMIP5", "wcrp_cmip6": "CMIP6", - # "wcrp_cmip7": "CMIP7-AFT", - # "wcrp_cmip7": "CMIP7", + # "wcrp_cmip7aft: "CMIP7-AFT", + "wcrp_cmip7": "CMIP7", # "wcrp_cordex": "CORDEX", "wcrp_cordex_cmip6": "CORDEX-CMIP6", # "obs4mips": "Obs4MIPs", @@ -24,17 +23,39 @@ } checker_release_versions = {} -# DRS parent directory names -DRS_path_parent = { - "CMIP5": "CMIP5", - "CMIP6": "CMIP6", - "CMIP7": "CMIP7", - "CMIP7-AFT": "CMIP7", - "CORDEX": "CORDEX", - "CORDEX-CMIP6": "CORDEX-CMIP6", - "Obs4MIPs": "Obs4MIPs", - "Input4MIPs": "Input4MIPs", -} +# Checkers for which consistency checks should be run +checker_supporting_consistency_checks = [ + "wcrp_cmip7", + "wcrp_cmip6", + "wcrp_cordex_cmip6", + "cc6", + "mip", +] + +# DRS parent directory names (for identifying project root and building dataset id) +supported_project_ids = [ + "cmip7", + "cmip6plus", + "cmip6", + "cmip5", + "cordex", + "cordex-cmip6", + "cordex-fpsconv", + "obs4mips", + "input4mips", + "c3scordex", + "c3scmip5", + "c3scmip6", + "c3s-ipcc-ar6-atlas", + "c3satlas", + "c3s-cica-atlas", + "c3satlas_v1", + "c3s-atlas-dataset", + "c3satlas_v2", + "eerie", + "happi", + "cosmo-rea", +] # Definition of maximum permitted deviations from the given frequency deltdic = {} diff --git a/esgf_qa/cluster_results.py b/esgf_qa/cluster_results.py index 58f5bb5..fff4810 100644 --- a/esgf_qa/cluster_results.py +++ b/esgf_qa/cluster_results.py @@ -101,7 +101,7 @@ def update(self, result_dict, dsid, file_name): "errors" ].items(): self.summary["error"][ - f"[{checker_dict[checker]}] " + function_name + f"[{checker_dict.get(checker, checker)}] " + function_name ][error_msg][dsid].append(file_name) else: score, max_score = result_dict[checker][test]["value"] @@ -110,7 +110,7 @@ def update(self, result_dict, dsid, file_name): if score < max_score: # test outcome: fail for msg in msgs: self.summary["fail"][weight][ - f"[{checker_dict[checker]}] " + test + f"[{checker_dict.get(checker, checker)}] " + test ][msg][dsid].append(file_name) def update_ds(self, result_dict, dsid): @@ -132,7 +132,8 @@ def update_ds(self, result_dict, dsid): ].items(): for file_name in errdict["files"]: self.summary["error"][ - f"[{checker_dict_ext[checker]}] " + function_name + f"[{checker_dict_ext.get(checker, checker)}] " + + function_name ][errdict["msg"]][dsid].append(file_name) else: weight = result_dict[checker][test].get("weight", 3) @@ -140,7 +141,7 @@ def update_ds(self, result_dict, dsid): for msg, file_names in fails.items(): for file_name in file_names: self.summary["fail"][weight][ - f"[{checker_dict_ext[checker]}] " + test + f"[{checker_dict_ext.get(checker, checker)}] " + test ][msg][dsid].append(file_name) def sort(self): diff --git a/esgf_qa/run_qa.py b/esgf_qa/run_qa.py index 3aef217..dc9d8ef 100644 --- a/esgf_qa/run_qa.py +++ b/esgf_qa/run_qa.py @@ -12,12 +12,14 @@ from compliance_checker import __version__ as cc_version from compliance_checker.runner import CheckSuite +from packaging import version as pversion from esgf_qa._constants import ( - DRS_path_parent, checker_dict, checker_dict_ext, checker_release_versions, + checker_supporting_consistency_checks, + supported_project_ids, ) from esgf_qa._version import version from esgf_qa.cluster_results import QAResultAggregator @@ -53,7 +55,7 @@ def get_default_result_dir(): ) -def get_dsid(files_to_check_dict, dataset_files_map_ext, file_path, project_id): +def get_dsid(files_to_check_dict, dataset_files_map_ext, file_path, project_ids): """ Get the dataset id for a file. @@ -65,8 +67,8 @@ def get_dsid(files_to_check_dict, dataset_files_map_ext, file_path, project_id): Dictionary of dataset files. file_path : str Path to the file. - project_id : str - Project id. + project_ids: list of str + List of supported project_ids Returns ------- @@ -75,16 +77,46 @@ def get_dsid(files_to_check_dict, dataset_files_map_ext, file_path, project_id): """ dir_id = files_to_check_dict[file_path]["id_dir"].split("/") fn_id = files_to_check_dict[file_path]["id_fn"].split("_") - if project_id in dir_id: - last_index = len(dir_id) - 1 - dir_id[::-1].index(project_id) - dsid = ".".join(dir_id[last_index:]) - else: - dsid = ".".join(dir_id) + dsid = ".".join(dir_id) + dir_id_lower = [el.lower() for el in dir_id] + for project_id in project_ids: + if project_id in dir_id_lower: + last_index = len(dir_id_lower) - 1 - dir_id_lower[::-1].index(project_id) + dsid = ".".join(dir_id[last_index:]) + break if len(dataset_files_map_ext[files_to_check_dict[file_path]["id_dir"]].keys()) > 1: dsid += "." + ".".join(fn_id) return dsid +def get_installed_checker_versions(): + """ + Get all available versions of installed cc-plugins. + + Returns + ------- + dict + A dictionary of {checker_name: [version1, version2, latest], ...}. + """ + check_suite = CheckSuite() + check_suite.load_all_available_checkers() + installed_versions = {} + for checker in check_suite.checkers: + try: + name, version = checker.split(":") + except ValueError: + name, version = checker, "latest" + if version == "latest": + continue + if name not in installed_versions: + installed_versions[name] = [] + installed_versions[name].append(version) + for name, versions in installed_versions.items(): + installed_versions[name] = sorted(versions, key=pversion.parse) + ["latest"] + + return installed_versions + + def get_checker_release_versions(checkers, checker_options={}): """ Get the release versions of the checkers. @@ -117,6 +149,12 @@ def get_checker_release_versions(checkers, checker_options={}): ) elif checker.split(":")[0] in checker_dict_ext: checker_release_versions[checker.split(":")[0]] = version + else: + checker_release_versions[checker.split(":")[0]] = ( + check_suite.checkers.get( + checker, "unknown version" + )._cc_spec_version + ) def run_compliance_checker(file_path, checkers, checker_options={}): @@ -166,8 +204,13 @@ def run_compliance_checker(file_path, checkers, checker_options={}): ds, [checker], include_checks=None, skip_checks=[] ) ) + if hasattr(ds, "close"): + ds.close() return results - return check_suite.run_all(ds, checkers, include_checks=None, skip_checks=[]) + results = check_suite.run_all(ds, checkers, include_checks=None, skip_checks=[]) + if hasattr(ds, "close"): + ds.close() + return results def track_checked_datasets(checked_datasets_file, checked_datasets): @@ -263,14 +306,6 @@ def process_file( checker = checkerv.split(":")[0] check_results[checker] = dict() check_results[checker]["errors"] = {} - # print() - # print("name",result[checker][0][0].name) - # print("weight", result[checker][0][0].weight) - # print("value", result[checker][0][0].value) - # print("msgs", result[checker][0][0].msgs) - # print("method", result[checker][0][0].check_method) - # print("children", result[checker][0][0].children) - # quit() for check in result[checkerv][0]: check_results[checker][check.name] = {} check_results[checker][check.name]["weight"] = check.weight @@ -507,6 +542,13 @@ def main(): action="store_true", help="Include basic consistency and continuity checks. Default: False.", ) + parser.add_argument( + "-P", + "--parallel_processes", + type=int, + default=0, + help="Specify the maximum number of parallel processes. Default: 0 (= number of cores).", + ) args = parser.parse_args() result_dir = os.path.abspath(args.output_dir) @@ -518,6 +560,7 @@ def main(): args.include_consistency_checks if args.include_consistency_checks else False ) cl_checker_options = parse_options(args.option) + parallel_processes = args.parallel_processes # Progress file to track already checked files progress_file = Path(result_dir, "progress.txt") @@ -527,15 +570,15 @@ def main(): # Resume information stored in a json file resume_info_file = Path(result_dir, ".resume_info") - # Do not allow arguments other than -o/--output_dir, -i/--info and -r/--resume if resuming previous QA run + # Do not allow any but certain arguments if resuming previous QA run if resume: - allowed_with_resume = {"output_dir", "info", "resume"} + allowed_with_resume = {"output_dir", "info", "resume", "parallel_processes"} # Convert Namespace to dict for easier checking set_args = {k for k, v in vars(args).items() if v not in (None, False, [], "")} invalid_args = set_args - allowed_with_resume if invalid_args: parser.error( - f"When using -r/--resume, only -o/--output_dir and -i/--info can be set. Invalid: {', '.join(invalid_args)}" + f"When using -r/--resume, the following arguments are not allowed: {', '.join(invalid_args)}" ) # Deal with result_dir @@ -622,12 +665,13 @@ def main(): checker_options = defaultdict(dict) else: # Require versions to be specified: - # test_regex = re.compile(r"^[a-z0-9_]+:(latest|[0-9]+(\.[0-9]+)*)$") + # test_regex = re.compile(r"^[a-zA-Z0-9_-]+:(latest|[0-9]+(\.[0-9]+)*)$") # Allow versions to be ommitted: - test_regex = re.compile(r"^[a-z0-9_]+(?::(latest|[0-9]+(?:\.[0-9]+)*))?$") + test_regex = re.compile(r"^[a-zA-Z0-9_-]+(?::(latest|[0-9]+(?:\.[0-9]+)*))?$") + # Check format of specified checkers and separate checker, version, options if not all([test_regex.match(test) for test in tests]): raise Exception( - f"Invalid test(s) specified. Please specify tests in the format 'checker_name' or'checker_name:version'. Currently supported are: {', '.join(list(checker_dict.keys()))}, eerie." + "Invalid test(s) specified. Please specify tests in the format 'checker_name' or'checker_name:version'." ) checkers = [test.split(":")[0] for test in tests] if sorted(checkers) != sorted(list(set(checkers))): @@ -641,6 +685,29 @@ def main(): for test in tests } checker_options = defaultdict(dict) + # Check if specified checkers (or their requested versions) exist / are currently installed + cc_checker_versions = get_installed_checker_versions() + invalid_checkers = [] + invalid_checkers_versions = [] + invalid_checkers_errmsg = "" + for checker_i, checker_iv in checkers_versions.items(): + if checker_i not in cc_checker_versions and checker_i != "eerie": + invalid_checkers.append(checker_i) + elif checker_i == "eerie": + pass + elif checker_iv not in cc_checker_versions[checker_i] and checker_i not in [ + "cc6", + "mip", + ]: + invalid_checkers_versions.append(checker_i) + if invalid_checkers: + invalid_checkers_errmsg = f"ERROR: Invalid test(s) specified. The following checkers are not supported or installed: {', '.join(invalid_checkers)}. " + for checker_i in invalid_checkers_versions: + if not invalid_checkers_errmsg: + invalid_checkers_errmsg = "ERROR: Invalid test(s) specified. " + invalid_checkers_errmsg += f"For checker {checker_i} only the following versions are currently supported / installed: {', '.join(cc_checker_versions[checker_i])}. " + if invalid_checkers_errmsg: + raise ValueError(invalid_checkers_errmsg) if "cc6" in checkers_versions and checkers_versions["cc6"] != "latest": checkers_versions["cc6"] = "latest" warnings.warn("Version of checker 'cc6' must be 'latest'. Using 'latest'.") @@ -665,10 +732,6 @@ def main(): raise Exception( "ERROR: Cannot run both 'cc6' and 'mip' checkers at the same time." ) - if any(test not in checker_dict.keys() for test in checkers_versions): - raise Exception( - f"Invalid test(s) specified. Supported are: {', '.join(checker_dict.keys())}" - ) # Combine checkers and versions # (checker_options are hardcoded) @@ -709,15 +772,6 @@ def main(): progress_file.touch() dataset_file.touch() - DRS_parent = "CORDEX-CMIP6" - for cname in checkers: - DRS_parent_tmp = DRS_path_parent.get( - checker_dict.get(cname.split(":")[0], ""), "" - ) - if DRS_parent_tmp: - DRS_parent = DRS_parent_tmp - break - # Check if progress files exist and read already processed files/datasets processed_files = set() with open(progress_file) as file: @@ -816,7 +870,7 @@ def main(): files_to_check = sorted(files_to_check) for file_path in files_to_check: files_to_check_dict[file_path]["id"] = get_dsid( - files_to_check_dict, dataset_files_map_ext, file_path, DRS_parent + files_to_check_dict, dataset_files_map_ext, file_path, supported_project_ids ) files_to_check_dict[file_path]["result_file_ds"] = ( result_dir @@ -884,22 +938,27 @@ def main(): raise Exception("No files found to check.") else: print( - f"Found {len(files_to_check)} files (organized in {len(dataset_files_map)} datasets) to check." + f"\nFound {len(files_to_check)} files (organized in {len(dataset_files_map)} datasets) to check." ) - print() - print("Files to check:") - print(json.dumps(files_to_check, indent=4)) - print() - print("Dataset - Files mapping (extended):") - print(json.dumps(dataset_files_map_ext, indent=4)) - print() - print("Dataset - Files mapping:") - print(json.dumps(dataset_files_map, indent=4)) - print() - print("Files to check dict:") - print(json.dumps(files_to_check_dict, indent=4)) - print() + # Save dictionaries to disk for information + with open(os.path.join(result_dir, "files_to_check.json"), "w") as f: + json.dump(files_to_check, f, indent=4) + with open(os.path.join(result_dir, "files_to_check_dict.json"), "w") as f: + json.dump(files_to_check_dict, f, indent=4) + with open(os.path.join(result_dir, "dataset_files_map.json"), "w") as f: + json.dump(dataset_files_map, f, indent=4) + with open(os.path.join(result_dir, "dataset_files_map_ext.json"), "w") as f: + json.dump(dataset_files_map_ext, f, indent=4) + print( + "Information on which files have been found and how these are organized into datasets was saved to disk:" + ) + print( + f" - {os.path.join(result_dir, 'files_to_check.json')}\n" + f" - {os.path.join(result_dir, 'files_to_check_dict.json')}\n" + f" - {os.path.join(result_dir, 'dataset_files_map.json')}\n" + f" - {os.path.join(result_dir, 'dataset_files_map_ext.json')}" + ) ######################################################### # QA Part 1 - Run all compliance-checker checks @@ -917,6 +976,8 @@ def main(): # Calculate the number of processes num_processes = max(multiprocessing.cpu_count() - 4, 1) + if parallel_processes > 0: + num_processes = min(num_processes, parallel_processes) print(f"Using {num_processes} parallel processes for cc checks.") print() @@ -963,13 +1024,8 @@ def main(): # Skip continuity and consistency checks if no cc6/mip checks were run # (and thus no consistency output file was created) - if ( - "cc6:latest" in checkers - or "mip:latest" in checkers - or "wcrp_cmip6:1.0" in checkers - or "wcrp_cmip6:latest" in checkers - or "wcrp_cordex_cmip6:1.0" in checkers - or "wcrp_cordex_cmip6:latest" in checkers + if any( + ch.split(":", 1)[0] in checker_supporting_consistency_checks for ch in checkers ): ######################################################### # QA Part 2 - Run all consistency & continuity checks @@ -996,6 +1052,8 @@ def main(): # Limit the number of processes for consistency checks since a lot # of files will be opened at the same time num_processes = min(num_processes, 10) + if parallel_processes > 0: + num_processes = min(num_processes, parallel_processes) print(f"Using {num_processes} parallel processes for dataset checks.") print() @@ -1046,7 +1104,9 @@ def main(): else: print() warnings.warn( - "Continuity & Consistency checks skipped since no cc6 checks were run." + "Continuity & consistency checks skipped since no appropriate checkers were run." + " The following checkers support the continuity & consistency checks: " + f"{', '.join(checker_supporting_consistency_checks)}" ) ######################################################### @@ -1074,7 +1134,7 @@ def main(): "cc_version": cc_version, "checkers": ", ".join( [ - f"{checker_dict.get(checker.split(':')[0], '')} {checker.split(':')[0]}:{checker_release_versions[checker.split(':')[0]]}" + f"{checker_dict.get(checker.split(':')[0], '')} {checker.split(':')[0]}:{checker_release_versions[checker.split(':')[0]]}".strip() for checker in checkers ] ), diff --git a/pyproject.toml b/pyproject.toml index a7fbc96..2106e44 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "compliance-checker>=5.3.0", "dask", "netCDF4", + "packaging", "pandas", "textual", "xarray", diff --git a/tests/test_cli.py b/tests/test_cli.py index 9081d2b..ffaca9c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -230,7 +230,7 @@ def test_cli_resume_functionality(self): ), ( ["-r", "-t", "cf:latest", "-o", "some_dir"], - "When using -r/--resume, only -o/--output_dir and -i/--info can be set", + "When using -r/--resume, the following arguments are not allowed", ), ], ) @@ -251,8 +251,12 @@ def test_cli_produces_valid_json(self): ["-t", "cf:latest", "-o", str(output_dir), str(self.cmip6_dir)] ) json_files = list(output_dir.glob("*.json")) - assert len(json_files) == 2 - with open(json_files[0]) as f: + assert len(json_files) == 6 + json_result_files = [ + f for f in json_files if f.name.startswith("qa_result_") + ] + assert len(json_result_files) == 2 + with open(json_result_files[0]) as f: data = json.load(f) # "info" is the only required field assert "info" in data diff --git a/tests/test_run_qa.py b/tests/test_run_qa.py index 0e1b067..a7eb43c 100644 --- a/tests/test_run_qa.py +++ b/tests/test_run_qa.py @@ -61,7 +61,7 @@ def test_get_dsid(): }, } file_path = f"/path/to/{project_id}/drs/elements/until/file1_1950-1960.nc" - dsid = get_dsid(files_to_check_dict, dataset_files_map_ext, file_path, project_id) + dsid = get_dsid(files_to_check_dict, dataset_files_map_ext, file_path, [project_id]) assert dsid == "my_project.drs.elements.until"