Skip to content

Commit 50eeb6e

Browse files
committed
feat: Add publish activation versioning
1 parent 35a7986 commit 50eeb6e

4 files changed

Lines changed: 82 additions & 11 deletions

File tree

data_pipeline/run_pipeline.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from data_pipeline.stages.publish_lifecycle import (
1919
run_integrity_gate,
2020
promote_semantic_version,
21+
activate_published_version,
2122
)
2223

2324

@@ -84,7 +85,7 @@ def finalize_run(run_context: RunContext, status: str) -> None:
8485
payload = json.load(file)
8586

8687
payload["status"] = status
87-
payload["complete_at"] = dt.utcnow().isoformat()
88+
payload["completed_at"] = dt.utcnow().isoformat()
8889

8990
if status == "SUCCESS":
9091
payload["published"] = True
@@ -251,6 +252,21 @@ def main() -> None:
251252
finalize_run(run_context, "FAILED")
252253
sys.exit(1)
253254

255+
finalize_run(run_context, "SUCCESS")
256+
257+
activation = activate_published_version(run_context)
258+
259+
persist_json(
260+
run_context.logs_path / "publish_activation_report.json",
261+
{
262+
"run_id": run_context.run_id,
263+
"report": activation,
264+
},
265+
)
266+
267+
if activation["status"] == "failed":
268+
sys.exit(1)
269+
254270
sys.exit(0)
255271

256272

data_pipeline/stages/publish_lifecycle.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
import pandas as pd
66
import shutil
7+
from datetime import datetime as dt
8+
import json
9+
import os
710

811
from typing import Dict, List
912
from data_pipeline.shared.run_context import RunContext
@@ -157,7 +160,7 @@ def promote_semantic_version(run_context: RunContext) -> Dict:
157160
report = init_report()
158161

159162
semantic_path = run_context.semantic_path
160-
version_path = run_context.version_path
163+
version_path = run_context.version_path / "seller_semantic"
161164

162165
if version_path.exists():
163166
report["status"] = "failed"
@@ -191,6 +194,57 @@ def promote_semantic_version(run_context: RunContext) -> Dict:
191194
return report
192195

193196

197+
# ------------------------------------------------------------
198+
# PUBLISHED ATOMIC POINTER
199+
# ------------------------------------------------------------
200+
201+
202+
def activate_published_version(run_context: RunContext) -> Dict:
203+
"""
204+
Published version activation step.
205+
206+
Atomically updates the latest-version pointer to the newly promotedsemantic snapshot. <br>
207+
Guarantee BI dashboards read only fully published versions.
208+
209+
Chronological behavior:
210+
211+
- Initializes run-scoped reporting.
212+
- Builds the pointer payload with run lineage metadata.
213+
- Writes payload to a temporary pointer file.
214+
- Atomically swaps the temporary file into the latest pointer path.
215+
- Emits success signal when the swap completes.
216+
217+
Notes:
218+
- Uses temp-file + os.replace for atomicity.
219+
- Assumes version promotion has already succeeded.
220+
"""
221+
222+
report = init_report()
223+
224+
latest_path = run_context.latest_pointer_path
225+
tmp_path = latest_path.with_suffix(".tmp")
226+
227+
payload = {
228+
"run_id": run_context.run_id,
229+
"version": f"v{run_context.run_id}",
230+
"published_at": dt.utcnow().isoformat(),
231+
}
232+
233+
try:
234+
with open(tmp_path, "w") as file:
235+
json.dump(payload, file, indent=2)
236+
237+
os.replace(tmp_path, latest_path)
238+
239+
except Exception as e:
240+
report["status"] = "failed"
241+
log_error(str(e), report)
242+
243+
log_info("Atomic pointer swap successful", report)
244+
245+
return report
246+
247+
194248
# =============================================================================
195249
# END OF SCRIPT
196250
# =============================================================================

tests/stages/test_publish_lifecycle.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,8 @@ def test_promote_semantic_version_fails_on_making_directory(tmp_path):
294294
run_context.initialize_directories()
295295

296296
# Force mkdir to raise
297-
run_context.version_path.mkdir(parents=True)
297+
published_version_path = run_context.version_path / "seller_semantic"
298+
published_version_path.mkdir(parents=True)
298299

299300
report = promote_semantic_version(run_context)
300301

tests/test_run_pipeline.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,14 @@ def test_main_success(monkeypatch, tmp_path):
148148
}, # Pass, status success
149149
)
150150

151-
monkeypatch.setattr(
152-
"data_pipeline.run_pipeline.promote_semantic_version",
153-
lambda *a, **k: {
154-
"status": "success",
155-
"errors": [],
156-
"info": [],
157-
}, # Pass, status success
158-
)
151+
# monkeypatch.setattr(
152+
# "data_pipeline.run_pipeline.promote_semantic_version",
153+
# lambda *a, **k: {
154+
# "status": "success",
155+
# "errors": [],
156+
# "info": [],
157+
# }, # Pass, status success
158+
# )
159159

160160
monkeypatch.setattr(
161161
"data_pipeline.run_pipeline.snapshot_raw",

0 commit comments

Comments
 (0)