|
13 | 13 | import pandas as pd |
14 | 14 | from pandera import Column, Check, DataFrameSchema |
15 | 15 | from pandera.errors import SchemaErrors |
| 16 | +from fuzzywuzzy import fuzz |
16 | 17 |
|
17 | 18 |
|
18 | 19 | def validate_file_reader(filename: str, allowed_schema: list[str]) -> dict[str, Any]: |
@@ -58,6 +59,56 @@ def load_json(path: str) -> Any: |
58 | 59 | raise FileNotFoundError(f"Failed to load JSON schema at {path}: {e}") |
59 | 60 |
|
60 | 61 |
|
| 62 | +def rename_columns_to_match_schema( |
| 63 | + df: pd.DataFrame, |
| 64 | + canon_to_aliases: Dict[str, List[str]], |
| 65 | + threshold: int = 90, |
| 66 | +) -> pd.DataFrame: |
| 67 | + """ |
| 68 | + Rename incoming columns using fuzzy match against schema-defined column names and aliases. |
| 69 | +
|
| 70 | + Args: |
| 71 | + df: Incoming dataframe |
| 72 | + canon_to_aliases: Mapping from canonical column names to list of aliases (including the canonical name itself) |
| 73 | + threshold: Fuzzy match score threshold to rename |
| 74 | +
|
| 75 | + Returns: |
| 76 | + A new DataFrame with renamed columns |
| 77 | + """ |
| 78 | + from collections import defaultdict |
| 79 | + |
| 80 | + new_column_names = {} |
| 81 | + log_info = defaultdict(list) |
| 82 | + |
| 83 | + schema_names = [] |
| 84 | + for canon, aliases in canon_to_aliases.items(): |
| 85 | + for name in aliases: |
| 86 | + schema_names.append((name, canon)) # (alias_or_name, canonical_name) |
| 87 | + |
| 88 | + for incoming_col in df.columns: |
| 89 | + best_score = 0 |
| 90 | + best_match = None |
| 91 | + best_canon = None |
| 92 | + |
| 93 | + for schema_col, canon in schema_names: |
| 94 | + score = fuzz.ratio(incoming_col.lower(), schema_col.lower()) |
| 95 | + if score > best_score: |
| 96 | + best_score = score |
| 97 | + best_match = schema_col |
| 98 | + best_canon = canon |
| 99 | + |
| 100 | + if best_score >= threshold and incoming_col != best_canon: |
| 101 | + new_column_names[incoming_col] = best_canon |
| 102 | + log_info[incoming_col].append( |
| 103 | + f"Renamed '{incoming_col}' -> '{best_canon}' (matched on '{best_match}', score={best_score})" |
| 104 | + ) |
| 105 | + |
| 106 | + for k, v in log_info.items(): |
| 107 | + logging.info(" | ".join(v)) |
| 108 | + |
| 109 | + return df.rename(columns=new_column_names) |
| 110 | + |
| 111 | + |
61 | 112 | def merge_model_columns( |
62 | 113 | base_schema: dict, |
63 | 114 | extension_schema: Any, |
@@ -107,6 +158,13 @@ def validate_dataset( |
107 | 158 | ) -> Dict[str, Any]: |
108 | 159 | df = pd.read_csv(filename) |
109 | 160 | df = df.rename(columns={c: normalize_col(c) for c in df.columns}) |
| 161 | + |
| 162 | + canon_to_aliases = { |
| 163 | + canon: [normalize_col(alias) for alias in [canon] + spec.get("aliases", [])] |
| 164 | + for canon, spec in merged_specs.items() |
| 165 | + } |
| 166 | + df = rename_columns_to_match_schema(df, canon_to_aliases) |
| 167 | + |
110 | 168 | incoming = set(df.columns) |
111 | 169 |
|
112 | 170 | # 1) load schemas |
|
0 commit comments