Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions data_pipeline/stages/assemble_validated_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,14 @@ def error(msg):
for table_name in EVENT_TABLES:

df = load_logical_table(
contracted_path, table_name, log_info=info, log_error=error
contracted_path,
table_name,
log_info=info,
log_error=error,
)

if df is None:
log_error(f"{table_name}: dataset is empty", report)
error(f"{table_name}: dataset is empty")
report["status"] = "failed"

return report
Expand All @@ -234,17 +237,18 @@ def error(msg):
df_contract = freeze_schema(df_assembled)

except Exception as e:
log_error(str(e), report)
error(str(e))
report["status"] = "failed"

return report

output_path = run_context.assembled_path / "assembled_events.parquet"

if not export_file(df_contract, output_path):
log_error("Export failed", report)
error("Export failed")
report["status"] = "failed"

info(f"Export success: assembled_events.parquet ({len(df_contract)} rows)")
return report


Expand Down
81 changes: 81 additions & 0 deletions data_pipeline/stages/build_bi_semantic_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,87 @@
# - Define and lock analytical grains for consistent aggregation and reporting
# - Enforce referential integrity between fact and dimension tables

import pandas as pd
from typing import Dict, List
from data_pipeline.shared.run_context import RunContext
from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file


# ------------------------------------------------------------
# SEMANTIC REPORT & LOGS
# ------------------------------------------------------------


def init_report():
return {"status": "success", "errors": [], "info": []}


def log_info(message: str, report: Dict[str, List[str]]) -> None:
print(f"[INFO] {message}")
report["info"].append(message)


def log_error(message: str, report: Dict[str, List[str]]) -> None:
print(f"[ERROR] {message}")
report["errors"].append(message)


# ------------------------------------------------------------
# SEMANTIC LAYERING AND SCHEMA ENFORCEMENT
# ------------------------------------------------------------


# ------------------------------------------------------------
# BUILD BI SEMANTIC
# ------------------------------------------------------------


def build_semantic_layer(run_context: RunContext) -> Dict:

report = init_report()

def info(msg):
log_info(msg, report)

def error(msg):
log_error(msg, report)

assembled_path = run_context.assembled_path

df = load_logical_table(
assembled_path,
"assembled_events",
log_info=info,
log_error=error,
)

if df is None or df.empty:
error("assembled events is empty")
report["status"] = "failed"

return report

fact_seller = pd.DataFrame()
dim_seller = pd.DataFrame()

fulfillment_tables = {
"seller_week_fulfillment_fact.parquet": fact_seller,
"dim_seller.parquet": dim_seller,
}

for table_name, table in fulfillment_tables.items():

output_path = run_context.semantic_path / table_name

if not export_file(table, output_path):
error(f"{table_name}: Export failed")
report["status"] = "failed"
break

info(f"Export success: {table_name} ({len(table)} rows)")

return report


# =============================================================================
# END OF SCRIPT
Expand Down
4 changes: 2 additions & 2 deletions data_pipeline/stages/validate_raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def error(msg: str):
df = load_logical_table(base_path, table_name, log_info=info, log_error=error)

if df is None:
log_error(f"{table_name} logical table is missing", report)
error(f"{table_name} logical table is missing")
continue

loaded_table_names.add(table_name)
Expand All @@ -385,7 +385,7 @@ def error(msg: str):

missing_tables = sorted(expected_tables - loaded_table_names)
if missing_tables:
log_error(f"missing expected table(s) {missing_tables}", report)
error(f"missing expected table(s) {missing_tables}")

run_cross_table_validations(tables, report)

Expand Down