Skip to content

Commit fa8cf50

Browse files
committed
test: Update test suites to match refactoring
1 parent f833843 commit fa8cf50

7 files changed

Lines changed: 442 additions & 130 deletions

data_pipeline/run_pipeline.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,23 @@ def initiliaze_metadata(run_context: RunContext) -> None:
5555
establish lifecycle tracking and publish eligibility state.
5656
"""
5757

58+
run_dt = dt.strptime(run_context.run_id[:15], "%Y%m%dT%H%M%S")
59+
5860
payload = {
5961
"run_id": run_context.run_id,
6062
"status": "RUNNING",
6163
"started_at": dt.utcnow().isoformat(),
64+
"run_year": run_dt.year,
65+
"run_month": run_dt.month,
66+
"run_week_of_month": (run_dt.day - 1) // 7 + 1,
6267
"completed_at": None,
6368
"published": False,
6469
}
6570

6671
persist_json(run_context.metadata_path, payload)
6772

6873

69-
def finalize_run(run_context: RunContext, status: str) -> None:
74+
def finalize_metadata(run_context: RunContext, status: str) -> None:
7075
"""
7176
Run metadata finalizer.
7277
@@ -146,7 +151,7 @@ def main() -> None:
146151

147152
# Early exit for structural errors else apply contract
148153
if validation_initial["errors"]:
149-
finalize_run(run_context, "FAILED")
154+
finalize_metadata(run_context, "FAILED")
150155
sys.exit(1)
151156

152157
report_contract = []
@@ -191,7 +196,7 @@ def main() -> None:
191196

192197
# Intervention: Either manual fixing or escalate the data to source owner
193198
if validation_post_contract["errors"] or validation_post_contract["warnings"]:
194-
finalize_run(run_context, "FAILED")
199+
finalize_metadata(run_context, "FAILED")
195200
sys.exit(1)
196201

197202
# Assemble event table
@@ -206,7 +211,7 @@ def main() -> None:
206211
)
207212

208213
if assemble["status"] == "failed":
209-
finalize_run(run_context, "FAILED")
214+
finalize_metadata(run_context, "FAILED")
210215
sys.exit(1)
211216

212217
# Semantic modeling
@@ -221,7 +226,7 @@ def main() -> None:
221226
)
222227

223228
if semantic["status"] == "failed":
224-
finalize_run(run_context, "FAILED")
229+
finalize_metadata(run_context, "FAILED")
225230
sys.exit(1)
226231

227232
# Pre-publish semantic validation
@@ -236,10 +241,10 @@ def main() -> None:
236241
)
237242

238243
if publish["status"] == "failed":
239-
finalize_run(run_context, "FAILED")
244+
finalize_metadata(run_context, "FAILED")
240245
sys.exit(1)
241246

242-
finalize_run(run_context, "SUCCESS")
247+
finalize_metadata(run_context, "SUCCESS")
243248
sys.exit(0)
244249

245250

data_pipeline/stages/build_bi_semantic_layer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def build_seller_semantic(df: pd.DataFrame, run_context: RunContext) -> Dict:
107107
)
108108

109109
seller_semantic = {
110-
"seller_week_performance_fact": seller_weekly_fact,
110+
"seller_weekly_fact": seller_weekly_fact,
111111
"seller_dim": seller_dim,
112112
}
113113

@@ -232,7 +232,8 @@ def build_product_semantic(df: pd.DataFrame, run_context: RunContext) -> Dict:
232232
read_assembled["is_cancelled"] = read_assembled["order_status"].eq("cancelled")
233233

234234
product_weekly_fact = read_assembled.groupby(
235-
["product_id", "order_year_week"], as_index=False
235+
["product_id", "order_year_week"],
236+
as_index=False,
236237
).agg(
237238
week_start_date=("week_start_date", "min"),
238239
run_id=("run_id", "first"),
@@ -284,7 +285,7 @@ def build_product_semantic(df: pd.DataFrame, run_context: RunContext) -> Dict:
284285
"seller_semantic": {
285286
"builder": build_seller_semantic,
286287
"tables": {
287-
"seller_week_performance_fact": {
288+
"seller_weekly_fact": {
288289
"type": "fact",
289290
"grain": ["seller_id", "order_year_week"],
290291
"schema": SELLER_FACT_SCHEMA,

data_pipeline/stages/publish_lifecycle.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def run_integrity_gate(run_context: RunContext) -> Dict:
9393
actual_files = {file.name for file in module_path.glob("*.parquet")}
9494

9595
if actual_files != expected_files:
96-
log_error("Semantic file set mismatch", report)
96+
log_error(f"Semantic file set mismatch on {module_name}", report)
9797
report["status"] = "failed"
9898

9999
return report
@@ -214,9 +214,14 @@ def activate_published_version(run_context: RunContext) -> Dict:
214214

215215
tmp_path = latest_path.with_suffix(".tmp")
216216

217+
run_dt = dt.strptime(run_context.run_id[:15], "%Y%m%dT%H%M%S")
218+
217219
payload = {
218220
"run_id": run_context.run_id,
219221
"version": f"v{run_context.run_id}",
222+
"run_year": run_dt.year,
223+
"run_month": run_dt.month,
224+
"run_week_of_month": (run_dt.day - 1) // 7 + 1,
220225
"published_at": dt.utcnow().isoformat(),
221226
}
222227

tests/stages/test_assemble_validated_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def valid_derived_df():
8484
dtype="string",
8585
),
8686
"seller_id": pd.Series(["seller1", "seller2"], dtype="string"),
87+
"customer_id": pd.Series(["customer1", "customer2"], dtype="string"),
8788
"order_revenue": pd.Series([12.34, 56.78], dtype="float64"),
8889
"product_id": pd.Series(["prod1", "prod2"], dtype="string"),
8990
"order_status": pd.Series(["delivered", "cancelled"], dtype="string"),
@@ -246,6 +247,7 @@ def test_freeze_schema_enforces_strict_schema_success(valid_derived_df):
246247
"order_id": "string",
247248
"order_revenue": "float64",
248249
"seller_id": "string",
250+
"customer_id": "string",
249251
"product_id": "string",
250252
"order_status": "string",
251253
"order_purchase_timestamp": "datetime64[ns]",

tests/stages/test_build_bi_semantic_layer.py

Lines changed: 77 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,32 @@ def empty_report():
2121
return init_report()
2222

2323

24+
@pytest.fixture
25+
def valid_customers_df():
26+
return pd.DataFrame(
27+
{
28+
"customer_id": pd.Series(["customer1", "customer2"], dtype="string"),
29+
"customer_zip_code_prefix": pd.Series(["zip1", "zip2"], dtype="string"),
30+
"customer_city": pd.Series(["city1", "city2"], dtype="string"),
31+
"customer_state": pd.Series(["state1", "state2"], dtype="string"),
32+
}
33+
)
34+
35+
36+
@pytest.fixture
37+
def valid_products_df():
38+
return pd.DataFrame(
39+
{
40+
"product_id": pd.Series(["prod1", "prod2"], dtype="string"),
41+
"product_category_name": pd.Series(["categ1", "categ2"], dtype="string"),
42+
"product_weight_g": pd.Series([491, 500], dtype="float64"),
43+
"product_length_cm": pd.Series([19.0, 20.0], dtype="float64"),
44+
"product_height_cm": pd.Series([12.0, 13.0], dtype="float64"),
45+
"product_width_cm": pd.Series([16.0, 15.0], dtype="float64"),
46+
}
47+
)
48+
49+
2450
@pytest.fixture
2551
def valid_assembled_df():
2652
return pd.DataFrame(
@@ -30,6 +56,7 @@ def valid_assembled_df():
3056
dtype="string",
3157
),
3258
"seller_id": pd.Series(["seller1", "seller2"], dtype="string"),
59+
"customer_id": pd.Series(["customer1", "customer2"], dtype="string"),
3360
"order_revenue": pd.Series([12.34, 56.78], dtype="float64"),
3461
"product_id": pd.Series(["prod1", "prod2"], dtype="string"),
3562
"order_status": pd.Series(["delivered", "cancelled"], dtype="string"),
@@ -139,9 +166,11 @@ def test_log_info_appends_only_to_info(empty_report):
139166
# =============================================================================
140167

141168

142-
def test_seller_semantic_model_grain_preserved_success(valid_assembled_df):
169+
def test_seller_semantic_model_grain_preserved_success(tmp_path, valid_assembled_df):
170+
171+
run_context = RunContext.create(base_path=tmp_path)
143172

144-
seller_semantic = build_seller_semantic(valid_assembled_df)
173+
seller_semantic = build_seller_semantic(valid_assembled_df, run_context)
145174
expected = (
146175
valid_assembled_df[["seller_id", "order_year_week"]].drop_duplicates().shape[0]
147176
)
@@ -156,21 +185,28 @@ def test_seller_semantic_model_grain_preserved_success(valid_assembled_df):
156185
)
157186

158187

159-
def test_seller_semantic_fails_on_multiple_run_ids(valid_assembled_df):
188+
def test_seller_semantic_fails_on_multiple_run_ids(tmp_path, valid_assembled_df):
189+
190+
run_context = RunContext.create(base_path=tmp_path)
160191

161192
broken_df = valid_assembled_df.copy()
162193
broken_df.loc[1, "run_id"] = "another_run"
163194

164195
with pytest.raises(RuntimeError):
165-
build_seller_semantic(broken_df)
196+
build_seller_semantic(broken_df, run_context)
166197

167198

168199
# =============================================================================
169200
# BUILD BI SEMANTIC
170201
# =============================================================================
171202

172203

173-
def test_build_semantic_layer_success(tmp_path, valid_assembled_df):
204+
def test_build_semantic_layer_success(
205+
tmp_path,
206+
valid_assembled_df,
207+
valid_customers_df,
208+
valid_products_df,
209+
):
174210

175211
run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id")
176212
run_context.initialize_directories()
@@ -179,23 +215,27 @@ def test_build_semantic_layer_success(tmp_path, valid_assembled_df):
179215
run_context.assembled_path / "assembled_events_2023_01.parquet"
180216
)
181217

182-
report = build_semantic_layer(run_context)
218+
valid_customers_df.to_parquet(
219+
run_context.contracted_path / "df_customers_contracted.parquet"
220+
)
183221

184-
for module in SEMANTIC_MODULES:
222+
valid_products_df.to_parquet(
223+
run_context.contracted_path / "df_products_contracted.parquet"
224+
)
185225

186-
output_path_seller = (
187-
run_context.semantic_path
188-
/ module
189-
/ "seller_week_performance_fact_dumm_y_.parquet"
190-
)
226+
report = build_semantic_layer(run_context)
191227

192-
output_path_dim = (
193-
run_context.semantic_path / module / "seller_dim_dumm_y_.parquet"
194-
)
228+
for module_name, module in SEMANTIC_MODULES.items():
229+
for table_name in module["tables"]:
195230

196-
assert report["status"] == "success"
197-
assert output_path_seller.exists()
198-
assert output_path_dim.exists()
231+
outputs_path = (
232+
run_context.semantic_path
233+
/ module_name
234+
/ f"{table_name}_dumm_y_.parquet"
235+
)
236+
237+
assert report["status"] == "success"
238+
assert outputs_path.exists()
199239

200240

201241
def test_build_semantic_layer_fails_on_multiple_ids(tmp_path, valid_assembled_df):
@@ -242,6 +282,25 @@ def test_build_semantic_layer_fails_on_missing_columns(tmp_path, valid_assembled
242282
assert any("approval_lag_days" in error for error in module_error)
243283

244284

285+
def test_build_semantic_layer_fails_on_missing_or_empty_df(tmp_path):
286+
287+
empty_df = pd.DataFrame()
288+
289+
run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id")
290+
run_context.initialize_directories()
291+
292+
empty_df.to_parquet(run_context.assembled_path / "assembled_events_2023_01.parquet")
293+
294+
report = build_semantic_layer(run_context)
295+
296+
assert report["status"] == "failed"
297+
assert report["failed_step"] == "load_tables"
298+
299+
load_error = report["steps"]["load_tables"]["errors"]
300+
301+
assert any("missing or empty" in error for error in load_error)
302+
303+
245304
# =============================================================================
246305
# UNIT TESTS END
247306
# =============================================================================

0 commit comments

Comments
 (0)