Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion data_pipeline/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from data_pipeline.stages.publish_lifecycle import (
run_integrity_gate,
promote_semantic_version,
activate_published_version,
)


Expand Down Expand Up @@ -84,7 +85,7 @@ def finalize_run(run_context: RunContext, status: str) -> None:
payload = json.load(file)

payload["status"] = status
payload["complete_at"] = dt.utcnow().isoformat()
payload["completed_at"] = dt.utcnow().isoformat()

if status == "SUCCESS":
payload["published"] = True
Expand Down Expand Up @@ -251,6 +252,21 @@ def main() -> None:
finalize_run(run_context, "FAILED")
sys.exit(1)

finalize_run(run_context, "SUCCESS")

activation = activate_published_version(run_context)

persist_json(
run_context.logs_path / "publish_activation_report.json",
{
"run_id": run_context.run_id,
"report": activation,
},
)

if activation["status"] == "failed":
sys.exit(1)

sys.exit(0)


Expand Down
56 changes: 55 additions & 1 deletion data_pipeline/stages/publish_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import pandas as pd
import shutil
from datetime import datetime as dt
import json
import os

from typing import Dict, List
from data_pipeline.shared.run_context import RunContext
Expand Down Expand Up @@ -157,7 +160,7 @@ def promote_semantic_version(run_context: RunContext) -> Dict:
report = init_report()

semantic_path = run_context.semantic_path
version_path = run_context.version_path
version_path = run_context.version_path / "seller_semantic"

if version_path.exists():
report["status"] = "failed"
Expand Down Expand Up @@ -191,6 +194,57 @@ def promote_semantic_version(run_context: RunContext) -> Dict:
return report


# ------------------------------------------------------------
# PUBLISHED ATOMIC POINTER
# ------------------------------------------------------------


def activate_published_version(run_context: RunContext) -> Dict:
"""
Published version activation step.

Atomically updates the latest-version pointer to the newly promotedsemantic snapshot. <br>
Guarantee BI dashboards read only fully published versions.

Chronological behavior:

- Initializes run-scoped reporting.
- Builds the pointer payload with run lineage metadata.
- Writes payload to a temporary pointer file.
- Atomically swaps the temporary file into the latest pointer path.
- Emits success signal when the swap completes.

Notes:
- Uses temp-file + os.replace for atomicity.
- Assumes version promotion has already succeeded.
"""

report = init_report()

latest_path = run_context.latest_pointer_path
tmp_path = latest_path.with_suffix(".tmp")

payload = {
"run_id": run_context.run_id,
"version": f"v{run_context.run_id}",
"published_at": dt.utcnow().isoformat(),
}

try:
with open(tmp_path, "w") as file:
json.dump(payload, file, indent=2)

os.replace(tmp_path, latest_path)

except Exception as e:
report["status"] = "failed"
log_error(str(e), report)

log_info("Atomic pointer swap successful", report)

return report


# =============================================================================
# END OF SCRIPT
# =============================================================================
3 changes: 2 additions & 1 deletion tests/stages/test_publish_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ def test_promote_semantic_version_fails_on_making_directory(tmp_path):
run_context.initialize_directories()

# Force mkdir to raise
run_context.version_path.mkdir(parents=True)
published_version_path = run_context.version_path / "seller_semantic"
published_version_path.mkdir(parents=True)

report = promote_semantic_version(run_context)

Expand Down
16 changes: 8 additions & 8 deletions tests/test_run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ def test_main_success(monkeypatch, tmp_path):
}, # Pass, status success
)

monkeypatch.setattr(
"data_pipeline.run_pipeline.promote_semantic_version",
lambda *a, **k: {
"status": "success",
"errors": [],
"info": [],
}, # Pass, status success
)
# monkeypatch.setattr(
# "data_pipeline.run_pipeline.promote_semantic_version",
# lambda *a, **k: {
# "status": "success",
# "errors": [],
# "info": [],
# }, # Pass, status success
# )

monkeypatch.setattr(
"data_pipeline.run_pipeline.snapshot_raw",
Expand Down