Skip to content

Commit 682feaf

Browse files
committed
test: Add assemble stage coverage and refactor tests to use run-scoped paths
1 parent a0cf22b commit 682feaf

4 files changed

Lines changed: 493 additions & 72 deletions

File tree

data_pipeline/stages/assemble_validated_events.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
def init_report():
23-
return {"status": "success", "error": [], "info": []}
23+
return {"status": "success", "errors": [], "info": []}
2424

2525

2626
def log_info(message: str, report: Dict[str, List[str]]) -> None:
@@ -30,7 +30,7 @@ def log_info(message: str, report: Dict[str, List[str]]) -> None:
3030

3131
def log_error(message: str, report: Dict[str, list[str]]) -> None:
3232
print(f"[ERROR] {message}")
33-
report["error"].append(message)
33+
report["errors"].append(message)
3434

3535

3636
# ------------------------------------------------------------
@@ -153,12 +153,22 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:
153153

154154
ENFORCED_DTYPES = {
155155
"order_id": "string",
156-
"order_year": "int64",
156+
"seller_id": "string",
157+
"product_id": "string",
158+
"order_status": "string",
159+
"order_purchase_timestamp": "datetime64[ns]",
160+
"order_approved_at": "datetime64[ns]",
161+
"order_delivered_timestamp": "datetime64[ns]",
157162
"lead_time_days": "int64",
158163
"approval_lag_days": "int64",
159164
"delivery_delay_days": "int64",
165+
"order_year": "int64",
160166
}
161167

168+
missing_cols = set(ENFORCED_SCHEMA) - set(df.columns)
169+
if missing_cols:
170+
raise RuntimeError(f"missing required columns: {sorted(missing_cols)}")
171+
162172
df_contract = df[ENFORCED_SCHEMA].copy()
163173
df_contract = df_contract.astype(ENFORCED_DTYPES)
164174
df_contract = df_contract.sort_values("order_id").reset_index(drop=True)

data_pipeline/tests/stages/test_apply_raw_data_contract.py

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import pandas as pd
66
import pytest
7-
from shutil import copytree
87

98
from data_pipeline.shared.run_context import RunContext
109
from data_pipeline.stages.apply_raw_data_contract import (
@@ -181,9 +180,6 @@ def test_remove_impossible_timestamps_drops_invalid_rows(invalid_temporal_order_
181180

182181
def test_apply_contract_event_fact_success(tmp_path):
183182

184-
raw_dir = tmp_path / "raw"
185-
raw_dir.mkdir()
186-
187183
df = pd.DataFrame(
188184
{
189185
"order_id": [1, 2, 3, 4],
@@ -214,12 +210,10 @@ def test_apply_contract_event_fact_success(tmp_path):
214210
}
215211
)
216212

217-
df.to_csv(raw_dir / "df_orders_2026_01.csv", index=False)
218-
219213
run_context = RunContext.create(base_path=tmp_path)
220214
run_context.initialize_directories()
221215

222-
copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
216+
df.to_csv(run_context.raw_snapshot_path / "df_orders_2026_01.csv", index=False)
223217

224218
report, _ = apply_contract(run_context, "df_orders")
225219

@@ -246,9 +240,6 @@ def test_apply_contract_unknown_table(tmp_path):
246240

247241
def test_apply_contract_duplicate_on_entity_reference(tmp_path):
248242

249-
raw_dir = tmp_path / "raw"
250-
raw_dir.mkdir()
251-
252243
df = pd.DataFrame(
253244
{
254245
"customer_id": [1, 1, 3], # 1 exact duplicate
@@ -258,12 +249,10 @@ def test_apply_contract_duplicate_on_entity_reference(tmp_path):
258249
}
259250
)
260251

261-
df.to_csv(raw_dir / "df_customers_2026_01.csv", index=False)
262-
263252
run_context = RunContext.create(base_path=tmp_path)
264253
run_context.initialize_directories()
265254

266-
copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
255+
df.to_csv(run_context.raw_snapshot_path / "df_customers_2026_01.csv", index=False)
267256

268257
report, _ = apply_contract(run_context, "df_customers")
269258

@@ -274,9 +263,6 @@ def test_apply_contract_duplicate_on_entity_reference(tmp_path):
274263

275264
def test_apply_contract_duplicate_on_transactional_detail(tmp_path):
276265

277-
raw_dir = tmp_path / "raw"
278-
raw_dir.mkdir()
279-
280266
df = pd.DataFrame(
281267
{
282268
"order_id": [
@@ -292,12 +278,10 @@ def test_apply_contract_duplicate_on_transactional_detail(tmp_path):
292278
}
293279
)
294280

295-
df.to_csv(raw_dir / "df_payments_2026_01.csv", index=False)
296-
297281
run_context = RunContext.create(base_path=tmp_path)
298282
run_context.initialize_directories()
299283

300-
copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
284+
df.to_csv(run_context.raw_snapshot_path / "df_payments_2026_01.csv", index=False)
301285

302286
report, _ = apply_contract(run_context, "df_payments")
303287

@@ -308,9 +292,6 @@ def test_apply_contract_duplicate_on_transactional_detail(tmp_path):
308292

309293
def test_apply_contract_cascade_drop_with_order_id(tmp_path):
310294

311-
raw_dir = tmp_path / "raw"
312-
raw_dir.mkdir()
313-
314295
df_order = pd.DataFrame(
315296
{
316297
"order_id": ["o1", "o2", "o3"],
@@ -355,14 +336,21 @@ def test_apply_contract_cascade_drop_with_order_id(tmp_path):
355336
}
356337
)
357338

358-
df_order.to_csv(raw_dir / "df_orders_2026_01.csv", index=False)
359-
df_payments.to_csv(raw_dir / "df_payments_2026_01.csv", index=False)
360-
df_order_items.to_csv(raw_dir / "df_order_items_2026_01.csv", index=False)
361-
362339
run_context = RunContext.create(base_path=tmp_path)
363340
run_context.initialize_directories()
364341

365-
copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True)
342+
df_order.to_csv(
343+
run_context.raw_snapshot_path / "df_orders_2026_01.csv",
344+
index=False,
345+
)
346+
df_payments.to_csv(
347+
run_context.raw_snapshot_path / "df_payments_2026_01.csv",
348+
index=False,
349+
)
350+
df_order_items.to_csv(
351+
run_context.raw_snapshot_path / "df_order_items_2026_01.csv",
352+
index=False,
353+
)
366354

367355
invalid_ids = set()
368356

0 commit comments

Comments
 (0)