Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
omit =
data_pipeline/prototypes/*
data_pipeline/shared/table_configs.py
data_pipeline/stages/build_bi_semantic_layer.py
*/__init__.py
42 changes: 30 additions & 12 deletions data_pipeline/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from data_pipeline.stages.validate_raw_data import apply_validation
from data_pipeline.stages.apply_raw_data_contract import apply_contract
from data_pipeline.stages.assemble_validated_events import assemble_events
from data_pipeline.stages.build_bi_semantic_layer import build_semantic_layer


def snapshot_raw(run_context: RunContext) -> None:
Expand Down Expand Up @@ -45,22 +46,22 @@ def main() -> None:
# Create raw snapshot at runtime
snapshot_raw(run_context)

report_validation_1 = []
report_validation_initial = []

# Initial validation
validation_1 = apply_validation(run_context)
report_validation_1.append(validation_1)
validation_initial = apply_validation(run_context)
report_validation_initial.append(validation_initial)

persist_json(
run_context.logs_path / "validation_1.json",
run_context.logs_path / "validation_initial.json",
{
"run_id": run_context.run_id,
"report": report_validation_1,
"report": report_validation_initial,
},
)

# Early exit for structural errors else apply contract
if validation_1["errors"]:
if validation_initial["errors"]:
sys.exit(1)

report_contract = []
Expand Down Expand Up @@ -89,26 +90,26 @@ def main() -> None:
},
)

report_validation_2 = []
report_validation_post_contract = []

# Rerun validation on CONTRACTED data
validation_2 = apply_validation(
validation_post_contract = apply_validation(
run_context,
base_path=run_context.contracted_path,
)

report_validation_2.append(validation_2)
report_validation_post_contract.append(validation_post_contract)

persist_json(
run_context.logs_path / "validation_2.json",
run_context.logs_path / "validation_post_contract.json",
{
"run_id": run_context.run_id,
"report": report_validation_2,
"report": report_validation_post_contract,
},
)

# Intervention: Either manual fixing or escalate the data to source owner
if validation_2["errors"] or validation_2["warnings"]:
if validation_post_contract["errors"] or validation_post_contract["warnings"]:
sys.exit(1)

report_assemble = []
Expand All @@ -128,6 +129,23 @@ def main() -> None:
if assemble["status"] == "failed":
sys.exit(1)

report_semantic = []

# Semantic modeling
semantic = build_semantic_layer(run_context)
report_semantic.append(semantic)

persist_json(
run_context.logs_path / "semantic_report.json",
{
"run_id": run_context.run_id,
"report": report_semantic,
},
)

if semantic["status"] == "failed":
sys.exit(1)

sys.exit(0)


Expand Down
4 changes: 2 additions & 2 deletions data_pipeline/shared/raw_loader_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ def load_logical_table(
df = loader(file_path)

if log_info:
log_info(f"loaded {file_path.name} ({len(df)} rows)")
log_info(f"Loaded: {file_path.name} ({len(df)} rows)")

dfs.append(df)

except Exception as e:
if log_error:
log_error(f"failed loading {file_path.name}: {e}")
log_error(f"Failed loading: {file_path.name}: {e}")

if not dfs:
if log_error:
Expand Down
12 changes: 10 additions & 2 deletions data_pipeline/stages/assemble_validated_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:
"lead_time_days": "int64",
"approval_lag_days": "int64",
"delivery_delay_days": "int64",
"order_date": "datetime64[ns]",
"order_year": "int64",
}

Expand Down Expand Up @@ -242,13 +243,20 @@ def error(msg):

return report

output_path = run_context.assembled_path / "assembled_events.parquet"
year = run_context.run_id[:4]
month = run_context.run_id[4:6]

output_path = (
run_context.assembled_path / f"assembled_events_{year}_{month}.parquet"
)

if not export_file(df_contract, output_path):
error("Export failed")
report["status"] = "failed"

info(f"Export success: assembled_events.parquet ({len(df_contract)} rows)")
info(
f"Export success: assembled_events_{year}_{month}.parquet ({len(df_contract)} rows)"
)
return report


Expand Down
Loading