Skip to content

Commit e0bc4c0

Browse files
committed
feat: Add metadata lifecycle + integrity gate
- initialize metadata and finalize_run in orchestrator - implement pre-publish integrity validation (publish_lifecycle.py) - integrate integrity gate into run_pipeline - add test coverage for publish_lifecycle
1 parent 2a5c391 commit e0bc4c0

8 files changed

Lines changed: 490 additions & 23 deletions

data_pipeline/run_pipeline.py

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,23 @@
44

55
from pathlib import Path
66
from shutil import copytree
7+
from datetime import datetime as dt
78
import sys
89
import json
910

11+
1012
from data_pipeline.shared.table_configs import TABLE_CONFIG
1113
from data_pipeline.shared.run_context import RunContext
1214
from data_pipeline.stages.validate_raw_data import apply_validation
1315
from data_pipeline.stages.apply_raw_data_contract import apply_contract
1416
from data_pipeline.stages.assemble_validated_events import assemble_events
1517
from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer
18+
from data_pipeline.stages.publish_lifecycle import run_integrity_gate
19+
20+
21+
# ------------------------------------------------------------
22+
# SUPPORTING UTILITIES
23+
# ------------------------------------------------------------
1624

1725

1826
def snapshot_raw(run_context: RunContext) -> None:
@@ -39,29 +47,100 @@ def persist_json(path: Path, payload: dict) -> None:
3947
json.dump(payload, f, indent=2)
4048

4149

50+
def initiliaze_metadata(run_context: RunContext) -> None:
51+
"""
52+
Run metadata initializer.
53+
54+
Creates the run-scoped metadata record at pipeline start to
55+
establish lifecycle tracking and publish eligibility state.
56+
"""
57+
58+
payload = {
59+
"run_id": run_context.run_id,
60+
"status": "RUNNING",
61+
"started_at": dt.utcnow().isoformat(),
62+
"completed_at": None,
63+
"published": False,
64+
}
65+
66+
persist_json(run_context.metadata_path, payload)
67+
68+
69+
def finalize_run(run_context: RunContext, status: str) -> None:
70+
"""
71+
Run metadata finalizer.
72+
73+
Updates the run metadata record with terminal status and
74+
completion timestamp.
75+
"""
76+
77+
if not run_context.metadata_path.exists():
78+
raise RuntimeError("metadata.json missing during finalization")
79+
80+
with open(run_context.metadata_path, "r") as file:
81+
payload = json.load(file)
82+
83+
payload["status"] = status
84+
payload["complete_at"] = dt.utcnow().isoformat()
85+
86+
if status == "SUCCESS":
87+
payload["published"] = True
88+
89+
else:
90+
payload["published"] = False
91+
92+
persist_json(run_context.metadata_path, payload)
93+
94+
95+
# ------------------------------------------------------------
96+
# PIPELINE ORCHESTRATOR
97+
# ------------------------------------------------------------
98+
99+
42100
def main() -> None:
101+
"""
102+
Pipeline execution controller.
103+
104+
Execution order:
105+
106+
1. Initialize run context and directory structure.
107+
2. Capture raw snapshot and initialize metadata.
108+
3. Run initial validation on raw data.
109+
- Exit if structural errors exist.
110+
4. Apply table contracts in configured parent → child order,
111+
propagating invalid order_ids.
112+
5. Rerun validation on contracted data.
113+
- Exit if any errors or warnings remain.
114+
6. Assemble the core event table.
115+
- Exit on assembly failure.
116+
7. Build semantic layer tables.
117+
- Exit on semantic failure.
118+
8. Run pre-publish semantic integrity gate.
119+
- Exit if gate fails.
120+
9. Exit process with success code.
121+
"""
122+
43123
run_context = RunContext.create()
44124
run_context.initialize_directories()
45125

46126
# Create raw snapshot at runtime
47127
snapshot_raw(run_context)
48-
49-
report_validation_initial = []
128+
initiliaze_metadata(run_context)
50129

51130
# Initial validation
52131
validation_initial = apply_validation(run_context)
53-
report_validation_initial.append(validation_initial)
54132

55133
persist_json(
56134
run_context.logs_path / "validation_initial.json",
57135
{
58136
"run_id": run_context.run_id,
59-
"report": report_validation_initial,
137+
"report": validation_initial,
60138
},
61139
)
62140

63141
# Early exit for structural errors else apply contract
64142
if validation_initial["errors"]:
143+
finalize_run(run_context, "FAILED")
65144
sys.exit(1)
66145

67146
report_contract = []
@@ -90,60 +169,68 @@ def main() -> None:
90169
},
91170
)
92171

93-
report_validation_post_contract = []
94-
95172
# Rerun validation on CONTRACTED data
96173
validation_post_contract = apply_validation(
97174
run_context,
98175
base_path=run_context.contracted_path,
99176
)
100177

101-
report_validation_post_contract.append(validation_post_contract)
102-
103178
persist_json(
104179
run_context.logs_path / "validation_post_contract.json",
105180
{
106181
"run_id": run_context.run_id,
107-
"report": report_validation_post_contract,
182+
"report": validation_post_contract,
108183
},
109184
)
110185

111186
# Intervention: Either manual fixing or escalate the data to source owner
112187
if validation_post_contract["errors"] or validation_post_contract["warnings"]:
188+
finalize_run(run_context, "FAILED")
113189
sys.exit(1)
114190

115-
report_assemble = []
116-
117191
# Assemble event table
118192
assemble = assemble_events(run_context)
119-
report_assemble.append(assemble)
120193

121194
persist_json(
122195
run_context.logs_path / "assemble_report.json",
123196
{
124197
"run_id": run_context.run_id,
125-
"report": report_assemble,
198+
"report": assemble,
126199
},
127200
)
128201

129202
if assemble["status"] == "failed":
203+
finalize_run(run_context, "FAILED")
130204
sys.exit(1)
131205

132-
report_semantic = []
133-
134206
# Semantic modeling
135207
semantic = build_semantic_layer(run_context)
136-
report_semantic.append(semantic)
137208

138209
persist_json(
139210
run_context.logs_path / "semantic_report.json",
140211
{
141212
"run_id": run_context.run_id,
142-
"report": report_semantic,
213+
"report": semantic,
143214
},
144215
)
145216

146217
if semantic["status"] == "failed":
218+
finalize_run(run_context, "FAILED")
219+
sys.exit(1)
220+
221+
# Pre-publish semantic integrity validation
222+
gate = run_integrity_gate(run_context)
223+
224+
persist_json(
225+
run_context.logs_path / "publish_integrity_report.json",
226+
{
227+
"run_id": run_context.run_id,
228+
"report": gate,
229+
},
230+
)
231+
232+
if gate["status"] == "failed":
233+
finalize_run(run_context, "FAILED")
147234
sys.exit(1)
148235

149236
sys.exit(0)

data_pipeline/stages/build_bi_semantic_layer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ def error(msg):
271271

272272
seller_semantic_tables = {
273273
f"seller_week_performance_fact_{year}_{month}.parquet": seller_fact_contracted,
274-
f"dim_seller_{year}_{month}.parquet": seller_dim_contracted,
274+
f"seller_dim_{year}_{month}.parquet": seller_dim_contracted,
275275
}
276276

277277
for table_name, table in seller_semantic_tables.items():
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# =============================================================================
2+
# PUBLISH ACTIVATION GATE
3+
# =============================================================================
4+
5+
import pandas as pd
6+
7+
from typing import Dict, List
8+
from data_pipeline.shared.run_context import RunContext
9+
from data_pipeline.shared.table_configs import (
10+
SELLER_FACT_ENFORCED_SCHEMA,
11+
SELLER_DIM_ENFORCED_SCHEMA,
12+
)
13+
14+
# ------------------------------------------------------------
15+
# ASSEMBLE REPORT & LOGS
16+
# ------------------------------------------------------------
17+
18+
19+
def init_report():
20+
return {"status": "success", "errors": [], "info": []}
21+
22+
23+
def log_info(message: str, report: Dict[str, List[str]]) -> None:
24+
print(f"[INFO] {message}")
25+
report["info"].append(message)
26+
27+
28+
def log_error(message: str, report: Dict[str, list[str]]) -> None:
29+
print(f"[ERROR] {message}")
30+
report["errors"].append(message)
31+
32+
33+
# ------------------------------------------------------------
34+
# PRE-PUBLISH INTEGRITY GATE
35+
# ------------------------------------------------------------
36+
37+
38+
def run_integrity_gate(run_context: RunContext) -> Dict:
39+
"""
40+
Pre-publish semantic integrity gate.
41+
42+
Verifies that the semantic layer is complete, structurally valid,
43+
and safe for downstream consumption before any publish action.
44+
45+
Chronological behavior:
46+
47+
- Initializes run-scoped reporting.
48+
- Validates semantic output directory exists.
49+
- Confirms actual parquet file set exactly matches the expected set.
50+
- Loads each required semantic table.
51+
- Validates each table is readable and non-empty.
52+
- Verifies required schema columns are present per table type.
53+
- Emits success signal when all checks pass.
54+
55+
Gate intent:
56+
57+
- Detect partial publishes
58+
- Detect schema drift entering BI layer
59+
- Detect empty or corrupt semantic outputs
60+
"""
61+
62+
report = init_report()
63+
semantic_path = run_context.semantic_path
64+
65+
year = run_context.run_id[:4]
66+
month = run_context.run_id[4:6]
67+
68+
# Validate semantic directory exists
69+
if not semantic_path.exists():
70+
log_error("Semantic directory is missing", report)
71+
report["status"] = "failed"
72+
73+
return report
74+
75+
# Validate expected semantic file set exactly matches required set
76+
seller_expected_files = {
77+
f"seller_week_performance_fact_{year}_{month}.parquet",
78+
f"seller_dim_{year}_{month}.parquet",
79+
}
80+
81+
seller_actual_files = {
82+
file.name for file in run_context.semantic_path.glob("*.parquet")
83+
}
84+
85+
if seller_actual_files != seller_expected_files:
86+
log_error("Semantic file set mismatch", report)
87+
report["status"] = "failed"
88+
89+
return report
90+
91+
# Validate required parquet files exist
92+
for file_name in seller_expected_files:
93+
path = semantic_path / file_name
94+
95+
try:
96+
df = pd.read_parquet(path)
97+
98+
except Exception as e:
99+
log_error(f"{file_name} failed to load: {e}", report)
100+
report["status"] = "failed"
101+
102+
return report
103+
104+
# Validate dataframe not empty
105+
if df is None or df.empty:
106+
log_error(f"{file_name} logical table missing or empty", report)
107+
report["status"] = "failed"
108+
109+
return report
110+
111+
# Validate required schema columns present
112+
if "seller_week_performance_fact" in file_name:
113+
required_cols = SELLER_FACT_ENFORCED_SCHEMA
114+
else:
115+
required_cols = SELLER_DIM_ENFORCED_SCHEMA
116+
117+
missing = set(required_cols) - set(df.columns)
118+
119+
if missing:
120+
log_error(f"{file_name} required column(s): {sorted(missing)}", report)
121+
report["status"] = "failed"
122+
123+
return report
124+
125+
log_info("Pre-publishing validation passed", report)
126+
return report
127+
128+
129+
# =============================================================================
130+
# END OF SCRIPT
131+
# =============================================================================

data_pipeline/stages/validate_raw_data.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,13 @@
2323
# ------------------------------------------------------------
2424

2525

26-
def init_report() -> Dict[str, List[str]]:
27-
return {"errors": [], "warnings": [], "info": []}
26+
def init_report():
27+
return {
28+
"status": "success",
29+
"errors": [],
30+
"warnings": [],
31+
"info": [],
32+
}
2833

2934

3035
def log_info(message: str, report: Dict[str, List[str]]) -> None:
@@ -389,6 +394,9 @@ def error(msg: str):
389394

390395
run_cross_table_validations(tables, report)
391396

397+
if len(report["warnings"] or report["errors"]) > 0:
398+
report["status"] = "failed"
399+
392400
return report
393401

394402

tests/stages/test_build_bi_semantic_layer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ def test_build_semantic_layer_success(tmp_path, valid_assembled_df):
290290
run_context.semantic_path / "seller_week_performance_fact_dumm_y_.parquet"
291291
)
292292

293-
output_path_dim = run_context.semantic_path / "dim_seller_dumm_y_.parquet"
293+
output_path_dim = run_context.semantic_path / "seller_dim_dumm_y_.parquet"
294294

295295
assert report["status"] == "success"
296296
assert output_path_seller.exists()

0 commit comments

Comments
 (0)