Skip to content

Commit d516489

Browse files
authored
feat: Add semantic stage run-scoped IO wiring
1 parent abe3f6a commit d516489

3 files changed

Lines changed: 91 additions & 6 deletions

File tree

data_pipeline/stages/assemble_validated_events.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,14 @@ def error(msg):
217217
for table_name in EVENT_TABLES:
218218

219219
df = load_logical_table(
220-
contracted_path, table_name, log_info=info, log_error=error
220+
contracted_path,
221+
table_name,
222+
log_info=info,
223+
log_error=error,
221224
)
222225

223226
if df is None:
224-
log_error(f"{table_name}: dataset is empty", report)
227+
error(f"{table_name}: dataset is empty")
225228
report["status"] = "failed"
226229

227230
return report
@@ -234,17 +237,18 @@ def error(msg):
234237
df_contract = freeze_schema(df_assembled)
235238

236239
except Exception as e:
237-
log_error(str(e), report)
240+
error(str(e))
238241
report["status"] = "failed"
239242

240243
return report
241244

242245
output_path = run_context.assembled_path / "assembled_events.parquet"
243246

244247
if not export_file(df_contract, output_path):
245-
log_error("Export failed", report)
248+
error("Export failed")
246249
report["status"] = "failed"
247250

251+
info(f"Export success: assembled_events.parquet ({len(df_contract)} rows)")
248252
return report
249253

250254

data_pipeline/stages/build_bi_semantic_layer.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,87 @@
55
# - Define and lock analytical grains for consistent aggregation and reporting
66
# - Enforce referential integrity between fact and dimension tables
77

8+
import pandas as pd
9+
from typing import Dict, List
10+
from data_pipeline.shared.run_context import RunContext
11+
from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file
12+
13+
14+
# ------------------------------------------------------------
15+
# SEMANTIC REPORT & LOGS
16+
# ------------------------------------------------------------
17+
18+
19+
def init_report():
20+
return {"status": "success", "errors": [], "info": []}
21+
22+
23+
def log_info(message: str, report: Dict[str, List[str]]) -> None:
24+
print(f"[INFO] {message}")
25+
report["info"].append(message)
26+
27+
28+
def log_error(message: str, report: Dict[str, List[str]]) -> None:
29+
print(f"[ERROR] {message}")
30+
report["errors"].append(message)
31+
32+
33+
# ------------------------------------------------------------
34+
# SEMANTIC LAYERING AND SCHEMA ENFORCEMENT
35+
# ------------------------------------------------------------
36+
37+
38+
# ------------------------------------------------------------
39+
# BUILD BI SEMANTIC
40+
# ------------------------------------------------------------
41+
42+
43+
def build_semantic_layer(run_context: RunContext) -> Dict:
44+
45+
report = init_report()
46+
47+
def info(msg):
48+
log_info(msg, report)
49+
50+
def error(msg):
51+
log_error(msg, report)
52+
53+
assembled_path = run_context.assembled_path
54+
55+
df = load_logical_table(
56+
assembled_path,
57+
"assembled_events",
58+
log_info=info,
59+
log_error=error,
60+
)
61+
62+
if df is None or df.empty:
63+
error("assembled events is empty")
64+
report["status"] = "failed"
65+
66+
return report
67+
68+
fact_seller = pd.DataFrame()
69+
dim_seller = pd.DataFrame()
70+
71+
fulfillment_tables = {
72+
"seller_week_fulfillment_fact.parquet": fact_seller,
73+
"dim_seller.parquet": dim_seller,
74+
}
75+
76+
for table_name, table in fulfillment_tables.items():
77+
78+
output_path = run_context.semantic_path / table_name
79+
80+
if not export_file(table, output_path):
81+
error(f"{table_name}: Export failed")
82+
report["status"] = "failed"
83+
break
84+
85+
info(f"Export success: {table_name} ({len(table)} rows)")
86+
87+
return report
88+
889

990
# =============================================================================
1091
# END OF SCRIPT

data_pipeline/stages/validate_raw_data.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ def error(msg: str):
364364
df = load_logical_table(base_path, table_name, log_info=info, log_error=error)
365365

366366
if df is None:
367-
log_error(f"{table_name} logical table is missing", report)
367+
error(f"{table_name} logical table is missing")
368368
continue
369369

370370
loaded_table_names.add(table_name)
@@ -385,7 +385,7 @@ def error(msg: str):
385385

386386
missing_tables = sorted(expected_tables - loaded_table_names)
387387
if missing_tables:
388-
log_error(f"missing expected table(s) {missing_tables}", report)
388+
error(f"missing expected table(s) {missing_tables}")
389389

390390
run_cross_table_validations(tables, report)
391391

0 commit comments

Comments
 (0)