|
1 | | -# **Assembly Stage** |
| 1 | +# Assembly Stage |
2 | 2 |
|
3 | 3 | **Files:** |
4 | 4 | * **Executor:** [`assembly_executor.py`](../../data_pipeline/assembly/assembly_executor.py) |
|
8 | 8 |
|
9 | 9 |  |
10 | 10 |
|
11 | | -## **System Contract** |
| 11 | +## System Contract |
12 | 12 |
|
13 | 13 | **Purpose** |
14 | 14 |
|
15 | | -Integrates multiple normalized relational tables into a unified, analytical "Event" dataset and extracts high-fidelity "Dimension" references. It transforms raw business facts into a ready-to-model state by enforcing cardinality rules, leveraging the Primitive Integer Pipeline for memory efficiency, and calculating temporal performance metrics. |
| 15 | +Integrates normalized relational tables into a unified analytical dataset and extracts dimension references. This stage enforces cardinality rules, applies integer mapping for memory efficiency, and calculates temporal performance metrics. |
16 | 16 |
|
17 | 17 | **Invariants** |
18 | | -* **Strict Order-ID Grain:** The primary event output is guaranteed to be exactly 1 row per `order_id_int`. Any operation causing cardinality explosion triggers a terminal failure. |
19 | | -* **Inner-Join Priority:** To maintain analytical integrity, orders without corresponding items are purged. |
20 | | -* **Temporal Determinism:** All lead times, lags, and delays are calculated as integer-day durations based on validated UTC timestamps pre-normalized to microsecond resolution. |
21 | | -* **Reference Uniqueness:** Dimension reference tables (Customers, Products) are strictly deduplicated by their primary keys. |
| 18 | +* **Order-ID Grain:** The primary event output maintains a 1:1 grain per `order_id_int`. Cardinality issues trigger a terminal failure. |
| 19 | +* **Join Logic:** Orders without corresponding items are removed to maintain integrity. |
| 20 | +* **Temporal Calculations:** Lead times and delays are calculated as integer-day durations using UTC timestamps. |
| 21 | +* **Reference Uniqueness:** Dimension tables (Customers, Products) are deduplicated by their primary keys. |
22 | 22 |
|
23 | 23 | **Inputs** |
24 | | -* `run_context`: `RunContext` (Path resolution for Silver/contracted and Gold/assembled zones). |
25 | | -* **Source Tables:** `df_orders`, `df_order_items`, `df_payments` (from the contracted layer). |
| 24 | +* `run_context`: Path resolution for Silver and Gold zones. |
| 25 | +* **Source Tables:** `df_orders`, `df_order_items`, `df_payments` from the Silver layer. |
26 | 26 |
|
27 | 27 | **Outputs** |
28 | | -* **Assembly Report:** `dict` (Step-level status and informational logs). |
29 | | -* **Assembled Events:** `parquet` (The unified analytical order-grain table). |
30 | | -* **Dimension Refs:** `parquet` (Unique snapshots of customer and product attributes). |
| 28 | +* **Assembly Report:** Status and informational logs for each step. |
| 29 | +* **Assembled Events:** Unified analytical table at the order grain. |
| 30 | +* **Dimension Refs:** Deduplicated snapshots of customer and product attributes. |
31 | 31 |
|
32 | | -## **Execution Workflow** |
| 32 | +## Execution Workflow |
33 | 33 |
|
34 | | -The **Executor** coordinates two distinct sub-orchestrations: |
| 34 | +The executor manages two distinct workflows: |
35 | 35 |
|
36 | | -### **Workflow I: Event Assembly** |
37 | | -1. **Batch Load:** Fetches the required triplet (`orders`, `items`, `payments`) from the Silver zone. |
38 | | -2. **Merge:** Joins datasets using `merge_data`. It performs an inner join on items and a left join on payments to preserve financial data without losing order context. |
39 | | - * **Optimization:** Employs **Integer-Joins** on pre-mapped `UInt32/UInt64` IDs (e.g., `order_id_int`) provided by the Contract Registrar to drastically reduce memory overhead. Utilizes pre-aggregation on payments and items to ensure a strict 1:1 grain, preventing row explosions. |
40 | | -3. **Derivation:** Executes `derive_fields` to calculate fulfillment lead times and extract ISO-calendar attributes. |
41 | | - * **Optimization:** Applies memory-efficient casting (e.g., `Int16` for durations, `Categorical` for repetitive strings) and drops intermediate columns early to minimize row width. |
42 | | -4. **Schema Freeze:** Projects the final `ASSEMBLE_SCHEMA` and casts all columns to `ASSEMBLE_DTYPES`. |
43 | | - * **Optimization:** Omitted sorting to enable zero-copy streaming, maintaining a non-blocking execution plan compatible with `sink_parquet()`. |
44 | | -5. **Export & Clean:** Persists the table using `sink_parquet()` for streaming output and triggers `gc.collect()` to free memory before dimension processing. |
| 36 | +### Workflow I: Event Assembly |
| 37 | +1. **Load:** Retrieves `orders`, `items`, and `payments` from the Silver zone. |
| 38 | +2. **Merge:** Joins datasets using an inner join for items and a left join for payments. |
| 39 | + * **Optimization:** Uses integer joins on `UInt32/UInt64` IDs to reduce memory overhead. Pre-aggregates payments and items to ensure a 1:1 grain. |
| 40 | +3. **Derivation:** Calculates fulfillment lead times and extracts ISO-calendar attributes. |
| 41 | + * **Optimization:** Applies data type casting (e.g., `Int16` for durations, `Categorical` for repetitive strings) and drops intermediate columns to reduce memory footprint. |
| 42 | +4. **Schema Enforcement:** Projects the final `ASSEMBLE_SCHEMA` and casts to `ASSEMBLE_DTYPES`. |
| 43 | + * **Optimization:** Skips sorting to enable streaming and non-blocking execution. |
| 44 | +5. **Export:** Saves the table using `sink_parquet()` for streaming output and triggers garbage collection. |
45 | 45 |
|
46 | | -### **Workflow II: Dimension Reference Extraction** |
| 46 | +### Workflow II: Dimension Reference Extraction |
47 | 47 | 1. **Selection:** Iterates through the `DIMENSION_REFERENCES` registry. |
48 | | -2. **Deduplication:** Extracts the required column subset and drops duplicate primary keys. |
49 | | -3. **Export:** Persists each dimension (e.g., `df_customers`) as an independent artifact. |
| 48 | +2. **Deduplication:** Extracts required columns and removes duplicate primary keys. |
| 49 | +3. **Export:** Saves each dimension table as an independent artifact. |
50 | 50 |
|
51 | | -## **Optimization & Memory Invariants** |
| 51 | +## Optimization & Resource Management |
52 | 52 |
|
53 | | -* **Primitive Integer Pipeline:** To operate within 4GB RAM, the pipeline converts 36-byte UUID strings into 8-byte `UInt64` hashes for joins, and 4-byte `UInt32` categoricals for payloads. This is the primary driver of memory efficiency for 36M+ row datasets. |
54 | | -* **Streaming-First Join:** By deferring aggregations until after raw joins on `order_id`, leveraging Polars' streaming engine to avoid massive, materialized hash tables. |
55 | | -* **Low-Level Memory Reclamation:** The executor utilizes `ctypes.CDLL('libc.so.6').malloc_trim(0)` at high-water mark transitions. This forces the Linux allocator to release free memory back to the OS, preventing Cloud Run from terminating the process due to bloated (but unused) heap memory. |
56 | | -* **Zero-Copy Streaming:** `sink_parquet()` is used to prevent the pipeline from fully materializing the assembly result set in memory. |
| 53 | +* **Integer Mapping:** Converts UUID strings to `UInt64` hashes for joins and `UInt32` for payloads. This reduces memory overhead when processing large datasets. |
| 54 | +* **Streaming Joins:** Defers aggregations until after joins, leveraging the Polars streaming engine to avoid large materialized tables. |
| 55 | +* **Memory Reclamation:** Uses `malloc_trim(0)` during high-water mark transitions to release free memory back to the OS. |
| 56 | +* **Zero-Copy Streaming:** Employs `sink_parquet()` to avoid materializing the entire result set in memory. |
57 | 57 |
|
58 | | -## **Boundaries** |
| 58 | +## Boundaries |
59 | 59 |
|
60 | | -| This component **DOES** | This component **DOES NOT** | |
| 60 | +| This component | This component does NOT | |
61 | 61 | | :--- | :--- | |
62 | | -| Join multiple relational tables into a flat grain. | Perform data cleaning (handled in Contract stage). | |
63 | | -| Calculate time-deltas (e.g., `lead_time_days`). | Perform complex multi-stage aggregations (delegated to Semantic stage). | |
64 | | -| Enforce 1:1 cardinality for the final event grain. | Handle schema validation of raw data. | |
65 | | -| Deduplicate dimension attributes. | Manage partitioning logic (managed by the loader/exporter). | |
66 | | -| Manage peak memory via explicit `gc` triggers and concurrency control. | Change historical values or re-map IDs. | |
67 | | -| Utilize Hash-Joins for high-cardinality keys. | Perform blocking sorts on large datasets. | |
68 | | - |
69 | | -## **Failure & Severity Model** |
70 | | - |
71 | | -### **Operational Failures (System Level)** |
72 | | -* **Task Failure:** Individual transformation steps (Merge, Derive, Freeze) are wrapped in a fail-safe `task_wrapper`. Exceptions are trapped, logged, and return a `failed` status for that step. |
73 | | -* **Executor Trapping:** The top-level orchestration in `assembly_executor.py` uses `try-except-finally` blocks to catch and log unexpected pipeline crashes while ensuring memory reclamation. |
74 | | -* **Loading Missing Table:** If a required table (e.g., `df_orders`) is missing from the Silver zone, the stage returns `failed` immediately. |
75 | | -* **Export Failure:** Disk I/O errors or path resolution issues during the `export_file` call halt the lifecycle. |
76 | | - |
77 | | -### **Functional Findings (Data Level)** |
78 | | -* **Partial Payments:** Orders without payments are allowed (via Left Join); the system fills these with `None/NaN`, which is considered a valid business state rather than a failure. |
| 62 | +| Joins relational tables into a flat grain. | Perform data cleaning (handled in Contract stage). | |
| 63 | +| Calculates time-deltas (e.g., lead times). | Perform multi-stage aggregations (delegated to Semantic stage). | |
| 64 | +| Enforces 1:1 cardinality for the event grain. | Validate raw data schemas. | |
| 65 | +| Deduplicates dimension attributes. | Manage partitioning logic. | |
| 66 | +| Manages peak memory and garbage collection. | Change historical values or re-map IDs. | |
| 67 | +| Uses Hash-Joins for high-cardinality keys. | Perform blocking sorts on large datasets. | |
| 68 | + |
| 69 | +## Failure & Severity Model |
| 70 | + |
| 71 | +### System Failures |
| 72 | +* **Task Failure:** Transformation steps are wrapped in a handler that logs exceptions and returns a failure status. |
| 73 | +* **Executor Safety:** Top-level orchestration uses `try-except-finally` blocks to catch crashes and ensure resource cleanup. |
| 74 | +* **Missing Data:** If a required table is missing from the Silver zone, the stage fails. |
| 75 | +* **I/O Failure:** Storage or path errors during export halt the lifecycle. |
| 76 | + |
| 77 | +### Data Findings |
| 78 | +* **Optional Joins:** Orders without payments are allowed; the system fills missing values with nulls, which is treated as a valid state. |
0 commit comments