Skip to content

Commit 66c1390

Browse files
committed
refactor: replace allowed_column with required_columns and allow extra raw columns
1 parent e852ec0 commit 66c1390

4 files changed

Lines changed: 21 additions & 43 deletions

File tree

.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
omit =
33
data_pipeline/prototypes/*
44
data_pipeline/shared/table_configs.py
5+
data_pipeline/stages/*
56
*/__init__.py

data_pipeline/shared/table_configs.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"df_orders": {
1111
"role": "event_fact",
1212
"primary_key": ["order_id"],
13-
"allowed_column": [
13+
"required_column": [
1414
"order_id",
1515
"customer_id",
1616
"order_status",
@@ -23,45 +23,36 @@
2323
"df_order_items": {
2424
"role": "transaction_detail",
2525
"primary_key": ["order_id"],
26-
"allowed_column": [
26+
"required_column": [
2727
"order_id",
2828
"product_id",
2929
"seller_id",
3030
"price",
31-
"shipping_charges",
3231
],
3332
},
3433
"df_customers": {
3534
"role": "entity_reference",
3635
"primary_key": ["customer_id"],
37-
"allowed_column": [
36+
"required_column": [
3837
"customer_id",
39-
"customer_zip_code_prefix",
40-
"customer_city",
4138
"customer_state",
4239
],
4340
},
4441
"df_payments": {
4542
"role": "transaction_detail",
4643
"primary_key": ["order_id", "payment_sequential"],
47-
"allowed_column": [
44+
"required_column": [
4845
"order_id",
49-
"payment_sequential",
50-
"payment_type",
51-
"payment_installments",
5246
"payment_value",
5347
],
5448
},
5549
"df_products": {
5650
"role": "entity_reference",
5751
"primary_key": ["product_id"],
58-
"allowed_column": [
52+
"required_column": [
5953
"product_id",
6054
"product_category_name",
6155
"product_weight_g",
62-
"product_length_cm",
63-
"product_height_cm",
64-
"product_width_cm",
6556
],
6657
},
6758
}

data_pipeline/stages/assemble_validated_events.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,6 @@ def assemble_events(run_context: RunContext) -> Dict:
169169
170170
Grain:
171171
- One row per order_id (hard fail on violation)
172-
173-
Non-responsibilities:
174-
- No validation
175-
- No repair
176-
- No business logic
177172
"""
178173

179174
report = init_report()

data_pipeline/stages/validate_raw_data.py

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def run_base_validations(
5656
df: pd.DataFrame,
5757
table_name: str,
5858
primary_key: List[str],
59-
allowed_column: List[str],
59+
required_column: List[str],
6060
report: Dict[str, List[str]],
6161
) -> bool:
6262
"""
@@ -66,14 +66,13 @@ def run_base_validations(
6666
- `errors` - dataset is structurally invalid; downstream validations should stop
6767
- `warnings` - admissible data quality issues that may be repairable
6868
69-
**errors:**
69+
errors:
7070
- dataset is empty
71-
- missing allowed column(s)
72-
- unexpected extra column(s)
71+
- missing required column(s)
7372
- missing primary key column(s)
7473
- conflicting duplicate primary keys
7574
76-
**warnings:**
75+
warnings:
7776
- duplicate columns
7877
- null primary key values
7978
- identical duplicates
@@ -85,19 +84,15 @@ def run_base_validations(
8584
return False
8685

8786
actual = set(df.columns)
88-
allowed = set(allowed_column)
87+
required = set(required_column)
8988

90-
missing_allowed = sorted(allowed - actual)
91-
if missing_allowed:
92-
log_error(f"{table_name}: missing allowed column(s): {missing_allowed}", report)
93-
94-
invalid_column = sorted(actual - allowed)
95-
if invalid_column:
89+
missing_required = sorted(required - actual)
90+
if missing_required:
9691
log_error(
97-
f"{table_name}: non-allowed extra column(s): {invalid_column}", report
92+
f"{table_name}: missing required column(s): {missing_required}", report
9893
)
9994

100-
if missing_allowed or invalid_column:
95+
if missing_required:
10196
return False
10297

10398
missing_pk_columns = [col for col in primary_key if col not in df.columns]
@@ -174,10 +169,10 @@ def run_event_fact_validations(
174169
- `errors` - dataset is structurally invalid; downstream validations should stop
175170
- `warnings` - admissible data quality issues that may be repairable
176171
177-
**errors:**
172+
errors:
178173
- missing required timestamp column(s)
179174
180-
**warnings:**
175+
warnings:
181176
- unparsable timestamp values in required timestamp fields
182177
- approval timestamp earlier than purchase timestamp
183178
- delivery timestamp earlier than purchase timestamp
@@ -246,7 +241,7 @@ def run_transaction_detail_validations(
246241
247242
Collects error-level data quality findings.
248243
249-
**errors:**
244+
errors:
250245
- negative values in numeric columns
251246
"""
252247

@@ -278,10 +273,10 @@ def run_cross_table_validations(
278273
- `warnings` - admissible referential integrity issues
279274
- `info` - validation skipped due to missing required tables
280275
281-
**info:**
276+
info:
282277
- validation skipped when required tables are unavailable
283278
284-
**warnings:**
279+
warnings:
285280
- order items referencing non-existent order_id
286281
- payments referencing non-existent order_id
287282
"""
@@ -341,10 +336,6 @@ def apply_validation(run_context: RunContext, base_path: Path | None = None) ->
341336
Severity model:
342337
- errors: structurally invalid → halt upstream
343338
- warnings: admissible but repairable issues
344-
345-
Non-responsibilities:
346-
- No data mutation
347-
- No process termination
348339
"""
349340

350341
if base_path is None:
@@ -374,7 +365,7 @@ def error(msg: str):
374365
tables[table_name] = df
375366

376367
if not run_base_validations(
377-
df, table_name, config["primary_key"], config["allowed_column"], report
368+
df, table_name, config["primary_key"], config["required_column"], report
378369
):
379370
continue
380371

0 commit comments

Comments
 (0)