diff --git a/.vscode/settings.json b/.vscode/settings.json index bd18990..4cc41ad 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,6 @@ { "python.testing.pytestArgs": [ - "data_pipeline" + "tests" ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, diff --git a/data_pipeline/stages/assemble_validated_events.py b/data_pipeline/stages/assemble_validated_events.py index 7c65207..485c675 100644 --- a/data_pipeline/stages/assemble_validated_events.py +++ b/data_pipeline/stages/assemble_validated_events.py @@ -20,7 +20,7 @@ def init_report(): - return {"status": "success", "error": [], "info": []} + return {"status": "success", "errors": [], "info": []} def log_info(message: str, report: Dict[str, List[str]]) -> None: @@ -30,7 +30,7 @@ def log_info(message: str, report: Dict[str, List[str]]) -> None: def log_error(message: str, report: Dict[str, list[str]]) -> None: print(f"[ERROR] {message}") - report["error"].append(message) + report["errors"].append(message) # ------------------------------------------------------------ @@ -153,12 +153,22 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame: ENFORCED_DTYPES = { "order_id": "string", - "order_year": "int64", + "seller_id": "string", + "product_id": "string", + "order_status": "string", + "order_purchase_timestamp": "datetime64[ns]", + "order_approved_at": "datetime64[ns]", + "order_delivered_timestamp": "datetime64[ns]", "lead_time_days": "int64", "approval_lag_days": "int64", "delivery_delay_days": "int64", + "order_year": "int64", } + missing_cols = set(ENFORCED_SCHEMA) - set(df.columns) + if missing_cols: + raise RuntimeError(f"missing required columns: {sorted(missing_cols)}") + df_contract = df[ENFORCED_SCHEMA].copy() df_contract = df_contract.astype(ENFORCED_DTYPES) df_contract = df_contract.sort_values("order_id").reset_index(drop=True) diff --git a/pyproject.toml b/pyproject.toml index 207be20..15d281a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ line-length = 88 [tool.pytest.ini_options] -testpaths = ["data_pipeline/tests"] +testpaths = ["tests"] [tool.ruff] ignore = ["E501"] diff --git a/data_pipeline/tests/__init__.py b/tests/__init__.py similarity index 100% rename from data_pipeline/tests/__init__.py rename to tests/__init__.py diff --git a/data_pipeline/tests/shared/__init__.py b/tests/shared/__init__.py similarity index 100% rename from data_pipeline/tests/shared/__init__.py rename to tests/shared/__init__.py diff --git a/data_pipeline/tests/shared/test_raw_loader_exporter.py b/tests/shared/test_raw_loader_exporter.py similarity index 100% rename from data_pipeline/tests/shared/test_raw_loader_exporter.py rename to tests/shared/test_raw_loader_exporter.py diff --git a/data_pipeline/tests/shared/test_run_context.py b/tests/shared/test_run_context.py similarity index 100% rename from data_pipeline/tests/shared/test_run_context.py rename to tests/shared/test_run_context.py diff --git a/data_pipeline/tests/stages/__init__.py b/tests/stages/__init__.py similarity index 100% rename from data_pipeline/tests/stages/__init__.py rename to tests/stages/__init__.py diff --git a/data_pipeline/tests/stages/test_apply_raw_data_contract.py b/tests/stages/test_apply_raw_data_contract.py similarity index 92% rename from data_pipeline/tests/stages/test_apply_raw_data_contract.py rename to tests/stages/test_apply_raw_data_contract.py index f276f49..6e735f5 100644 --- a/data_pipeline/tests/stages/test_apply_raw_data_contract.py +++ b/tests/stages/test_apply_raw_data_contract.py @@ -4,7 +4,6 @@ import pandas as pd import pytest -from shutil import copytree from data_pipeline.shared.run_context import RunContext from data_pipeline.stages.apply_raw_data_contract import ( @@ -181,9 +180,6 @@ def test_remove_impossible_timestamps_drops_invalid_rows(invalid_temporal_order_ def test_apply_contract_event_fact_success(tmp_path): - raw_dir = tmp_path / "raw" - raw_dir.mkdir() - df = pd.DataFrame( { "order_id": [1, 2, 3, 4], @@ -214,12 +210,10 @@ def test_apply_contract_event_fact_success(tmp_path): } ) - df.to_csv(raw_dir / "df_orders_2026_01.csv", index=False) - run_context = RunContext.create(base_path=tmp_path) run_context.initialize_directories() - copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True) + df.to_csv(run_context.raw_snapshot_path / "df_orders_2026_01.csv", index=False) report, _ = apply_contract(run_context, "df_orders") @@ -246,9 +240,6 @@ def test_apply_contract_unknown_table(tmp_path): def test_apply_contract_duplicate_on_entity_reference(tmp_path): - raw_dir = tmp_path / "raw" - raw_dir.mkdir() - df = pd.DataFrame( { "customer_id": [1, 1, 3], # 1 exact duplicate @@ -258,12 +249,10 @@ def test_apply_contract_duplicate_on_entity_reference(tmp_path): } ) - df.to_csv(raw_dir / "df_customers_2026_01.csv", index=False) - run_context = RunContext.create(base_path=tmp_path) run_context.initialize_directories() - copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True) + df.to_csv(run_context.raw_snapshot_path / "df_customers_2026_01.csv", index=False) report, _ = apply_contract(run_context, "df_customers") @@ -274,9 +263,6 @@ def test_apply_contract_duplicate_on_entity_reference(tmp_path): def test_apply_contract_duplicate_on_transactional_detail(tmp_path): - raw_dir = tmp_path / "raw" - raw_dir.mkdir() - df = pd.DataFrame( { "order_id": [ @@ -292,12 +278,10 @@ def test_apply_contract_duplicate_on_transactional_detail(tmp_path): } ) - df.to_csv(raw_dir / "df_payments_2026_01.csv", index=False) - run_context = RunContext.create(base_path=tmp_path) run_context.initialize_directories() - copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True) + df.to_csv(run_context.raw_snapshot_path / "df_payments_2026_01.csv", index=False) report, _ = apply_contract(run_context, "df_payments") @@ -308,9 +292,6 @@ def test_apply_contract_duplicate_on_transactional_detail(tmp_path): def test_apply_contract_cascade_drop_with_order_id(tmp_path): - raw_dir = tmp_path / "raw" - raw_dir.mkdir() - df_order = pd.DataFrame( { "order_id": ["o1", "o2", "o3"], @@ -355,14 +336,21 @@ def test_apply_contract_cascade_drop_with_order_id(tmp_path): } ) - df_order.to_csv(raw_dir / "df_orders_2026_01.csv", index=False) - df_payments.to_csv(raw_dir / "df_payments_2026_01.csv", index=False) - df_order_items.to_csv(raw_dir / "df_order_items_2026_01.csv", index=False) - run_context = RunContext.create(base_path=tmp_path) run_context.initialize_directories() - copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True) + df_order.to_csv( + run_context.raw_snapshot_path / "df_orders_2026_01.csv", + index=False, + ) + df_payments.to_csv( + run_context.raw_snapshot_path / "df_payments_2026_01.csv", + index=False, + ) + df_order_items.to_csv( + run_context.raw_snapshot_path / "df_order_items_2026_01.csv", + index=False, + ) invalid_ids = set() diff --git a/tests/stages/test_assemble_validated_events.py b/tests/stages/test_assemble_validated_events.py new file mode 100644 index 0000000..3b5c88e --- /dev/null +++ b/tests/stages/test_assemble_validated_events.py @@ -0,0 +1,392 @@ +# ============================================================================= +# UNIT TESTS FOR validate_raw_data.py +# ============================================================================= + +import pandas as pd +import pytest + +from data_pipeline.shared.run_context import RunContext +from data_pipeline.stages.assemble_validated_events import ( + init_report, + log_info, + log_error, + merge_data, + derive_fields, + freeze_schema, + assemble_events, +) + + +@pytest.fixture +def empty_report(): + return init_report() + + +@pytest.fixture +def valid_orders_df(): + return pd.DataFrame( + { + "order_id": ["o1", "o2"], + "customer_id": ["cos1", "cos2"], + "order_status": ["delivered", "delivered"], + "order_purchase_timestamp": [ + "2023-01-02 09:00:00", + "2023-01-10 14:00:00", + ], + "order_approved_at": [ + "2023-01-03 09:00:00", + "2023-01-11 14:00:00", + ], + "order_delivered_timestamp": [ + "2023-01-06 09:00:00", + "2023-01-16 14:00:00", + ], + "order_estimated_delivery_date": [ + "2023-01-05", + "2023-01-15", + ], + } + ) + + +@pytest.fixture +def valid_order_items_df(): + return pd.DataFrame( + { + "order_id": ["o1", "o2"], + "product_id": ["prod1", "prod2"], + "seller_id": ["seller1", "seller2"], + "price": [12.3, 45.6], + "shipping_charges": [1.23, 4.56], + } + ) + + +@pytest.fixture +def valid_payments_df(): + return pd.DataFrame( + { + "order_id": ["o1", "o2"], + "payment_sequential": [1, 2], + "payment_type": ["credit", "cash"], + "payment_installments": [4, 5], + "payment_value": [100.1, 50.2], + } + ) + + +@pytest.fixture +def valid_derived_df(): + return pd.DataFrame( + { + "order_id": pd.Series( + ["o1", "o2"], + dtype="string", + ), + "seller_id": pd.Series(["seller1", "seller2"], dtype="string"), + "product_id": pd.Series(["prod1", "prod2"], dtype="string"), + "order_status": pd.Series(["delivered", "cancelled"], dtype="string"), + "order_purchase_timestamp": pd.Series( + [ + "2023-01-02 09:00:00", + "2023-01-10 14:00:00", + ], + dtype="datetime64[ns]", + ), + "order_approved_at": pd.Series( + [ + "2023-01-03 09:00:00", + "2023-01-11 14:00:00", + ], + dtype="datetime64[ns]", + ), + "order_delivered_timestamp": pd.Series( + [ + "2023-01-06 09:00:00", + "2023-01-16 14:00:00", + ], + dtype="datetime64[ns]", + ), + "order_estimated_delivery_date": [ + "2023-01-05", + "2023-01-15", + ], + "lead_time_days": pd.Series([3, 5], dtype="int64"), + "approval_lag_days": pd.Series([1, 1], dtype="int64"), + "delivery_delay_days": pd.Series([1, 1], dtype="int64"), + "order_date": pd.Series(pd.to_datetime(["2023-01-02", "2023-01-10"]).date), + "order_year": pd.Series([2023, 2023], dtype="int64"), + "order_year_week": pd.Series(["2023-W01", "2023-W02"], dtype="string"), + "run_id": "dummy_run_id", + } + ) + + +# ============================================================================= +# REPORTING & LOGS +# ============================================================================= + + +def test_init_report_structure(): + report = init_report() + + assert set(report.keys()) == {"status", "errors", "info"} + assert all(isinstance(v, list | str) for v in report.values()) + + +def test_log_error_appends_only_to_errors(empty_report): + log_error("errors", empty_report) + + assert empty_report["errors"] == ["errors"] + + +def test_log_info_appends_only_to_info(empty_report): + log_info("info", empty_report) + + assert empty_report["info"] == ["info"] + + +# ============================================================================= +# MERGING DATA +# ============================================================================= + + +def test_merge_data_preserve_grain( + valid_orders_df, + valid_order_items_df, + valid_payments_df, +): + + result = merge_data( + { + "df_orders": valid_orders_df, + "df_order_items": valid_order_items_df, + "df_payments": valid_payments_df, + } + ) + + assert len(result) == 2 + assert result.duplicated().any() == False + + +def test_merge_detects_cardinality_violation( + valid_orders_df, + valid_order_items_df, +): + + duplicated_payments_df = pd.DataFrame( + { + "order_id": ["o1", "o1"], + "payment_sequential": [1, 1], + "payment_type": ["credit", "credit"], + "payment_installments": [4, 4], + "payment_value": [100.1, 100.1], + } + ) + + with pytest.raises(RuntimeError): + merge_data( + { + "df_orders": valid_orders_df, + "df_order_items": valid_order_items_df, + "df_payments": duplicated_payments_df, + } + ) + + +# ============================================================================= +# DERIVING FIELDS +# ============================================================================= + + +def test_derived_fields_correctness(valid_derived_df): + + source_cols = [ + "order_id", + "seller_id", + "product_id", + "order_status", + "order_purchase_timestamp", + "order_approved_at", + "order_delivered_timestamp", + "order_estimated_delivery_date", + ] + + expected_cols = [ + "lead_time_days", + "approval_lag_days", + "delivery_delay_days", + "order_date", + "order_year", + "order_year_week", + "run_id", + ] + + source_df = valid_derived_df[source_cols].copy() + expected_df = valid_derived_df[expected_cols].copy() + + result = derive_fields(source_df, "dummy_run_id") + + pd.testing.assert_frame_equal( + result[expected_cols].reset_index(drop=True), + expected_df.reset_index(drop=True), + check_dtype=False, + ) + + +# ============================================================================= +# FREEZING SCHEMA +# ============================================================================= + + +def test_freeze_schema_enforces_strict_schema_success(valid_derived_df): + + expected_dtypes = { + "order_id": "string", + "seller_id": "string", + "product_id": "string", + "order_status": "string", + "order_purchase_timestamp": "datetime64[ns]", + "order_approved_at": "datetime64[ns]", + "order_delivered_timestamp": "datetime64[ns]", + "lead_time_days": "int64", + "approval_lag_days": "int64", + "delivery_delay_days": "int64", + "order_date": "object", + "order_year": "int64", + } + + result = freeze_schema(valid_derived_df) + + for col, correct_dtype in expected_dtypes.items(): + assert str(result[col].dtype) == correct_dtype + + +def test_freeze_schema_fails_on_missing_column(valid_derived_df): + + missing_required_column = valid_derived_df.drop(columns="seller_id") + + with pytest.raises(RuntimeError): + freeze_schema(missing_required_column) + + +# ============================================================================= +# ASSEMBLING DATA +# ============================================================================= + + +def test_assemble_data_success( + tmp_path, + valid_orders_df, + valid_order_items_df, + valid_payments_df, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id") + run_context.initialize_directories() + + valid_orders_df.to_parquet( + run_context.contracted_path / "df_orders_contracted.parquet", + index=False, + ) + valid_order_items_df.to_parquet( + run_context.contracted_path / "df_order_items_contracted.parquet", + index=False, + ) + valid_payments_df.to_parquet( + run_context.contracted_path / "df_payments_contracted.parquet", + index=False, + ) + + report = assemble_events(run_context) + + assert report["status"] == "success" + + output_file = run_context.assembled_path / "assembled_events.parquet" + + assert output_file.exists() + + +def test_assemble_data_fails_on_missing_column( + tmp_path, + valid_orders_df, + valid_order_items_df, + valid_payments_df, +): + + run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id") + run_context.initialize_directories() + + invalid_order_items_df = valid_order_items_df.drop(columns="seller_id") + + valid_orders_df.to_parquet( + run_context.contracted_path / "df_orders_contracted.parquet", + index=False, + ) + + invalid_order_items_df.to_parquet( + run_context.contracted_path / "df_order_items_contracted.parquet", + index=False, + ) + + valid_payments_df.to_parquet( + run_context.contracted_path / "df_payments_contracted.parquet", index=False + ) + + report = assemble_events(run_context) + output_file = run_context.assembled_path / "assembled_events.parquet" + + assert report["status"] == "failed" + assert output_file.exists() == False + assert any( + "missing required columns: ['seller_id']" in error for error in report["errors"] + ) + + +def test_assemble_data_fails_on_cardinality( + tmp_path, + valid_orders_df, + valid_order_items_df, +): + + duplicated_payments_df = pd.DataFrame( + { + "order_id": ["o1", "o1"], + "payment_sequential": [1, 1], + "payment_type": ["credit", "credit"], + "payment_installments": [4, 4], + "payment_value": [100.1, 100.1], + } + ) + + run_context = RunContext.create(base_path=tmp_path, run_id="dummy_run_id") + run_context.initialize_directories() + + valid_orders_df.to_parquet( + run_context.contracted_path / "df_orders_contracted.parquet", + index=False, + ) + + valid_order_items_df.to_parquet( + run_context.contracted_path / "df_order_items_contracted.parquet", + index=False, + ) + + duplicated_payments_df.to_parquet( + run_context.contracted_path / "df_payments_contracted.parquet", index=False + ) + + report = assemble_events(run_context) + output_file = run_context.assembled_path / "assembled_events.parquet" + + assert report["status"] == "failed" + assert output_file.exists() == False + assert any( + "Cardinality violation detected: expected 1 row per order" in error + for error in report["errors"] + ) + + +# ============================================================================= +# UNIT TESTS END +# ============================================================================= diff --git a/data_pipeline/tests/stages/test_validate_raw_data.py b/tests/stages/test_validate_raw_data.py similarity index 86% rename from data_pipeline/tests/stages/test_validate_raw_data.py rename to tests/stages/test_validate_raw_data.py index 21cc4dd..d7005f5 100644 --- a/data_pipeline/tests/stages/test_validate_raw_data.py +++ b/tests/stages/test_validate_raw_data.py @@ -5,7 +5,6 @@ import pandas as pd import pytest -from shutil import copytree from data_pipeline.shared.run_context import RunContext from data_pipeline.stages.validate_raw_data import ( init_report, @@ -76,10 +75,7 @@ def valid_customers_df(): return pd.DataFrame( { "customer_id": [1, 2], - "customer_zip_code_prefix": [ - "zip1", - "zip2", - ], + "customer_zip_code_prefix": ["zip1", "zip2"], "customer_city": ["city1", "city2"], "customer_state": ["state1", "state2"], } @@ -136,7 +132,8 @@ def test_log_info_appends_only_to_info(empty_report): def test_base_validation_fails_on_missing_allowed_column( - empty_report, valid_customers_df + empty_report, + valid_customers_df, ): df = valid_customers_df.drop(columns="customer_city") ok = run_base_validations( @@ -161,7 +158,8 @@ def test_base_validation_fails_on_missing_allowed_column( def test_base_validation_fails_on_invalid_extra_column( - empty_report, valid_customers_df + empty_report, + valid_customers_df, ): df = valid_customers_df @@ -211,7 +209,8 @@ def test_base_validation_fails_on_missing_pk(empty_report): def test_base_validation_logs_error_on_conflicting_duplicate_pk( - empty_report, valid_products_df + empty_report, + valid_products_df, ): df = valid_products_df @@ -287,7 +286,12 @@ def test_base_validation_passes_with_non_fatal_issues(empty_report): df, "df_customers", ["customer_id"], - ["customer_id", "customer_zip_code_prefix", "customer_city", "customer_state"], + [ + "customer_id", + "customer_zip_code_prefix", + "customer_city", + "customer_state", + ], empty_report, ) @@ -304,14 +308,20 @@ def test_base_validation_passes_with_non_fatal_issues(empty_report): # ------------------------------------------------------------ -def test_event_fact_validation_passes(valid_orders_df, empty_report): +def test_event_fact_validation_passes( + valid_orders_df, + empty_report, +): ok = run_event_fact_validations(valid_orders_df, "df_orders", empty_report) assert ok is True assert empty_report["errors"] or empty_report["warnings"] == [] -def test_event_fact_fails_on_missing_timestamp(valid_orders_df, empty_report): +def test_event_fact_fails_on_missing_timestamp( + valid_orders_df, + empty_report, +): df = valid_orders_df.drop(columns=["order_approved_at"]) ok = run_event_fact_validations(df, "df_orders", empty_report) @@ -326,7 +336,8 @@ def test_event_fact_fails_on_missing_timestamp(valid_orders_df, empty_report): def test_event_fact_logs_warning_on_invalid_temporal_order( - valid_orders_df, empty_report + valid_orders_df, + empty_report, ): valid_orders_df["order_approved_at"] = [ "2022-12-01 10:05:20", @@ -348,7 +359,10 @@ def test_event_fact_logs_warning_on_invalid_temporal_order( # ------------------------------------------------------------ -def test_transaction_detail_passes(valid_transaction_df, empty_report): +def test_transaction_detail_passes( + valid_transaction_df, + empty_report, +): ok = run_transaction_detail_validations( valid_transaction_df, "df_payments", empty_report ) @@ -358,6 +372,7 @@ def test_transaction_detail_passes(valid_transaction_df, empty_report): def test_transaction_detail_fails_on_negative_value(empty_report): + df = pd.DataFrame({"order_id": ["o1"], "payment_value": [-10]}) ok = run_transaction_detail_validations(df, "df_payments", empty_report) @@ -376,7 +391,9 @@ def test_transaction_detail_fails_on_negative_value(empty_report): def test_cross_table_validation_passes( - valid_orders_df, valid_transaction_df, empty_report + valid_orders_df, + valid_transaction_df, + empty_report, ): tables = { "df_orders": valid_orders_df, @@ -418,21 +435,30 @@ def test_validation_passes( valid_products_df, ): - # Dummy raw structure - raw_dir = tmp_path / "raw" - raw_dir.mkdir() - - # Export to snapshot directory - valid_orders_df.to_csv(raw_dir / "df_orders_2026_01.csv", index=False) - valid_order_items_df.to_csv(raw_dir / "df_order_items_2026_01.csv", index=False) - valid_transaction_df.to_csv(raw_dir / "df_payments_2026_01.csv", index=False) - valid_customers_df.to_csv(raw_dir / "df_customers_2026_01.csv", index=False) - valid_products_df.to_csv(raw_dir / "df_products_2026_01.csv", index=False) - run_context = RunContext.create(base_path=tmp_path) run_context.initialize_directories() - copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True) + # Export to snapshot directory + valid_orders_df.to_csv( + run_context.raw_snapshot_path / "df_orders_2026_01.csv", + index=False, + ) + valid_order_items_df.to_csv( + run_context.raw_snapshot_path / "df_order_items_2026_01.csv", + index=False, + ) + valid_transaction_df.to_csv( + run_context.raw_snapshot_path / "df_payments_2026_01.csv", + index=False, + ) + valid_customers_df.to_csv( + run_context.raw_snapshot_path / "df_customers_2026_01.csv", + index=False, + ) + valid_products_df.to_csv( + run_context.raw_snapshot_path / "df_products_2026_01.csv", + index=False, + ) report = apply_validation(run_context) @@ -448,19 +474,26 @@ def test_validation_fails_on_missing_logical_table( valid_products_df, ): - raw_dir = tmp_path / "raw" - raw_dir.mkdir() - - # Missing df_payments on snapshot - valid_orders_df.to_csv(raw_dir / "df_orders_2026_01.csv", index=False) - valid_order_items_df.to_csv(raw_dir / "df_order_items_2026_01.csv", index=False) - valid_customers_df.to_csv(raw_dir / "df_customers_2026_01.csv", index=False) - valid_products_df.to_csv(raw_dir / "df_products_2026_01.csv", index=False) - run_context = RunContext.create(base_path=tmp_path) run_context.initialize_directories() - copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True) + # Missing df_payments on snapshot + valid_orders_df.to_csv( + run_context.raw_snapshot_path / "df_orders_2026_01.csv", + index=False, + ) + valid_order_items_df.to_csv( + run_context.raw_snapshot_path / "df_order_items_2026_01.csv", + index=False, + ) + valid_customers_df.to_csv( + run_context.raw_snapshot_path / "df_customers_2026_01.csv", + index=False, + ) + valid_products_df.to_csv( + run_context.raw_snapshot_path / "df_products_2026_01.csv", + index=False, + ) report = apply_validation(run_context) @@ -471,17 +504,15 @@ def test_validation_fails_on_missing_logical_table( def test_validation_fails_on_multiple_errors(tmp_path, valid_orders_df): - raw_dir = tmp_path / "raw" - raw_dir.mkdir() - df_orders = valid_orders_df - df_orders.to_csv(raw_dir / "df_orders_2026_01.csv", index=False) - run_context = RunContext.create(base_path=tmp_path) run_context.initialize_directories() - copytree(raw_dir, run_context.raw_snapshot_path, dirs_exist_ok=True) + df_orders.to_csv( + run_context.raw_snapshot_path / "df_orders_2026_01.csv", + index=False, + ) report = apply_validation(run_context) diff --git a/data_pipeline/tests/test_run_pipeline.py b/tests/test_run_pipeline.py similarity index 100% rename from data_pipeline/tests/test_run_pipeline.py rename to tests/test_run_pipeline.py