diff --git a/data_pipeline/run_pipeline.py b/data_pipeline/run_pipeline.py index 73d5ee1..28f75f6 100644 --- a/data_pipeline/run_pipeline.py +++ b/data_pipeline/run_pipeline.py @@ -15,7 +15,10 @@ from data_pipeline.stages.apply_raw_data_contract import apply_contract from data_pipeline.stages.assemble_validated_events import assemble_events from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer -from data_pipeline.stages.publish_lifecycle import run_integrity_gate +from data_pipeline.stages.publish_lifecycle import ( + run_integrity_gate, + promote_semantic_version, +) # ------------------------------------------------------------ @@ -233,6 +236,21 @@ def main() -> None: finalize_run(run_context, "FAILED") sys.exit(1) + # Copy validated semantics to version directory + promotion = promote_semantic_version(run_context) + + persist_json( + run_context.logs_path / "publish_promotion_report.json", + { + "run_id": run_context.run_id, + "report": promotion, + }, + ) + + if promotion["status"] == "failed": + finalize_run(run_context, "FAILED") + sys.exit(1) + sys.exit(0) diff --git a/data_pipeline/stages/publish_lifecycle.py b/data_pipeline/stages/publish_lifecycle.py index 535db55..cb7fc50 100644 --- a/data_pipeline/stages/publish_lifecycle.py +++ b/data_pipeline/stages/publish_lifecycle.py @@ -3,6 +3,7 @@ # ============================================================================= import pandas as pd +import shutil from typing import Dict, List from data_pipeline.shared.run_context import RunContext @@ -126,6 +127,70 @@ def run_integrity_gate(run_context: RunContext) -> Dict: return report +# ------------------------------------------------------------ +# PROMOTE VALIDATED SEMANTIC +# ------------------------------------------------------------ + + +def promote_semantic_version(run_context: RunContext) -> Dict: + """ + Semantic version promotion step. + + Publishes the validated semantic artifacts into the run-scoped + version directory for downstream consumption and lineage tracking. + + Chronological behavior: + + - Initializes run-scoped reporting. + - Verifies the target version directory does not already exist. + - Creates the version directory for the current run. + - Copies all semantic parquet artifacts into the version directory. + - Emits success signal when promotion completes. + + Promotion intent: + + - Create an immutable, run-versioned semantic snapshot + - Provide a stable handoff point for BI consumption + - Preserve lineage between run_id and published artifacts + """ + + report = init_report() + + semantic_path = run_context.semantic_path + version_path = run_context.version_path + + if version_path.exists(): + report["status"] = "failed" + log_error("Version directory already exists", report) + + return report + + # Create version directory + try: + version_path.mkdir(parents=True, exist_ok=False) + + except Exception as e: + report["status"] = "failed" + log_error(str(e), report) + + return report + + # Copy validated semantics to version directory + try: + for file in semantic_path.glob("*.parquet"): + shutil.copy2(file, version_path / file.name) + + except Exception as e: + report["status"] = "failed" + log_error(str(e), report) + + return report + + log_info("Semantic artifacts promoted successfully", report) + + return report + + # ============================================================================= # END OF SCRIPT # ============================================================================= diff --git a/data_pipeline/stages/validate_raw_data.py b/data_pipeline/stages/validate_raw_data.py index 27a754f..c80d724 100644 --- a/data_pipeline/stages/validate_raw_data.py +++ b/data_pipeline/stages/validate_raw_data.py @@ -206,7 +206,7 @@ def run_event_fact_validations( invalid_count = ts.isna().sum() if invalid_count > 0: log_warning( - f"{table_name}: {invalid_count} unparsable timestamp value(s) in `{col}`", + f"{table_name}: {invalid_count} unparsable timestamp value(s) in {col}", report, ) diff --git a/tests/stages/test_publish_lifecycle.py b/tests/stages/test_publish_lifecycle.py index 50d2c90..7bc4122 100644 --- a/tests/stages/test_publish_lifecycle.py +++ b/tests/stages/test_publish_lifecycle.py @@ -4,6 +4,7 @@ import pandas as pd import pytest +import shutil from data_pipeline.shared.run_context import RunContext from data_pipeline.stages.publish_lifecycle import ( @@ -11,6 +12,7 @@ log_info, log_error, run_integrity_gate, + promote_semantic_version, ) @@ -119,9 +121,9 @@ def test_run_integrity_gate_success( assert "Pre-publishing validation passed" in report["info"] -def test_run_integrity_gate_fails_on_missing_directory(): +def test_run_integrity_gate_fails_on_missing_directory(tmp_path): - run_context = RunContext.create() + run_context = RunContext.create(base_path=tmp_path) report = run_integrity_gate(run_context) @@ -227,6 +229,110 @@ def test_run_integrity_gate_fails_on_missing_columns( ) +# ------------------------------------------------------------ +# PRE-PUBLISH VALIDATION GATE +# ------------------------------------------------------------ + + +def test_promote_semantic_version_success( + tmp_path, + valid_seller_fact, + valid_seller_dim, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + valid_seller_fact.to_parquet( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + + valid_seller_dim.to_parquet( + run_context.semantic_path / "seller_dim_2023_01.parquet", + index=False, + ) + + report = promote_semantic_version(run_context) + + assert "success" in report["status"] + assert "Semantic artifacts promoted successfully" in report["info"] + + +def test_promote_semantic_version_fails_on_existing_version_directory( + tmp_path, + valid_seller_fact, + valid_seller_dim, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + valid_seller_fact.to_parquet( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + + valid_seller_dim.to_parquet( + run_context.semantic_path / "seller_dim_2023_01.parquet", + index=False, + ) + + # Initial run that created the directory + _ = promote_semantic_version(run_context) + + # Fails due to existing version directory on same run_id + report = promote_semantic_version(run_context) + + assert "failed" in report["status"] + assert "Version directory already exists" in report["errors"] + + +def test_promote_semantic_version_fails_on_making_directory(tmp_path): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + # Force mkdir to raise + run_context.version_path.mkdir(parents=True) + + report = promote_semantic_version(run_context) + + assert report["status"] == "failed" + assert any("File exists" in e or "exists" in e for e in report["errors"]) + + +def test_promote_semantic_version_fails_on_copying_semantic( + tmp_path, + monkeypatch, + valid_seller_fact, + valid_seller_dim, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123") + run_context.initialize_directories() + + valid_seller_fact.to_parquet( + run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet", + index=False, + ) + valid_seller_dim.to_parquet( + run_context.semantic_path / "seller_dim_2023_01.parquet", + index=False, + ) + + # force shutil.copy2 to raise + def mock_copy2(*args, **kwargs): + raise RuntimeError("copy failure") + + monkeypatch.setattr(shutil, "copy2", mock_copy2) + + report = promote_semantic_version(run_context) + + assert report["status"] == "failed" + assert any("copy failure" in e for e in report["errors"]) + + # ============================================================================= # UNIT TESTS END # ============================================================================= diff --git a/tests/test_run_pipeline.py b/tests/test_run_pipeline.py index 1fb2104..52a3f5e 100644 --- a/tests/test_run_pipeline.py +++ b/tests/test_run_pipeline.py @@ -110,7 +110,10 @@ def test_main_success(monkeypatch, tmp_path): monkeypatch.setattr( "data_pipeline.run_pipeline.apply_validation", - lambda *a, **k: {"errors": [], "warnings": []}, # Pass all validations + lambda *a, **k: { + "errors": [], + "warnings": [], + }, # Pass all validations ) monkeypatch.setattr( @@ -145,6 +148,15 @@ def test_main_success(monkeypatch, tmp_path): }, # Pass, status success ) + monkeypatch.setattr( + "data_pipeline.run_pipeline.promote_semantic_version", + lambda *a, **k: { + "status": "success", + "errors": [], + "info": [], + }, # Pass, status success + ) + monkeypatch.setattr( "data_pipeline.run_pipeline.snapshot_raw", lambda *_: None, @@ -159,6 +171,8 @@ def test_main_success(monkeypatch, tmp_path): assert (fake_ctx.logs_path / "validation_post_contract.json").exists() assert (fake_ctx.logs_path / "assemble_report.json").exists() assert (fake_ctx.logs_path / "semantic_report.json").exists() + assert (fake_ctx.logs_path / "publish_integrity_report.json").exists() + assert (fake_ctx.logs_path / "publish_promotion_report.json").exists() # =============================================================================