Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"python.testing.pytestArgs": [
"data_pipeline"
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
Expand Down
16 changes: 13 additions & 3 deletions data_pipeline/stages/assemble_validated_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


def init_report():
return {"status": "success", "error": [], "info": []}
return {"status": "success", "errors": [], "info": []}


def log_info(message: str, report: Dict[str, List[str]]) -> None:
Expand All @@ -30,7 +30,7 @@ def log_info(message: str, report: Dict[str, List[str]]) -> None:

def log_error(message: str, report: Dict[str, list[str]]) -> None:
print(f"[ERROR] {message}")
report["error"].append(message)
report["errors"].append(message)


# ------------------------------------------------------------
Expand Down Expand Up @@ -153,12 +153,22 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:

ENFORCED_DTYPES = {
"order_id": "string",
"order_year": "int64",
"seller_id": "string",
"product_id": "string",
"order_status": "string",
"order_purchase_timestamp": "datetime64[ns]",
"order_approved_at": "datetime64[ns]",
"order_delivered_timestamp": "datetime64[ns]",
"lead_time_days": "int64",
"approval_lag_days": "int64",
"delivery_delay_days": "int64",
"order_year": "int64",
}

missing_cols = set(ENFORCED_SCHEMA) - set(df.columns)
if missing_cols:
raise RuntimeError(f"missing required columns: {sorted(missing_cols)}")

df_contract = df[ENFORCED_SCHEMA].copy()
df_contract = df_contract.astype(ENFORCED_DTYPES)
df_contract = df_contract.sort_values("order_id").reset_index(drop=True)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
line-length = 88

[tool.pytest.ini_options]
testpaths = ["data_pipeline/tests"]
testpaths = ["tests"]

[tool.ruff]
ignore = ["E501"]
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pandas as pd
import pytest
from shutil import copytree

from data_pipeline.shared.run_context import RunContext
from data_pipeline.stages.apply_raw_data_contract import (
Expand Down Expand Up @@ -181,9 +180,6 @@ def test_remove_impossible_timestamps_drops_invalid_rows(invalid_temporal_order_

def test_apply_contract_event_fact_success(tmp_path):

raw_dir = tmp_path / "raw"
raw_dir.mkdir()

df = pd.DataFrame(
{
"order_id": [1, 2, 3, 4],
Expand Down Expand Up @@ -214,12 +210,10 @@ def test_apply_contract_event_fact_success(tmp_path):
}
)

df.to_csv(raw_dir / "df_orders_2026_01.csv", index=False)

run_context = RunContext.create(base_path=tmp_path)
run_context.initialize_directories()

copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
df.to_csv(run_context.raw_snapshot_path / "df_orders_2026_01.csv", index=False)

report, _ = apply_contract(run_context, "df_orders")

Expand All @@ -246,9 +240,6 @@ def test_apply_contract_unknown_table(tmp_path):

def test_apply_contract_duplicate_on_entity_reference(tmp_path):

raw_dir = tmp_path / "raw"
raw_dir.mkdir()

df = pd.DataFrame(
{
"customer_id": [1, 1, 3], # 1 exact duplicate
Expand All @@ -258,12 +249,10 @@ def test_apply_contract_duplicate_on_entity_reference(tmp_path):
}
)

df.to_csv(raw_dir / "df_customers_2026_01.csv", index=False)

run_context = RunContext.create(base_path=tmp_path)
run_context.initialize_directories()

copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
df.to_csv(run_context.raw_snapshot_path / "df_customers_2026_01.csv", index=False)

report, _ = apply_contract(run_context, "df_customers")

Expand All @@ -274,9 +263,6 @@ def test_apply_contract_duplicate_on_entity_reference(tmp_path):

def test_apply_contract_duplicate_on_transactional_detail(tmp_path):

raw_dir = tmp_path / "raw"
raw_dir.mkdir()

df = pd.DataFrame(
{
"order_id": [
Expand All @@ -292,12 +278,10 @@ def test_apply_contract_duplicate_on_transactional_detail(tmp_path):
}
)

df.to_csv(raw_dir / "df_payments_2026_01.csv", index=False)

run_context = RunContext.create(base_path=tmp_path)
run_context.initialize_directories()

copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
df.to_csv(run_context.raw_snapshot_path / "df_payments_2026_01.csv", index=False)

report, _ = apply_contract(run_context, "df_payments")

Expand All @@ -308,9 +292,6 @@ def test_apply_contract_duplicate_on_transactional_detail(tmp_path):

def test_apply_contract_cascade_drop_with_order_id(tmp_path):

raw_dir = tmp_path / "raw"
raw_dir.mkdir()

df_order = pd.DataFrame(
{
"order_id": ["o1", "o2", "o3"],
Expand Down Expand Up @@ -355,14 +336,21 @@ def test_apply_contract_cascade_drop_with_order_id(tmp_path):
}
)

df_order.to_csv(raw_dir / "df_orders_2026_01.csv", index=False)
df_payments.to_csv(raw_dir / "df_payments_2026_01.csv", index=False)
df_order_items.to_csv(raw_dir / "df_order_items_2026_01.csv", index=False)

run_context = RunContext.create(base_path=tmp_path)
run_context.initialize_directories()

copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
df_order.to_csv(
run_context.raw_snapshot_path / "df_orders_2026_01.csv",
index=False,
)
df_payments.to_csv(
run_context.raw_snapshot_path / "df_payments_2026_01.csv",
index=False,
)
df_order_items.to_csv(
run_context.raw_snapshot_path / "df_order_items_2026_01.csv",
index=False,
)

invalid_ids = set()

Expand Down
Loading