Skip to content

Commit 6c421eb

Browse files
committed
docs: update validation, contract and publish stage docstring clarity
1 parent f0ec874 commit 6c421eb

6 files changed

Lines changed: 148 additions & 107 deletions

File tree

data_pipeline/contract/contract_executor.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,24 @@ def apply_contract(
1717
"""
1818
Main entry point for the Raw-to-Contracted Stage.
1919
20-
This component enforces structural data quality gates based on the logical
21-
role of the table. It acts as a subtractive filter and schema-freezer,
22-
ensuring only compliant rows and columns reach the Silver (contracted) layer.
23-
2420
Workflow:
25-
1. Resolve: Determines table configuration and role (event_fact, entity_reference, etc.).
26-
2. Load: Fetches the raw snapshot from the lake's snapshot zone.
27-
3. Sequence: Iteratively applies atomic filtering rules (Deduplication, Null-checks, etc.).
28-
4. Track: Captures row-level telemetry and identifies compromised 'order_id's.
29-
5. Propagate: Returns validated/invalidated IDs to maintain referential integrity.
30-
6. Freeze: Executes 'enforce_schema' as the terminal step to project approved columns.
31-
7. Export: Persists the contract-compliant dataset to the Silver zone.
21+
1. Resolve: Identifies table metadata (role, schema, keys) from the central registry.
22+
2. Hydrate: Fetches the raw snapshot from the lake's snapshot zone.
23+
3. Delegate: Iteratively applies atomic logic rules (Deduplication, Chronology, Null-checks).
24+
4. Validate: Executes 'enforce_schema' as the terminal structural gate.
25+
5. Promote: Persists the contract-compliant dataset to the Silver (contracted) zone.
3226
3327
Operational Guarantees:
34-
- Subtractive Only: Filters rows first; never mutates row values (only column types).
35-
- Finality: The 'enforce_schema' step guarantees the artifact matches the system registry.
36-
- Referential Integrity: Tables processed after 'df_orders' use its output for parent-check filtering.
28+
- Subtractive Only: Exclusively filters rows or casts types; never mutates business values.
29+
- Referential Safety: Propagates invalidated keys across table boundaries to ensure consistent pruning.
30+
- Structural Finality: Guarantees output parity with the ASSEMBLE_SCHEMA specification.
31+
32+
Side Effects:
33+
- Persists a Parquet artifact to the contracted directory.
34+
- Updates newly invalidated 'order_id' sets for downstream cross-table pruning.
3735
3836
Failure Behavior:
39-
- Traps logic-step exceptions via a try-except block within the ROLE_STEPS loop.
40-
- Marks stage status as 'failed' and returns early upon encountering any transformation error.
37+
- Traps logic-step exceptions; logs errors to the report and halts the current table's processing.
4138
4239
Returns:
4340
tuple: (Stage Report Dict, Newly Invalidated IDs Set, Validated Order IDs Set)

data_pipeline/contract/contract_logic.py

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@ def deduplicate_exact_events(df: pd.DataFrame) -> tuple[pd.DataFrame, int]:
1616
- Identifies and removes rows where every column value is an exact match.
1717
- Retains the 'first' encountered instance of the record.
1818
19-
Returns:
20-
tuple: (Filtered DataFrame, Integer count of dropped rows)
19+
Invariants:
20+
- Grain: Preserves the original semantic grain while purging physical duplicates.
21+
22+
Outputs:
23+
- Tuple: (Filtered DataFrame, Integer count of dropped rows).
24+
25+
Failures:
26+
- [Structural] Crashes if input is not a pandas DataFrame.
2127
"""
2228

2329
initial_count = len(df)
@@ -40,14 +46,17 @@ def remove_unparsable_timestamps(df: pd.DataFrame) -> tuple[pd.DataFrame, int, s
4046
4147
Contract:
4248
- Evaluates all columns defined in REQUIRED_TIMESTAMPS.
43-
- Drops any row containing at least one NaT/unparsable value in these columns.
49+
- Subtractive Filtering: Drops any row containing at least one NaT/unparsable value in target columns.
4450
4551
Invariants:
46-
- Does not cast types permanently; performs internal validation only.
47-
- Emits 'order_id' of failing rows to prevent orphan processing downstream.
52+
- Type Safety: Does not cast types permanently; performs internal validation only.
53+
- Lineage: Emits 'order_id' of failing rows to enable cascade pruning downstream.
4854
49-
Returns:
50-
tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids)
55+
Outputs:
56+
- Tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids).
57+
58+
Failures:
59+
- [Structural] Crashes if REQUIRED_TIMESTAMPS columns are missing from the DataFrame.
5160
"""
5261

5362
initial_count = len(df)
@@ -82,12 +91,17 @@ def remove_impossible_timestamps(df: pd.DataFrame) -> tuple[pd.DataFrame, int, s
8291
Enforces logical chronology for the order lifecycle.
8392
8493
Contract:
85-
- Invariant I: Order Approval Date >= Order Purchase Date.
86-
- Invariant II: Order Delivery Date >= Order Purchase Date.
87-
- Drops rows where the temporal sequence is physically impossible.
94+
- Chronological Gate: Order Approval Date >= Order Purchase Date AND Order Delivery Date >= Order Purchase Date.
95+
- Subtractive Filtering: Drops rows where the temporal sequence violates physical reality.
8896
89-
Returns:
90-
tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids)
97+
Invariants:
98+
- Temporal Alignment: Ensures all orders have a positive or zero lead time.
99+
100+
Outputs:
101+
- Tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids).
102+
103+
Failures:
104+
- [Structural] Crashes if lifecycle timestamp columns are missing.
91105
"""
92106

93107
purchase_ts = pd.to_datetime(df["order_purchase_timestamp"])
@@ -118,11 +132,17 @@ def remove_rows_with_null_constraint(
118132
Enforces mandatory data presence (NOT NULL) for a dynamic column list.
119133
120134
Contract:
121-
- Evaluates the subset of columns provided in 'non_nullable_column'.
122-
- Drops any row where at least one target column contains a Null/NaN.
135+
- Subset Validation: Evaluates only columns provided in 'non_nullable_column'.
136+
- Subtractive Filtering: Drops any row where at least one target column contains Null/NaN.
137+
138+
Invariants:
139+
- Data Integrity: Guarantees 100% population for critical join keys and metrics.
123140
124-
Returns:
125-
tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids)
141+
Outputs:
142+
- Tuple: (Filtered DataFrame, Count of dropped rows, Set of invalid order_ids).
143+
144+
Failures:
145+
- [Structural] Crashes if 'non_nullable_column' names are not in the DataFrame.
126146
"""
127147

128148
initial_count = len(df)
@@ -150,11 +170,17 @@ def cascade_drop_by_order_id(
150170
Enforces referential cleanup based on a blacklist of compromised keys.
151171
152172
Contract:
153-
- Drops any row whose 'order_id' exists in the 'invalid_order_ids' set.
173+
- Blacklist Filtering: Drops any row whose 'order_id' exists in 'invalid_order_ids'.
154174
- Purpose: Prunes child records (items/payments) whose parent orders failed validation.
155175
156-
Returns:
157-
tuple: (Filtered DataFrame, Integer count of dropped rows)
176+
Invariants:
177+
- Referential Integrity: Prevents orphan records from reaching the assembly stage.
178+
179+
Outputs:
180+
- Tuple: (Filtered DataFrame, Integer count of dropped rows).
181+
182+
Failures:
183+
- [Structural] Crashes if 'order_id' column is missing.
158184
"""
159185

160186
initial_count = len(df)
@@ -172,11 +198,17 @@ def enforce_parent_reference(
172198
Enforces referential integrity based on a whitelist of validated keys.
173199
174200
Contract:
175-
- Drops any row whose 'order_id' is NOT present in the 'valid_order_ids' set.
201+
- Whitelist Filtering: Drops any row whose 'order_id' is NOT present in 'valid_order_ids'.
176202
- Purpose: Final referential gate to ensure total alignment with the 'orders' grain.
177203
178-
Returns:
179-
tuple: (Filtered DataFrame, Integer count of dropped rows)
204+
Invariants:
205+
- Data Reliability: Guarantees that every child record has a corresponding valid parent.
206+
207+
Outputs:
208+
- Tuple: (Filtered DataFrame, Integer count of dropped rows).
209+
210+
Failures:
211+
- [Structural] Crashes if 'order_id' column is missing.
180212
"""
181213
initial_count = len(df)
182214

@@ -200,11 +232,14 @@ def enforce_schema(
200232
- Type Enforcement: Casts remaining columns to the formats defined in 'dtypes'.
201233
202234
Invariants:
203-
- Column Integrity: The output column count and order strictly match 'required_column'.
204-
- Type Safety: Ensures the dataset is ready for downstream analytical joins (e.g., matching IDs).
235+
- Structural Integrity: Output exactly matches the modeling specification.
236+
- Grain: Preserves the input row count.
237+
238+
Outputs:
239+
- Tuple: (Filtered DataFrame, Integer count of columns removed).
205240
206-
Returns:
207-
tuple: (Filtered DataFrame, Integer count of columns removed).
241+
Failures:
242+
- [Structural] Crashes if required columns are missing or if dtypes are incompatible.
208243
"""
209244

210245
initial_col_count = len(df.columns)

data_pipeline/publish/publish_executor.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,22 @@ def execute_publish_lifecycle(run_context: RunContext) -> Dict:
1616
"""
1717
Main entry point for the Pipeline Publish Stage.
1818
19-
This component manages the transition of analytical artifacts from
20-
the internal assembly zones to the production-facing BI environment.
21-
2219
Workflow:
23-
1. Integrity Gate: Verifies that the current run has produced all
24-
required semantic modules and tables defined in the registry.
25-
2. Promotion: Moves/copies artifacts into a permanent, read-only
26-
versioned directory (v{run_id}).
27-
3. Activation: Performs an atomic update of the 'latest' pointer
28-
to switch BI/Reporting traffic to the new version.
20+
1. Validate: Executes the 'Integrity Gate' to ensure all semantic artifacts exist and are schema-compliant.
21+
2. Promote: Transfers validated artifacts to the permanent versioned publication zone.
22+
3. Delegate: Triggers the atomic pointer swap to activate the new version for BI consumers.
2923
3024
Operational Guarantees:
31-
- Atomicity: The 'latest' pointer is updated ONLY if all prior
32-
validation and promotion steps succeed.
33-
- Immutability: Promoted versions are treated as static snapshots.
34-
- Fail-Fast: Any failure in the lifecycle prevents version activation.
25+
- Atomicity: The 'latest' version pointer is updated ONLY after successful promotion of all artifacts.
26+
- Immutability: Once published, a versioned directory is treated as a static, read-only snapshot.
27+
- Fail-Fast: Any failure in validation or promotion immediately halts the lifecycle.
28+
29+
Side Effects:
30+
- Persists a new versioned directory (v{run_id}) in the publication zone.
31+
- Mutates the 'latest_version.json' manifest to update the global version pointer.
3532
3633
Failure Behavior:
37-
- Explicit Fail-Fast: Uses 'fail_step' helper to terminate the lifecycle and
38-
mark status as 'failed' immediately after any step failure.
34+
- Traps step-level failures; logs errors and returns a report with status='failed', preventing version activation.
3935
4036
Returns:
4137
Dict: A global publish report containing status and step-level logs.

data_pipeline/publish/publish_logic.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,18 @@ def run_integrity_gate(run_context: RunContext) -> Dict:
4646
Enforces the pre-publication structural completeness contract.
4747
4848
Contract:
49-
- Scans the runtime semantic directory for existence.
50-
- Validates that every Module and Table defined in SEMANTIC_MODULES
51-
exists as a physical artifact.
49+
- Structural Validation: Scans the runtime semantic directory and verifies 1:1 parity with SEMANTIC_MODULES registry.
50+
- Schema Enforcement: Validates that all physical Parquet files contain the required column set.
5251
5352
Invariants:
54-
- Failure is triggered if any expected Parquet file is missing.
53+
- Completeness: Halts publication if any expected module or table is missing from the file system.
54+
- Version Alignment: Ensures all files follow the current run_id timestamp convention.
5555
56-
Returns:
57-
Dict: A report object containing the success status and findings.
56+
Outputs:
57+
- Dict: Report containing 'status' and detailed findings.
58+
59+
Failures:
60+
- [Structural] Returns status='failed' if directories are missing, modules mismatch, or schemas are incomplete.
5861
"""
5962

6063
report = init_report()
@@ -129,15 +132,18 @@ def promote_semantic_version(run_context: RunContext) -> Dict:
129132
Manages the archival of the current run into the publication zone.
130133
131134
Contract:
132-
- Creates a permanent directory following the 'v{run_id}' convention.
133-
- Transfers all semantic artifacts to the versioned destination.
135+
- Promote: Transfers validated semantic artifacts from the runtime zone to a permanent versioned destination.
136+
- Versioning: Creates a new directory following the 'v{run_id}' physical convention.
134137
135138
Invariants:
136-
- Destination is derived from run_context.published_path.
137-
- Relies on the storage_adapter for Local/GCS transparency.
139+
- Immutability: Once promoted, artifacts are treated as static, read-only snapshots.
140+
- Path Integrity: Destination is derived strictly from run_context.published_path.
141+
142+
Outputs:
143+
- Dict: Report logging the promotion status and any transfer errors.
138144
139-
Returns:
140-
Dict: A report object logging the promotion status.
145+
Failures:
146+
- [Operational] Returns status='failed' if the version directory already exists or upload fails.
141147
"""
142148

143149
report = init_report()
@@ -172,16 +178,18 @@ def activate_published_version(run_context: RunContext) -> Dict:
172178
Atomically updates the system-wide 'latest' version pointer.
173179
174180
Contract:
175-
- Generates a JSON manifest containing run_id and publication metadata.
176-
- Overwrites the root 'latest_version.json' in the published zone.
181+
- Atomic Update: Overwrites the root 'latest_version.json' to shift downstream consumers to the new run.
182+
- BI Consistency: Guarantees that analytical tools see the new version only after successful promotion.
177183
178184
Invariants:
179-
- Atomic Update: Local updates use write-and-replace to prevent corruption.
180-
- BI Consistency: Downstream consumers see the new version only after
181-
this atomic swap is complete.
185+
- Pointer Integrity: Manifest always contains current run_id and ISO-8601 publication timestamps.
186+
- Atomicity: Local updates use a write-and-replace (os.replace) strategy to prevent manifest corruption.
187+
188+
Outputs:
189+
- Dict: Report logging the activation status.
182190
183-
Returns:
184-
Dict: A report object logging the activation status.
191+
Failures:
192+
- [Operational] Returns status='failed' if manifest generation or storage upload (Local/GCS) fails.
185193
"""
186194

187195
report = init_report()

data_pipeline/validation/validation_executor.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,19 @@ def apply_validation(run_context: RunContext, base_path: Path | None = None) ->
2323
"""
2424
Main entry point for the Pipeline Validation Stage.
2525
26-
This component serves as the primary diagnostic gate for the data pipeline,
27-
ensuring that raw snapshots meet the structural requirements for the
28-
subsequent Contract and Assembly stages.
29-
3026
Workflow:
31-
1. Loading: Iteratively fetches logical tables from the snapshot zone.
32-
2. Base Check: Enforces schema, uniqueness, and null constraints via 'run_base_validations'.
33-
3. Role Dispatch: Executes specialized logic (Event/Transaction) based on 'TABLE_CONFIG'.
34-
4. Referential Check: Evaluates inter-table integrity (orphans) via 'run_cross_table_validations'.
27+
1. Hydrate: Iteratively fetches logical tables from the snapshot zone.
28+
2. Delegate: Enforces base structural integrity (Schema, PK, Nulls) for each table.
29+
3. Delegate: Executes role-specific domain checks (Event Chronology, Transaction Ranges).
30+
4. Delegate: Performs cross-table referential analysis (Orphan Detection).
3531
3632
Operational Guarantees:
37-
- Diagnostic Only: This function is read-only and will never mutate the source data.
38-
- Comprehensive Reporting: Captures all failures across all tables before returning; does not fail-fast on the first table error.
39-
- Severity: Structural issues are logged as 'errors' while referential issues are 'warnings'.
33+
- Diagnostic Only: Read-only; never mutates source snapshots.
34+
- Non-Blocking: Processes all tables regardless of individual base validation failures.
35+
- Severity Model: Distinguishes between fatal Structural Errors and non-fatal Referential Warnings.
4036
4137
Failure Behavior:
42-
- Non-Blocking: Continues processing remaining tables even if one fails base validations.
43-
- Status Update: Sets global report status to 'failed' if any errors or warnings are accumulated.
38+
- Sets the global report status to 'failed' if any errors or warnings are accumulated across the dataset.
4439
4540
Returns:
4641
Dict: A unified validation report containing 'status' and detailed finding lists.

0 commit comments

Comments
 (0)