From aced7c8f610df10ba798a79d9a622467f044160c Mon Sep 17 00:00:00 2001 From: LinklyLuck <83734371+LinklyLuck@users.noreply.github.com> Date: Wed, 29 Oct 2025 15:44:13 -0400 Subject: [PATCH] Optimize cleaning providers for faster type fixes --- app.py | 111 ++++++++-- cleaning_providers/__init__.py | 2 +- cleaning_providers/type_cleaner.py | 323 ++++++++++++++++++----------- the_pipeline_v2.py | 121 +++++++++++ 4 files changed, 419 insertions(+), 138 deletions(-) diff --git a/app.py b/app.py index ece087e..bae5be1 100644 --- a/app.py +++ b/app.py @@ -31,15 +31,31 @@ omdb_key = st.sidebar.text_input("OMDb API Key (OMDB_API_KEY)", type="password", value=os.getenv("OMDB_API_KEY", "")) tmdb_key = st.sidebar.text_input("TMDb API Key (TMDB_API_KEY)", type="password", value=os.getenv("TMDB_API_KEY", "")) -st.sidebar.divider() -st.sidebar.header("🧩 ER / MVI Options") -st.sidebar.caption("ER=Entity Resolution (LLM-driven); MVI=Missing Value Imputation (API-driven)") -enable_er = st.sidebar.checkbox("Enable ER (Entity Resolution - deduplicate entities)", value=True) -enable_mvi = st.sidebar.checkbox("Enable MVI (Missing Value Imputation - Wikipedia/TMDb/OMDb/Kaggle)", value=True) -er_sample_rows = st.sidebar.slider("ER/MVI Sample Rows (to limit external requests)", min_value=50, max_value=1000, - value=200, - step=50) -st.sidebar.caption("👉 Without API keys, only Wikipedia (no key required) will be used.") +# ---------------------- Sidebar: Cleaning & Validation ---------------------- +st.sidebar.header("🧽 Cleaning & Validation") +perform_cleaning = st.sidebar.checkbox("Enable data cleaning & external validation", value=False) +cleaning_mode = st.sidebar.selectbox( + "Cleaning mode", + options=["comprehensive", "llm", "type", "minimal"], + index=0, + disabled=not perform_cleaning +) +validation_source = st.sidebar.selectbox( + "Validation source", + options=["wikipedia", "tmdb", "omdb", "comprehensive"], + index=0, + disabled=not perform_cleaning +) +validation_cols_text = st.sidebar.text_input( + "Columns to validate (comma separated, optional)", + value="", + disabled=not perform_cleaning +) + +validation_columns = None +if perform_cleaning: + cols = [c.strip() for c in validation_cols_text.split(",") if c.strip()] + validation_columns = cols if cols else None # ---------------------- Queries Input ---------------------- st.subheader("📝 Natural Language Queries (one per line)") @@ -119,8 +135,26 @@ def run_one_query(idx: int, question: str) -> dict: "max_concurrency": int(max_conc), "api_base": api_base, "api_key": api_key or os.getenv("GPTNB_API_KEY", ""), - "er_mvi_sample_rows": int(er_sample_rows), } + if perform_cleaning: + cfg.update({ + "enable_cleaning": True, + "cleaning_mode": cleaning_mode, + "enable_validation": True, + "validation_source": validation_source, + "validation_columns": validation_columns, + "validation_api_keys": { + k: v for k, v in { + "tmdb": tmdb_key, + "omdb": omdb_key, + }.items() if v + } + }) + else: + cfg.update({ + "enable_cleaning": False, + "enable_validation": False, + }) cfg_path.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8") # Build subprocess environment: LLM + external enhancement @@ -132,11 +166,6 @@ def run_one_query(idx: int, question: str) -> dict: if omdb_key: env_for_child["OMDB_API_KEY"] = omdb_key if tmdb_key: env_for_child["TMDB_API_KEY"] = tmdb_key - # Separate ER and MVI switches - env_for_child["ER_ENABLED"] = "1" if enable_er else "0" - env_for_child["MVI_ENABLED"] = "1" if enable_mvi else "0" - env_for_child["ER_MVI_SAMPLE_ROWS"] = str(er_sample_rows) - # Run main pipeline t0 = time.time() proc = subprocess.run( @@ -168,6 +197,10 @@ def run_one_query(idx: int, question: str) -> dict: "validation_score": None, "validation_stdout": "", "validation_stderr": "", + "cleaning_report": "", + "cleaning_summary": {}, + "validation_provider_report": "", + "validation_summary": {}, } # Read entity_report from meta (if exists) @@ -181,6 +214,24 @@ def run_one_query(idx: int, question: str) -> dict: erp_path = out_csv.with_name(out_csv.stem + "_entity_report.csv") if erp_path.exists(): summary["entity_report"] = str(erp_path) + clean_report = meta.get("cleaning_report", "") + if clean_report: + clean_path = Path(clean_report) + if not clean_path.is_absolute() and out_csv_exists: + clean_path = out_csv.with_name(out_csv.stem + "_cleaning_report.json") + if clean_path.exists(): + summary["cleaning_report"] = str(clean_path) + validation_provider_rep = meta.get("validation_report", "") + if validation_provider_rep: + vpr_path = Path(validation_provider_rep) + if not vpr_path.is_absolute() and out_csv_exists: + vpr_path = out_csv.with_name(out_csv.stem + "_validation_provider_report.json") + if vpr_path.exists(): + summary["validation_provider_report"] = str(vpr_path) + if isinstance(meta.get("cleaning"), dict): + summary["cleaning_summary"] = meta["cleaning"] + if isinstance(meta.get("validation"), dict): + summary["validation_summary"] = meta["validation"] except Exception: pass @@ -296,6 +347,36 @@ def run_one_query(idx: int, question: str) -> dict: mime="text/csv" ) + # Cleaning stage report (if enabled) + if res.get("cleaning_report") or res.get("cleaning_summary"): + with st.expander("🧽 Cleaning Summary", expanded=False): + if res.get("cleaning_summary"): + st.json(res["cleaning_summary"]) + if res.get("cleaning_report"): + clean_path = Path(res["cleaning_report"]) + if clean_path.exists(): + st.download_button( + "⬇️ Download Cleaning Report (JSON)", + data=clean_path.read_bytes(), + file_name=clean_path.name, + mime="application/json" + ) + + # External validation provider report (post-cleaning) + if res.get("validation_provider_report") or res.get("validation_summary"): + with st.expander("✅ External Validation Summary", expanded=False): + if res.get("validation_summary"): + st.json(res["validation_summary"]) + if res.get("validation_provider_report"): + vpr = Path(res["validation_provider_report"]) + if vpr.exists(): + st.download_button( + "⬇️ Download External Validation Report (JSON)", + data=vpr.read_bytes(), + file_name=vpr.name, + mime="application/json" + ) + # ═══════════════════════════════════════════════════════════ # 🎓 VLDB Validation Report (NEW) # ═══════════════════════════════════════════════════════════ diff --git a/cleaning_providers/__init__.py b/cleaning_providers/__init__.py index 7f8c03f..0dff3aa 100644 --- a/cleaning_providers/__init__.py +++ b/cleaning_providers/__init__.py @@ -123,4 +123,4 @@ async def clean_data( # 执行清洗 cleaned_df, report = await cleaner.clean(df, primary_keys, rules) - return cleaned_df, report \ No newline at end of file + return cleaned_df, report diff --git a/cleaning_providers/type_cleaner.py b/cleaning_providers/type_cleaner.py index b385042..79b8dba 100644 --- a/cleaning_providers/type_cleaner.py +++ b/cleaning_providers/type_cleaner.py @@ -8,7 +8,6 @@ - 布尔值标准化 """ from __future__ import annotations -import re from typing import List, Dict, Optional, Any, Tuple import polars as pl from .base import CleaningProvider, CleaningReport @@ -47,23 +46,20 @@ async def clean( # 数值列清洗 if self._is_numeric_column(df, col): - col_changes = self._clean_numeric_column(cleaned_df, col, primary_keys) + cleaned_df, col_changes = self._clean_numeric_column(cleaned_df, col, primary_keys) if col_changes: - cleaned_df = self._apply_numeric_cleaning(cleaned_df, col, col_changes) changes.extend(col_changes) # 日期列清洗 elif self._is_date_column(df, col): - col_changes = self._clean_date_column(cleaned_df, col, primary_keys) + cleaned_df, col_changes = self._clean_date_column(cleaned_df, col, primary_keys) if col_changes: - cleaned_df = self._apply_date_cleaning(cleaned_df, col, col_changes) changes.extend(col_changes) # 布尔列清洗 elif self._is_boolean_column(df, col): - col_changes = self._clean_boolean_column(cleaned_df, col, primary_keys) + cleaned_df, col_changes = self._clean_boolean_column(cleaned_df, col, primary_keys) if col_changes: - cleaned_df = self._apply_boolean_cleaning(cleaned_df, col, col_changes) changes.extend(col_changes) report = self._create_report(changes) @@ -99,160 +95,243 @@ def _clean_numeric_column( df: pl.DataFrame, col: str, primary_keys: List[str] - ) -> List[Dict[str, Any]]: - """清洗数值列""" - changes = [] + ) -> Tuple[pl.DataFrame, List[Dict[str, Any]]]: + """清洗数值列(向量化实现)""" + if df.height == 0: + return df, [] + + original = pl.col(col) + original_str = original.cast(pl.Utf8, strict=False) + + sanitized = original_str.str.strip() + sanitized = sanitized.str.replace_all(",", "") + sanitized = sanitized.str.replace_all(r"[$€£¥]", "") + sanitized = pl.when( + sanitized.is_null() | (sanitized.str.len_chars() == 0) + ).then(None).otherwise(sanitized) + + is_numeric = sanitized.str.match(r"^-?\d+(?:\.\d+)?$").fill_null(False) + has_decimal = sanitized.str.contains(r"\.").fill_null(False) + + as_float = sanitized.cast(pl.Float64, strict=False) + as_int = sanitized.cast(pl.Int64, strict=False) + + converted_numeric = pl.when(is_numeric & has_decimal).then(as_float).when(is_numeric).then( + as_int.cast(pl.Float64, strict=False) + ).otherwise(None) + converted_str = pl.when(is_numeric & has_decimal).then( + as_float.cast(pl.Utf8, strict=False) + ).when(is_numeric).then( + as_int.cast(pl.Utf8, strict=False) + ).otherwise(None) + + invalid_mask = original.is_not_null() & sanitized.is_not_null() & (~is_numeric) + converted_change = (converted_str != original_str).fill_null(False) & is_numeric + needs_update = converted_change | invalid_mask + + report_df = df.select([ + pl.arange(0, pl.count()).alias("_row_index"), + original.alias("_original"), + converted_numeric.alias("_converted_numeric"), + converted_str.alias("_converted_str"), + invalid_mask.alias("_invalid"), + converted_change.alias("_converted_change"), + needs_update.alias("_needs_update") + ]) - for i in range(len(df)): - old_val = df[col][i] - if old_val is None: - continue + if not report_df["_needs_update"].any(): + return df, [] - # 尝试转换为数值 - try: - str_val = str(old_val).strip() - # 移除千分位逗号 - str_val = str_val.replace(",", "") - # 移除货币符号 - str_val = re.sub(r"[$€£¥]", "", str_val) - - # 尝试转换 - if "." in str_val: - new_val = float(str_val) - else: - new_val = int(str_val) - - if str(old_val) != str(new_val): - pk_info = {k: df[k][i] for k in primary_keys if k in df.columns} - changes.append({ - "row_index": i, - **pk_info, - "column": col, - "before": old_val, - "after": new_val, - "reason": "Type fix: converted to numeric" - }) - except: - # 无法转换,标记为None - pk_info = {k: df[k][i] for k in primary_keys if k in df.columns} + changes: List[Dict[str, Any]] = [] + for row in report_df.filter(pl.col("_needs_update")).iter_rows(named=True): + idx = row["_row_index"] + pk_info = {k: df[k][idx] for k in primary_keys if k in df.columns} + if row["_invalid"]: changes.append({ - "row_index": i, + "row_index": idx, **pk_info, "column": col, - "before": old_val, + "before": row["_original"], "after": None, "reason": "Type fix: invalid numeric value" }) + else: + changes.append({ + "row_index": idx, + **pk_info, + "column": col, + "before": row["_original"], + "after": row["_converted_numeric"], + "reason": "Type fix: converted to numeric" + }) - return changes + updated_df = df.with_columns([ + pl.when(is_numeric & has_decimal) + .then(as_float) + .when(is_numeric) + .then(as_int.cast(pl.Float64, strict=False)) + .when(invalid_mask) + .then(pl.lit(None, dtype=pl.Float64)) + .otherwise(original.cast(pl.Float64, strict=False)) + .alias(col) + ]) + + return updated_df, changes def _clean_date_column( self, df: pl.DataFrame, col: str, primary_keys: List[str] - ) -> List[Dict[str, Any]]: - """清洗日期列""" - changes = [] - - # 常见日期格式 - date_patterns = [ - r"(\d{4})-(\d{1,2})-(\d{1,2})", # YYYY-MM-DD - r"(\d{1,2})/(\d{1,2})/(\d{4})", # MM/DD/YYYY - r"(\d{4})(\d{2})(\d{2})", # YYYYMMDD - ] + ) -> Tuple[pl.DataFrame, List[Dict[str, Any]]]: + """清洗日期列(向量化实现)""" + if df.height == 0: + return df, [] + + # 已经是日期/日期时间列则跳过 + if df.schema[col] in (pl.Date, pl.Datetime): + return df, [] + + original = pl.col(col) + original_str = original.cast(pl.Utf8, strict=False) + stripped = original_str.str.strip() + stripped = pl.when( + stripped.is_null() | (stripped.str.len_chars() == 0) + ).then(None).otherwise(stripped) + + parsed = pl.coalesce( + stripped.str.strptime(pl.Date, "%Y-%m-%d", strict=False), + stripped.str.strptime(pl.Date, "%m/%d/%Y", strict=False), + stripped.str.strptime(pl.Date, "%Y%m%d", strict=False) + ) + normalized = parsed.dt.strftime("%Y-%m-%d") + + invalid_mask = original.is_not_null() & stripped.is_not_null() & parsed.is_null() + changed_mask = (normalized != original_str).fill_null(False) & normalized.is_not_null() + needs_update = changed_mask | invalid_mask + + report_df = df.select([ + pl.arange(0, pl.count()).alias("_row_index"), + original.alias("_original"), + normalized.alias("_normalized"), + invalid_mask.alias("_invalid"), + changed_mask.alias("_changed"), + needs_update.alias("_needs_update") + ]) - for i in range(len(df)): - old_val = df[col][i] - if old_val is None: - continue + if not report_df["_needs_update"].any(): + return df, [] - str_val = str(old_val).strip() - - # 尝试匹配并标准化为 YYYY-MM-DD - matched = False - for pattern in date_patterns: - match = re.match(pattern, str_val) - if match: - groups = match.groups() - - # 判断格式并重组 - if len(groups[0]) == 4: # YYYY-... - year, month, day = groups - else: # MM/DD/YYYY - month, day, year = groups - - try: - new_val = f"{int(year)}-{int(month):02d}-{int(day):02d}" - - if str_val != new_val: - pk_info = {k: df[k][i] for k in primary_keys if k in df.columns} - changes.append({ - "row_index": i, - **pk_info, - "column": col, - "before": old_val, - "after": new_val, - "reason": "Type fix: date format standardized" - }) - - matched = True - break - except: - continue - - if not matched and old_val not in (None, ""): - # 无法解析的日期,标记为None - pk_info = {k: df[k][i] for k in primary_keys if k in df.columns} + changes: List[Dict[str, Any]] = [] + for row in report_df.filter(pl.col("_needs_update")).iter_rows(named=True): + idx = row["_row_index"] + pk_info = {k: df[k][idx] for k in primary_keys if k in df.columns} + if row["_invalid"]: changes.append({ - "row_index": i, + "row_index": idx, **pk_info, "column": col, - "before": old_val, + "before": row["_original"], "after": None, "reason": "Type fix: invalid date format" }) + else: + changes.append({ + "row_index": idx, + **pk_info, + "column": col, + "before": row["_original"], + "after": row["_normalized"], + "reason": "Type fix: date format standardized" + }) - return changes + updated_df = df.with_columns([ + pl.when(normalized.is_not_null()) + .then(normalized) + .when(invalid_mask) + .then(pl.lit(None, dtype=pl.Utf8)) + .otherwise(original_str) + .alias(col) + ]) + + return updated_df, changes def _clean_boolean_column( self, df: pl.DataFrame, col: str, primary_keys: List[str] - ) -> List[Dict[str, Any]]: - """清洗布尔列""" - changes = [] - - true_values = {"true", "yes", "1", "t", "y"} - false_values = {"false", "no", "0", "f", "n"} - - for i in range(len(df)): - old_val = df[col][i] - if old_val is None: - continue + ) -> Tuple[pl.DataFrame, List[Dict[str, Any]]]: + """清洗布尔列(向量化实现)""" + if df.height == 0: + return df, [] + + if df.schema[col] == pl.Boolean: + return df, [] + + original = pl.col(col) + original_str = original.cast(pl.Utf8, strict=False) + lowered = original_str.str.to_lowercase().str.strip() + lowered = pl.when( + lowered.is_null() | (lowered.str.len_chars() == 0) + ).then(None).otherwise(lowered) + + true_values = ["true", "yes", "1", "t", "y"] + false_values = ["false", "no", "0", "f", "n"] + + converted = pl.when(lowered.is_in(true_values)).then(pl.lit(True)).when( + lowered.is_in(false_values) + ).then(pl.lit(False)).otherwise(None) + + invalid_mask = original.is_not_null() & lowered.is_not_null() & converted.is_null() + changed_mask = (converted.cast(pl.Utf8, strict=False) != original_str).fill_null(False) & converted.is_not_null() + needs_update = changed_mask | invalid_mask + + report_df = df.select([ + pl.arange(0, pl.count()).alias("_row_index"), + original.alias("_original"), + converted.alias("_converted"), + invalid_mask.alias("_invalid"), + changed_mask.alias("_changed"), + needs_update.alias("_needs_update") + ]) - str_val = str(old_val).lower().strip() + if not report_df["_needs_update"].any(): + return df, [] - if str_val in true_values: - new_val = True - elif str_val in false_values: - new_val = False + changes: List[Dict[str, Any]] = [] + for row in report_df.filter(pl.col("_needs_update")).iter_rows(named=True): + idx = row["_row_index"] + pk_info = {k: df[k][idx] for k in primary_keys if k in df.columns} + if row["_invalid"]: + changes.append({ + "row_index": idx, + **pk_info, + "column": col, + "before": row["_original"], + "after": None, + "reason": "Type fix: boolean standardized" + }) else: - new_val = None - - if old_val != new_val: - pk_info = {k: df[k][i] for k in primary_keys if k in df.columns} changes.append({ - "row_index": i, + "row_index": idx, **pk_info, "column": col, - "before": old_val, - "after": new_val, + "before": row["_original"], + "after": row["_converted"], "reason": "Type fix: boolean standardized" }) - return changes + updated_df = df.with_columns([ + pl.when(converted.is_not_null()) + .then(converted) + .when(invalid_mask) + .then(pl.lit(None, dtype=pl.Boolean)) + .otherwise(original) + .alias(col) + ]) + + return updated_df, changes def _apply_numeric_cleaning( self, diff --git a/the_pipeline_v2.py b/the_pipeline_v2.py index 80d9153..4c640d4 100644 --- a/the_pipeline_v2.py +++ b/the_pipeline_v2.py @@ -177,6 +177,22 @@ def _expand_globs_mixed(patterns: List[str]) -> List[Path]: _HAS_MVI_PROVIDERS = False print("[INFO] mvi_providers not found, MVI will be skipped") +try: + from cleaning_providers import clean_data + + _HAS_CLEANING_PROVIDERS = True +except ImportError: + _HAS_CLEANING_PROVIDERS = False + print("[INFO] cleaning_providers not found, cleaning stage will be skipped") + +try: + from validation_providers import validate_data + + _HAS_VALIDATION_PROVIDERS = True +except ImportError: + _HAS_VALIDATION_PROVIDERS = False + print("[INFO] validation_providers not found, validation stage will be skipped") + try: import yaml # optional for .yaml config except ImportError: @@ -351,6 +367,15 @@ class Config: max_rows_per_file: int = 200_000 # 每个文件最多读取多少行(超出截断) filter_by_query_keywords: bool = True # 根据 query 关键词粗过滤文件名 + # —— 新增:数据清洗与外部验证(可选) —— + enable_cleaning: bool = False + cleaning_mode: str = "comprehensive" + cleaning_rules: dict | None = None + enable_validation: bool = False + validation_source: str = "wikipedia" + validation_columns: Optional[List[str]] = None + validation_api_keys: Optional[Dict[str, str]] = None + # 👉 默认扫描"用户上传 datas/ + 本地 datalake/" DEFAULT_CONFIG = Config( @@ -1748,6 +1773,10 @@ def _enough(col: str) -> bool: entity_report_path = None enrich_summary_path = None er_report_path = None + cleaning_report_path = None + validation_provider_report_path = None + cleaning_report_data = None + validation_report_data = None # 识别领域 auto_domain = await detect_domain_from_query_or_columns( @@ -1940,6 +1969,90 @@ def _apply_row(i, r): import traceback traceback.print_exc() + # ==================================================================== + # 阶段3: 数据清洗(可选) + # ==================================================================== + cleaning_enabled = getattr(cfg, "enable_cleaning", False) + if cleaning_enabled: + if not _HAS_CLEANING_PROVIDERS: + print("[CLEAN] cleaning_providers not available, skipping cleaning stage") + else: + try: + print("\n[CLEAN] Starting data cleaning...") + cleaning_mode = getattr(cfg, "cleaning_mode", "comprehensive") + cleaning_rules = getattr(cfg, "cleaning_rules", None) + cleaned_df, cleaning_report = await clean_data( + df=merged, + primary_keys=pk_candidates, + llm_client=client, + mode=cleaning_mode, + rules=cleaning_rules + ) + merged = cleaned_df + if cleaning_report and hasattr(cleaning_report, "to_dict"): + cleaning_report_data = cleaning_report.to_dict() + cleaning_report_path = Path(cfg.out).with_name(Path(cfg.out).stem + "_cleaning_report.json") + Path(cleaning_report_path).write_text( + json.dumps(cleaning_report_data, ensure_ascii=False, indent=2), + encoding="utf-8" + ) + print(f"[CLEAN] Report -> {cleaning_report_path}") + if cleaning_report and getattr(cleaning_report, "cleaned_cells", 0): + print( + f"[CLEAN] ✓ Cleaned {cleaning_report.cleaned_cells} cells across {len(cleaning_report.cleaned_columns)} columns" + ) + except Exception as e: + print(f"[WARN] Cleaning failed: {e}") + import traceback + traceback.print_exc() + + # ==================================================================== + # 阶段4: 数据验证(可选,通常在清洗后) + # ==================================================================== + validation_enabled = getattr(cfg, "enable_validation", False) + if validation_enabled: + if not _HAS_VALIDATION_PROVIDERS: + print("[VALIDATION] validation_providers not available, skipping validation stage") + else: + try: + print("\n[VALIDATION] Starting external data validation...") + validation_source = getattr(cfg, "validation_source", "wikipedia") + validation_columns = getattr(cfg, "validation_columns", None) + validation_api_keys = getattr(cfg, "validation_api_keys", None) or {} + if not validation_api_keys: + fallback_keys = { + "tmdb": os.getenv("TMDB_API_KEY", ""), + "omdb": os.getenv("OMDB_API_KEY", ""), + } + validation_api_keys = {k: v for k, v in fallback_keys.items() if v} + validated_df, validation_report = await validate_data( + df=merged, + primary_keys=pk_candidates, + source=validation_source, + api_keys=validation_api_keys, + validate_columns=validation_columns, + llm_client=client + ) + merged = validated_df + if validation_report and hasattr(validation_report, "to_dict"): + validation_report_data = validation_report.to_dict() + validation_provider_report_path = Path(cfg.out).with_name( + Path(cfg.out).stem + "_validation_provider_report.json" + ) + Path(validation_provider_report_path).write_text( + json.dumps(validation_report_data, ensure_ascii=False, indent=2), + encoding="utf-8" + ) + print(f"[VALIDATION] Report -> {validation_provider_report_path}") + if validation_report and getattr(validation_report, "corrected_cells", 0): + print( + f"[VALIDATION] ✓ Corrected {validation_report.corrected_cells} cells across {validation_report.corrected_rows} rows" + ) + except Exception as e: + print(f"[WARN] Validation failed: {e}") + import traceback + traceback.print_exc() + # >>> 导出点名列并集副产物(如有) <<< if REQUESTED_COLS and requested_subset_df is not None: req_out = Path(cfg.out).with_name(Path(cfg.out).stem + "_requested_subset.csv") @@ -1996,6 +2109,10 @@ def _apply_row(i, r): meta["entity_report"] = entity_report_path if enrich_summary_path: meta["enrich_summary_csv"] = enrich_summary_path + if cleaning_report_path: + meta["cleaning_report"] = cleaning_report_path + if validation_provider_report_path: + meta["validation_report"] = validation_provider_report_path if entity_report_path or enrich_summary_path: meta["enrich"] = { "provider": (provider.name if 'provider' in locals() else None), @@ -2003,6 +2120,10 @@ def _apply_row(i, r): "sample_n": (len(sample_rows) if 'sample_rows' in locals() else None), "primary_keys": pk_candidates if 'pk_candidates' in locals() else [], } + if cleaning_report_data: + meta["cleaning"] = cleaning_report_data + if validation_report_data: + meta["validation"] = validation_report_data out.with_suffix(".meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8") print(f"[OK] CSV -> {out} ({merged.height}x{merged.width})")