diff --git a/data_pipeline/stages/assemble_validated_events.py b/data_pipeline/stages/assemble_validated_events.py index 5dbc055..5dc48f7 100644 --- a/data_pipeline/stages/assemble_validated_events.py +++ b/data_pipeline/stages/assemble_validated_events.py @@ -217,11 +217,14 @@ def error(msg): for table_name in EVENT_TABLES: df = load_logical_table( - contracted_path, table_name, log_info=info, log_error=error + contracted_path, + table_name, + log_info=info, + log_error=error, ) if df is None: - log_error(f"{table_name}: dataset is empty", report) + error(f"{table_name}: dataset is empty") report["status"] = "failed" return report @@ -234,7 +237,7 @@ def error(msg): df_contract = freeze_schema(df_assembled) except Exception as e: - log_error(str(e), report) + error(str(e)) report["status"] = "failed" return report @@ -242,9 +245,10 @@ def error(msg): output_path = run_context.assembled_path / "assembled_events.parquet" if not export_file(df_contract, output_path): - log_error("Export failed", report) + error("Export failed") report["status"] = "failed" + info(f"Export success: assembled_events.parquet ({len(df_contract)} rows)") return report diff --git a/data_pipeline/stages/build_bi_semantic_layer.py b/data_pipeline/stages/build_bi_semantic_layer.py index 1b1cfe7..267ff92 100644 --- a/data_pipeline/stages/build_bi_semantic_layer.py +++ b/data_pipeline/stages/build_bi_semantic_layer.py @@ -5,6 +5,87 @@ # - Define and lock analytical grains for consistent aggregation and reporting # - Enforce referential integrity between fact and dimension tables +import pandas as pd +from typing import Dict, List +from data_pipeline.shared.run_context import RunContext +from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file + + +# ------------------------------------------------------------ +# SEMANTIC REPORT & LOGS +# ------------------------------------------------------------ + + +def init_report(): + return {"status": "success", "errors": [], "info": []} + + +def log_info(message: str, report: Dict[str, List[str]]) -> None: + print(f"[INFO] {message}") + report["info"].append(message) + + +def log_error(message: str, report: Dict[str, List[str]]) -> None: + print(f"[ERROR] {message}") + report["errors"].append(message) + + +# ------------------------------------------------------------ +# SEMANTIC LAYERING AND SCHEMA ENFORCEMENT +# ------------------------------------------------------------ + + +# ------------------------------------------------------------ +# BUILD BI SEMANTIC +# ------------------------------------------------------------ + + +def build_semantic_layer(run_context: RunContext) -> Dict: + + report = init_report() + + def info(msg): + log_info(msg, report) + + def error(msg): + log_error(msg, report) + + assembled_path = run_context.assembled_path + + df = load_logical_table( + assembled_path, + "assembled_events", + log_info=info, + log_error=error, + ) + + if df is None or df.empty: + error("assembled events is empty") + report["status"] = "failed" + + return report + + fact_seller = pd.DataFrame() + dim_seller = pd.DataFrame() + + fulfillment_tables = { + "seller_week_fulfillment_fact.parquet": fact_seller, + "dim_seller.parquet": dim_seller, + } + + for table_name, table in fulfillment_tables.items(): + + output_path = run_context.semantic_path / table_name + + if not export_file(table, output_path): + error(f"{table_name}: Export failed") + report["status"] = "failed" + break + + info(f"Export success: {table_name} ({len(table)} rows)") + + return report + # ============================================================================= # END OF SCRIPT diff --git a/data_pipeline/stages/validate_raw_data.py b/data_pipeline/stages/validate_raw_data.py index 80e27b7..db9a320 100644 --- a/data_pipeline/stages/validate_raw_data.py +++ b/data_pipeline/stages/validate_raw_data.py @@ -364,7 +364,7 @@ def error(msg: str): df = load_logical_table(base_path, table_name, log_info=info, log_error=error) if df is None: - log_error(f"{table_name} logical table is missing", report) + error(f"{table_name} logical table is missing") continue loaded_table_names.add(table_name) @@ -385,7 +385,7 @@ def error(msg: str): missing_tables = sorted(expected_tables - loaded_table_names) if missing_tables: - log_error(f"missing expected table(s) {missing_tables}", report) + error(f"missing expected table(s) {missing_tables}") run_cross_table_validations(tables, report)