Skip to content

Commit 10edea6

Browse files
committed
test: Add semantic modeling stage coverage
1 parent 7f8d32a commit 10edea6

3 files changed

Lines changed: 338 additions & 11 deletions

File tree

.coveragerc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22
omit =
33
data_pipeline/prototypes/*
44
data_pipeline/shared/table_configs.py
5-
data_pipeline/stages/build_bi_semantic_layer.py
65
*/__init__.py

data_pipeline/stages/build_bi_semantic_layer.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,6 @@ def seller_weekly_semantic(
8686
weekly_avg_approval_lag=("approval_lag_days", "mean"),
8787
)
8888

89-
expected = (
90-
read_assembled[["seller_id", "order_year_week"]].drop_duplicates().shape[0]
91-
)
92-
93-
if len(seller_weekly_fact) != expected:
94-
raise RuntimeError("Fact table grain violation: seller_weekly_fact")
95-
9689
seller_dim = read_assembled.groupby(
9790
"seller_id",
9891
as_index=False,
@@ -102,9 +95,6 @@ def seller_weekly_semantic(
10295
run_id=("run_id", "first"),
10396
)
10497

105-
if len(seller_dim) != read_assembled["seller_id"].nunique():
106-
raise RuntimeError("Dimension table grain violation: seller_dim")
107-
10898
return seller_weekly_fact, seller_dim
10999

110100

Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
# =============================================================================
2+
# UNIT TESTS FOR build_bi_sematic_layer.py
3+
# =============================================================================
4+
5+
import pandas as pd
6+
import pytest
7+
8+
from data_pipeline.shared.run_context import RunContext
9+
from data_pipeline.stages.build_bi_semantic_layer import (
10+
init_report,
11+
log_error,
12+
log_info,
13+
seller_weekly_semantic,
14+
freeze_seller_semantic,
15+
build_semantic_layer,
16+
)
17+
18+
19+
@pytest.fixture
20+
def empty_report():
21+
return init_report()
22+
23+
24+
@pytest.fixture
25+
def valid_assembled_df():
26+
return pd.DataFrame(
27+
{
28+
"order_id": pd.Series(
29+
["o1", "o2"],
30+
dtype="string",
31+
),
32+
"seller_id": pd.Series(["seller1", "seller2"], dtype="string"),
33+
"order_revenue": pd.Series([12.34, 56.78], dtype="float64"),
34+
"product_id": pd.Series(["prod1", "prod2"], dtype="string"),
35+
"order_status": pd.Series(["delivered", "cancelled"], dtype="string"),
36+
"order_purchase_timestamp": pd.Series(
37+
[
38+
"2023-01-02 09:00:00",
39+
"2023-01-10 14:00:00",
40+
],
41+
dtype="datetime64[ns]",
42+
),
43+
"order_approved_at": pd.Series(
44+
[
45+
"2023-01-03 09:00:00",
46+
"2023-01-11 14:00:00",
47+
],
48+
dtype="datetime64[ns]",
49+
),
50+
"order_delivered_timestamp": pd.Series(
51+
[
52+
"2023-01-06 09:00:00",
53+
"2023-01-16 14:00:00",
54+
],
55+
dtype="datetime64[ns]",
56+
),
57+
"order_estimated_delivery_date": [
58+
"2023-01-05",
59+
"2023-01-15",
60+
],
61+
"lead_time_days": pd.Series([3, 5], dtype="int64"),
62+
"approval_lag_days": pd.Series([1, 1], dtype="int64"),
63+
"delivery_delay_days": pd.Series([1, 1], dtype="int64"),
64+
"order_date": pd.Series(
65+
["2023-01-02", "2023-01-10"], dtype="datetime64[ns]"
66+
),
67+
"order_year": pd.Series([2023, 2023], dtype="int64"),
68+
"order_year_week": pd.Series(["2023-W01", "2023-W01"], dtype="string"),
69+
"run_id": ["dummy_run_id", "dummy_run_id"],
70+
}
71+
)
72+
73+
74+
@pytest.fixture
75+
def valid_seller_fact():
76+
return pd.DataFrame(
77+
{
78+
"seller_id": pd.Series(["seller1", "seller2"], dtype="string"),
79+
"order_year_week": pd.Series(["2023-W01", "2023-W02"], dtype="string"),
80+
"week_start_date": pd.Series(
81+
["2023-01-02 09:00:00", "2023-01-04 15:00:00"], dtype="datetime64[ns]"
82+
),
83+
"run_id": pd.Series(["run_1", "run_1"], dtype="string"),
84+
"weekly_order_count": pd.Series([12, 34], dtype="int64"),
85+
"weekly_delivered_orders": pd.Series([5, 6], dtype="int64"),
86+
"weekly_cancelled_orders": pd.Series([7, 8], dtype="int64"),
87+
"weekly_revenue": pd.Series([12.3, 45.6], dtype="float64"),
88+
"weekly_avg_lead_time": pd.Series([5.34, 6.45], dtype="float64"),
89+
"weekly_total_lead_time": pd.Series([5, 6], dtype="int64"),
90+
"weekly_avg_delivery_delay": pd.Series([54.50, 67.89], dtype="float64"),
91+
"weekly_total_delivery_delay": pd.Series([10, 11], dtype="int64"),
92+
"weekly_avg_approval_lag": pd.Series([12.3, 14.5], dtype="float64"),
93+
}
94+
)
95+
96+
97+
@pytest.fixture
98+
def valid_seller_dim():
99+
return pd.DataFrame(
100+
{
101+
"seller_id": pd.Series(["seller1", "seller2"], dtype="string"),
102+
"first_order_date": pd.Series(
103+
["2023-03-02 09:00:00", "2023-05-04 11:00:00"], dtype="datetime64[ns]"
104+
),
105+
"first_order_year_week": pd.Series(
106+
["2023-W03", "2023-W05"], dtype="string"
107+
),
108+
"run_id": pd.Series(["run_1", "run_1"], dtype="string"),
109+
}
110+
)
111+
112+
113+
# =============================================================================
114+
# REPORTING & LOGS
115+
# =============================================================================
116+
117+
118+
def test_init_report_structure():
119+
report = init_report()
120+
121+
assert set(report.keys()) == {"status", "errors", "info"}
122+
assert all(isinstance(v, list | str) for v in report.values())
123+
124+
125+
def test_log_error_appends_only_to_errors(empty_report):
126+
log_error("error", empty_report)
127+
128+
assert empty_report["errors"] == ["error"]
129+
130+
131+
def test_log_info_appends_only_to_info(empty_report):
132+
log_info("info", empty_report)
133+
134+
assert empty_report["info"] == ["info"]
135+
136+
137+
# =============================================================================
138+
# SELLER WEEKLY SEMANTIC MODELING AND SCHEMA ENFORCEMENT
139+
# =============================================================================
140+
141+
142+
def test_seller_semantic_model_grain_preserved_success(valid_assembled_df):
143+
144+
seller_fact, seller_dim = seller_weekly_semantic(valid_assembled_df)
145+
expected = (
146+
valid_assembled_df[["seller_id", "order_year_week"]].drop_duplicates().shape[0]
147+
)
148+
149+
# Fact preserved grain
150+
assert len(seller_fact) == expected
151+
152+
# Dimension preserved grain
153+
assert len(seller_dim) == valid_assembled_df["seller_id"].drop_duplicates().shape[0]
154+
155+
156+
def test_seller_semantic_fails_on_multiple_run_ids(valid_assembled_df):
157+
158+
broken_df = valid_assembled_df.copy()
159+
broken_df.loc[1, "run_id"] = "another_run"
160+
161+
with pytest.raises(RuntimeError):
162+
seller_weekly_semantic(broken_df)
163+
164+
165+
def test_freeze_seller_semantic_fact_success(valid_seller_fact):
166+
167+
result_fact = freeze_seller_semantic(valid_seller_fact, "fact")
168+
169+
expected_dtypes = {
170+
"seller_id": "string",
171+
"order_year_week": "string",
172+
"week_start_date": "datetime64[ns]",
173+
"run_id": "string",
174+
"weekly_order_count": "int64",
175+
"weekly_delivered_orders": "int64",
176+
"weekly_cancelled_orders": "int64",
177+
"weekly_revenue": "float64",
178+
"weekly_avg_lead_time": "float64",
179+
"weekly_total_lead_time": "int64",
180+
"weekly_avg_delivery_delay": "float64",
181+
"weekly_total_delivery_delay": "int64",
182+
"weekly_avg_approval_lag": "float64",
183+
}
184+
185+
assert list(result_fact.columns) == [
186+
"seller_id",
187+
"order_year_week",
188+
"week_start_date",
189+
"run_id",
190+
"weekly_order_count",
191+
"weekly_delivered_orders",
192+
"weekly_cancelled_orders",
193+
"weekly_revenue",
194+
"weekly_avg_lead_time",
195+
"weekly_total_lead_time",
196+
"weekly_avg_delivery_delay",
197+
"weekly_total_delivery_delay",
198+
"weekly_avg_approval_lag",
199+
]
200+
201+
for col, correct_dtypes in expected_dtypes.items():
202+
assert str(result_fact[col].dtype) == correct_dtypes
203+
204+
assert result_fact.equals(
205+
result_fact.sort_values(["seller_id", "order_year_week"]).reset_index(drop=True)
206+
)
207+
208+
assert len(result_fact) == len(valid_seller_fact)
209+
210+
211+
def test_freeze_seller_semantic_dimension_success(valid_seller_dim):
212+
213+
result_dim = freeze_seller_semantic(valid_seller_dim, "dim")
214+
215+
assert list(result_dim.columns) == [
216+
"seller_id",
217+
"first_order_date",
218+
"first_order_year_week",
219+
"run_id",
220+
]
221+
222+
assert result_dim["seller_id"].dtype == "string"
223+
assert result_dim["first_order_date"].dtype == "datetime64[ns]"
224+
assert result_dim["first_order_year_week"].dtype == "string"
225+
assert result_dim["run_id"].dtype == "string"
226+
227+
assert len(result_dim) == len(valid_seller_dim)
228+
229+
230+
def test_freeze_seller_semantic_fact_fails_on_missing_column(valid_seller_fact):
231+
232+
broken_seller_fact = valid_seller_fact.copy()
233+
broken_seller_fact.drop(columns="weekly_order_count", inplace=True)
234+
235+
with pytest.raises(RuntimeError):
236+
freeze_seller_semantic(broken_seller_fact, "fact")
237+
238+
239+
def test_freeze_seller_semantic_dimension_fails_on_missing_column(valid_seller_dim):
240+
241+
broken_seller_dim = valid_seller_dim.copy()
242+
broken_seller_dim.drop(columns="first_order_year_week", inplace=True)
243+
244+
with pytest.raises(RuntimeError):
245+
freeze_seller_semantic(broken_seller_dim, "dim")
246+
247+
248+
def test_freeze_seller_semantic_fact_fails_on_duplictes(valid_seller_fact):
249+
250+
broken_seller_fact = valid_seller_fact.copy()
251+
broken_seller_fact.loc[1, "seller_id"] = "seller1"
252+
broken_seller_fact.loc[1, "order_year_week"] = "2023-W01"
253+
254+
with pytest.raises(RuntimeError):
255+
freeze_seller_semantic(broken_seller_fact, "fact")
256+
257+
258+
def test_freeze_seller_semantic_fact_dimension_on_duplictes(valid_seller_dim):
259+
260+
broken_seller_dim = valid_seller_dim.copy()
261+
broken_seller_dim.loc[1, "seller_id"] = "seller1"
262+
263+
with pytest.raises(RuntimeError):
264+
freeze_seller_semantic(broken_seller_dim, "dim")
265+
266+
267+
def test_freeze_seller_semantic_fails_on_table_type(valid_seller_fact):
268+
269+
with pytest.raises(ValueError):
270+
freeze_seller_semantic(valid_seller_fact, "invalid_param") # type: ignore
271+
272+
273+
# =============================================================================
274+
# BUILD BI SEMANTIC
275+
# =============================================================================
276+
277+
278+
def test_build_semantic_layer_success(tmp_path, valid_assembled_df):
279+
280+
run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id")
281+
run_context.initialize_directories()
282+
283+
valid_assembled_df.to_parquet(
284+
run_context.assembled_path / "assembled_events_2023_01.parquet"
285+
)
286+
287+
report = build_semantic_layer(run_context)
288+
289+
output_path_seller = (
290+
run_context.semantic_path / "seller_week_performance_fact_dumm_y_.parquet"
291+
)
292+
293+
output_path_dim = run_context.semantic_path / "dim_seller_dumm_y_.parquet"
294+
295+
assert report["status"] == "success"
296+
assert output_path_seller.exists()
297+
assert output_path_dim.exists()
298+
299+
300+
def test_build_semantic_layer_fails_on_multiple_ids(tmp_path, valid_assembled_df):
301+
302+
broken_assembled = valid_assembled_df.copy()
303+
broken_assembled.loc[1, "run_id"] = "another_run"
304+
305+
run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id")
306+
run_context.initialize_directories()
307+
308+
broken_assembled.to_parquet(
309+
run_context.assembled_path / "assembled_events_2023_01.parquet"
310+
)
311+
312+
report = build_semantic_layer(run_context)
313+
314+
assert report["status"] == "failed"
315+
assert "Multiple run_ids detected" in report["errors"]
316+
317+
318+
def test_build_semantic_layer_fails_on_missing_columns(tmp_path, valid_assembled_df):
319+
320+
broken_assembled = valid_assembled_df.copy()
321+
broken_assembled.drop(columns="approval_lag_days", inplace=True)
322+
323+
run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id")
324+
run_context.initialize_directories()
325+
326+
broken_assembled.to_parquet(
327+
run_context.assembled_path / "assembled_events_2023_01.parquet"
328+
)
329+
330+
report = build_semantic_layer(run_context)
331+
332+
assert report["status"] == "failed"
333+
assert any("approval_lag_days" in error for error in report["errors"])
334+
335+
336+
# =============================================================================
337+
# UNIT TESTS END
338+
# =============================================================================

0 commit comments

Comments
 (0)