Skip to content

Commit 195d9a1

Browse files
committed
docs: refactor docstrings to procedural and tight style
1 parent e324931 commit 195d9a1

7 files changed

Lines changed: 161 additions & 175 deletions

File tree

data_pipeline/run_pipeline.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,25 +103,31 @@ def finalize_run(run_context: RunContext, status: str) -> None:
103103

104104
def main() -> None:
105105
"""
106-
Pipeline execution controller.
106+
Pipeline orchestrator.
107107
108108
Execution order:
109-
110-
1. Initialize run context and directory structure.
111-
2. Capture raw snapshot and initialize metadata.
112-
3. Run initial validation on raw data.
113-
- Exit if structural errors exist.
114-
4. Apply table contracts in configured parent → child order,
115-
propagating invalid order_ids.
116-
5. Rerun validation on contracted data.
117-
- Exit if any errors or warnings remain.
118-
6. Assemble the core event table.
119-
- Exit on assembly failure.
120-
7. Build semantic layer tables.
121-
- Exit on semantic failure.
122-
8. Run pre-publish semantic integrity gate.
123-
- Exit if gate fails.
124-
9. Exit process with success code.
109+
1. Snapshot raw data
110+
2. Initialize metadata (RUNNING)
111+
3. Validation → halt on errors
112+
4. Contract enforcement
113+
- Apply role-driven repair
114+
- Propagate invalid order_ids (parent → child)
115+
5. Re-validation → halt on errors/warnings
116+
6. Assemble event table
117+
7. Build semantic layer
118+
8. Pre-publish integrity gate
119+
9. Promote version
120+
10. Finalize metadata (SUCCESS)
121+
11. Atomic activation
122+
123+
Guarantees:
124+
- Deterministic forward-only execution
125+
- Single run isolation
126+
- Only Contract stage mutates data
127+
- Activation occurs only after SUCCESS
128+
129+
Failure behavior:
130+
- Any stage failure → metadata FAILED → process exits
125131
"""
126132

127133
run_context = RunContext.create()

data_pipeline/shared/run_context.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,20 @@ def _generate_run_id() -> str:
1717

1818
@dataclass
1919
class RunContext:
20+
"""
21+
Run-scoped execution context.
22+
23+
Responsibilities:
24+
- Generate and hold run_id
25+
- Define all stage directory paths
26+
- Provide consistent path resolution across stages
27+
- Enforce run isolation via run-scoped folders
28+
29+
Guarantees:
30+
- Each run writes only within its own directory tree
31+
- Published artifacts are resolved deterministically from run_id
32+
"""
33+
2034
run_id: str
2135
base_path: str | Path
2236

data_pipeline/stages/apply_raw_data_contract.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,26 +136,28 @@ def apply_contract(
136136
run_context: RunContext, table_name: str, invalid_order_ids: set | None = None
137137
) -> tuple[dict, set]:
138138
"""
139-
Applies role-driven deterministic normalization, producing a cleaned
140-
dataset and a structured execution report.
141-
142-
Chronological behavior:
143-
- Initializes contract metrics and error container.
144-
- Validates table eligibility against TABLE_CONFIG.
145-
- Loads the logical table from the raw snapshot.
146-
- Applies role-specific contract steps:
147-
- **event_fact:**
148-
- exact deduplication
149-
- timestamp parse enforcement
150-
- temporal invariant enforcement (produces invalid `order_ids`)
151-
- **transaction_detail:**
152-
- exact deduplication
153-
- optional cascade removal using upstream invalid `order_ids`
154-
- **entity_reference:**
155-
- exact deduplication only.
156-
- Records row-level impact for each enforcement step.
157-
- Writes the contracted output to the contract layer.
158-
- Returns the execution report and any newly invalidated `order_ids`.
139+
Enforce structural contract on a single logical table.
140+
141+
Role-driven behavior:
142+
- event_fact:
143+
- exact deduplication
144+
- remove unparsable timestamps
145+
- remove temporal violations
146+
- emit invalid order_ids
147+
- transaction_detail:
148+
- deduplicate
149+
- cascade drop invalid order_ids
150+
- entity_reference:
151+
- deduplicate only
152+
153+
Guarantees:
154+
- Deterministic row removal
155+
- No numeric or domain correction
156+
- Output written to contracted layer
157+
158+
Returns:
159+
- Contract execution report
160+
- Newly invalidated order_ids (if any)
159161
"""
160162

161163
report = {

data_pipeline/stages/assemble_validated_events.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import pandas as pd
1111
from typing import Dict, List
1212
from data_pipeline.shared.run_context import RunContext
13-
from data_pipeline.shared.table_configs import (
13+
from data_pipeline.shared.modeling_configs import (
1414
ASSEMBLE_ENFORCED_SCHEMA,
1515
ASSEMBLE_ENFORCED_DTYPES,
1616
)
@@ -158,19 +158,22 @@ def freeze_schema(df: pd.DataFrame) -> pd.DataFrame:
158158

159159
def assemble_events(run_context: RunContext) -> Dict:
160160
"""
161-
Produces the contract-compliant event fact table from validated logical inputs and produce a consolidated run report
162-
163-
Chronological behavior:
164-
165-
- Initializes run-scoped reporting and logging helpers.
166-
- Loads all required event-source tables from the contracted layer.
167-
- Fails fast if any required table is missing or empty.
168-
- Executes core assembly pipeline:
169-
- merge_data (grain enforcement)
170-
- derive_fields (event enrichment)
171-
- freeze_schema (final contract projection)
172-
- Exports the assembled dataset to the run-scoped output path.
173-
- Aggregates all findings into the returned report.
161+
Assemble contract-compliant event dataset (order grain).
162+
163+
Steps:
164+
- Load contracted event tables
165+
- Merge with cardinality enforcement (1 row per order_id)
166+
- Derive temporal metrics and lineage fields
167+
- Freeze schema and enforce dtypes
168+
- Export deterministic output
169+
170+
Grain:
171+
- One row per order_id (hard fail on violation)
172+
173+
Non-responsibilities:
174+
- No validation
175+
- No repair
176+
- No business logic
174177
"""
175178

176179
report = init_report()

data_pipeline/stages/build_bi_semantic_layer.py

Lines changed: 40 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pandas as pd
99
from typing import Dict, List, Tuple, Literal
1010
from data_pipeline.shared.run_context import RunContext
11-
from data_pipeline.shared.table_configs import (
11+
from data_pipeline.shared.modeling_configs import (
1212
SELLER_DIM_ENFORCED_SCHEMA,
1313
SELLER_DIM_ENFORCED_DTYPES,
1414
SELLER_FACT_ENFORCED_SCHEMA,
@@ -45,23 +45,22 @@ def seller_weekly_semantic(
4545
df: pd.DataFrame,
4646
) -> Tuple[pd.DataFrame, pd.DataFrame]:
4747
"""
48-
Seller weekly semantic builder.
49-
50-
Transforms the assembled event table into seller-level weekly
51-
performance fact and supporting seller dimension.
52-
53-
Transform behavior:
54-
55-
- Validates single-run lineage via `run_id`
56-
- Derives weekly alignment fields and status flags
57-
- Aggregates event data to seller-week grain
58-
- Builds seller dimension from first observed activity
48+
Build seller weekly semantic layer from assembled events.
5949
6050
Fact grain:
61-
- One row per (seller_id, order_year_week)
51+
- 1 row per (seller_id, order_year_week)
6252
6353
Dimension grain:
64-
- One row per seller_id
54+
- 1 row per seller_id
55+
56+
Behavior:
57+
- Enforce single run_id lineage
58+
- Derive ISO week alignment
59+
- Aggregate event metrics to seller-week
60+
61+
Returns:
62+
- Aggregated fact dataframe
63+
- Seller dimension dataframe
6564
"""
6665

6766
read_assembled = df.copy()
@@ -109,40 +108,26 @@ def freeze_seller_semantic(
109108
table_type: Literal["fact", "dim"],
110109
) -> pd.DataFrame:
111110
"""
112-
Seller semantic contract enforcer.
111+
Enforce seller semantic contract.
113112
114-
Routes the input table to the appropriate fact or dimension
115-
contract freezer and enforces grain integrity before projection.
113+
For fact:
114+
- Validate required columns
115+
- Enforce (seller_id, order_year_week) uniqueness
116+
- Apply projection, dtype enforcement, deterministic sort
116117
117-
Behavior:
118+
For dimension:
119+
- Validate required columns
120+
- Enforce seller_id uniqueness
121+
- Apply projection and dtype enforcement
118122
119-
- Validates `table_type` selector
120-
- Applies grain-level duplicate checks:
121-
- fact: (seller_id, order_year_week)
122-
- dim: seller_id
123-
- Dispatches to the corresponding schema freezer
124-
- Returns a BI-ready, schema-stable dataframe
123+
Raise:
124+
- RuntimeError on schema or grain violation
125125
"""
126126

127127
if table_type not in {"fact", "dim"}:
128128
raise ValueError
129129

130130
def freeze_seller_fact(df: pd.DataFrame) -> pd.DataFrame:
131-
"""
132-
Seller weekly fact contract enforcement.
133-
134-
Projects the aggregated seller-week dataset into the approved
135-
fact schema, enforces dtypes, and applies deterministic ordering.
136-
137-
Enforcement actions:
138-
139-
- Validates presence of all required fact columns
140-
- Projects to the contract column order
141-
- Casts fields to enforced dtypes
142-
- Sorts by (seller_id, order_year_week)
143-
- Resets index for clean downstream consumption
144-
145-
"""
146131

147132
fact_contract = df.copy()
148133

@@ -161,20 +146,6 @@ def freeze_seller_fact(df: pd.DataFrame) -> pd.DataFrame:
161146
return fact_contract
162147

163148
def freeze_seller_dim(df: pd.DataFrame) -> pd.DataFrame:
164-
"""
165-
Seller dimension contract enforcement.
166-
167-
Projects the seller dimension into the approved schema,
168-
enforces dtypes, and applies deterministic ordering.
169-
170-
Enforcement actions:
171-
172-
- Validates presence of all required dimension columns
173-
- Projects to the contract column order
174-
- Casts fields to enforced dtypes
175-
- Sorts by seller_id
176-
- Resets index for clean downstream consumption
177-
"""
178149

179150
dim_contract = df.copy()
180151

@@ -216,20 +187,24 @@ def build_semantic_layer(run_context: RunContext) -> Dict:
216187
"""
217188
Semantic layer orchestrator.
218189
219-
Builds seller performance semantic tables from the assembled
220-
event layer and exports contract-compliant BI artifacts.
190+
Builds semantic modules from the assembled event layer and
191+
exports contract-compliant BI artifacts.
192+
193+
Execution:
194+
- Load assembled event dataset (order grain)
195+
- Fail fast if dataset is missing or empty
196+
- Execute registered semantic builders
197+
- Apply module-level freeze contracts
198+
- Export semantic artifacts into run-scoped semantic directory
199+
- Aggregate findings into report
221200
222-
Chronological behavior:
201+
Guarantees:
202+
- Each semantic module enforces its declared grain
203+
- All exported tables are contract-compliant
204+
- No decision logic embedded
223205
224-
- Initializes run-scoped reporting and logging helpers.
225-
- Loads the assembled_events logical table.
226-
- Fails fast if the assembled dataset is missing or empty.
227-
- Executes semantic pipeline:
228-
- seller_weekly_semantic (aggregation)
229-
- freeze_seller_semantic (fact and dimension contracts)
230-
- Generates run-partitioned output filenames.
231-
- Exports semantic tables to the run-scoped semantic directory.
232-
- Aggregates all findings into the returned report.
206+
Failure:
207+
- Any module failure halts semantic stage
233208
"""
234209

235210
report = init_report()

0 commit comments

Comments
 (0)