Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 69 additions & 70 deletions cdisc_rules_engine/services/csv_metadata_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(
):
self.file_path = file_path
self.file_name = file_name
self.dataset = Path(file_name).stem
self.encoding = encoding
self.variables_csv_path = (
Path(variables_csv_path)
Expand All @@ -32,59 +33,87 @@ def __init__(
)

def read(self) -> dict:
dataset_name = Path(self.file_name).stem.lower()
metadata = {}
metadata.update(self.__dataset_metadata())
metadata.update(
{
"dataset_modification_date": datetime.fromtimestamp(
Path(self.file_path).stat().st_mtime
).isoformat(),
"adam_info": {
"categorization_scheme": {},
"w_indexes": {},
"period": {},
"selection_algorithm": {},
},
}
)
metadata.update(self.__variable_metadata())
metadata.update(self.__data_metadata())
return metadata

if not self.variables_csv_path.exists():
logger = logging.getLogger("validator")
logger.info("No variables file found for %s", dataset_name)
variables_meta = {}
else:
variables_meta = self.__get_variable_metadata(
dataset_name, self.variables_csv_path
def __dataset_metadata(self) -> dict:
logger = logging.getLogger("validator")

if not self.datasets_csv_path.exists():
logger.info("No datasets file found for %s", self.dataset)
return {}

try:
datasets_df = pd.read_csv(self.datasets_csv_path, encoding=self.encoding)
except (UnicodeDecodeError, UnicodeError) as e:
logger.error(
f"\n Error reading CSV from: {self.file_path}"
f"\n Failed to decode with {self.encoding} encoding: {e}"
f"\n Please specify the correct encoding using the -e flag."
)
return {}
except Exception as e:
logger.error("Error reading CSV file %s. %s", self.file_path, e)
return {}

metadata = {
"dataset_name": dataset_name.upper(),
"dataset_modification_date": datetime.fromtimestamp(
Path(self.file_path).stat().st_mtime
).isoformat(),
"adam_info": {
"categorization_scheme": {},
"w_indexes": {},
"period": {},
"selection_algorithm": {},
},
if "Filename" not in datasets_df.columns:
return {}

match = datasets_df[datasets_df["Filename"] == self.dataset]

if match.empty or len(match) > 1:
return {}

single_match = match.iloc[0]

return {
"dataset_name": (
single_match["Dataset Name"]
if "Dataset Name" in datasets_df.columns
else str(single_match["Filename"]).upper()
),
"dataset_label": str(single_match["Label"]),
}
metadata.update(variables_meta)
metadata.update(self.__data_meta())
metadata.update(self.__dataset_label())
return metadata

def __get_variable_metadata(
self, dataset_name: str, variables_file_path: Path
def __variable_metadata(
self,
) -> dict:
logger = logging.getLogger("validator")
if not self.variables_csv_path.exists():
logger.info("No variables file found for %s", self.dataset)
return {}
try:
meta_df = pd.read_csv(variables_file_path, encoding=self.encoding)
meta_df = pd.read_csv(self.variables_csv_path, encoding=self.encoding)
except (UnicodeDecodeError, UnicodeError) as e:
logger.error(
f"Could not decode CSV file {variables_file_path} with {self.encoding} encoding: {e}. "
f"Could not decode CSV file {self.variables_csv_path} with {self.encoding} encoding: {e}. "
f"Please specify the correct encoding using the -e flag."
)
return {}
except Exception as e:
logger.error("Error reading CSV file %s. %s", self.file_path, e)
return {}

meta_df["dataset"] = meta_df["dataset"].apply(
lambda x: Path(str(x)).stem.lower()
)

dataset_meta_df = meta_df[meta_df["dataset"] == dataset_name]
dataset_meta_df = meta_df[meta_df["dataset"] == self.dataset]

if dataset_meta_df.empty:
logger = logging.getLogger("validator")
logger.info("No dataset metadata found for %s", dataset_name)
logger.info("No dataset metadata found for %s", self.dataset)
return {}

variable_names = dataset_meta_df["variable"].tolist()
Expand All @@ -95,7 +124,11 @@ def __get_variable_metadata(
zip(variable_names, dataset_meta_df["type"])
)
variable_name_to_size_map = {
var: (int(length) if pd.notna(length) else None)
var: (
int(length)
if pd.notna(length) and (isinstance(length, int) or length.isdigit())
else None
)
for var, length in zip(variable_names, dataset_meta_df["length"])
}
return {
Expand All @@ -108,41 +141,7 @@ def __get_variable_metadata(
"number_of_variables": len(variable_names),
}

def __dataset_label(self) -> dict:
logger = logging.getLogger("validator")

if not self.datasets_csv_path.exists():
return {}

try:
datasets_df = pd.read_csv(self.datasets_csv_path, encoding=self.encoding)
except (UnicodeDecodeError, UnicodeError) as e:
logger.error(
f"\n Error reading CSV from: {self.file_path}"
f"\n Failed to decode with {self.encoding} encoding: {e}"
f"\n Please specify the correct encoding using the -e flag."
)
return {}
except Exception as e:
logger.error("Error reading CSV file %s. %s", self.file_path, e)
return {}

if "Filename" not in datasets_df.columns or "Label" not in datasets_df.columns:
return {}

datasets_df["dataset"] = datasets_df["Filename"].apply(
lambda x: Path(str(x)).stem.lower()
)

current_dataset = Path(self.file_name).stem.lower()
match = datasets_df[datasets_df["dataset"] == current_dataset]

if match.empty:
return {}

return {"dataset_label": str(match.iloc[0]["Label"])}

def __data_meta(self):
def __data_metadata(self):
logger = logging.getLogger("validator")
result = {
"dataset_length": 0,
Expand Down
Loading