Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 86 additions & 152 deletions README.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions assets/benchmarks/polars/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Measurement Methodology

This section details the methodology used to capture the memory metrics in the [`GCP Stress-Test Metrics (Scaling Efficiency)`](/README.md#gcp-stress-test-metrics-scaling-efficiency)
This section details the methodology used to capture the memory metrics in the [`GCP Stress-Test Metrics (Scaling Efficiency)`](../../../README.md#gcp-stress-test-metrics-scaling-efficiency)

The telemetry logger below was added to the orchestrator for a specific benchmarking run.

Expand Down Expand Up @@ -28,7 +28,7 @@ finally:
stop_event.set()
logger_thread.join()
```
Since `psutil` requires C-extensions to compile, the **Dockerfile** was modified to include the necessary build tools and the package itself. This allowed for benchmarking without altering the project's permanent [`requirements.txt`](/data_pipeline/requirements.txt).
Since `psutil` requires C-extensions to compile, the **Dockerfile** was modified to include the necessary build tools and the package itself. This allowed for benchmarking without altering the project's permanent [`requirements.txt`](../../../data_pipeline/requirements.txt).

```docker
FROM python:3.11-slim
Expand Down
78 changes: 38 additions & 40 deletions docs/data_extract/drive_extractor.md
Original file line number Diff line number Diff line change
@@ -1,65 +1,63 @@
# **Data Extractor Stage**
# Data Extractor Stage

**Files:**
* **Executor:** [`run_extract.py`](../../data_extract/run_extract.py)
* **Logic:** [`extract_logic.py`](../../data_extract/shared/extract_logic.py)
* **Utilities:** [`utils.py`](../../data_extract/shared/utils.py)

**Role:** Source Ingestion and Cloud Mirroring Gateway.
**Role:** Source Ingestion and Storage Mirroring.

## **System Contract**
## System Contract

**Purpose**

Automates the secure transfer of source data from Google Drive to Google Cloud Storage (GCS). It ensures that raw inputs are preserved in an immutable archival zone while simultaneously providing a clean trigger-point for the downstream data pipeline.
Automates the transfer of source data from Google Drive to Google Cloud Storage (GCS). It preserves raw inputs in an archival zone and provides a trigger-point for the downstream data pipeline.

**Invariants**
* **Folder-Level Deduplication:** Every source folder is processed exactly once. Re-execution is blocked by the existence of a `.success` marker in the archival bucket.
* **Dual-Mirroring Guarantee:** Every extracted file must be successfully written to both the **Archival Bucket** (compliance/audit) and the **Pipeline Bucket** (raw landing zone) before the extraction is considered successful.
* **Namespace Protection:** The extractor only operates on subfolders belonging to the strictly defined `PARENT_FOLDER`. It cannot "see" or extract files from the wider Drive environment.
* **Metadata Lineage:** Every extraction run generates a unique `execution_id` (UUID) and a JSON manifest documenting file names, timestamps, and status.
* **Idempotency:** Each source folder is processed once. Re-execution is prevented by checking for a `.success` marker in the archival bucket.
* **Storage Mirroring:** Extracted files are written to both the **Archival Bucket** and the **Pipeline Bucket** for a transfer to be considered successful.
* **Access Scoping:** The extractor only operates on subfolders within the defined `PARENT_FOLDER`. It cannot access files outside this scope.
* **Metadata Logging:** Each extraction run generates a unique `execution_id` and a JSON manifest documenting file names, timestamps, and status.

**Inputs**
* `target_child_folder`: `str` (The identifier of the operational batch to ingest).
* **Drive Service Account**: (Credentials with read-access to the `operations-upload-folder`).
* `target_child_folder`: Identifier of the folder to ingest.
* **Drive Service Account**: Credentials with read-access to the source folder.

**Outputs**
* **Archival Artifacts:** Mirror of source files in `gs://ops-archival-storage-dev/archive/{folder_name}/`.
* **Pipeline Artifacts:** Mirror of source files in `gs://ops-pipeline-storage-dev/raw/`.
* **Success Marker:** An empty `gs://.../{folder_name}.success` file used for idempotency.
* **Extraction Log:** A JSON metadata file summarizing the run.
* **Archival Artifacts:** Mirror of source files in the archival bucket.
* **Pipeline Artifacts:** Mirror of source files in the pipeline's raw landing zone.
* **Success Marker:** An empty `.success` file used for idempotency.
* **Extraction Log:** JSON metadata file summarizing the run.

## **Execution Workflow**
## Execution Workflow

The **Extractor** manages the ingestion lifecycle through the following steps:
The extractor manages the ingestion lifecycle through these steps:

1. **Deduplication Check:** Queries GCS for the success marker. If present, the job terminates immediately with a "Skipped" status.
2. **Hierarchy Resolution:** Uses the Drive API to locate the `folder_id` of the target child, ensuring it resides strictly under the authorized parent root.
3. **Manifest Fetching:** Retrieves a list of all files in the target folder, filtering out system-reserved files (e.g., `instruction.txt`).
4. **Extraction Loop:** For each valid file:
* Downloads the binary content from Google Drive into memory.
* Uploads the content to the **Archival Bucket** (long-term persistence).
* Uploads the content to the **Pipeline Bucket** (transient raw landing).
5. **Audit Persistence:** Generates and uploads the run metadata log.
6. **Marker Placement:** Upon 100% success of the file loop, writes the `.success` file to GCS.
1. **Duplicate Check:** Queries GCS for the success marker. If present, the job terminates with a "Skipped" status.
2. **Path Resolution:** Uses the Drive API to locate the target folder ID and verifies its parent root.
3. **File Discovery:** Lists files in the target folder, filtering out non-data files.
4. **Extraction Loop:** For each file:
* Downloads content from Google Drive to memory.
* Uploads content to the archival and pipeline buckets.
5. **Logging:** Generates and uploads the run metadata log.
6. **Finalization:** Writes the `.success` file to GCS after all files are successfully processed.

## Boundaries

## **Boundaries**

| This component **DOES** | This component **DOES NOT** |
| This component | This component does NOT |
| :--- | :--- |
| Extract files from Google Drive to GCS. | Modify or delete any files in the source Drive. |
| Mirror files across two separate administrative buckets. | Validate the internal schema or data quality of files. |
| Enforce folder-grain idempotency. | Rename or transform file content. |
| Log every file-level transfer result. | Trigger the main pipeline directly (Triggered via GCS events). |
| Filter out non-data files (instruction files). | Handle multi-part Drive uploads (expects completed files). |
| Extracts files from Google Drive to GCS. | Modify or delete files in the source Drive. |
| Mirrors files across separate buckets. | Validate internal schemas or data quality. |
| Enforces folder-level idempotency. | Rename or transform file content. |
| Logs file transfer results. | Trigger the main pipeline directly. |
| Filters non-data files. | Handle multi-part Drive uploads. |

## **Failure & Severity Model**
## Failure & Severity Model

### **Operational Failures (System Level)**
* **Missing Hierarchy:** If the `PARENT_FOLDER` or `target_child_folder` cannot be resolved, the orchestrator returns `exit 1`.
* **API Throttling/Auth:** If Drive or GCS credentials fail, the process halts. No success marker is written, allowing for a clean retry.
### System Failures
* **Resolution Failure:** If folders cannot be identified, the orchestrator returns an error code.
* **API/Auth Failure:** If credentials fail, the process stops without writing a success marker, allowing for retries.

### **Functional Findings (Data Level)**
* **Partial Extraction:** If a single file in a batch fails to upload to either bucket, the entire batch is marked as `failed`. The success marker is **not** written, ensuring that a subsequent run will attempt to re-process the *entire* folder.
* **Empty Source:** If the target folder contains no valid data files, the extractor logs a warning but terminates with `exit 1` (Failure) to prevent a "phantom" successful run from triggering downstream processes.
### Data Findings
* **Partial Extraction:** If any file in a batch fails to upload, the entire batch is marked as failed. The success marker is not written to ensure the entire folder is re-processed.
* **Empty Source:** If no valid data files are found, the extractor logs a warning and returns an error code to prevent triggering downstream processes.
100 changes: 50 additions & 50 deletions docs/data_pipeline/assembly_stage.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# **Assembly Stage**
# Assembly Stage

**Files:**
* **Executor:** [`assembly_executor.py`](../../data_pipeline/assembly/assembly_executor.py)
Expand All @@ -8,71 +8,71 @@

![assembled-stage-diagram](/assets/diagrams/04-assemble-stage-diagram.png)

## **System Contract**
## System Contract

**Purpose**

Integrates multiple normalized relational tables into a unified, analytical "Event" dataset and extracts high-fidelity "Dimension" references. It transforms raw business facts into a ready-to-model state by enforcing cardinality rules, leveraging the Primitive Integer Pipeline for memory efficiency, and calculating temporal performance metrics.
Integrates normalized relational tables into a unified analytical dataset and extracts dimension references. This stage enforces cardinality rules, applies integer mapping for memory efficiency, and calculates temporal performance metrics.

**Invariants**
* **Strict Order-ID Grain:** The primary event output is guaranteed to be exactly 1 row per `order_id_int`. Any operation causing cardinality explosion triggers a terminal failure.
* **Inner-Join Priority:** To maintain analytical integrity, orders without corresponding items are purged.
* **Temporal Determinism:** All lead times, lags, and delays are calculated as integer-day durations based on validated UTC timestamps pre-normalized to microsecond resolution.
* **Reference Uniqueness:** Dimension reference tables (Customers, Products) are strictly deduplicated by their primary keys.
* **Order-ID Grain:** The primary event output maintains a 1:1 grain per `order_id_int`. Cardinality issues trigger a terminal failure.
* **Join Logic:** Orders without corresponding items are removed to maintain integrity.
* **Temporal Calculations:** Lead times and delays are calculated as integer-day durations using UTC timestamps.
* **Reference Uniqueness:** Dimension tables (Customers, Products) are deduplicated by their primary keys.

**Inputs**
* `run_context`: `RunContext` (Path resolution for Silver/contracted and Gold/assembled zones).
* **Source Tables:** `df_orders`, `df_order_items`, `df_payments` (from the contracted layer).
* `run_context`: Path resolution for Silver and Gold zones.
* **Source Tables:** `df_orders`, `df_order_items`, `df_payments` from the Silver layer.

**Outputs**
* **Assembly Report:** `dict` (Step-level status and informational logs).
* **Assembled Events:** `parquet` (The unified analytical order-grain table).
* **Dimension Refs:** `parquet` (Unique snapshots of customer and product attributes).
* **Assembly Report:** Status and informational logs for each step.
* **Assembled Events:** Unified analytical table at the order grain.
* **Dimension Refs:** Deduplicated snapshots of customer and product attributes.

## **Execution Workflow**
## Execution Workflow

The **Executor** coordinates two distinct sub-orchestrations:
The executor manages two distinct workflows:

### **Workflow I: Event Assembly**
1. **Batch Load:** Fetches the required triplet (`orders`, `items`, `payments`) from the Silver zone.
2. **Merge:** Joins datasets using `merge_data`. It performs an inner join on items and a left join on payments to preserve financial data without losing order context.
* **Optimization:** Employs **Integer-Joins** on pre-mapped `UInt32/UInt64` IDs (e.g., `order_id_int`) provided by the Contract Registrar to drastically reduce memory overhead. Utilizes pre-aggregation on payments and items to ensure a strict 1:1 grain, preventing row explosions.
3. **Derivation:** Executes `derive_fields` to calculate fulfillment lead times and extract ISO-calendar attributes.
* **Optimization:** Applies memory-efficient casting (e.g., `Int16` for durations, `Categorical` for repetitive strings) and drops intermediate columns early to minimize row width.
4. **Schema Freeze:** Projects the final `ASSEMBLE_SCHEMA` and casts all columns to `ASSEMBLE_DTYPES`.
* **Optimization:** Omitted sorting to enable zero-copy streaming, maintaining a non-blocking execution plan compatible with `sink_parquet()`.
5. **Export & Clean:** Persists the table using `sink_parquet()` for streaming output and triggers `gc.collect()` to free memory before dimension processing.
### Workflow I: Event Assembly
1. **Load:** Retrieves `orders`, `items`, and `payments` from the Silver zone.
2. **Merge:** Joins datasets using an inner join for items and a left join for payments.
* **Optimization:** Uses integer joins on `UInt32/UInt64` IDs to reduce memory overhead. Pre-aggregates payments and items to ensure a 1:1 grain.
3. **Derivation:** Calculates fulfillment lead times and extracts ISO-calendar attributes.
* **Optimization:** Applies data type casting (e.g., `Int16` for durations, `Categorical` for repetitive strings) and drops intermediate columns to reduce memory footprint.
4. **Schema Enforcement:** Projects the final `ASSEMBLE_SCHEMA` and casts to `ASSEMBLE_DTYPES`.
* **Optimization:** Skips sorting to enable streaming and non-blocking execution.
5. **Export:** Saves the table using `sink_parquet()` for streaming output and triggers garbage collection.

### **Workflow II: Dimension Reference Extraction**
### Workflow II: Dimension Reference Extraction
1. **Selection:** Iterates through the `DIMENSION_REFERENCES` registry.
2. **Deduplication:** Extracts the required column subset and drops duplicate primary keys.
3. **Export:** Persists each dimension (e.g., `df_customers`) as an independent artifact.
2. **Deduplication:** Extracts required columns and removes duplicate primary keys.
3. **Export:** Saves each dimension table as an independent artifact.

## **Optimization & Memory Invariants**
## Optimization & Resource Management

* **Primitive Integer Pipeline:** To operate within 4GB RAM, the pipeline converts 36-byte UUID strings into 8-byte `UInt64` hashes for joins, and 4-byte `UInt32` categoricals for payloads. This is the primary driver of memory efficiency for 36M+ row datasets.
* **Streaming-First Join:** By deferring aggregations until after raw joins on `order_id`, leveraging Polars' streaming engine to avoid massive, materialized hash tables.
* **Low-Level Memory Reclamation:** The executor utilizes `ctypes.CDLL('libc.so.6').malloc_trim(0)` at high-water mark transitions. This forces the Linux allocator to release free memory back to the OS, preventing Cloud Run from terminating the process due to bloated (but unused) heap memory.
* **Zero-Copy Streaming:** `sink_parquet()` is used to prevent the pipeline from fully materializing the assembly result set in memory.
* **Integer Mapping:** Converts UUID strings to `UInt64` hashes for joins and `UInt32` for payloads. This reduces memory overhead when processing large datasets.
* **Streaming Joins:** Defers aggregations until after joins, leveraging the Polars streaming engine to avoid large materialized tables.
* **Memory Reclamation:** Uses `malloc_trim(0)` during high-water mark transitions to release free memory back to the OS.
* **Zero-Copy Streaming:** Employs `sink_parquet()` to avoid materializing the entire result set in memory.

## **Boundaries**
## Boundaries

| This component **DOES** | This component **DOES NOT** |
| This component | This component does NOT |
| :--- | :--- |
| Join multiple relational tables into a flat grain. | Perform data cleaning (handled in Contract stage). |
| Calculate time-deltas (e.g., `lead_time_days`). | Perform complex multi-stage aggregations (delegated to Semantic stage). |
| Enforce 1:1 cardinality for the final event grain. | Handle schema validation of raw data. |
| Deduplicate dimension attributes. | Manage partitioning logic (managed by the loader/exporter). |
| Manage peak memory via explicit `gc` triggers and concurrency control. | Change historical values or re-map IDs. |
| Utilize Hash-Joins for high-cardinality keys. | Perform blocking sorts on large datasets. |

## **Failure & Severity Model**

### **Operational Failures (System Level)**
* **Task Failure:** Individual transformation steps (Merge, Derive, Freeze) are wrapped in a fail-safe `task_wrapper`. Exceptions are trapped, logged, and return a `failed` status for that step.
* **Executor Trapping:** The top-level orchestration in `assembly_executor.py` uses `try-except-finally` blocks to catch and log unexpected pipeline crashes while ensuring memory reclamation.
* **Loading Missing Table:** If a required table (e.g., `df_orders`) is missing from the Silver zone, the stage returns `failed` immediately.
* **Export Failure:** Disk I/O errors or path resolution issues during the `export_file` call halt the lifecycle.

### **Functional Findings (Data Level)**
* **Partial Payments:** Orders without payments are allowed (via Left Join); the system fills these with `None/NaN`, which is considered a valid business state rather than a failure.
| Joins relational tables into a flat grain. | Perform data cleaning (handled in Contract stage). |
| Calculates time-deltas (e.g., lead times). | Perform multi-stage aggregations (delegated to Semantic stage). |
| Enforces 1:1 cardinality for the event grain. | Validate raw data schemas. |
| Deduplicates dimension attributes. | Manage partitioning logic. |
| Manages peak memory and garbage collection. | Change historical values or re-map IDs. |
| Uses Hash-Joins for high-cardinality keys. | Perform blocking sorts on large datasets. |

## Failure & Severity Model

### System Failures
* **Task Failure:** Transformation steps are wrapped in a handler that logs exceptions and returns a failure status.
* **Executor Safety:** Top-level orchestration uses `try-except-finally` blocks to catch crashes and ensure resource cleanup.
* **Missing Data:** If a required table is missing from the Silver zone, the stage fails.
* **I/O Failure:** Storage or path errors during export halt the lifecycle.

### Data Findings
* **Optional Joins:** Orders without payments are allowed; the system fills missing values with nulls, which is treated as a valid state.
Loading
Loading