Skip to content

Commit 2a5c391

Browse files
committed
Chore: Move all table configs to shared/table_configs.py
1 parent 49932e9 commit 2a5c391

3 files changed

Lines changed: 118 additions & 89 deletions

File tree

data_pipeline/shared/table_configs.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
# TABLE CONFIGURATIONS
33
# =============================================================================
44

5+
# ------------------------------------------------------------
6+
# CONFIGURATIONS FOR validate_raw_data.py
7+
# ------------------------------------------------------------
8+
59
TABLE_CONFIG = {
610
"df_orders": {
711
"role": "event_fact",
@@ -81,3 +85,98 @@
8185
"order_delivered_timestamp": "%Y-%m-%d %H:%M:%S",
8286
"order_estimated_delivery_date": "%Y-%m-%d",
8387
}
88+
89+
90+
# ------------------------------------------------------------
91+
# CONFIGURATIONS FOR assemble_validate_events.py
92+
# ------------------------------------------------------------
93+
94+
# Assemble events enforced schema and dtypes
95+
ASSEMBLE_ENFORCED_SCHEMA = [
96+
"order_id",
97+
"order_revenue",
98+
"seller_id",
99+
"product_id",
100+
"order_status",
101+
"order_purchase_timestamp",
102+
"order_approved_at",
103+
"order_delivered_timestamp",
104+
"lead_time_days",
105+
"approval_lag_days",
106+
"delivery_delay_days",
107+
"order_date",
108+
"order_year",
109+
"order_year_week",
110+
"run_id",
111+
]
112+
113+
ASSEMBLE_ENFORCED_DTYPES = {
114+
"order_id": "string",
115+
"order_revenue": "float64",
116+
"seller_id": "string",
117+
"product_id": "string",
118+
"order_status": "string",
119+
"order_purchase_timestamp": "datetime64[ns]",
120+
"order_approved_at": "datetime64[ns]",
121+
"order_delivered_timestamp": "datetime64[ns]",
122+
"lead_time_days": "int64",
123+
"approval_lag_days": "int64",
124+
"delivery_delay_days": "int64",
125+
"order_date": "datetime64[ns]",
126+
"order_year": "int64",
127+
}
128+
129+
130+
# ------------------------------------------------------------
131+
# CONFIGURATIONS FOR build_bi_semantic_layer.py
132+
# ------------------------------------------------------------
133+
134+
135+
# Seller dimension enforced schema and dtypes
136+
SELLER_DIM_ENFORCED_SCHEMA = [
137+
"seller_id",
138+
"first_order_date",
139+
"first_order_year_week",
140+
"run_id",
141+
]
142+
143+
SELLER_DIM_ENFORCED_DTYPES = {
144+
"seller_id": "string",
145+
"first_order_date": "datetime64[ns]",
146+
"first_order_year_week": "string",
147+
"run_id": "string",
148+
}
149+
150+
151+
# Seller Facts enforced schema and dtypes
152+
SELLER_FACT_ENFORCED_SCHEMA = [
153+
"seller_id",
154+
"order_year_week",
155+
"week_start_date",
156+
"run_id",
157+
"weekly_order_count",
158+
"weekly_delivered_orders",
159+
"weekly_cancelled_orders",
160+
"weekly_revenue",
161+
"weekly_avg_lead_time",
162+
"weekly_total_lead_time",
163+
"weekly_avg_delivery_delay",
164+
"weekly_total_delivery_delay",
165+
"weekly_avg_approval_lag",
166+
]
167+
168+
SELLER_FACT_ENFORCED_DTYPES = {
169+
"seller_id": "string",
170+
"order_year_week": "string",
171+
"week_start_date": "datetime64[ns]",
172+
"run_id": "string",
173+
"weekly_order_count": "int64",
174+
"weekly_delivered_orders": "int64",
175+
"weekly_cancelled_orders": "int64",
176+
"weekly_revenue": "float64",
177+
"weekly_avg_lead_time": "float64",
178+
"weekly_total_lead_time": "int64",
179+
"weekly_avg_delivery_delay": "float64",
180+
"weekly_total_delivery_delay": "int64",
181+
"weekly_avg_approval_lag": "float64",
182+
}

data_pipeline/stages/assemble_validated_events.py

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
import pandas as pd
1111
from typing import Dict, List
1212
from data_pipeline.shared.run_context import RunContext
13+
from data_pipeline.shared.table_configs import (
14+
ASSEMBLE_ENFORCED_SCHEMA,
15+
ASSEMBLE_ENFORCED_DTYPES,
16+
)
1317
from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file
1418

1519
EVENT_TABLES = ["df_orders", "df_order_items", "df_payments"]
@@ -136,46 +140,12 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:
136140
- Resets index to produce a clean output frame
137141
"""
138142

139-
ENFORCED_SCHEMA = [
140-
"order_id",
141-
"order_revenue",
142-
"seller_id",
143-
"product_id",
144-
"order_status",
145-
"order_purchase_timestamp",
146-
"order_approved_at",
147-
"order_delivered_timestamp",
148-
"lead_time_days",
149-
"approval_lag_days",
150-
"delivery_delay_days",
151-
"order_date",
152-
"order_year",
153-
"order_year_week",
154-
"run_id",
155-
]
156-
157-
ENFORCED_DTYPES = {
158-
"order_id": "string",
159-
"order_revenue": "float64",
160-
"seller_id": "string",
161-
"product_id": "string",
162-
"order_status": "string",
163-
"order_purchase_timestamp": "datetime64[ns]",
164-
"order_approved_at": "datetime64[ns]",
165-
"order_delivered_timestamp": "datetime64[ns]",
166-
"lead_time_days": "int64",
167-
"approval_lag_days": "int64",
168-
"delivery_delay_days": "int64",
169-
"order_date": "datetime64[ns]",
170-
"order_year": "int64",
171-
}
172-
173-
missing_cols = set(ENFORCED_SCHEMA) - set(df.columns)
143+
missing_cols = set(ASSEMBLE_ENFORCED_SCHEMA) - set(df.columns)
174144
if missing_cols:
175145
raise RuntimeError(f"missing required columns: {sorted(missing_cols)}")
176146

177-
df_contract = df[ENFORCED_SCHEMA].copy()
178-
df_contract = df_contract.astype(ENFORCED_DTYPES)
147+
df_contract = df[ASSEMBLE_ENFORCED_SCHEMA].copy()
148+
df_contract = df_contract.astype(ASSEMBLE_ENFORCED_DTYPES)
179149
df_contract = df_contract.sort_values("order_id").reset_index(drop=True)
180150

181151
return df_contract

data_pipeline/stages/build_bi_semantic_layer.py

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88
import pandas as pd
99
from typing import Dict, List, Tuple, Literal
1010
from data_pipeline.shared.run_context import RunContext
11+
from data_pipeline.shared.table_configs import (
12+
SELLER_DIM_ENFORCED_SCHEMA,
13+
SELLER_DIM_ENFORCED_DTYPES,
14+
SELLER_FACT_ENFORCED_SCHEMA,
15+
SELLER_FACT_ENFORCED_DTYPES,
16+
)
1117
from data_pipeline.shared.raw_loader_exporter import load_logical_table, export_file
1218

1319

@@ -140,46 +146,14 @@ def freeze_seller_fact(df: pd.DataFrame) -> pd.DataFrame:
140146

141147
fact_contract = df.copy()
142148

143-
FACT_SCHEMA = [
144-
"seller_id",
145-
"order_year_week",
146-
"week_start_date",
147-
"run_id",
148-
"weekly_order_count",
149-
"weekly_delivered_orders",
150-
"weekly_cancelled_orders",
151-
"weekly_revenue",
152-
"weekly_avg_lead_time",
153-
"weekly_total_lead_time",
154-
"weekly_avg_delivery_delay",
155-
"weekly_total_delivery_delay",
156-
"weekly_avg_approval_lag",
157-
]
158-
159-
FACT_ENFORCED_DTYPES = {
160-
"seller_id": "string",
161-
"order_year_week": "string",
162-
"week_start_date": "datetime64[ns]",
163-
"run_id": "string",
164-
"weekly_order_count": "int64",
165-
"weekly_delivered_orders": "int64",
166-
"weekly_cancelled_orders": "int64",
167-
"weekly_revenue": "float64",
168-
"weekly_avg_lead_time": "float64",
169-
"weekly_total_lead_time": "int64",
170-
"weekly_avg_delivery_delay": "float64",
171-
"weekly_total_delivery_delay": "int64",
172-
"weekly_avg_approval_lag": "float64",
173-
}
174-
175-
missing_cols = set(FACT_SCHEMA) - set(fact_contract.columns)
149+
missing_cols = set(SELLER_FACT_ENFORCED_SCHEMA) - set(fact_contract.columns)
176150
if missing_cols:
177151
raise RuntimeError(
178152
f"seller_weekly_fact missing required column(s): {sorted(missing_cols)}"
179153
)
180154

181-
fact_contract = fact_contract[FACT_SCHEMA].copy()
182-
fact_contract = fact_contract.astype(FACT_ENFORCED_DTYPES)
155+
fact_contract = fact_contract[SELLER_FACT_ENFORCED_SCHEMA].copy()
156+
fact_contract = fact_contract.astype(SELLER_FACT_ENFORCED_DTYPES)
183157
fact_contract = fact_contract.sort_values(
184158
["seller_id", "order_year_week"]
185159
).reset_index(drop=True)
@@ -204,28 +178,14 @@ def freeze_seller_dim(df: pd.DataFrame) -> pd.DataFrame:
204178

205179
dim_contract = df.copy()
206180

207-
DIM_SCHEMA = [
208-
"seller_id",
209-
"first_order_date",
210-
"first_order_year_week",
211-
"run_id",
212-
]
213-
214-
DIM_ENFORCED_DTYPES = {
215-
"seller_id": "string",
216-
"first_order_date": "datetime64[ns]",
217-
"first_order_year_week": "string",
218-
"run_id": "string",
219-
}
220-
221-
missing_cols = set(DIM_SCHEMA) - set(dim_contract.columns)
181+
missing_cols = set(SELLER_DIM_ENFORCED_SCHEMA) - set(dim_contract.columns)
222182
if missing_cols:
223183
raise RuntimeError(
224184
f"seller_dim missing required column(s): {sorted(missing_cols)}"
225185
)
226186

227-
dim_contract = dim_contract[DIM_SCHEMA].copy()
228-
dim_contract = dim_contract.astype(DIM_ENFORCED_DTYPES)
187+
dim_contract = dim_contract[SELLER_DIM_ENFORCED_SCHEMA].copy()
188+
dim_contract = dim_contract.astype(SELLER_DIM_ENFORCED_DTYPES)
229189
dim_contract = dim_contract.sort_values("seller_id").reset_index(drop=True)
230190

231191
return dim_contract

0 commit comments

Comments
 (0)