diff --git a/data_pipeline/run_pipeline.py b/data_pipeline/run_pipeline.py index 28f75f6..9dd79b9 100644 --- a/data_pipeline/run_pipeline.py +++ b/data_pipeline/run_pipeline.py @@ -18,6 +18,7 @@ from data_pipeline.stages.publish_lifecycle import ( run_integrity_gate, promote_semantic_version, + activate_published_version, ) @@ -84,7 +85,7 @@ def finalize_run(run_context: RunContext, status: str) -> None: payload = json.load(file) payload["status"] = status - payload["complete_at"] = dt.utcnow().isoformat() + payload["completed_at"] = dt.utcnow().isoformat() if status == "SUCCESS": payload["published"] = True @@ -251,6 +252,21 @@ def main() -> None: finalize_run(run_context, "FAILED") sys.exit(1) + finalize_run(run_context, "SUCCESS") + + activation = activate_published_version(run_context) + + persist_json( + run_context.logs_path / "publish_activation_report.json", + { + "run_id": run_context.run_id, + "report": activation, + }, + ) + + if activation["status"] == "failed": + sys.exit(1) + sys.exit(0) diff --git a/data_pipeline/stages/publish_lifecycle.py b/data_pipeline/stages/publish_lifecycle.py index cb7fc50..ce6cd66 100644 --- a/data_pipeline/stages/publish_lifecycle.py +++ b/data_pipeline/stages/publish_lifecycle.py @@ -4,6 +4,9 @@ import pandas as pd import shutil +from datetime import datetime as dt +import json +import os from typing import Dict, List from data_pipeline.shared.run_context import RunContext @@ -157,7 +160,7 @@ def promote_semantic_version(run_context: RunContext) -> Dict: report = init_report() semantic_path = run_context.semantic_path - version_path = run_context.version_path + version_path = run_context.version_path / "seller_semantic" if version_path.exists(): report["status"] = "failed" @@ -191,6 +194,57 @@ def promote_semantic_version(run_context: RunContext) -> Dict: return report +# ------------------------------------------------------------ +# PUBLISHED ATOMIC POINTER +# ------------------------------------------------------------ + + +def activate_published_version(run_context: RunContext) -> Dict: + """ + Published version activation step. + + Atomically updates the latest-version pointer to the newly promotedsemantic snapshot.
+ Guarantee BI dashboards read only fully published versions. + + Chronological behavior: + + - Initializes run-scoped reporting. + - Builds the pointer payload with run lineage metadata. + - Writes payload to a temporary pointer file. + - Atomically swaps the temporary file into the latest pointer path. + - Emits success signal when the swap completes. + + Notes: + - Uses temp-file + os.replace for atomicity. + - Assumes version promotion has already succeeded. + """ + + report = init_report() + + latest_path = run_context.latest_pointer_path + tmp_path = latest_path.with_suffix(".tmp") + + payload = { + "run_id": run_context.run_id, + "version": f"v{run_context.run_id}", + "published_at": dt.utcnow().isoformat(), + } + + try: + with open(tmp_path, "w") as file: + json.dump(payload, file, indent=2) + + os.replace(tmp_path, latest_path) + + except Exception as e: + report["status"] = "failed" + log_error(str(e), report) + + log_info("Atomic pointer swap successful", report) + + return report + + # ============================================================================= # END OF SCRIPT # ============================================================================= diff --git a/tests/stages/test_publish_lifecycle.py b/tests/stages/test_publish_lifecycle.py index 7bc4122..20fb8cd 100644 --- a/tests/stages/test_publish_lifecycle.py +++ b/tests/stages/test_publish_lifecycle.py @@ -294,7 +294,8 @@ def test_promote_semantic_version_fails_on_making_directory(tmp_path): run_context.initialize_directories() # Force mkdir to raise - run_context.version_path.mkdir(parents=True) + published_version_path = run_context.version_path / "seller_semantic" + published_version_path.mkdir(parents=True) report = promote_semantic_version(run_context) diff --git a/tests/test_run_pipeline.py b/tests/test_run_pipeline.py index 52a3f5e..7becfdd 100644 --- a/tests/test_run_pipeline.py +++ b/tests/test_run_pipeline.py @@ -148,14 +148,14 @@ def test_main_success(monkeypatch, tmp_path): }, # Pass, status success ) - monkeypatch.setattr( - "data_pipeline.run_pipeline.promote_semantic_version", - lambda *a, **k: { - "status": "success", - "errors": [], - "info": [], - }, # Pass, status success - ) + # monkeypatch.setattr( + # "data_pipeline.run_pipeline.promote_semantic_version", + # lambda *a, **k: { + # "status": "success", + # "errors": [], + # "info": [], + # }, # Pass, status success + # ) monkeypatch.setattr( "data_pipeline.run_pipeline.snapshot_raw",