Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 104 additions & 17 deletions data_pipeline/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@

from pathlib import Path
from shutil import copytree
from datetime import datetime as dt
import sys
import json


from data_pipeline.shared.table_configs import TABLE_CONFIG
from data_pipeline.shared.run_context import RunContext
from data_pipeline.stages.validate_raw_data import apply_validation
from data_pipeline.stages.apply_raw_data_contract import apply_contract
from data_pipeline.stages.assemble_validated_events import assemble_events
from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer
from data_pipeline.stages.publish_lifecycle import run_integrity_gate


# ------------------------------------------------------------
# SUPPORTING UTILITIES
# ------------------------------------------------------------


def snapshot_raw(run_context: RunContext) -> None:
Expand All @@ -39,29 +47,100 @@ def persist_json(path: Path, payload: dict) -> None:
json.dump(payload, f, indent=2)


def initiliaze_metadata(run_context: RunContext) -> None:
"""
Run metadata initializer.

Creates the run-scoped metadata record at pipeline start to
establish lifecycle tracking and publish eligibility state.
"""

payload = {
"run_id": run_context.run_id,
"status": "RUNNING",
"started_at": dt.utcnow().isoformat(),
"completed_at": None,
"published": False,
}

persist_json(run_context.metadata_path, payload)


def finalize_run(run_context: RunContext, status: str) -> None:
"""
Run metadata finalizer.

Updates the run metadata record with terminal status and
completion timestamp.
"""

if not run_context.metadata_path.exists():
raise RuntimeError("metadata.json missing during finalization")

with open(run_context.metadata_path, "r") as file:
payload = json.load(file)

payload["status"] = status
payload["complete_at"] = dt.utcnow().isoformat()

if status == "SUCCESS":
payload["published"] = True

else:
payload["published"] = False

persist_json(run_context.metadata_path, payload)


# ------------------------------------------------------------
# PIPELINE ORCHESTRATOR
# ------------------------------------------------------------


def main() -> None:
"""
Pipeline execution controller.

Execution order:

1. Initialize run context and directory structure.
2. Capture raw snapshot and initialize metadata.
3. Run initial validation on raw data.
- Exit if structural errors exist.
4. Apply table contracts in configured parent → child order,
propagating invalid order_ids.
5. Rerun validation on contracted data.
- Exit if any errors or warnings remain.
6. Assemble the core event table.
- Exit on assembly failure.
7. Build semantic layer tables.
- Exit on semantic failure.
8. Run pre-publish semantic integrity gate.
- Exit if gate fails.
9. Exit process with success code.
"""

run_context = RunContext.create()
run_context.initialize_directories()

# Create raw snapshot at runtime
snapshot_raw(run_context)

report_validation_initial = []
initiliaze_metadata(run_context)

# Initial validation
validation_initial = apply_validation(run_context)
report_validation_initial.append(validation_initial)

persist_json(
run_context.logs_path / "validation_initial.json",
{
"run_id": run_context.run_id,
"report": report_validation_initial,
"report": validation_initial,
},
)

# Early exit for structural errors else apply contract
if validation_initial["errors"]:
finalize_run(run_context, "FAILED")
sys.exit(1)

report_contract = []
Expand Down Expand Up @@ -90,60 +169,68 @@ def main() -> None:
},
)

report_validation_post_contract = []

# Rerun validation on CONTRACTED data
validation_post_contract = apply_validation(
run_context,
base_path=run_context.contracted_path,
)

report_validation_post_contract.append(validation_post_contract)

persist_json(
run_context.logs_path / "validation_post_contract.json",
{
"run_id": run_context.run_id,
"report": report_validation_post_contract,
"report": validation_post_contract,
},
)

# Intervention: Either manual fixing or escalate the data to source owner
if validation_post_contract["errors"] or validation_post_contract["warnings"]:
finalize_run(run_context, "FAILED")
sys.exit(1)

report_assemble = []

# Assemble event table
assemble = assemble_events(run_context)
report_assemble.append(assemble)

persist_json(
run_context.logs_path / "assemble_report.json",
{
"run_id": run_context.run_id,
"report": report_assemble,
"report": assemble,
},
)

if assemble["status"] == "failed":
finalize_run(run_context, "FAILED")
sys.exit(1)

report_semantic = []

# Semantic modeling
semantic = build_semantic_layer(run_context)
report_semantic.append(semantic)

persist_json(
run_context.logs_path / "semantic_report.json",
{
"run_id": run_context.run_id,
"report": report_semantic,
"report": semantic,
},
)

if semantic["status"] == "failed":
finalize_run(run_context, "FAILED")
sys.exit(1)

# Pre-publish semantic integrity validation
gate = run_integrity_gate(run_context)

persist_json(
run_context.logs_path / "publish_integrity_report.json",
{
"run_id": run_context.run_id,
"report": gate,
},
)

if gate["status"] == "failed":
finalize_run(run_context, "FAILED")
sys.exit(1)

sys.exit(0)
Expand Down
99 changes: 99 additions & 0 deletions data_pipeline/shared/table_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# TABLE CONFIGURATIONS
# =============================================================================

# ------------------------------------------------------------
# CONFIGURATIONS FOR validate_raw_data.py
# ------------------------------------------------------------

TABLE_CONFIG = {
"df_orders": {
"role": "event_fact",
Expand Down Expand Up @@ -81,3 +85,98 @@
"order_delivered_timestamp": "%Y-%m-%d %H:%M:%S",
"order_estimated_delivery_date": "%Y-%m-%d",
}


# ------------------------------------------------------------
# CONFIGURATIONS FOR assemble_validate_events.py
# ------------------------------------------------------------

# Assemble events enforced schema and dtypes
ASSEMBLE_ENFORCED_SCHEMA = [
"order_id",
"order_revenue",
"seller_id",
"product_id",
"order_status",
"order_purchase_timestamp",
"order_approved_at",
"order_delivered_timestamp",
"lead_time_days",
"approval_lag_days",
"delivery_delay_days",
"order_date",
"order_year",
"order_year_week",
"run_id",
]

ASSEMBLE_ENFORCED_DTYPES = {
"order_id": "string",
"order_revenue": "float64",
"seller_id": "string",
"product_id": "string",
"order_status": "string",
"order_purchase_timestamp": "datetime64[ns]",
"order_approved_at": "datetime64[ns]",
"order_delivered_timestamp": "datetime64[ns]",
"lead_time_days": "int64",
"approval_lag_days": "int64",
"delivery_delay_days": "int64",
"order_date": "datetime64[ns]",
"order_year": "int64",
}


# ------------------------------------------------------------
# CONFIGURATIONS FOR build_bi_semantic_layer.py
# ------------------------------------------------------------


# Seller dimension enforced schema and dtypes
SELLER_DIM_ENFORCED_SCHEMA = [
"seller_id",
"first_order_date",
"first_order_year_week",
"run_id",
]

SELLER_DIM_ENFORCED_DTYPES = {
"seller_id": "string",
"first_order_date": "datetime64[ns]",
"first_order_year_week": "string",
"run_id": "string",
}


# Seller Facts enforced schema and dtypes
SELLER_FACT_ENFORCED_SCHEMA = [
"seller_id",
"order_year_week",
"week_start_date",
"run_id",
"weekly_order_count",
"weekly_delivered_orders",
"weekly_cancelled_orders",
"weekly_revenue",
"weekly_avg_lead_time",
"weekly_total_lead_time",
"weekly_avg_delivery_delay",
"weekly_total_delivery_delay",
"weekly_avg_approval_lag",
]

SELLER_FACT_ENFORCED_DTYPES = {
"seller_id": "string",
"order_year_week": "string",
"week_start_date": "datetime64[ns]",
"run_id": "string",
"weekly_order_count": "int64",
"weekly_delivered_orders": "int64",
"weekly_cancelled_orders": "int64",
"weekly_revenue": "float64",
"weekly_avg_lead_time": "float64",
"weekly_total_lead_time": "int64",
"weekly_avg_delivery_delay": "float64",
"weekly_total_delivery_delay": "int64",
"weekly_avg_approval_lag": "float64",
}
44 changes: 7 additions & 37 deletions data_pipeline/stages/assemble_validated_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import pandas as pd
from typing import Dict, List
from data_pipeline.shared.run_context import RunContext
from data_pipeline.shared.table_configs import (
ASSEMBLE_ENFORCED_SCHEMA,
ASSEMBLE_ENFORCED_DTYPES,
)
from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file

EVENT_TABLES = ["df_orders", "df_order_items", "df_payments"]
Expand Down Expand Up @@ -136,46 +140,12 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:
- Resets index to produce a clean output frame
"""

ENFORCED_SCHEMA = [
"order_id",
"order_revenue",
"seller_id",
"product_id",
"order_status",
"order_purchase_timestamp",
"order_approved_at",
"order_delivered_timestamp",
"lead_time_days",
"approval_lag_days",
"delivery_delay_days",
"order_date",
"order_year",
"order_year_week",
"run_id",
]

ENFORCED_DTYPES = {
"order_id": "string",
"order_revenue": "float64",
"seller_id": "string",
"product_id": "string",
"order_status": "string",
"order_purchase_timestamp": "datetime64[ns]",
"order_approved_at": "datetime64[ns]",
"order_delivered_timestamp": "datetime64[ns]",
"lead_time_days": "int64",
"approval_lag_days": "int64",
"delivery_delay_days": "int64",
"order_date": "datetime64[ns]",
"order_year": "int64",
}

missing_cols = set(ENFORCED_SCHEMA) - set(df.columns)
missing_cols = set(ASSEMBLE_ENFORCED_SCHEMA) - set(df.columns)
if missing_cols:
raise RuntimeError(f"missing required columns: {sorted(missing_cols)}")

df_contract = df[ENFORCED_SCHEMA].copy()
df_contract = df_contract.astype(ENFORCED_DTYPES)
df_contract = df[ASSEMBLE_ENFORCED_SCHEMA].copy()
df_contract = df_contract.astype(ASSEMBLE_ENFORCED_DTYPES)
df_contract = df_contract.sort_values("order_id").reset_index(drop=True)

return df_contract
Expand Down
Loading