Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion data_pipeline/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from data_pipeline.stages.apply_raw_data_contract import apply_contract
from data_pipeline.stages.assemble_validated_events import assemble_events
from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer
from data_pipeline.stages.publish_lifecycle import run_integrity_gate
from data_pipeline.stages.publish_lifecycle import (
run_integrity_gate,
promote_semantic_version,
)


# ------------------------------------------------------------
Expand Down Expand Up @@ -233,6 +236,21 @@ def main() -> None:
finalize_run(run_context, "FAILED")
sys.exit(1)

# Copy validated semantics to version directory
promotion = promote_semantic_version(run_context)

persist_json(
run_context.logs_path / "publish_promotion_report.json",
{
"run_id": run_context.run_id,
"report": promotion,
},
)

if promotion["status"] == "failed":
finalize_run(run_context, "FAILED")
sys.exit(1)

sys.exit(0)


Expand Down
65 changes: 65 additions & 0 deletions data_pipeline/stages/publish_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# =============================================================================

import pandas as pd
import shutil

from typing import Dict, List
from data_pipeline.shared.run_context import RunContext
Expand Down Expand Up @@ -126,6 +127,70 @@ def run_integrity_gate(run_context: RunContext) -> Dict:
return report


# ------------------------------------------------------------
# PROMOTE VALIDATED SEMANTIC
# ------------------------------------------------------------


def promote_semantic_version(run_context: RunContext) -> Dict:
"""
Semantic version promotion step.

Publishes the validated semantic artifacts into the run-scoped
version directory for downstream consumption and lineage tracking.

Chronological behavior:

- Initializes run-scoped reporting.
- Verifies the target version directory does not already exist.
- Creates the version directory for the current run.
- Copies all semantic parquet artifacts into the version directory.
- Emits success signal when promotion completes.

Promotion intent:

- Create an immutable, run-versioned semantic snapshot
- Provide a stable handoff point for BI consumption
- Preserve lineage between run_id and published artifacts
"""

report = init_report()

semantic_path = run_context.semantic_path
version_path = run_context.version_path

if version_path.exists():
report["status"] = "failed"
log_error("Version directory already exists", report)

return report

# Create version directory
try:
version_path.mkdir(parents=True, exist_ok=False)

except Exception as e:
report["status"] = "failed"
log_error(str(e), report)

return report

# Copy validated semantics to version directory
try:
for file in semantic_path.glob("*.parquet"):
shutil.copy2(file, version_path / file.name)

except Exception as e:
report["status"] = "failed"
log_error(str(e), report)

return report

log_info("Semantic artifacts promoted successfully", report)

return report


# =============================================================================
# END OF SCRIPT
# =============================================================================
2 changes: 1 addition & 1 deletion data_pipeline/stages/validate_raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def run_event_fact_validations(
invalid_count = ts.isna().sum()
if invalid_count > 0:
log_warning(
f"{table_name}: {invalid_count} unparsable timestamp value(s) in `{col}`",
f"{table_name}: {invalid_count} unparsable timestamp value(s) in {col}",
report,
)

Expand Down
110 changes: 108 additions & 2 deletions tests/stages/test_publish_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import pandas as pd
import pytest
import shutil

from data_pipeline.shared.run_context import RunContext
from data_pipeline.stages.publish_lifecycle import (
init_report,
log_info,
log_error,
run_integrity_gate,
promote_semantic_version,
)


Expand Down Expand Up @@ -119,9 +121,9 @@ def test_run_integrity_gate_success(
assert "Pre-publishing validation passed" in report["info"]


def test_run_integrity_gate_fails_on_missing_directory():
def test_run_integrity_gate_fails_on_missing_directory(tmp_path):

run_context = RunContext.create()
run_context = RunContext.create(base_path=tmp_path)

report = run_integrity_gate(run_context)

Expand Down Expand Up @@ -227,6 +229,110 @@ def test_run_integrity_gate_fails_on_missing_columns(
)


# ------------------------------------------------------------
# PRE-PUBLISH VALIDATION GATE
# ------------------------------------------------------------


def test_promote_semantic_version_success(
tmp_path,
valid_seller_fact,
valid_seller_dim,
):

run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
run_context.initialize_directories()

valid_seller_fact.to_parquet(
run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet",
index=False,
)

valid_seller_dim.to_parquet(
run_context.semantic_path / "seller_dim_2023_01.parquet",
index=False,
)

report = promote_semantic_version(run_context)

assert "success" in report["status"]
assert "Semantic artifacts promoted successfully" in report["info"]


def test_promote_semantic_version_fails_on_existing_version_directory(
tmp_path,
valid_seller_fact,
valid_seller_dim,
):

run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
run_context.initialize_directories()

valid_seller_fact.to_parquet(
run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet",
index=False,
)

valid_seller_dim.to_parquet(
run_context.semantic_path / "seller_dim_2023_01.parquet",
index=False,
)

# Initial run that created the directory
_ = promote_semantic_version(run_context)

# Fails due to existing version directory on same run_id
report = promote_semantic_version(run_context)

assert "failed" in report["status"]
assert "Version directory already exists" in report["errors"]


def test_promote_semantic_version_fails_on_making_directory(tmp_path):

run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
run_context.initialize_directories()

# Force mkdir to raise
run_context.version_path.mkdir(parents=True)

report = promote_semantic_version(run_context)

assert report["status"] == "failed"
assert any("File exists" in e or "exists" in e for e in report["errors"])


def test_promote_semantic_version_fails_on_copying_semantic(
tmp_path,
monkeypatch,
valid_seller_fact,
valid_seller_dim,
):

run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
run_context.initialize_directories()

valid_seller_fact.to_parquet(
run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet",
index=False,
)
valid_seller_dim.to_parquet(
run_context.semantic_path / "seller_dim_2023_01.parquet",
index=False,
)

# force shutil.copy2 to raise
def mock_copy2(*args, **kwargs):
raise RuntimeError("copy failure")

monkeypatch.setattr(shutil, "copy2", mock_copy2)

report = promote_semantic_version(run_context)

assert report["status"] == "failed"
assert any("copy failure" in e for e in report["errors"])


# =============================================================================
# UNIT TESTS END
# =============================================================================
16 changes: 15 additions & 1 deletion tests/test_run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ def test_main_success(monkeypatch, tmp_path):

monkeypatch.setattr(
"data_pipeline.run_pipeline.apply_validation",
lambda *a, **k: {"errors": [], "warnings": []}, # Pass all validations
lambda *a, **k: {
"errors": [],
"warnings": [],
}, # Pass all validations
)

monkeypatch.setattr(
Expand Down Expand Up @@ -145,6 +148,15 @@ def test_main_success(monkeypatch, tmp_path):
}, # Pass, status success
)

monkeypatch.setattr(
"data_pipeline.run_pipeline.promote_semantic_version",
lambda *a, **k: {
"status": "success",
"errors": [],
"info": [],
}, # Pass, status success
)

monkeypatch.setattr(
"data_pipeline.run_pipeline.snapshot_raw",
lambda *_: None,
Expand All @@ -159,6 +171,8 @@ def test_main_success(monkeypatch, tmp_path):
assert (fake_ctx.logs_path / "validation_post_contract.json").exists()
assert (fake_ctx.logs_path / "assemble_report.json").exists()
assert (fake_ctx.logs_path / "semantic_report.json").exists()
assert (fake_ctx.logs_path / "publish_integrity_report.json").exists()
assert (fake_ctx.logs_path / "publish_promotion_report.json").exists()


# =============================================================================
Expand Down