|
13 | 13 | import pandas as pd |
14 | 14 | from pandera import Column, Check, DataFrameSchema |
15 | 15 | from pandera.errors import SchemaErrors |
| 16 | +from fuzzywuzzy import fuzz |
16 | 17 |
|
17 | 18 |
|
18 | 19 | def validate_file_reader(filename: str, allowed_schema: list[str]) -> dict[str, Any]: |
@@ -58,6 +59,56 @@ def load_json(path: str) -> Any: |
58 | 59 | raise FileNotFoundError(f"Failed to load JSON schema at {path}: {e}") |
59 | 60 |
|
60 | 61 |
|
| 62 | +def rename_columns_to_match_schema( |
| 63 | + df: pd.DataFrame, |
| 64 | + canon_to_aliases: Dict[str, List[str]], |
| 65 | + threshold: int = 90, |
| 66 | +) -> pd.DataFrame: |
| 67 | + """ |
| 68 | + Rename incoming columns using fuzzy match against schema-defined column names and aliases. |
| 69 | +
|
| 70 | + Args: |
| 71 | + df: Incoming dataframe |
| 72 | + canon_to_aliases: Mapping from canonical column names to list of aliases (including the canonical name itself) |
| 73 | + threshold: Fuzzy match score threshold to rename |
| 74 | +
|
| 75 | + Returns: |
| 76 | + A new DataFrame with renamed columns |
| 77 | + """ |
| 78 | + from collections import defaultdict |
| 79 | + |
| 80 | + new_column_names = {} |
| 81 | + log_info = defaultdict(list) |
| 82 | + |
| 83 | + schema_names = [] |
| 84 | + for canon, aliases in canon_to_aliases.items(): |
| 85 | + for name in aliases: |
| 86 | + schema_names.append((name, canon)) # (alias_or_name, canonical_name) |
| 87 | + |
| 88 | + for incoming_col in df.columns: |
| 89 | + best_score = 0 |
| 90 | + best_match = None |
| 91 | + best_canon = None |
| 92 | + |
| 93 | + for schema_col, canon in schema_names: |
| 94 | + score = fuzz.ratio(incoming_col.lower(), schema_col.lower()) |
| 95 | + if score > best_score: |
| 96 | + best_score = score |
| 97 | + best_match = schema_col |
| 98 | + best_canon = canon |
| 99 | + |
| 100 | + if best_score >= threshold and incoming_col != best_canon: |
| 101 | + new_column_names[incoming_col] = best_canon |
| 102 | + log_info[incoming_col].append( |
| 103 | + f"Renamed '{incoming_col}' -> '{best_canon}' (matched on '{best_match}', score={best_score})" |
| 104 | + ) |
| 105 | + |
| 106 | + for k, v in log_info.items(): |
| 107 | + logging.info(" | ".join(v)) |
| 108 | + |
| 109 | + return df.rename(columns=new_column_names) |
| 110 | + |
| 111 | + |
61 | 112 | def merge_model_columns( |
62 | 113 | base_schema: dict, |
63 | 114 | extension_schema: Any, |
@@ -134,6 +185,14 @@ def validate_dataset( |
134 | 185 | specs = merge_model_columns(base_schema, ext_schema, institution_id, m.lower()) |
135 | 186 | merged_specs.update(specs) |
136 | 187 |
|
| 188 | + canon_to_aliases = { |
| 189 | + canon: [normalize_col(alias) for alias in [canon] + spec.get("aliases", [])] |
| 190 | + for canon, spec in merged_specs.items() |
| 191 | + } |
| 192 | + df = rename_columns_to_match_schema(df, canon_to_aliases) |
| 193 | + |
| 194 | + incoming = set(df.columns) |
| 195 | + |
137 | 196 | # 3) build canon → set(normalized names) |
138 | 197 | canon_to_norms: Dict[str, set] = { |
139 | 198 | canon: {normalize_col(alias) for alias in [canon] + spec.get("aliases", [])} |
|
0 commit comments