diff --git a/nuh_helper/date_shift/validation.py b/nuh_helper/date_shift/validation.py new file mode 100644 index 0000000..9d30c84 --- /dev/null +++ b/nuh_helper/date_shift/validation.py @@ -0,0 +1,153 @@ +"""This contains an inspection function and error record type to determine if a +spreadsheet has data in abnormal places. it's mean tot check for "little notes" which +are outside of the CDM and may have undocumented patient data""" + +from dataclasses import dataclass +from pathlib import Path + +from openpyxl.cell.cell import Cell +from openpyxl.worksheet.worksheet import Worksheet + + +class Error: + """base class for the errors. has a simplified __eq__ for `assert error in list`""" + + +@dataclass +class ExcessRows(Error): + """error indicating that there are extra rows in a spreadsheet that don't have a + patient id and won't be shifted""" + + sheet_name: str + excess: list[int] + + +@dataclass +class UnlabeledColumns(Error): + """indication that there are columns with data but no header; probably notes in the + margin about missing tests or (previously) dates related to patient's treatment to + explain the data in the spreadsheet.""" + + sheet_name: str + columns: list[int] + + +@dataclass +class PatientColumnMissing(Error): + """used to indicate that the patien column wasn't found in the spreadsheet""" + sheet_name: str + label: str + + +def format_errors(errors: list[Error]) -> str: + """formats a collection of error objects into a human digestible string""" + message: str = "" + names = [] + + # group the errors by sheet names + for error in errors: + if error.sheet_name not in names: + names.append(error.sheet_name) + + for sheet_name in names: + message += f"on sheet {sheet_name=} ...\n" + for error in errors: + if error.sheet_name != sheet_name: + continue + match error: + case ExcessRows(): + message += ( + f"\tthere were {len(error.excess)} rows with data but no " + + "patient ID\n" + ) + message += f"\t\t{error.excess}\n" + case UnlabeledColumns(): + message += ( + f"\tthere were {len(error.columns)} columns with no data " + + "in their label\n" + ) + message += f"\t\t{error.columns}\n" + case PatientColumnMissing(): + label = error.label + message += f"\tthere was no patient column {label=}\n" + return message + + +def inspect(sheet_file: Path, sheet_configs: dict) -> list[Error]: + """Find data that's out of bounds in the spreadsheet. Uses the date-shifting + sheet_configs structure. Rather than throw exceptions, this returns a list of Error + objects that can be inspected or tested for.""" + + from openpyxl import load_workbook + + errors: list[Error] = [] + + workbook = load_workbook(sheet_file, read_only=True, rich_text=False) + for sheet_name in workbook.sheetnames: + if sheet_name not in sheet_configs: + print(f"skipping sheet {sheet_name=} since there's no config for it") + continue + + sheet = workbook[sheet_name] + + # scan the header row to find out what the bounds of the spreadsheet should be + header_row = sheet_configs[sheet_name]["header_row"] + patient_id_col_text = sheet_configs[sheet_name]["patient_id_col"] + skip_rows = sheet_configs[sheet_name]["skip_rows_after_header"] + + # we'll want to use the index in later checks + patient_id_col_index: None | int = None + + # record the "blank" columns in the + blanks: list[int] = [] + + # check each cell of the header + for col in range(0, sheet.max_column): + value = sheet.cell(header_row + 1, col + 1) + if blank_cell(value): + blanks.append(col) + elif value.value == patient_id_col_text: + patient_id_col_index = col + + if blanks: + errors.append(UnlabeledColumns(sheet_name, blanks)) + + # we can't do any further checks without the patient_id_col_index + if patient_id_col_index is None: + errors.append(PatientColumnMissing(sheet_name, patient_id_col_text)) + else: + excess = [] + + # find any rows with data but no patient id + for row in range(0, sheet.max_row): + if row in skip_rows or row == header_row: + continue + + # we will allow "blank" rows + # ... such as empty rows between groups of patients + should_be_blank = blank_cell( + sheet.cell(row + 1, patient_id_col_index + 1) + ) + + # to allow "whitespace rows" we only check rows without a patient id + if should_be_blank and not blank_row(sheet, row): + excess.append(row) + + if excess: + errors.append(ExcessRows(sheet_name, excess)) + + return errors + + +def blank_cell(cell: Cell) -> bool: + """tests if a cell value is blank""" + return str(cell.value).strip() == "" or cell.value is None + + +def blank_row(sheet: Worksheet, row: int) -> bool: + """tests if a row of a Worksheet is blank""" + for c in range(0, sheet.max_column): + cell = sheet.cell(row + 1, c + 1) + if not blank_cell(cell): + return False + return True diff --git a/tests/data/patients2with-extra-data.xlsx b/tests/data/patients2with-extra-data.xlsx new file mode 100644 index 0000000..01fb960 Binary files /dev/null and b/tests/data/patients2with-extra-data.xlsx differ diff --git a/tests/test_date_shift.py b/tests/test_date_shift.py index d624a16..c395d0b 100644 --- a/tests/test_date_shift.py +++ b/tests/test_date_shift.py @@ -55,7 +55,7 @@ def test_empty_string_returns_none(self) -> None: @pytest.mark.parametrize( "placeholder", - ["unknown", "Unknown", "unk", "unkown", "n/a", "none", "null"], + ["unknown", "Unknown", "unk", "unknown", "n/a", "none", "null"], ) def test_placeholder_strings_return_none(self, placeholder: str) -> None: assert _parse_date_value(placeholder) is None diff --git a/tests/test_inspect.py b/tests/test_inspect.py new file mode 100644 index 0000000..654418b --- /dev/null +++ b/tests/test_inspect.py @@ -0,0 +1,59 @@ +from nuh_helper.date_shift.validation import ( + ExcessRows, + Path, + UnlabeledColumns, + format_errors, + inspect, +) + + +def test_inspect() -> None: + """ + + https://github.com/Health-Informatics-UoN/nuh-helper/issues/78 + + https://github.com/Health-Informatics-UoN/nuh-helper/issues/8 + """ + + patients_src = Path(__file__).parent / "data/patients2with-extra-data.xlsx" + + errors = inspect(patients_src, sheet_configs) + + message = format_errors(errors) + print(">>>") + print(message) + print("<<<") + + assert ExcessRows("measurements", [14]) in errors + assert UnlabeledColumns("measurements", [3, 4]) in errors + + assert len(errors) == 2 + + +sheet_configs = { + "patients": { + "patient_id_col": "patient_id", + "header_row": 0, + "skip_rows_after_header": [], + "date_columns": [ + "dob", + "last_alive", + ], + }, + "results": { + "patient_id_col": "patient_id", + "header_row": 0, + "skip_rows_after_header": [], + "date_columns": [ + "date_result", + ], + }, + "measurements": { + "patient_id_col": "p_id", + "header_row": 1, + "skip_rows_after_header": [2, 3], + "date_columns": [ + "date8061", + ], + }, +}