From 49932e92a4d025cf88ec31be0bf2e801dfb4c57a Mon Sep 17 00:00:00 2001 From: Bryan Melvida <126201239+BLMgithub@users.noreply.github.com> Date: Sat, 28 Feb 2026 10:40:19 +0800 Subject: [PATCH 1/3] initialize feature/publish-gate From 2a5c39184a25b9e97bd92cf0ef46846629f95ba0 Mon Sep 17 00:00:00 2001 From: Bryan Melvida <126201239+BLMgithub@users.noreply.github.com> Date: Sat, 28 Feb 2026 12:14:45 +0800 Subject: [PATCH 2/3] Chore: Move all table configs to shared/table_configs.py --- data_pipeline/shared/table_configs.py | 99 +++++++++++++++++++ .../stages/assemble_validated_events.py | 44 ++------- .../stages/build_bi_semantic_layer.py | 64 +++--------- 3 files changed, 118 insertions(+), 89 deletions(-) diff --git a/data_pipeline/shared/table_configs.py b/data_pipeline/shared/table_configs.py index 781f299..dccf9b1 100644 --- a/data_pipeline/shared/table_configs.py +++ b/data_pipeline/shared/table_configs.py @@ -2,6 +2,10 @@ # TABLE CONFIGURATIONS # ============================================================================= +# ------------------------------------------------------------ +# CONFIGURATIONS FOR validate_raw_data.py +# ------------------------------------------------------------ + TABLE_CONFIG = { "df_orders": { "role": "event_fact", @@ -81,3 +85,98 @@ "order_delivered_timestamp": "%Y-%m-%d %H:%M:%S", "order_estimated_delivery_date": "%Y-%m-%d", } + + +# ------------------------------------------------------------ +# CONFIGURATIONS FOR assemble_validate_events.py +# ------------------------------------------------------------ + +# Assemble events enforced schema and dtypes +ASSEMBLE_ENFORCED_SCHEMA = [ + "order_id", + "order_revenue", + "seller_id", + "product_id", + "order_status", + "order_purchase_timestamp", + "order_approved_at", + "order_delivered_timestamp", + "lead_time_days", + "approval_lag_days", + "delivery_delay_days", + "order_date", + "order_year", + "order_year_week", + "run_id", +] + +ASSEMBLE_ENFORCED_DTYPES = { + "order_id": "string", + "order_revenue": "float64", + "seller_id": "string", + "product_id": "string", + "order_status": "string", + "order_purchase_timestamp": "datetime64[ns]", + "order_approved_at": "datetime64[ns]", + "order_delivered_timestamp": "datetime64[ns]", + "lead_time_days": "int64", + "approval_lag_days": "int64", + "delivery_delay_days": "int64", + "order_date": "datetime64[ns]", + "order_year": "int64", +} + + +# ------------------------------------------------------------ +# CONFIGURATIONS FOR build_bi_semantic_layer.py +# ------------------------------------------------------------ + + +# Seller dimension enforced schema and dtypes +SELLER_DIM_ENFORCED_SCHEMA = [ + "seller_id", + "first_order_date", + "first_order_year_week", + "run_id", +] + +SELLER_DIM_ENFORCED_DTYPES = { + "seller_id": "string", + "first_order_date": "datetime64[ns]", + "first_order_year_week": "string", + "run_id": "string", +} + + +# Seller Facts enforced schema and dtypes +SELLER_FACT_ENFORCED_SCHEMA = [ + "seller_id", + "order_year_week", + "week_start_date", + "run_id", + "weekly_order_count", + "weekly_delivered_orders", + "weekly_cancelled_orders", + "weekly_revenue", + "weekly_avg_lead_time", + "weekly_total_lead_time", + "weekly_avg_delivery_delay", + "weekly_total_delivery_delay", + "weekly_avg_approval_lag", +] + +SELLER_FACT_ENFORCED_DTYPES = { + "seller_id": "string", + "order_year_week": "string", + "week_start_date": "datetime64[ns]", + "run_id": "string", + "weekly_order_count": "int64", + "weekly_delivered_orders": "int64", + "weekly_cancelled_orders": "int64", + "weekly_revenue": "float64", + "weekly_avg_lead_time": "float64", + "weekly_total_lead_time": "int64", + "weekly_avg_delivery_delay": "float64", + "weekly_total_delivery_delay": "int64", + "weekly_avg_approval_lag": "float64", +} diff --git a/data_pipeline/stages/assemble_validated_events.py b/data_pipeline/stages/assemble_validated_events.py index e425762..fe7d862 100644 --- a/data_pipeline/stages/assemble_validated_events.py +++ b/data_pipeline/stages/assemble_validated_events.py @@ -10,6 +10,10 @@ import pandas as pd from typing import Dict, List from data_pipeline.shared.run_context import RunContext +from data_pipeline.shared.table_configs import ( + ASSEMBLE_ENFORCED_SCHEMA, + ASSEMBLE_ENFORCED_DTYPES, +) from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file EVENT_TABLES = ["df_orders", "df_order_items", "df_payments"] @@ -136,46 +140,12 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame: - Resets index to produce a clean output frame """ - ENFORCED_SCHEMA = [ - "order_id", - "order_revenue", - "seller_id", - "product_id", - "order_status", - "order_purchase_timestamp", - "order_approved_at", - "order_delivered_timestamp", - "lead_time_days", - "approval_lag_days", - "delivery_delay_days", - "order_date", - "order_year", - "order_year_week", - "run_id", - ] - - ENFORCED_DTYPES = { - "order_id": "string", - "order_revenue": "float64", - "seller_id": "string", - "product_id": "string", - "order_status": "string", - "order_purchase_timestamp": "datetime64[ns]", - "order_approved_at": "datetime64[ns]", - "order_delivered_timestamp": "datetime64[ns]", - "lead_time_days": "int64", - "approval_lag_days": "int64", - "delivery_delay_days": "int64", - "order_date": "datetime64[ns]", - "order_year": "int64", - } - - missing_cols = set(ENFORCED_SCHEMA) - set(df.columns) + missing_cols = set(ASSEMBLE_ENFORCED_SCHEMA) - set(df.columns) if missing_cols: raise RuntimeError(f"missing required columns: {sorted(missing_cols)}") - df_contract = df[ENFORCED_SCHEMA].copy() - df_contract = df_contract.astype(ENFORCED_DTYPES) + df_contract = df[ASSEMBLE_ENFORCED_SCHEMA].copy() + df_contract = df_contract.astype(ASSEMBLE_ENFORCED_DTYPES) df_contract = df_contract.sort_values("order_id").reset_index(drop=True) return df_contract diff --git a/data_pipeline/stages/build_bi_semantic_layer.py b/data_pipeline/stages/build_bi_semantic_layer.py index 818407c..528f688 100644 --- a/data_pipeline/stages/build_bi_semantic_layer.py +++ b/data_pipeline/stages/build_bi_semantic_layer.py @@ -8,6 +8,12 @@ import pandas as pd from typing import Dict, List, Tuple, Literal from data_pipeline.shared.run_context import RunContext +from data_pipeline.shared.table_configs import ( + SELLER_DIM_ENFORCED_SCHEMA, + SELLER_DIM_ENFORCED_DTYPES, + SELLER_FACT_ENFORCED_SCHEMA, + SELLER_FACT_ENFORCED_DTYPES, +) from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file @@ -140,46 +146,14 @@ def freeze_seller_fact(df: pd.DataFrame) -> pd.DataFrame: fact_contract = df.copy() - FACT_SCHEMA = [ - "seller_id", - "order_year_week", - "week_start_date", - "run_id", - "weekly_order_count", - "weekly_delivered_orders", - "weekly_cancelled_orders", - "weekly_revenue", - "weekly_avg_lead_time", - "weekly_total_lead_time", - "weekly_avg_delivery_delay", - "weekly_total_delivery_delay", - "weekly_avg_approval_lag", - ] - - FACT_ENFORCED_DTYPES = { - "seller_id": "string", - "order_year_week": "string", - "week_start_date": "datetime64[ns]", - "run_id": "string", - "weekly_order_count": "int64", - "weekly_delivered_orders": "int64", - "weekly_cancelled_orders": "int64", - "weekly_revenue": "float64", - "weekly_avg_lead_time": "float64", - "weekly_total_lead_time": "int64", - "weekly_avg_delivery_delay": "float64", - "weekly_total_delivery_delay": "int64", - "weekly_avg_approval_lag": "float64", - } - - missing_cols = set(FACT_SCHEMA) - set(fact_contract.columns) + missing_cols = set(SELLER_FACT_ENFORCED_SCHEMA) - set(fact_contract.columns) if missing_cols: raise RuntimeError( f"seller_weekly_fact missing required column(s): {sorted(missing_cols)}" ) - fact_contract = fact_contract[FACT_SCHEMA].copy() - fact_contract = fact_contract.astype(FACT_ENFORCED_DTYPES) + fact_contract = fact_contract[SELLER_FACT_ENFORCED_SCHEMA].copy() + fact_contract = fact_contract.astype(SELLER_FACT_ENFORCED_DTYPES) fact_contract = fact_contract.sort_values( ["seller_id", "order_year_week"] ).reset_index(drop=True) @@ -204,28 +178,14 @@ def freeze_seller_dim(df: pd.DataFrame) -> pd.DataFrame: dim_contract = df.copy() - DIM_SCHEMA = [ - "seller_id", - "first_order_date", - "first_order_year_week", - "run_id", - ] - - DIM_ENFORCED_DTYPES = { - "seller_id": "string", - "first_order_date": "datetime64[ns]", - "first_order_year_week": "string", - "run_id": "string", - } - - missing_cols = set(DIM_SCHEMA) - set(dim_contract.columns) + missing_cols = set(SELLER_DIM_ENFORCED_SCHEMA) - set(dim_contract.columns) if missing_cols: raise RuntimeError( f"seller_dim missing required column(s): {sorted(missing_cols)}" ) - dim_contract = dim_contract[DIM_SCHEMA].copy() - dim_contract = dim_contract.astype(DIM_ENFORCED_DTYPES) + dim_contract = dim_contract[SELLER_DIM_ENFORCED_SCHEMA].copy() + dim_contract = dim_contract.astype(SELLER_DIM_ENFORCED_DTYPES) dim_contract = dim_contract.sort_values("seller_id").reset_index(drop=True) return dim_contract From e0bc4c01c2bb8f42e57d87f0e0647b51770950d3 Mon Sep 17 00:00:00 2001 From: Bryan Melvida <126201239+BLMgithub@users.noreply.github.com> Date: Sat, 28 Feb 2026 16:57:02 +0800 Subject: [PATCH 3/3] feat: Add metadata lifecycle + integrity gate - initialize metadata and finalize_run in orchestrator - implement pre-publish integrity validation (publish_lifecycle.py) - integrate integrity gate into run_pipeline - add test coverage for publish_lifecycle --- data_pipeline/run_pipeline.py | 121 +++++++-- .../stages/build_bi_semantic_layer.py | 2 +- data_pipeline/stages/publish_lifecycle.py | 131 ++++++++++ data_pipeline/stages/validate_raw_data.py | 12 +- tests/stages/test_build_bi_semantic_layer.py | 2 +- tests/stages/test_publish_lifecycle.py | 232 ++++++++++++++++++ tests/stages/test_validate_raw_data.py | 4 +- tests/test_run_pipeline.py | 9 + 8 files changed, 490 insertions(+), 23 deletions(-) create mode 100644 data_pipeline/stages/publish_lifecycle.py create mode 100644 tests/stages/test_publish_lifecycle.py diff --git a/data_pipeline/run_pipeline.py b/data_pipeline/run_pipeline.py index b89926f..73d5ee1 100644 --- a/data_pipeline/run_pipeline.py +++ b/data_pipeline/run_pipeline.py @@ -4,15 +4,23 @@ from pathlib import Path from shutil import copytree +from datetime import datetime as dt import sys import json + from data_pipeline.shared.table_configs import TABLE_CONFIG from data_pipeline.shared.run_context import RunContext from data_pipeline.stages.validate_raw_data import apply_validation from data_pipeline.stages.apply_raw_data_contract import apply_contract from data_pipeline.stages.assemble_validated_events import assemble_events from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer +from data_pipeline.stages.publish_lifecycle import run_integrity_gate + + +# ------------------------------------------------------------ +# SUPPORTING UTILITIES +# ------------------------------------------------------------ def snapshot_raw(run_context: RunContext) -> None: @@ -39,29 +47,100 @@ def persist_json(path: Path, payload: dict) -> None: json.dump(payload, f, indent=2) +def initiliaze_metadata(run_context: RunContext) -> None: + """ + Run metadata initializer. + + Creates the run-scoped metadata record at pipeline start to + establish lifecycle tracking and publish eligibility state. + """ + + payload = { + "run_id": run_context.run_id, + "status": "RUNNING", + "started_at": dt.utcnow().isoformat(), + "completed_at": None, + "published": False, + } + + persist_json(run_context.metadata_path, payload) + + +def finalize_run(run_context: RunContext, status: str) -> None: + """ + Run metadata finalizer. + + Updates the run metadata record with terminal status and + completion timestamp. + """ + + if not run_context.metadata_path.exists(): + raise RuntimeError("metadata.json missing during finalization") + + with open(run_context.metadata_path, "r") as file: + payload = json.load(file) + + payload["status"] = status + payload["complete_at"] = dt.utcnow().isoformat() + + if status == "SUCCESS": + payload["published"] = True + + else: + payload["published"] = False + + persist_json(run_context.metadata_path, payload) + + +# ------------------------------------------------------------ +# PIPELINE ORCHESTRATOR +# ------------------------------------------------------------ + + def main() -> None: + """ + Pipeline execution controller. + + Execution order: + + 1. Initialize run context and directory structure. + 2. Capture raw snapshot and initialize metadata. + 3. Run initial validation on raw data. + - Exit if structural errors exist. + 4. Apply table contracts in configured parent → child order, + propagating invalid order_ids. + 5. Rerun validation on contracted data. + - Exit if any errors or warnings remain. + 6. Assemble the core event table. + - Exit on assembly failure. + 7. Build semantic layer tables. + - Exit on semantic failure. + 8. Run pre-publish semantic integrity gate. + - Exit if gate fails. + 9. Exit process with success code. + """ + run_context = RunContext.create() run_context.initialize_directories() # Create raw snapshot at runtime snapshot_raw(run_context) - - report_validation_initial = [] + initiliaze_metadata(run_context) # Initial validation validation_initial = apply_validation(run_context) - report_validation_initial.append(validation_initial) persist_json( run_context.logs_path / "validation_initial.json", { "run_id": run_context.run_id, - "report": report_validation_initial, + "report": validation_initial, }, ) # Early exit for structural errors else apply contract if validation_initial["errors"]: + finalize_run(run_context, "FAILED") sys.exit(1) report_contract = [] @@ -90,60 +169,68 @@ def main() -> None: }, ) - report_validation_post_contract = [] - # Rerun validation on CONTRACTED data validation_post_contract = apply_validation( run_context, base_path=run_context.contracted_path, ) - report_validation_post_contract.append(validation_post_contract) - persist_json( run_context.logs_path / "validation_post_contract.json", { "run_id": run_context.run_id, - "report": report_validation_post_contract, + "report": validation_post_contract, }, ) # Intervention: Either manual fixing or escalate the data to source owner if validation_post_contract["errors"] or validation_post_contract["warnings"]: + finalize_run(run_context, "FAILED") sys.exit(1) - report_assemble = [] - # Assemble event table assemble = assemble_events(run_context) - report_assemble.append(assemble) persist_json( run_context.logs_path / "assemble_report.json", { "run_id": run_context.run_id, - "report": report_assemble, + "report": assemble, }, ) if assemble["status"] == "failed": + finalize_run(run_context, "FAILED") sys.exit(1) - report_semantic = [] - # Semantic modeling semantic = build_semantic_layer(run_context) - report_semantic.append(semantic) persist_json( run_context.logs_path / "semantic_report.json", { "run_id": run_context.run_id, - "report": report_semantic, + "report": semantic, }, ) if semantic["status"] == "failed": + finalize_run(run_context, "FAILED") + sys.exit(1) + + # Pre-publish semantic integrity validation + gate = run_integrity_gate(run_context) + + persist_json( + run_context.logs_path / "publish_integrity_report.json", + { + "run_id": run_context.run_id, + "report": gate, + }, + ) + + if gate["status"] == "failed": + finalize_run(run_context, "FAILED") sys.exit(1) sys.exit(0) diff --git a/data_pipeline/stages/build_bi_semantic_layer.py b/data_pipeline/stages/build_bi_semantic_layer.py index 528f688..fa899a0 100644 --- a/data_pipeline/stages/build_bi_semantic_layer.py +++ b/data_pipeline/stages/build_bi_semantic_layer.py @@ -271,7 +271,7 @@ def error(msg): seller_semantic_tables = { f"seller_week_performance_fact_{year}_{month}.parquet": seller_fact_contracted, - f"dim_seller_{year}_{month}.parquet": seller_dim_contracted, + f"seller_dim_{year}_{month}.parquet": seller_dim_contracted, } for table_name, table in seller_semantic_tables.items(): diff --git a/data_pipeline/stages/publish_lifecycle.py b/data_pipeline/stages/publish_lifecycle.py new file mode 100644 index 0000000..535db55 --- /dev/null +++ b/data_pipeline/stages/publish_lifecycle.py @@ -0,0 +1,131 @@ +# ============================================================================= +# PUBLISH ACTIVATION GATE +# ============================================================================= + +import pandas as pd + +from typing import Dict, List +from data_pipeline.shared.run_context import RunContext +from data_pipeline.shared.table_configs import ( + SELLER_FACT_ENFORCED_SCHEMA, + SELLER_DIM_ENFORCED_SCHEMA, +) + +# ------------------------------------------------------------ +# ASSEMBLE REPORT & LOGS +# ------------------------------------------------------------ + + +def init_report(): + return {"status": "success", "errors": [], "info": []} + + +def log_info(message: str, report: Dict[str, List[str]]) -> None: + print(f"[INFO] {message}") + report["info"].append(message) + + +def log_error(message: str, report: Dict[str, list[str]]) -> None: + print(f"[ERROR] {message}") + report["errors"].append(message) + + +# ------------------------------------------------------------ +# PRE-PUBLISH INTEGRITY GATE +# ------------------------------------------------------------ + + +def run_integrity_gate(run_context: RunContext) -> Dict: + """ + Pre-publish semantic integrity gate. + + Verifies that the semantic layer is complete, structurally valid, + and safe for downstream consumption before any publish action. + + Chronological behavior: + + - Initializes run-scoped reporting. + - Validates semantic output directory exists. + - Confirms actual parquet file set exactly matches the expected set. + - Loads each required semantic table. + - Validates each table is readable and non-empty. + - Verifies required schema columns are present per table type. + - Emits success signal when all checks pass. + + Gate intent: + + - Detect partial publishes + - Detect schema drift entering BI layer + - Detect empty or corrupt semantic outputs + """ + + report = init_report() + semantic_path = run_context.semantic_path + + year = run_context.run_id[:4] + month = run_context.run_id[4:6] + + # Validate semantic directory exists + if not semantic_path.exists(): + log_error("Semantic directory is missing", report) + report["status"] = "failed" + + return report + + # Validate expected semantic file set exactly matches required set + seller_expected_files = { + f"seller_week_performance_fact_{year}_{month}.parquet", + f"seller_dim_{year}_{month}.parquet", + } + + seller_actual_files = { + file.name for file in run_context.semantic_path.glob("*.parquet") + } + + if seller_actual_files != seller_expected_files: + log_error("Semantic file set mismatch", report) + report["status"] = "failed" + + return report + + # Validate required parquet files exist + for file_name in seller_expected_files: + path = semantic_path / file_name + + try: + df = pd.read_parquet(path) + + except Exception as e: + log_error(f"{file_name} failed to load: {e}", report) + report["status"] = "failed" + + return report + + # Validate dataframe not empty + if df is None or df.empty: + log_error(f"{file_name} logical table missing or empty", report) + report["status"] = "failed" + + return report + + # Validate required schema columns present + if "seller_week_performance_fact" in file_name: + required_cols = SELLER_FACT_ENFORCED_SCHEMA + else: + required_cols = SELLER_DIM_ENFORCED_SCHEMA + + missing = set(required_cols) - set(df.columns) + + if missing: + log_error(f"{file_name} required column(s): {sorted(missing)}", report) + report["status"] = "failed" + + return report + + log_info("Pre-publishing validation passed", report) + return report + + +# ============================================================================= +# END OF SCRIPT +# ============================================================================= diff --git a/data_pipeline/stages/validate_raw_data.py b/data_pipeline/stages/validate_raw_data.py index db9a320..27a754f 100644 --- a/data_pipeline/stages/validate_raw_data.py +++ b/data_pipeline/stages/validate_raw_data.py @@ -23,8 +23,13 @@ # ------------------------------------------------------------ -def init_report() -> Dict[str, List[str]]: - return {"errors": [], "warnings": [], "info": []} +def init_report(): + return { + "status": "success", + "errors": [], + "warnings": [], + "info": [], + } def log_info(message: str, report: Dict[str, List[str]]) -> None: @@ -389,6 +394,9 @@ def error(msg: str): run_cross_table_validations(tables, report) + if len(report["warnings"] or report["errors"]) > 0: + report["status"] = "failed" + return report diff --git a/tests/stages/test_build_bi_semantic_layer.py b/tests/stages/test_build_bi_semantic_layer.py index e1809c9..d53e8cc 100644 --- a/tests/stages/test_build_bi_semantic_layer.py +++ b/tests/stages/test_build_bi_semantic_layer.py @@ -290,7 +290,7 @@ def test_build_semantic_layer_success(tmp_path, valid_assembled_df): run_context.semantic_path / "seller_week_performance_fact_dumm_y_.parquet" ) - output_path_dim = run_context.semantic_path / "dim_seller_dumm_y_.parquet" + output_path_dim = run_context.semantic_path / "seller_dim_dumm_y_.parquet" assert report["status"] == "success" assert output_path_seller.exists() diff --git a/tests/stages/test_publish_lifecycle.py b/tests/stages/test_publish_lifecycle.py new file mode 100644 index 0000000..50d2c90 --- /dev/null +++ b/tests/stages/test_publish_lifecycle.py @@ -0,0 +1,232 @@ +# ============================================================================= +# UNIT TESTS FOR publish_lifecycle.py +# ============================================================================= + +import pandas as pd +import pytest + +from data_pipeline.shared.run_context import RunContext +from data_pipeline.stages.publish_lifecycle import ( + init_report, + log_info, + log_error, + run_integrity_gate, +) + + +@pytest.fixture +def empty_report(): + return init_report() + + +@pytest.fixture +def valid_seller_fact(): + return pd.DataFrame( + { + "seller_id": pd.Series( + ["seller1", "seller2"], + dtype="string", + ), + "order_year_week": pd.Series( + ["2023-W01", "2023-W02"], + dtype="string", + ), + "week_start_date": pd.Series( + ["2023-01-02 09:00:00", "2023-01-04 15:00:00"], + dtype="datetime64[ns]", + ), + "run_id": pd.Series(["run_1", "run_1"], dtype="string"), + "weekly_order_count": pd.Series([12, 34], dtype="int64"), + "weekly_delivered_orders": pd.Series([5, 6], dtype="int64"), + "weekly_cancelled_orders": pd.Series([7, 8], dtype="int64"), + "weekly_revenue": pd.Series([12.3, 45.6], dtype="float64"), + "weekly_avg_lead_time": pd.Series([5.34, 6.45], dtype="float64"), + "weekly_total_lead_time": pd.Series([5, 6], dtype="int64"), + "weekly_avg_delivery_delay": pd.Series([54.50, 67.89], dtype="float64"), + "weekly_total_delivery_delay": pd.Series([10, 11], dtype="int64"), + "weekly_avg_approval_lag": pd.Series([12.3, 14.5], dtype="float64"), + } + ) + + +@pytest.fixture +def valid_seller_dim(): + return pd.DataFrame( + { + "seller_id": pd.Series(["seller1", "seller2"], dtype="string"), + "first_order_date": pd.Series( + ["2023-03-02 09:00:00", "2023-05-04 11:00:00"], dtype="datetime64[ns]" + ), + "first_order_year_week": pd.Series( + ["2023-W03", "2023-W05"], dtype="string" + ), + "run_id": pd.Series(["run_1", "run_1"], dtype="string"), + } + ) + + +# ------------------------------------------------------------ +# # REPORTING & LOGS +# ------------------------------------------------------------ + + +def test_init_report_structure(): + report = init_report() + + assert set(report.keys()) == {"status", "errors", "info"} + assert all(isinstance(v, list | str) for v in report.values()) + + +def test_log_error_appends_only_to_errors(empty_report): + log_error("error", empty_report) + + assert empty_report["errors"] == ["error"] + + +def test_log_info_appends_only_to_info(empty_report): + log_info("info", empty_report) + + assert empty_report["info"] == ["info"] + + +# ------------------------------------------------------------ +# PRE-PUBLISH VALIDATION GATE +# ------------------------------------------------------------ + + +def test_run_integrity_gate_success( + tmp_path, + valid_seller_fact, + valid_seller_dim, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + valid_seller_fact.to_parquet( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + + valid_seller_dim.to_parquet( + run_context.semantic_path / "seller_dim_2023_01.parquet", + index=False, + ) + + report = run_integrity_gate(run_context) + + assert "success" in report["status"] + assert "Pre-publishing validation passed" in report["info"] + + +def test_run_integrity_gate_fails_on_missing_directory(): + + run_context = RunContext.create() + + report = run_integrity_gate(run_context) + + assert "failed" in report["status"] + assert "Semantic directory is missing" in report["errors"] + + +def test_run_integrity_gate_fails_on_semantic_file_mismatch( + tmp_path, + valid_seller_fact, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + valid_seller_fact.to_parquet( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + + # Missing valid_seller_dim in semantic/ + report = run_integrity_gate(run_context) + + assert "failed" in report["status"] + assert "Semantic file set mismatch" in report["errors"] + + +def test_run_integrity_gate_fails_on_loading_parquet_files( + tmp_path, + valid_seller_fact, + valid_seller_dim, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + valid_seller_fact.to_csv( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + + valid_seller_dim.to_csv( + run_context.semantic_path / "seller_dim_2023_01.parquet", + index=False, + ) + + report = run_integrity_gate(run_context) + + assert "failed" in report["status"] + assert any("parquet failed to load" in error for error in report["errors"]) + + +def test_run_integrity_gate_fails_on_empty_dataframe(tmp_path): + + empty_seller_fact = pd.DataFrame() + empty_seller_dim = pd.DataFrame() + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + empty_seller_fact.to_parquet( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + + empty_seller_dim.to_parquet( + run_context.semantic_path / "seller_dim_2023_01.parquet", + index=False, + ) + + report = run_integrity_gate(run_context) + + assert "failed" in report["status"] + assert any("logical table missing or empty" in error for error in report["errors"]) + + +def test_run_integrity_gate_fails_on_missing_columns( + tmp_path, + valid_seller_fact, + valid_seller_dim, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + valid_seller_fact = valid_seller_fact.drop(columns="seller_id") + + valid_seller_fact.to_parquet( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + + valid_seller_dim.to_parquet( + run_context.semantic_path / "seller_dim_2023_01.parquet", + index=False, + ) + + report = run_integrity_gate(run_context) + + assert "failed" in report["status"] + assert any( + "required column(s): ['seller_id']" in error for error in report["errors"] + ) + + +# ============================================================================= +# UNIT TESTS END +# ============================================================================= diff --git a/tests/stages/test_validate_raw_data.py b/tests/stages/test_validate_raw_data.py index d7005f5..145b2ed 100644 --- a/tests/stages/test_validate_raw_data.py +++ b/tests/stages/test_validate_raw_data.py @@ -104,8 +104,8 @@ def valid_products_df(): def test_init_report_structure(): report = init_report() - assert set(report.keys()) == {"errors", "warnings", "info"} - assert all(isinstance(v, list) for v in report.values()) + assert set(report.keys()) == {"status", "errors", "warnings", "info"} + assert all(isinstance(v, list | str) for v in report.values()) def test_log_error_appends_only_to_errors(empty_report): diff --git a/tests/test_run_pipeline.py b/tests/test_run_pipeline.py index 719fdbe..1fb2104 100644 --- a/tests/test_run_pipeline.py +++ b/tests/test_run_pipeline.py @@ -136,6 +136,15 @@ def test_main_success(monkeypatch, tmp_path): }, # Pass, status success ) + monkeypatch.setattr( + "data_pipeline.run_pipeline.run_integrity_gate", + lambda *a, **k: { + "status": "success", + "errors": [], + "info": [], + }, # Pass, status success + ) + monkeypatch.setattr( "data_pipeline.run_pipeline.snapshot_raw", lambda *_: None,