diff --git a/cdisc_rules_engine/services/csv_metadata_reader.py b/cdisc_rules_engine/services/csv_metadata_reader.py index 16427e76c..778a2131d 100644 --- a/cdisc_rules_engine/services/csv_metadata_reader.py +++ b/cdisc_rules_engine/services/csv_metadata_reader.py @@ -19,6 +19,7 @@ def __init__( ): self.file_path = file_path self.file_name = file_name + self.dataset = Path(file_name).stem self.encoding = encoding self.variables_csv_path = ( Path(variables_csv_path) @@ -32,43 +33,76 @@ def __init__( ) def read(self) -> dict: - dataset_name = Path(self.file_name).stem.lower() + metadata = {} + metadata.update(self.__dataset_metadata()) + metadata.update( + { + "dataset_modification_date": datetime.fromtimestamp( + Path(self.file_path).stat().st_mtime + ).isoformat(), + "adam_info": { + "categorization_scheme": {}, + "w_indexes": {}, + "period": {}, + "selection_algorithm": {}, + }, + } + ) + metadata.update(self.__variable_metadata()) + metadata.update(self.__data_metadata()) + return metadata - if not self.variables_csv_path.exists(): - logger = logging.getLogger("validator") - logger.info("No variables file found for %s", dataset_name) - variables_meta = {} - else: - variables_meta = self.__get_variable_metadata( - dataset_name, self.variables_csv_path + def __dataset_metadata(self) -> dict: + logger = logging.getLogger("validator") + + if not self.datasets_csv_path.exists(): + logger.info("No datasets file found for %s", self.dataset) + return {} + + try: + datasets_df = pd.read_csv(self.datasets_csv_path, encoding=self.encoding) + except (UnicodeDecodeError, UnicodeError) as e: + logger.error( + f"\n Error reading CSV from: {self.file_path}" + f"\n Failed to decode with {self.encoding} encoding: {e}" + f"\n Please specify the correct encoding using the -e flag." ) + return {} + except Exception as e: + logger.error("Error reading CSV file %s. %s", self.file_path, e) + return {} - metadata = { - "dataset_name": dataset_name.upper(), - "dataset_modification_date": datetime.fromtimestamp( - Path(self.file_path).stat().st_mtime - ).isoformat(), - "adam_info": { - "categorization_scheme": {}, - "w_indexes": {}, - "period": {}, - "selection_algorithm": {}, - }, + if "Filename" not in datasets_df.columns: + return {} + + match = datasets_df[datasets_df["Filename"] == self.dataset] + + if match.empty or len(match) > 1: + return {} + + single_match = match.iloc[0] + + return { + "dataset_name": ( + single_match["Dataset Name"] + if "Dataset Name" in datasets_df.columns + else str(single_match["Filename"]).upper() + ), + "dataset_label": str(single_match["Label"]), } - metadata.update(variables_meta) - metadata.update(self.__data_meta()) - metadata.update(self.__dataset_label()) - return metadata - def __get_variable_metadata( - self, dataset_name: str, variables_file_path: Path + def __variable_metadata( + self, ) -> dict: logger = logging.getLogger("validator") + if not self.variables_csv_path.exists(): + logger.info("No variables file found for %s", self.dataset) + return {} try: - meta_df = pd.read_csv(variables_file_path, encoding=self.encoding) + meta_df = pd.read_csv(self.variables_csv_path, encoding=self.encoding) except (UnicodeDecodeError, UnicodeError) as e: logger.error( - f"Could not decode CSV file {variables_file_path} with {self.encoding} encoding: {e}. " + f"Could not decode CSV file {self.variables_csv_path} with {self.encoding} encoding: {e}. " f"Please specify the correct encoding using the -e flag." ) return {} @@ -76,15 +110,10 @@ def __get_variable_metadata( logger.error("Error reading CSV file %s. %s", self.file_path, e) return {} - meta_df["dataset"] = meta_df["dataset"].apply( - lambda x: Path(str(x)).stem.lower() - ) - - dataset_meta_df = meta_df[meta_df["dataset"] == dataset_name] + dataset_meta_df = meta_df[meta_df["dataset"] == self.dataset] if dataset_meta_df.empty: - logger = logging.getLogger("validator") - logger.info("No dataset metadata found for %s", dataset_name) + logger.info("No dataset metadata found for %s", self.dataset) return {} variable_names = dataset_meta_df["variable"].tolist() @@ -95,7 +124,11 @@ def __get_variable_metadata( zip(variable_names, dataset_meta_df["type"]) ) variable_name_to_size_map = { - var: (int(length) if pd.notna(length) else None) + var: ( + int(length) + if pd.notna(length) and (isinstance(length, int) or length.isdigit()) + else None + ) for var, length in zip(variable_names, dataset_meta_df["length"]) } return { @@ -108,41 +141,7 @@ def __get_variable_metadata( "number_of_variables": len(variable_names), } - def __dataset_label(self) -> dict: - logger = logging.getLogger("validator") - - if not self.datasets_csv_path.exists(): - return {} - - try: - datasets_df = pd.read_csv(self.datasets_csv_path, encoding=self.encoding) - except (UnicodeDecodeError, UnicodeError) as e: - logger.error( - f"\n Error reading CSV from: {self.file_path}" - f"\n Failed to decode with {self.encoding} encoding: {e}" - f"\n Please specify the correct encoding using the -e flag." - ) - return {} - except Exception as e: - logger.error("Error reading CSV file %s. %s", self.file_path, e) - return {} - - if "Filename" not in datasets_df.columns or "Label" not in datasets_df.columns: - return {} - - datasets_df["dataset"] = datasets_df["Filename"].apply( - lambda x: Path(str(x)).stem.lower() - ) - - current_dataset = Path(self.file_name).stem.lower() - match = datasets_df[datasets_df["dataset"] == current_dataset] - - if match.empty: - return {} - - return {"dataset_label": str(match.iloc[0]["Label"])} - - def __data_meta(self): + def __data_metadata(self): logger = logging.getLogger("validator") result = { "dataset_length": 0,