From a814a76d0ff95d7016b7be00977b99e4dd826fa5 Mon Sep 17 00:00:00 2001 From: Bryan Melvida <126201239+BLMgithub@users.noreply.github.com> Date: Tue, 24 Feb 2026 17:15:13 +0800 Subject: [PATCH 1/2] fix: add missing field to schema freeze list --- data_pipeline/stages/assemble_validated_events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/data_pipeline/stages/assemble_validated_events.py b/data_pipeline/stages/assemble_validated_events.py index 485c675..5dbc055 100644 --- a/data_pipeline/stages/assemble_validated_events.py +++ b/data_pipeline/stages/assemble_validated_events.py @@ -59,6 +59,8 @@ def merge_data(tables: dict) -> pd.DataFrame: df_payments, on="order_id", how="left" ) + df_merged = df_merged.rename(columns={"payment_value": "order_revenue"}) + if len(df_merged) != len(df_orders): raise RuntimeError("Cardinality violation detected: expected 1 row per order") @@ -136,6 +138,7 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame: ENFORCED_SCHEMA = [ "order_id", + "order_revenue", "seller_id", "product_id", "order_status", @@ -153,6 +156,7 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame: ENFORCED_DTYPES = { "order_id": "string", + "order_revenue": "float64", "seller_id": "string", "product_id": "string", "order_status": "string", From 23afb81b4596d1d8b2783236e1f05a8482fe2340 Mon Sep 17 00:00:00 2001 From: Bryan Melvida <126201239+BLMgithub@users.noreply.github.com> Date: Tue, 24 Feb 2026 17:48:34 +0800 Subject: [PATCH 2/2] test: update suite for schema change --- tests/stages/test_assemble_validated_events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/stages/test_assemble_validated_events.py b/tests/stages/test_assemble_validated_events.py index 3b5c88e..6a9ab01 100644 --- a/tests/stages/test_assemble_validated_events.py +++ b/tests/stages/test_assemble_validated_events.py @@ -84,6 +84,7 @@ def valid_derived_df(): dtype="string", ), "seller_id": pd.Series(["seller1", "seller2"], dtype="string"), + "order_revenue": pd.Series([12.34, 56.78], dtype="float64"), "product_id": pd.Series(["prod1", "prod2"], dtype="string"), "order_status": pd.Series(["delivered", "cancelled"], dtype="string"), "order_purchase_timestamp": pd.Series( @@ -243,6 +244,7 @@ def test_freeze_schema_enforces_strict_schema_success(valid_derived_df): expected_dtypes = { "order_id": "string", + "order_revenue": "float64", "seller_id": "string", "product_id": "string", "order_status": "string",