Skip to content

Commit 9d00e6d

Browse files
authored
feat: Add metadata lifecycle + integrity gate
- initialize metadata and finalize_run in orchestrator - implement pre-publish integrity validation (publish_lifecycle.py) - integrate integrity gate into run_pipeline - add test coverage for publish_lifecycle
1 parent 14f680c commit 9d00e6d

10 files changed

Lines changed: 608 additions & 112 deletions

data_pipeline/run_pipeline.py

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,23 @@
44

55
from pathlib import Path
66
from shutil import copytree
7+
from datetime import datetime as dt
78
import sys
89
import json
910

11+
1012
from data_pipeline.shared.table_configs import TABLE_CONFIG
1113
from data_pipeline.shared.run_context import RunContext
1214
from data_pipeline.stages.validate_raw_data import apply_validation
1315
from data_pipeline.stages.apply_raw_data_contract import apply_contract
1416
from data_pipeline.stages.assemble_validated_events import assemble_events
1517
from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer
18+
from data_pipeline.stages.publish_lifecycle import run_integrity_gate
19+
20+
21+
# ------------------------------------------------------------
22+
# SUPPORTING UTILITIES
23+
# ------------------------------------------------------------
1624

1725

1826
def snapshot_raw(run_context: RunContext) -> None:
@@ -39,29 +47,100 @@ def persist_json(path: Path, payload: dict) -> None:
3947
json.dump(payload, f, indent=2)
4048

4149

50+
def initiliaze_metadata(run_context: RunContext) -> None:
51+
"""
52+
Run metadata initializer.
53+
54+
Creates the run-scoped metadata record at pipeline start to
55+
establish lifecycle tracking and publish eligibility state.
56+
"""
57+
58+
payload = {
59+
"run_id": run_context.run_id,
60+
"status": "RUNNING",
61+
"started_at": dt.utcnow().isoformat(),
62+
"completed_at": None,
63+
"published": False,
64+
}
65+
66+
persist_json(run_context.metadata_path, payload)
67+
68+
69+
def finalize_run(run_context: RunContext, status: str) -> None:
70+
"""
71+
Run metadata finalizer.
72+
73+
Updates the run metadata record with terminal status and
74+
completion timestamp.
75+
"""
76+
77+
if not run_context.metadata_path.exists():
78+
raise RuntimeError("metadata.json missing during finalization")
79+
80+
with open(run_context.metadata_path, "r") as file:
81+
payload = json.load(file)
82+
83+
payload["status"] = status
84+
payload["complete_at"] = dt.utcnow().isoformat()
85+
86+
if status == "SUCCESS":
87+
payload["published"] = True
88+
89+
else:
90+
payload["published"] = False
91+
92+
persist_json(run_context.metadata_path, payload)
93+
94+
95+
# ------------------------------------------------------------
96+
# PIPELINE ORCHESTRATOR
97+
# ------------------------------------------------------------
98+
99+
42100
def main() -> None:
101+
"""
102+
Pipeline execution controller.
103+
104+
Execution order:
105+
106+
1. Initialize run context and directory structure.
107+
2. Capture raw snapshot and initialize metadata.
108+
3. Run initial validation on raw data.
109+
- Exit if structural errors exist.
110+
4. Apply table contracts in configured parent → child order,
111+
propagating invalid order_ids.
112+
5. Rerun validation on contracted data.
113+
- Exit if any errors or warnings remain.
114+
6. Assemble the core event table.
115+
- Exit on assembly failure.
116+
7. Build semantic layer tables.
117+
- Exit on semantic failure.
118+
8. Run pre-publish semantic integrity gate.
119+
- Exit if gate fails.
120+
9. Exit process with success code.
121+
"""
122+
43123
run_context = RunContext.create()
44124
run_context.initialize_directories()
45125

46126
# Create raw snapshot at runtime
47127
snapshot_raw(run_context)
48-
49-
report_validation_initial = []
128+
initiliaze_metadata(run_context)
50129

51130
# Initial validation
52131
validation_initial = apply_validation(run_context)
53-
report_validation_initial.append(validation_initial)
54132

55133
persist_json(
56134
run_context.logs_path / "validation_initial.json",
57135
{
58136
"run_id": run_context.run_id,
59-
"report": report_validation_initial,
137+
"report": validation_initial,
60138
},
61139
)
62140

63141
# Early exit for structural errors else apply contract
64142
if validation_initial["errors"]:
143+
finalize_run(run_context, "FAILED")
65144
sys.exit(1)
66145

67146
report_contract = []
@@ -90,60 +169,68 @@ def main() -> None:
90169
},
91170
)
92171

93-
report_validation_post_contract = []
94-
95172
# Rerun validation on CONTRACTED data
96173
validation_post_contract = apply_validation(
97174
run_context,
98175
base_path=run_context.contracted_path,
99176
)
100177

101-
report_validation_post_contract.append(validation_post_contract)
102-
103178
persist_json(
104179
run_context.logs_path / "validation_post_contract.json",
105180
{
106181
"run_id": run_context.run_id,
107-
"report": report_validation_post_contract,
182+
"report": validation_post_contract,
108183
},
109184
)
110185

111186
# Intervention: Either manual fixing or escalate the data to source owner
112187
if validation_post_contract["errors"] or validation_post_contract["warnings"]:
188+
finalize_run(run_context, "FAILED")
113189
sys.exit(1)
114190

115-
report_assemble = []
116-
117191
# Assemble event table
118192
assemble = assemble_events(run_context)
119-
report_assemble.append(assemble)
120193

121194
persist_json(
122195
run_context.logs_path / "assemble_report.json",
123196
{
124197
"run_id": run_context.run_id,
125-
"report": report_assemble,
198+
"report": assemble,
126199
},
127200
)
128201

129202
if assemble["status"] == "failed":
203+
finalize_run(run_context, "FAILED")
130204
sys.exit(1)
131205

132-
report_semantic = []
133-
134206
# Semantic modeling
135207
semantic = build_semantic_layer(run_context)
136-
report_semantic.append(semantic)
137208

138209
persist_json(
139210
run_context.logs_path / "semantic_report.json",
140211
{
141212
"run_id": run_context.run_id,
142-
"report": report_semantic,
213+
"report": semantic,
143214
},
144215
)
145216

146217
if semantic["status"] == "failed":
218+
finalize_run(run_context, "FAILED")
219+
sys.exit(1)
220+
221+
# Pre-publish semantic integrity validation
222+
gate = run_integrity_gate(run_context)
223+
224+
persist_json(
225+
run_context.logs_path / "publish_integrity_report.json",
226+
{
227+
"run_id": run_context.run_id,
228+
"report": gate,
229+
},
230+
)
231+
232+
if gate["status"] == "failed":
233+
finalize_run(run_context, "FAILED")
147234
sys.exit(1)
148235

149236
sys.exit(0)

data_pipeline/shared/table_configs.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
# TABLE CONFIGURATIONS
33
# =============================================================================
44

5+
# ------------------------------------------------------------
6+
# CONFIGURATIONS FOR validate_raw_data.py
7+
# ------------------------------------------------------------
8+
59
TABLE_CONFIG = {
610
"df_orders": {
711
"role": "event_fact",
@@ -81,3 +85,98 @@
8185
"order_delivered_timestamp": "%Y-%m-%d %H:%M:%S",
8286
"order_estimated_delivery_date": "%Y-%m-%d",
8387
}
88+
89+
90+
# ------------------------------------------------------------
91+
# CONFIGURATIONS FOR assemble_validate_events.py
92+
# ------------------------------------------------------------
93+
94+
# Assemble events enforced schema and dtypes
95+
ASSEMBLE_ENFORCED_SCHEMA = [
96+
"order_id",
97+
"order_revenue",
98+
"seller_id",
99+
"product_id",
100+
"order_status",
101+
"order_purchase_timestamp",
102+
"order_approved_at",
103+
"order_delivered_timestamp",
104+
"lead_time_days",
105+
"approval_lag_days",
106+
"delivery_delay_days",
107+
"order_date",
108+
"order_year",
109+
"order_year_week",
110+
"run_id",
111+
]
112+
113+
ASSEMBLE_ENFORCED_DTYPES = {
114+
"order_id": "string",
115+
"order_revenue": "float64",
116+
"seller_id": "string",
117+
"product_id": "string",
118+
"order_status": "string",
119+
"order_purchase_timestamp": "datetime64[ns]",
120+
"order_approved_at": "datetime64[ns]",
121+
"order_delivered_timestamp": "datetime64[ns]",
122+
"lead_time_days": "int64",
123+
"approval_lag_days": "int64",
124+
"delivery_delay_days": "int64",
125+
"order_date": "datetime64[ns]",
126+
"order_year": "int64",
127+
}
128+
129+
130+
# ------------------------------------------------------------
131+
# CONFIGURATIONS FOR build_bi_semantic_layer.py
132+
# ------------------------------------------------------------
133+
134+
135+
# Seller dimension enforced schema and dtypes
136+
SELLER_DIM_ENFORCED_SCHEMA = [
137+
"seller_id",
138+
"first_order_date",
139+
"first_order_year_week",
140+
"run_id",
141+
]
142+
143+
SELLER_DIM_ENFORCED_DTYPES = {
144+
"seller_id": "string",
145+
"first_order_date": "datetime64[ns]",
146+
"first_order_year_week": "string",
147+
"run_id": "string",
148+
}
149+
150+
151+
# Seller Facts enforced schema and dtypes
152+
SELLER_FACT_ENFORCED_SCHEMA = [
153+
"seller_id",
154+
"order_year_week",
155+
"week_start_date",
156+
"run_id",
157+
"weekly_order_count",
158+
"weekly_delivered_orders",
159+
"weekly_cancelled_orders",
160+
"weekly_revenue",
161+
"weekly_avg_lead_time",
162+
"weekly_total_lead_time",
163+
"weekly_avg_delivery_delay",
164+
"weekly_total_delivery_delay",
165+
"weekly_avg_approval_lag",
166+
]
167+
168+
SELLER_FACT_ENFORCED_DTYPES = {
169+
"seller_id": "string",
170+
"order_year_week": "string",
171+
"week_start_date": "datetime64[ns]",
172+
"run_id": "string",
173+
"weekly_order_count": "int64",
174+
"weekly_delivered_orders": "int64",
175+
"weekly_cancelled_orders": "int64",
176+
"weekly_revenue": "float64",
177+
"weekly_avg_lead_time": "float64",
178+
"weekly_total_lead_time": "int64",
179+
"weekly_avg_delivery_delay": "float64",
180+
"weekly_total_delivery_delay": "int64",
181+
"weekly_avg_approval_lag": "float64",
182+
}

data_pipeline/stages/assemble_validated_events.py

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
import pandas as pd
1111
from typing import Dict, List
1212
from data_pipeline.shared.run_context import RunContext
13+
from data_pipeline.shared.table_configs import (
14+
ASSEMBLE_ENFORCED_SCHEMA,
15+
ASSEMBLE_ENFORCED_DTYPES,
16+
)
1317
from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file
1418

1519
EVENT_TABLES = ["df_orders", "df_order_items", "df_payments"]
@@ -136,46 +140,12 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:
136140
- Resets index to produce a clean output frame
137141
"""
138142

139-
ENFORCED_SCHEMA = [
140-
"order_id",
141-
"order_revenue",
142-
"seller_id",
143-
"product_id",
144-
"order_status",
145-
"order_purchase_timestamp",
146-
"order_approved_at",
147-
"order_delivered_timestamp",
148-
"lead_time_days",
149-
"approval_lag_days",
150-
"delivery_delay_days",
151-
"order_date",
152-
"order_year",
153-
"order_year_week",
154-
"run_id",
155-
]
156-
157-
ENFORCED_DTYPES = {
158-
"order_id": "string",
159-
"order_revenue": "float64",
160-
"seller_id": "string",
161-
"product_id": "string",
162-
"order_status": "string",
163-
"order_purchase_timestamp": "datetime64[ns]",
164-
"order_approved_at": "datetime64[ns]",
165-
"order_delivered_timestamp": "datetime64[ns]",
166-
"lead_time_days": "int64",
167-
"approval_lag_days": "int64",
168-
"delivery_delay_days": "int64",
169-
"order_date": "datetime64[ns]",
170-
"order_year": "int64",
171-
}
172-
173-
missing_cols = set(ENFORCED_SCHEMA) - set(df.columns)
143+
missing_cols = set(ASSEMBLE_ENFORCED_SCHEMA) - set(df.columns)
174144
if missing_cols:
175145
raise RuntimeError(f"missing required columns: {sorted(missing_cols)}")
176146

177-
df_contract = df[ENFORCED_SCHEMA].copy()
178-
df_contract = df_contract.astype(ENFORCED_DTYPES)
147+
df_contract = df[ASSEMBLE_ENFORCED_SCHEMA].copy()
148+
df_contract = df_contract.astype(ASSEMBLE_ENFORCED_DTYPES)
179149
df_contract = df_contract.sort_values("order_id").reset_index(drop=True)
180150

181151
return df_contract

0 commit comments

Comments
 (0)