Skip to content

Commit 24d7da5

Browse files
committed
feat: Implement seller-week performance semantic layer (fact + dimension)
1 parent 83356c1 commit 24d7da5

7 files changed

Lines changed: 329 additions & 32 deletions

File tree

.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
omit =
33
data_pipeline/prototypes/*
44
data_pipeline/shared/table_configs.py
5+
data_pipeline/stages/build_bi_semantic_layer.py
56
*/__init__.py

data_pipeline/run_pipeline.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from data_pipeline.stages.validate_raw_data import apply_validation
1313
from data_pipeline.stages.apply_raw_data_contract import apply_contract
1414
from data_pipeline.stages.assemble_validated_events import assemble_events
15+
from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer
1516

1617

1718
def snapshot_raw(run_context: RunContext) -> None:
@@ -45,22 +46,22 @@ def main() -> None:
4546
# Create raw snapshot at runtime
4647
snapshot_raw(run_context)
4748

48-
report_validation_1 = []
49+
report_validation_initial = []
4950

5051
# Initial validation
51-
validation_1 = apply_validation(run_context)
52-
report_validation_1.append(validation_1)
52+
validation_initial = apply_validation(run_context)
53+
report_validation_initial.append(validation_initial)
5354

5455
persist_json(
55-
run_context.logs_path / "validation_1.json",
56+
run_context.logs_path / "validation_initial.json",
5657
{
5758
"run_id": run_context.run_id,
58-
"report": report_validation_1,
59+
"report": report_validation_initial,
5960
},
6061
)
6162

6263
# Early exit for structural errors else apply contract
63-
if validation_1["errors"]:
64+
if validation_initial["errors"]:
6465
sys.exit(1)
6566

6667
report_contract = []
@@ -89,26 +90,26 @@ def main() -> None:
8990
},
9091
)
9192

92-
report_validation_2 = []
93+
report_validation_post_contract = []
9394

9495
# Rerun validation on CONTRACTED data
95-
validation_2 = apply_validation(
96+
validation_post_contract = apply_validation(
9697
run_context,
9798
base_path=run_context.contracted_path,
9899
)
99100

100-
report_validation_2.append(validation_2)
101+
report_validation_post_contract.append(validation_post_contract)
101102

102103
persist_json(
103-
run_context.logs_path / "validation_2.json",
104+
run_context.logs_path / "validation_post_contract.json",
104105
{
105106
"run_id": run_context.run_id,
106-
"report": report_validation_2,
107+
"report": report_validation_post_contract,
107108
},
108109
)
109110

110111
# Intervention: Either manual fixing or escalate the data to source owner
111-
if validation_2["errors"] or validation_2["warnings"]:
112+
if validation_post_contract["errors"] or validation_post_contract["warnings"]:
112113
sys.exit(1)
113114

114115
report_assemble = []
@@ -128,6 +129,23 @@ def main() -> None:
128129
if assemble["status"] == "failed":
129130
sys.exit(1)
130131

132+
report_semantic = []
133+
134+
# Semantic modeling
135+
semantic = build_semantic_layer(run_context)
136+
report_semantic.append(semantic)
137+
138+
persist_json(
139+
run_context.logs_path / "semantic_report.json",
140+
{
141+
"run_id": run_context.run_id,
142+
"report": report_semantic,
143+
},
144+
)
145+
146+
if semantic["status"] == "failed":
147+
sys.exit(1)
148+
131149
sys.exit(0)
132150

133151

data_pipeline/shared/raw_loader_exporter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ def load_logical_table(
7070
df = loader(file_path)
7171

7272
if log_info:
73-
log_info(f"loaded {file_path.name} ({len(df)} rows)")
73+
log_info(f"Loaded: {file_path.name} ({len(df)} rows)")
7474

7575
dfs.append(df)
7676

7777
except Exception as e:
7878
if log_error:
79-
log_error(f"failed loading {file_path.name}: {e}")
79+
log_error(f"Failed loading: {file_path.name}: {e}")
8080

8181
if not dfs:
8282
if log_error:

data_pipeline/stages/assemble_validated_events.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:
166166
"lead_time_days": "int64",
167167
"approval_lag_days": "int64",
168168
"delivery_delay_days": "int64",
169+
"order_date": "datetime64[ns]",
169170
"order_year": "int64",
170171
}
171172

@@ -242,13 +243,20 @@ def error(msg):
242243

243244
return report
244245

245-
output_path = run_context.assembled_path / "assembled_events.parquet"
246+
year = run_context.run_id[:4]
247+
month = run_context.run_id[4:6]
248+
249+
output_path = (
250+
run_context.assembled_path / f"assembled_events_{year}_{month}.parquet"
251+
)
246252

247253
if not export_file(df_contract, output_path):
248254
error("Export failed")
249255
report["status"] = "failed"
250256

251-
info(f"Export success: assembled_events.parquet ({len(df_contract)} rows)")
257+
info(
258+
f"Export success: assembled_events_{year}_{month}.parquet ({len(df_contract)} rows)"
259+
)
252260
return report
253261

254262

0 commit comments

Comments
 (0)