Skip to content

Commit 93d9aa0

Browse files
committed
feat: Add versioned semantic promotion and test coverage
1 parent ed2b140 commit 93d9aa0

5 files changed

Lines changed: 208 additions & 5 deletions

File tree

data_pipeline/run_pipeline.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
from data_pipeline.stages.apply_raw_data_contract import apply_contract
1616
from data_pipeline.stages.assemble_validated_events import assemble_events
1717
from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer
18-
from data_pipeline.stages.publish_lifecycle import run_integrity_gate
18+
from data_pipeline.stages.publish_lifecycle import (
19+
run_integrity_gate,
20+
promote_semantic_version,
21+
)
1922

2023

2124
# ------------------------------------------------------------
@@ -233,6 +236,21 @@ def main() -> None:
233236
finalize_run(run_context, "FAILED")
234237
sys.exit(1)
235238

239+
# Copy validated semantics to version directory
240+
promotion = promote_semantic_version(run_context)
241+
242+
persist_json(
243+
run_context.logs_path / "publish_promotion_report.json",
244+
{
245+
"run_id": run_context.run_id,
246+
"report": promotion,
247+
},
248+
)
249+
250+
if promotion["status"] == "failed":
251+
finalize_run(run_context, "FAILED")
252+
sys.exit(1)
253+
236254
sys.exit(0)
237255

238256

data_pipeline/stages/publish_lifecycle.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# =============================================================================
44

55
import pandas as pd
6+
import shutil
67

78
from typing import Dict, List
89
from data_pipeline.shared.run_context import RunContext
@@ -126,6 +127,70 @@ def run_integrity_gate(run_context: RunContext) -> Dict:
126127
return report
127128

128129

130+
# ------------------------------------------------------------
131+
# PROMOTE VALIDATED SEMANTIC
132+
# ------------------------------------------------------------
133+
134+
135+
def promote_semantic_version(run_context: RunContext) -> Dict:
136+
"""
137+
Semantic version promotion step.
138+
139+
Publishes the validated semantic artifacts into the run-scoped
140+
version directory for downstream consumption and lineage tracking.
141+
142+
Chronological behavior:
143+
144+
- Initializes run-scoped reporting.
145+
- Verifies the target version directory does not already exist.
146+
- Creates the version directory for the current run.
147+
- Copies all semantic parquet artifacts into the version directory.
148+
- Emits success signal when promotion completes.
149+
150+
Promotion intent:
151+
152+
- Create an immutable, run-versioned semantic snapshot
153+
- Provide a stable handoff point for BI consumption
154+
- Preserve lineage between run_id and published artifacts
155+
"""
156+
157+
report = init_report()
158+
159+
semantic_path = run_context.semantic_path
160+
version_path = run_context.version_path
161+
162+
if version_path.exists():
163+
report["status"] = "failed"
164+
log_error("Version directory already exists", report)
165+
166+
return report
167+
168+
# Create version directory
169+
try:
170+
version_path.mkdir(parents=True, exist_ok=False)
171+
172+
except Exception as e:
173+
report["status"] = "failed"
174+
log_error(str(e), report)
175+
176+
return report
177+
178+
# Copy validated semantics to version directory
179+
try:
180+
for file in semantic_path.glob("*.parquet"):
181+
shutil.copy2(file, version_path / file.name)
182+
183+
except Exception as e:
184+
report["status"] = "failed"
185+
log_error(str(e), report)
186+
187+
return report
188+
189+
log_info("Semantic artifacts promoted successfully", report)
190+
191+
return report
192+
193+
129194
# =============================================================================
130195
# END OF SCRIPT
131196
# =============================================================================

data_pipeline/stages/validate_raw_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def run_event_fact_validations(
206206
invalid_count = ts.isna().sum()
207207
if invalid_count > 0:
208208
log_warning(
209-
f"{table_name}: {invalid_count} unparsable timestamp value(s) in `{col}`",
209+
f"{table_name}: {invalid_count} unparsable timestamp value(s) in {col}",
210210
report,
211211
)
212212

tests/stages/test_publish_lifecycle.py

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44

55
import pandas as pd
66
import pytest
7+
import shutil
78

89
from data_pipeline.shared.run_context import RunContext
910
from data_pipeline.stages.publish_lifecycle import (
1011
init_report,
1112
log_info,
1213
log_error,
1314
run_integrity_gate,
15+
promote_semantic_version,
1416
)
1517

1618

@@ -119,9 +121,9 @@ def test_run_integrity_gate_success(
119121
assert "Pre-publishing validation passed" in report["info"]
120122

121123

122-
def test_run_integrity_gate_fails_on_missing_directory():
124+
def test_run_integrity_gate_fails_on_missing_directory(tmp_path):
123125

124-
run_context = RunContext.create()
126+
run_context = RunContext.create(base_path=tmp_path)
125127

126128
report = run_integrity_gate(run_context)
127129

@@ -227,6 +229,110 @@ def test_run_integrity_gate_fails_on_missing_columns(
227229
)
228230

229231

232+
# ------------------------------------------------------------
233+
# PRE-PUBLISH VALIDATION GATE
234+
# ------------------------------------------------------------
235+
236+
237+
def test_promote_semantic_version_success(
238+
tmp_path,
239+
valid_seller_fact,
240+
valid_seller_dim,
241+
):
242+
243+
run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
244+
run_context.initialize_directories()
245+
246+
valid_seller_fact.to_parquet(
247+
run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet",
248+
index=False,
249+
)
250+
251+
valid_seller_dim.to_parquet(
252+
run_context.semantic_path / "seller_dim_2023_01.parquet",
253+
index=False,
254+
)
255+
256+
report = promote_semantic_version(run_context)
257+
258+
assert "success" in report["status"]
259+
assert "Semantic artifacts promoted successfully" in report["info"]
260+
261+
262+
def test_promote_semantic_version_fails_on_existing_version_directory(
263+
tmp_path,
264+
valid_seller_fact,
265+
valid_seller_dim,
266+
):
267+
268+
run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
269+
run_context.initialize_directories()
270+
271+
valid_seller_fact.to_parquet(
272+
run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet",
273+
index=False,
274+
)
275+
276+
valid_seller_dim.to_parquet(
277+
run_context.semantic_path / "seller_dim_2023_01.parquet",
278+
index=False,
279+
)
280+
281+
# Initial run that created the directory
282+
_ = promote_semantic_version(run_context)
283+
284+
# Fails due to existing version directory on same run_id
285+
report = promote_semantic_version(run_context)
286+
287+
assert "failed" in report["status"]
288+
assert "Version directory already exists" in report["errors"]
289+
290+
291+
def test_promote_semantic_version_fails_on_making_directory(tmp_path):
292+
293+
run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
294+
run_context.initialize_directories()
295+
296+
# Force mkdir to raise
297+
run_context.version_path.mkdir(parents=True)
298+
299+
report = promote_semantic_version(run_context)
300+
301+
assert report["status"] == "failed"
302+
assert any("File exists" in e or "exists" in e for e in report["errors"])
303+
304+
305+
def test_promote_semantic_version_fails_on_copying_semantic(
306+
tmp_path,
307+
monkeypatch,
308+
valid_seller_fact,
309+
valid_seller_dim,
310+
):
311+
312+
run_context = RunContext.create(base_path=tmp_path, run_id="20230101T000000_abc123")
313+
run_context.initialize_directories()
314+
315+
valid_seller_fact.to_parquet(
316+
run_context.semantic_path / "seller_week_performance_fact_2023_01.parquet",
317+
index=False,
318+
)
319+
valid_seller_dim.to_parquet(
320+
run_context.semantic_path / "seller_dim_2023_01.parquet",
321+
index=False,
322+
)
323+
324+
# force shutil.copy2 to raise
325+
def mock_copy2(*args, **kwargs):
326+
raise RuntimeError("copy failure")
327+
328+
monkeypatch.setattr(shutil, "copy2", mock_copy2)
329+
330+
report = promote_semantic_version(run_context)
331+
332+
assert report["status"] == "failed"
333+
assert any("copy failure" in e for e in report["errors"])
334+
335+
230336
# =============================================================================
231337
# UNIT TESTS END
232338
# =============================================================================

tests/test_run_pipeline.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ def test_main_success(monkeypatch, tmp_path):
110110

111111
monkeypatch.setattr(
112112
"data_pipeline.run_pipeline.apply_validation",
113-
lambda *a, **k: {"errors": [], "warnings": []}, # Pass all validations
113+
lambda *a, **k: {
114+
"errors": [],
115+
"warnings": [],
116+
}, # Pass all validations
114117
)
115118

116119
monkeypatch.setattr(
@@ -145,6 +148,15 @@ def test_main_success(monkeypatch, tmp_path):
145148
}, # Pass, status success
146149
)
147150

151+
monkeypatch.setattr(
152+
"data_pipeline.run_pipeline.promote_semantic_version",
153+
lambda *a, **k: {
154+
"status": "success",
155+
"errors": [],
156+
"info": [],
157+
}, # Pass, status success
158+
)
159+
148160
monkeypatch.setattr(
149161
"data_pipeline.run_pipeline.snapshot_raw",
150162
lambda *_: None,
@@ -159,6 +171,8 @@ def test_main_success(monkeypatch, tmp_path):
159171
assert (fake_ctx.logs_path / "validation_post_contract.json").exists()
160172
assert (fake_ctx.logs_path / "assemble_report.json").exists()
161173
assert (fake_ctx.logs_path / "semantic_report.json").exists()
174+
assert (fake_ctx.logs_path / "publish_integrity_report.json").exists()
175+
assert (fake_ctx.logs_path / "publish_promotion_report.json").exists()
162176

163177

164178
# =============================================================================

0 commit comments

Comments
 (0)