diff --git a/.coveragerc b/.coveragerc index 57cd853..9339626 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,4 +2,5 @@ omit = data_pipeline/prototypes/* data_pipeline/shared/table_configs.py + data_pipeline/stages/build_bi_semantic_layer.py */__init__.py \ No newline at end of file diff --git a/data_pipeline/run_pipeline.py b/data_pipeline/run_pipeline.py index 51bdecd..b89926f 100644 --- a/data_pipeline/run_pipeline.py +++ b/data_pipeline/run_pipeline.py @@ -12,6 +12,7 @@ from data_pipeline.stages.validate_raw_data import apply_validation from data_pipeline.stages.apply_raw_data_contract import apply_contract from data_pipeline.stages.assemble_validated_events import assemble_events +from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer def snapshot_raw(run_context: RunContext) -> None: @@ -45,22 +46,22 @@ def main() -> None: # Create raw snapshot at runtime snapshot_raw(run_context) - report_validation_1 = [] + report_validation_initial = [] # Initial validation - validation_1 = apply_validation(run_context) - report_validation_1.append(validation_1) + validation_initial = apply_validation(run_context) + report_validation_initial.append(validation_initial) persist_json( - run_context.logs_path / "validation_1.json", + run_context.logs_path / "validation_initial.json", { "run_id": run_context.run_id, - "report": report_validation_1, + "report": report_validation_initial, }, ) # Early exit for structural errors else apply contract - if validation_1["errors"]: + if validation_initial["errors"]: sys.exit(1) report_contract = [] @@ -89,26 +90,26 @@ def main() -> None: }, ) - report_validation_2 = [] + report_validation_post_contract = [] # Rerun validation on CONTRACTED data - validation_2 = apply_validation( + validation_post_contract = apply_validation( run_context, base_path=run_context.contracted_path, ) - report_validation_2.append(validation_2) + report_validation_post_contract.append(validation_post_contract) persist_json( - run_context.logs_path / "validation_2.json", + run_context.logs_path / "validation_post_contract.json", { "run_id": run_context.run_id, - "report": report_validation_2, + "report": report_validation_post_contract, }, ) # Intervention: Either manual fixing or escalate the data to source owner - if validation_2["errors"] or validation_2["warnings"]: + if validation_post_contract["errors"] or validation_post_contract["warnings"]: sys.exit(1) report_assemble = [] @@ -128,6 +129,23 @@ def main() -> None: if assemble["status"] == "failed": sys.exit(1) + report_semantic = [] + + # Semantic modeling + semantic = build_semantic_layer(run_context) + report_semantic.append(semantic) + + persist_json( + run_context.logs_path / "semantic_report.json", + { + "run_id": run_context.run_id, + "report": report_semantic, + }, + ) + + if semantic["status"] == "failed": + sys.exit(1) + sys.exit(0) diff --git a/data_pipeline/shared/raw_loader_exporter.py b/data_pipeline/shared/raw_loader_exporter.py index 97693ca..365cc2e 100644 --- a/data_pipeline/shared/raw_loader_exporter.py +++ b/data_pipeline/shared/raw_loader_exporter.py @@ -70,13 +70,13 @@ def load_logical_table( df = loader(file_path) if log_info: - log_info(f"loaded {file_path.name} ({len(df)} rows)") + log_info(f"Loaded: {file_path.name} ({len(df)} rows)") dfs.append(df) except Exception as e: if log_error: - log_error(f"failed loading {file_path.name}: {e}") + log_error(f"Failed loading: {file_path.name}: {e}") if not dfs: if log_error: diff --git a/data_pipeline/stages/assemble_validated_events.py b/data_pipeline/stages/assemble_validated_events.py index 5dc48f7..e425762 100644 --- a/data_pipeline/stages/assemble_validated_events.py +++ b/data_pipeline/stages/assemble_validated_events.py @@ -166,6 +166,7 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame: "lead_time_days": "int64", "approval_lag_days": "int64", "delivery_delay_days": "int64", + "order_date": "datetime64[ns]", "order_year": "int64", } @@ -242,13 +243,20 @@ def error(msg): return report - output_path = run_context.assembled_path / "assembled_events.parquet" + year = run_context.run_id[:4] + month = run_context.run_id[4:6] + + output_path = ( + run_context.assembled_path / f"assembled_events_{year}_{month}.parquet" + ) if not export_file(df_contract, output_path): error("Export failed") report["status"] = "failed" - info(f"Export success: assembled_events.parquet ({len(df_contract)} rows)") + info( + f"Export success: assembled_events_{year}_{month}.parquet ({len(df_contract)} rows)" + ) return report diff --git a/data_pipeline/stages/build_bi_semantic_layer.py b/data_pipeline/stages/build_bi_semantic_layer.py index 267ff92..91ec933 100644 --- a/data_pipeline/stages/build_bi_semantic_layer.py +++ b/data_pipeline/stages/build_bi_semantic_layer.py @@ -6,7 +6,7 @@ # - Enforce referential integrity between fact and dimension tables import pandas as pd -from typing import Dict, List +from typing import Dict, List, Tuple, Literal from data_pipeline.shared.run_context import RunContext from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file @@ -31,16 +31,256 @@ def log_error(message: str, report: Dict[str, List[str]]) -> None: # ------------------------------------------------------------ -# SEMANTIC LAYERING AND SCHEMA ENFORCEMENT +# SELLER WEEKLY SEMANTIC MODELING AND SCHEMA ENFORCEMENT # ------------------------------------------------------------ +def seller_weekly_semantic( + df: pd.DataFrame, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Seller weekly semantic builder. + + Transforms the assembled event table into seller-level weekly + performance fact and supporting seller dimension. + + Transform behavior: + + - Validates single-run lineage via `run_id` + - Derives weekly alignment fields and status flags + - Aggregates event data to seller-week grain + - Builds seller dimension from first observed activity + + Fact grain: + - One row per (seller_id, order_year_week) + + Dimension grain: + - One row per seller_id + """ + + read_assembled = df.copy() + + if read_assembled["run_id"].nunique() != 1: + raise RuntimeError("Multiple run_ids detected") + + read_assembled["week_start_date"] = ( + read_assembled["order_date"].dt.to_period("W-MON").dt.start_time + ) + read_assembled["is_delivered"] = read_assembled["order_status"].eq("delivered") + read_assembled["is_cancelled"] = read_assembled["order_status"].eq("cancelled") + + seller_weekly_fact = read_assembled.groupby( + ["seller_id", "order_year_week"], + as_index=False, + ).agg( + week_start_date=("week_start_date", "min"), + run_id=("run_id", "first"), + weekly_order_count=("order_id", "count"), + weekly_delivered_orders=("is_delivered", "sum"), + weekly_cancelled_orders=("is_cancelled", "sum"), + weekly_revenue=("order_revenue", "sum"), + weekly_avg_lead_time=("lead_time_days", "mean"), + weekly_total_lead_time=("lead_time_days", "sum"), + weekly_avg_delivery_delay=("delivery_delay_days", "mean"), + weekly_total_delivery_delay=("delivery_delay_days", "sum"), + weekly_avg_approval_lag=("approval_lag_days", "mean"), + ) + + expected = ( + read_assembled[["seller_id", "order_year_week"]].drop_duplicates().shape[0] + ) + + if len(seller_weekly_fact) != expected: + raise RuntimeError("Fact table grain violation: seller_weekly_fact") + + seller_dim = read_assembled.groupby( + "seller_id", + as_index=False, + ).agg( + first_order_date=("order_date", "min"), + first_order_year_week=("order_year_week", "min"), + run_id=("run_id", "first"), + ) + + if len(seller_dim) != read_assembled["seller_id"].nunique(): + raise RuntimeError("Dimension table grain violation: seller_dim") + + return seller_weekly_fact, seller_dim + + +def freeze_seller_semantic( + df: pd.DataFrame, + table_type: Literal["fact", "dim"], +) -> pd.DataFrame: + """ + Seller semantic contract enforcer. + + Routes the input table to the appropriate fact or dimension + contract freezer and enforces grain integrity before projection. + + Behavior: + + - Validates `table_type` selector + - Applies grain-level duplicate checks: + - fact: (seller_id, order_year_week) + - dim: seller_id + - Dispatches to the corresponding schema freezer + - Returns a BI-ready, schema-stable dataframe + """ + + if table_type not in {"fact", "dim"}: + raise ValueError + + def freeze_seller_fact(df: pd.DataFrame) -> pd.DataFrame: + """ + Seller weekly fact contract enforcement. + + Projects the aggregated seller-week dataset into the approved + fact schema, enforces dtypes, and applies deterministic ordering. + + Enforcement actions: + + - Validates presence of all required fact columns + - Projects to the contract column order + - Casts fields to enforced dtypes + - Sorts by (seller_id, order_year_week) + - Resets index for clean downstream consumption + + """ + + fact_contract = df.copy() + + FACT_SCHEMA = [ + "seller_id", + "order_year_week", + "week_start_date", + "run_id", + "weekly_order_count", + "weekly_delivered_orders", + "weekly_cancelled_orders", + "weekly_revenue", + "weekly_avg_lead_time", + "weekly_total_lead_time", + "weekly_avg_delivery_delay", + "weekly_total_delivery_delay", + "weekly_avg_approval_lag", + ] + + FACT_ENFORCED_DTYPES = { + "seller_id": "string", + "order_year_week": "string", + "week_start_date": "datetime64[ns]", + "run_id": "string", + "weekly_order_count": "int64", + "weekly_delivered_orders": "int64", + "weekly_cancelled_orders": "int64", + "weekly_revenue": "float64", + "weekly_avg_lead_time": "float64", + "weekly_total_lead_time": "int64", + "weekly_avg_delivery_delay": "float64", + "weekly_total_delivery_delay": "int64", + "weekly_avg_approval_lag": "float64", + } + + missing_cols = set(FACT_SCHEMA) - set(fact_contract.columns) + if missing_cols: + raise RuntimeError( + f"seller_weekly_fact missing required column(s): {sorted(missing_cols)}" + ) + + fact_contract = fact_contract[FACT_SCHEMA].copy() + fact_contract = fact_contract.astype(FACT_ENFORCED_DTYPES) + fact_contract = fact_contract.sort_values( + ["seller_id", "order_year_week"] + ).reset_index(drop=True) + + return fact_contract + + def freeze_seller_dim(df: pd.DataFrame) -> pd.DataFrame: + """ + Seller dimension contract enforcement. + + Projects the seller dimension into the approved schema, + enforces dtypes, and applies deterministic ordering. + + Enforcement actions: + + - Validates presence of all required dimension columns + - Projects to the contract column order + - Casts fields to enforced dtypes + - Sorts by seller_id + - Resets index for clean downstream consumption + """ + + dim_contract = df.copy() + + DIM_SCHEMA = [ + "seller_id", + "first_order_date", + "first_order_year_week", + "run_id", + ] + + DIM_ENFORCED_DTYPES = { + "seller_id": "string", + "first_order_date": "datetime64[ns]", + "first_order_year_week": "string", + "run_id": "string", + } + + missing_cols = set(DIM_SCHEMA) - set(dim_contract.columns) + if missing_cols: + raise RuntimeError( + f"seller_dim missing required column(s): {sorted(missing_cols)}" + ) + + dim_contract = dim_contract[DIM_SCHEMA].copy() + dim_contract = dim_contract.astype(DIM_ENFORCED_DTYPES) + dim_contract = dim_contract.sort_values("seller_id").reset_index(drop=True) + + return dim_contract + + if table_type == "fact": + if df.duplicated(["seller_id", "order_year_week"]).any(): + raise RuntimeError("Duplicate seller_id - order_year_week in fact") + + seller_fact_contracted = freeze_seller_fact(df) + + return seller_fact_contracted + + else: + if df["seller_id"].duplicated().any(): + raise RuntimeError("Duplicate seller_id in dimension") + + seller_dim_contracted = freeze_seller_dim(df) + + return seller_dim_contracted + + # ------------------------------------------------------------ # BUILD BI SEMANTIC # ------------------------------------------------------------ def build_semantic_layer(run_context: RunContext) -> Dict: + """ + Semantic layer orchestrator. + + Builds seller performance semantic tables from the assembled + event layer and exports contract-compliant BI artifacts. + + Chronological behavior: + + - Initializes run-scoped reporting and logging helpers. + - Loads the assembled_events logical table. + - Fails fast if the assembled dataset is missing or empty. + - Executes semantic pipeline: + - seller_weekly_semantic (aggregation) + - freeze_seller_semantic (fact and dimension contracts) + - Generates run-partitioned output filenames. + - Exports semantic tables to the run-scoped semantic directory. + - Aggregates all findings into the returned report. + """ report = init_report() @@ -60,20 +300,31 @@ def error(msg): ) if df is None or df.empty: - error("assembled events is empty") + error("assembled_events logical table missing or empty") + report["status"] = "failed" + + return report + + try: + fact_seller, dim_seller = seller_weekly_semantic(df) + seller_fact_contracted = freeze_seller_semantic(fact_seller, "fact") + seller_dim_contracted = freeze_seller_semantic(dim_seller, "dim") + + except Exception as e: + error(str(e)) report["status"] = "failed" return report - fact_seller = pd.DataFrame() - dim_seller = pd.DataFrame() + year = run_context.run_id[:4] + month = run_context.run_id[4:6] - fulfillment_tables = { - "seller_week_fulfillment_fact.parquet": fact_seller, - "dim_seller.parquet": dim_seller, + seller_semantic_tables = { + f"seller_week_performance_fact_{year}_{month}.parquet": seller_fact_contracted, + f"dim_seller_{year}_{month}.parquet": seller_dim_contracted, } - for table_name, table in fulfillment_tables.items(): + for table_name, table in seller_semantic_tables.items(): output_path = run_context.semantic_path / table_name diff --git a/tests/stages/test_assemble_validated_events.py b/tests/stages/test_assemble_validated_events.py index 6a9ab01..8d6d7bf 100644 --- a/tests/stages/test_assemble_validated_events.py +++ b/tests/stages/test_assemble_validated_events.py @@ -254,7 +254,7 @@ def test_freeze_schema_enforces_strict_schema_success(valid_derived_df): "lead_time_days": "int64", "approval_lag_days": "int64", "delivery_delay_days": "int64", - "order_date": "object", + "order_date": "datetime64[ns]", "order_year": "int64", } @@ -300,11 +300,16 @@ def test_assemble_data_success( index=False, ) + year = run_context.run_id[:4] + month = run_context.run_id[4:6] + report = assemble_events(run_context) assert report["status"] == "success" - output_file = run_context.assembled_path / "assembled_events.parquet" + output_file = ( + run_context.assembled_path / f"assembled_events_{year}_{month}.parquet" + ) assert output_file.exists() diff --git a/tests/test_run_pipeline.py b/tests/test_run_pipeline.py index 8e02b1c..719fdbe 100644 --- a/tests/test_run_pipeline.py +++ b/tests/test_run_pipeline.py @@ -41,7 +41,7 @@ def test_main_exits_on_validation_1_errors(monkeypatch, tmp_path): main() assert e.value.code == 1 - assert (fake_ctx.logs_path / "validation_1.json").exists() + assert (fake_ctx.logs_path / "validation_initial.json").exists() def test_main_exits_on_validation_2_issues(monkeypatch, tmp_path): @@ -91,7 +91,7 @@ def fake_validation(*args, **kwargs): main() assert e.value.code == 1 - assert (fake_ctx.logs_path / "validation_2.json").exists() + assert (fake_ctx.logs_path / "validation_post_contract.json").exists() def test_main_success(monkeypatch, tmp_path): @@ -120,7 +120,20 @@ def test_main_success(monkeypatch, tmp_path): monkeypatch.setattr( "data_pipeline.run_pipeline.assemble_events", - lambda *a, **k: {"status": "success", "error": [], "info": []}, + lambda *a, **k: { + "status": "success", + "error": [], + "info": [], + }, # Pass, status success + ) + + monkeypatch.setattr( + "data_pipeline.run_pipeline.build_semantic_layer", + lambda *a, **k: { + "status": "success", + "error": [], + "info": [], + }, # Pass, status success ) monkeypatch.setattr( @@ -132,10 +145,11 @@ def test_main_success(monkeypatch, tmp_path): main() assert e.value.code == 0 - assert (fake_ctx.logs_path / "validation_1.json").exists() + assert (fake_ctx.logs_path / "validation_initial.json").exists() assert (fake_ctx.logs_path / "contract_report.json").exists() - assert (fake_ctx.logs_path / "validation_2.json").exists() + assert (fake_ctx.logs_path / "validation_post_contract.json").exists() assert (fake_ctx.logs_path / "assemble_report.json").exists() + assert (fake_ctx.logs_path / "semantic_report.json").exists() # =============================================================================