Skip to content

Commit 76b4091

Browse files
committed
docs: document pipeline and infra design; fix legacy cloud path bugs
1 parent bb1eefc commit 76b4091

11 files changed

Lines changed: 226 additions & 11 deletions

File tree

233 KB
Loading

.gcp/terraforms/main.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ locals {
3030
"eventarc.googleapis.com",
3131
"cloudscheduler.googleapis.com",
3232
"iamcredentials.googleapis.com",
33+
"drive.googleapis.com",
3334
]
3435
}
3536

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ runtime/
88
data/published/
99
data/run_artifact
1010
data/contracted/
11+
assets/benchmarks/benchmark.py
1112

1213
# local editor configs
1314
pyrightconfig.json

README

Lines changed: 149 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,149 @@
1-
<p align="center">
2-
<a href="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-code.yml"><img src="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-code.yml/badge.svg" alt="CI - Code Quality"></a>
3-
<a href="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-infra.yml"><img src="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-infra.yml/badge.svg" alt="CI - Infra"></a>
4-
<a href="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-extract.yml"><img src="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-extract.yml/badge.svg" alt="CD - Extract"></a>
5-
<a href="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-pipeline.yml"><img src="https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-pipeline.yml/badge.svg" alt="CD - Pipeline"></a>
6-
</p>
1+
# Operations Analytics Pipeline: Scalable Integrity Engine
2+
3+
[![CI * Code Quality & Tests](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-code.yml/badge.svg)](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-code.yml)
4+
[![CI * Infra Enforcement](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-infra.yml/badge.svg)](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/ci-infra.yml)
5+
[![CD * Data Pipeline](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-pipeline.yml/badge.svg)](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-pipeline.yml)
6+
[![CD * Data Extractor](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-extract.yml/badge.svg)](https://github.com/BLMgithub/operations-analytics-pipeline/actions/workflows/cd-extract.yml)
7+
8+
## Overview
9+
Small to mid-sized organizations are trapped in a cycle where they outgrow the capabilities of spreadsheets but lack the technical infrastructure to migrate to databases. This creates a "structural debt" where the very tools that allow a business to be agile (spreadsheets) are the same tools that make their data fundamentally untrustworthy for scaling or reporting.
10+
11+
### The Solution
12+
This project solves that challenge by delivering a highly resilient, event-driven data pipeline on Google Cloud Platform for reliable operational analytics. It guarantees data integrity through a strict Medallion architecture (Bronze, Silver, Gold) that relies on rigid data contracts and validation gates to catch and isolate bad data early in the lifecycle.
13+
14+
### Defensive Pipeline Architecture
15+
![WIP_pipeline_diagram_picture](https://still-working-on-it.need-to-finish-readme.first)
16+
17+
To eliminate the risk of cross-run data contamination and memory bloat, the pipeline employs a defensive state-management strategy where local compute environments are strictly temporary:
18+
* **Stateless Orchestration:** Every execution operates within an isolated, deterministic `run_id` workspace that is aggressively purged post-run.
19+
* **Cloud Sync & Purge:** After processing data into the Silver layer, the system syncs the output to Cloud Storage, purging the local environment.
20+
* **Historical Context Pull:** It then safely re-downloads the complete historical state for Gold layer aggregation, ensuring every run builds analytical models in a clean, untainted environment.
21+
* **Linear Gating:** Stages are strictly gated; failure at any tier (Ingestion, Contract, or Assembly) prevents downstream processing and ensures partial data is never promoted.
22+
* **Resource-Optimized Compute:** Leverages a highly efficient lazy-evaluation engine to process large-scale datasets seamlessly within the strict memory constraints of serverless environments.
23+
24+
### Event-Driven Cloud Infrastructure
25+
![gcp-orchestration-diagram](assets/screenshots/gcp-orchestration-diagram.png)
26+
27+
The underlying infrastructure is entirely serverless, decoupled, and codified via Terraform to ensure reproducibility and security:
28+
* **Orchestrated Compute:** Cloud Scheduler initiates daily extraction via Cloud Run, separating the extraction layer from the main processing logic.
29+
* **Event-Driven Triggers:** Eventarc monitors Cloud Storage for `.success` flags, triggering the main processing job via Cloud Workflows only when extraction succeeds.
30+
* **Zero-Trust CI/CD:** GitHub Actions leverage Workload Identity Federation (WIF) for keyless, secure deployments of all infrastructure and Cloud Run jobs.
31+
* **Integrated Observability:** Native Cloud Logging and Cloud Monitoring provide comprehensive telemetry and automated responder alerts for pipeline health.
32+
33+
## Architecture & System Design
34+
35+
### The Medallion Data Model & Contracts
36+
37+
The pipeline does not just move data; it actively defends the analytical layer from upstream anomalies. It enforces a strict Medallion architecture governed by a registry-driven rule engine.
38+
39+
**Bronze (Raw Snapshots)**
40+
* **Purpose:** Immutable, un-typed snapshots of source systems.
41+
* **State:** Temporarily downloaded into the isolated workspace. Data here is assumed to be structurally untrustworthy, containing nulls, duplicates, and orphaned records.
42+
43+
**Silver (The Contract Layer)**
44+
* **Philosophy (Subtractive-Only Logic):** The pipeline never guesses, imputes, or "repairs" bad data. If a record violates the contract, it is explicitly dropped, and the loss is logged in the telemetry report.
45+
* **Role-Based Rules:** Tables are classified by role (`event_fact`, `transaction_detail`, `entity_reference`) and subjected to specific registry rules (e.g., deduplication, non-null assertions).
46+
* **Referential Integrity (Cascade Cleanup):** The pipeline tracks invalidated parent IDs (e.g., malformed `order_id`s) and propagates them downstream. If an order is dropped, all associated child records (like line items) are cascade-dropped to prevent orphan data from polluting joins.
47+
* **Schema Freeze:** Output files are strictly cast to predefined data types and projected to contain only approved columns before being written to Cloud Storage.
48+
49+
**Gold (The Semantic Layer)**
50+
* **Purpose:** Business-ready Fact and Dimension tables modeled for entity-centric and cohort analysis (Customers, Sellers, Products).
51+
* **Strict Grain Enforcement:**
52+
* **Temporal:** All fact tables are deterministically aligned to an ISO-Week grain (`W-MON`).
53+
* **Entity:** The engine validates that Dimension tables contain exactly one row per `Entity_ID`, and Fact tables contain exactly one row per `(Entity_ID, order_year_week)`.
54+
* **Lineage Integrity:** The Semantic builder aggressively checks that the assembled data belongs to a single `run_id`. Cross-run data contamination triggers a terminal failure, preventing poisoned data from ever reaching production.
55+
56+
### Validation Gates & Deployment Integrity
57+
58+
* **Dual-Pass Validation Strategy:**
59+
* **Initial Validation (Raw Gate):** The orchestrator evaluates raw snapshots. At this stage, `warnings` (like duplicate IDs or nulls) are tolerated and passed down to the Contract Stage for subtractive cleanup. Only fatal structural errors abort the run.
60+
* **Post-Contract Revalidation (Silver Gate):** After contract rules are applied, the system re-runs validation. In this phase, `warnings` are escalated to fatal. Because the contract stage guarantees a clean schema, any remaining warnings trigger a terminal `RuntimeError`, halting the pipeline immediately to prevent downstream corruption.
61+
* **Atomic Publishing Lifecycle:** The pipeline protects the Gold layer by writing intermediate analytical models to isolated temporary directories during computation. Only when *all* semantic modules successfully finish processing does the system execute an atomic publish via `latest_version.json` pointer updates, guaranteeing that partial or incomplete data is never served to dashboards.
62+
* **Comprehensive Telemetry:**
63+
* **End-to-End Traceability:** A single `run_id` is propagated through all raw snapshots, metadata logs, and published artifacts to provide absolute lineage tracking.
64+
* **Resilient Logging:** Even in the event of a fatal crash, the orchestrator's `finally` block guarantees that partial logs and stage reports are synced back to cloud storage before the local workspace is purged, ensuring debuggability.
65+
66+
## Performance & Scale
67+
68+
The pipeline is explicitly engineered to process large-scale historical data without breaching the strict memory constraints of serverless compute (Cloud Run). To achieve this, the core execution engine was migrated from Pandas to Polars, utilizing `LazyFrames` and streaming evaluation.
69+
70+
**The Benchmark Constraint: 4GB RAM / 2 vCPU**
71+
72+
![polars-vs-pandas](assets/screenshots/pandas-vs-polars.png)
73+
>The dataset for this chart is available at [`benchmark`](/assets/benchmarks/) and the instruction to download 15m rows dataset found in [`data/`](/data/README)
74+
75+
* **Measurement Methodology:** Performance profiles were captured by executing the pipeline locally via [`docker-compose.benchmark`](docker-compose.benchmark.yml) configured to precisely mirror the Cloud Run constraints (`memory="4G" cpus="2" POLARS_MAX_THREADS=2`). Resource footprints were tracked sequentially via a PowerShell polling script:
76+
```powershell
77+
while ($true) { docker stats --no-stream --format "{{.Name}}, {{.CPUPerc}}, {{.MemUsage}}, {{.MemPerc}}" >> stats_log.csv; Start-Sleep -Seconds 1 }
78+
```
79+
* **The Pandas Ceiling (4M Rows in 88s):** Under the legacy Pandas engine, memory usage became fully saturated (100% / 4GiB) when processing a 4-million-row dataset. Because Pandas executes eagerly and loads entire datasets into memory, any dataset larger than 4M rows resulted in an inevitable Out-Of-Memory (OOM) crash.
80+
* **The Polars Migration (15M Rows in 67s):** By switching to the Polars Lazy API, the pipeline now processes a dataset nearly 4x larger (15 million rows) while actually reducing execution time from 88 seconds to 67 seconds within the exact same 4GB/2vCPU constraint.
81+
* **Streaming Evaluation:** Instead of eagerly loading the whole dataset, Polars processes data in batches, drastically reducing the memory footprint.
82+
* **Multi-Core Utilization:** Unlike single-threaded Pandas (peaking at ~100% CPU), the Polars engine effectively parallelizes the workload, consistently utilizing ~200% CPU across both provisioned cores.
83+
* **Zero-Copy Export:** The Semantic stage leverages `sink_parquet` to write analytical models directly to disk via streaming, ensuring memory is freed instantaneously during the final Gold-layer assembly.
84+
85+
86+
## Observability & Alerting
87+
88+
![WIP_google_cloud_dashboard_monitoring_pictures](https://still-working-on-it.need-to-finish-readme.first)
89+
90+
Operational maturity requires assuming things will eventually break. The pipeline features a comprehensive observability suite managed natively via Google Cloud Monitoring and Cloud Logging, codified entirely in Terraform.
91+
92+
### Monitored Telemetry & Dashboards
93+
The custom Cloud Monitoring dashboard tracks granular operational metrics to proactively identify resource bottlenecks and execution failures:
94+
95+
**Pipeline Job Metrics:**
96+
1. **Workflow Execution Traffic:** Measures the volume of finished pipeline runs.
97+
2. **Execution Status Ratio:** Tracks the count of `SUCCESS` vs. `FAILED` runs to monitor overall reliability.
98+
3. **Memory Allocation Bottlenecks:** Plots the actual Cloud Run memory usage against a hardcoded 4GB horizontal threshold to visualize proximity to OOM exhaustion.
99+
100+
**Extractor Job Metrics:**
101+
1. **Drive Extractor Latency:** Tracks the billable instance time of the extractor job (the most accurate proxy for API usage cost, as the extractor utilizes the Drive API continuously during runtime).
102+
2. **Drive API Latencies (Median):** Monitors the median response times for core Google Workspace API calls (e.g., `google.apis.drive.v3.DriveFiles.Get` for extraction and `DriveFiles.List` for directory parsing).
103+
3. **Memory Allocation Bottlenecks:** Plots the extractor's memory usage against its specific 1GB hardcoded Cloud Run threshold.
104+
105+
### Automated Responders & Alerts
106+
The system monitors specific log payloads across the infrastructure and dispatches `CRITICAL` severity email alerts to on-call responders with actionable, markdown-formatted runbooks. Alerts are configured for:
107+
* **Ingestion Failures (`midnight_scheduler_failed`):** Detects IAM permission revokes (403) or token expiries (401) preventing the daily trigger.
108+
* **Extraction Crashes (`extractor_crashed`):** Captures Python tracebacks if the Drive Extractor fails to pull raw data or plant the `.success` flag.
109+
* **Orchestration Breakdowns (`pipeline_dispatch_failed`):** Catches Eventarc workflow failures if downstream routing breaks.
110+
* **Pipeline Fatals (`pipeline_crashed`):** Detects out-of-memory (OOM) events or unhandled exceptions within the main processing logic, ensuring dashboard consumers are never silently served stale data.
111+
112+
## Repository Structure
113+
114+
```text
115+
operations-analytics-pipeline/
116+
├── .gcp/
117+
│ └── terraforms/ # IaC for all GCP resources (Cloud Run, Eventarc, Storage, IAM)
118+
├── .github/
119+
│ └── workflows/ # CI/CD pipelines (Terraform apply, Docker build/push, Code quality & test)
120+
├── assets/
121+
│ └── benchmarks/ # Performance profiling logs (Pandas vs Polars memory usage)
122+
├── data/ # Git-ignored local directories used when simulating cloud storage
123+
│ ├── raw/ # Extracted snapshot dumps
124+
│ ├── contracted/ # Intermediate Silver-layer files
125+
│ ├── published/ # Final Gold-layer analytical models
126+
│ └── run_artifact/ # Lineage metadata and stage execution logs
127+
├── data_extract/
128+
│ ├── shared/ # Extractor logic and core I/O utilities
129+
│ └── run_extract.py # The Drive extractor orchestrator
130+
├── data_pipeline/
131+
│ ├── assembly/ # Delta merging and event mapping logic (Gold Pre-processing)
132+
│ ├── contract/ # Subtractive filtering logic (Silver Layer)
133+
│ ├── publish/ # Manages the atomic publish lifecycle of semantic datasets
134+
│ ├── semantic/ # Fact/Dimension table builders (Gold Layer)
135+
│ ├── shared/ # Storage adapters, IO wrappers, and registry configurations
136+
│ ├── validation/ # Dual-pass structural data validation gates
137+
│ └── run_pipeline.py # The pipeline orchestrator and state manager
138+
├── docs/ # Detailed architectural and stage-level system contracts
139+
├── runtime/ # Git-ignored ephemeral workspace used by the local pipeline executor
140+
└── tests/ # Pytest suite for pipeline logic and validation rules
141+
```
142+
143+
## CI/CD & Security
144+
145+
The project adheres to a strict "Zero-Trust" deployment model. No permanent service account keys are generated, downloaded, or stored as GitHub Secrets.
146+
147+
* **Workload Identity Federation (WIF):** GitHub Actions is authenticated to Google Cloud via short-lived, dynamically requested OIDC tokens.
148+
* **Infrastructure as Code:** The deployment of the infrastructure and the configuration of IAM bindings are strictly managed via automated `terraform plan` and `terraform apply` workflows.
149+
* **Containerized Artifacts:** Upon passing CI checks, the pipeline and extractor codebases are packaged into Docker images and pushed to the GCP Artifact Registry.
68 Bytes
Binary file not shown.
125 KB
Loading
91.4 KB
Loading

data/README

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Data & Synthetic Benchmarks
2+
3+
This directory serves as the local state provider for the pipeline when executing in a non-cloud environment. It mimics the structure of the Google Cloud Storage (GCS) buckets, allowing for high-fidelity local simulation and performance benchmarking.
4+
5+
## Synthetic Dataset
6+
To replicate the high-volume environment described in the [Performance & Scale](/README#performance-&-scale) section, you can download the 15M-row synthetic dataset here: [**Kaggle Dataset Link**](https://15m-row-synthetic-dataset.com)
7+
8+
### Dataset Structure
9+
The downloaded archive contains the following partitions:
10+
* **`raw/`**: Represents the **Bronze Layer**. Contains daily delta CSV snapshots. The `RunContext` class expects this directory to be populated when running locally. (~4.5GB total)
11+
* **`contracted/`**: Represents the **Silver Layer**. Contains accumulated, schema-enforced Parquet files. This acts as the authoritative state for Gold-layer assembly. (~1.55GB total)
12+
13+
## Local Execution Setup
14+
1. Extract the downloaded dataset archive.
15+
2. Copy the `raw/` and `contracted/` directories into this `data/` folder.
16+
3. The `RunContext` manager is configured to strictly recognize `.parquet` and `.csv` extensions; all other file types are ignored to prevent ingestion noise.
17+
18+
**Execute the local pipeline:**
19+
```
20+
python -m data_pipeline.run_pipeline
21+
```
22+
23+
## Data Dictionary: Contract-Compliant Schema (Silver Layer)
24+
The following tables represent the technical contracts enforced during the **Contract Stage**. Source [`table_configs.py`](../data_pipeline/shared/table_configs.py).
25+
26+
### Table: `df_orders` (Role: `event_fact`)
27+
| Attribute | Type | PK | Required | Non-nullable |
28+
| :--- | :--- | :--- | :--- | :--- |
29+
| `order_id` | string | True | True | True |
30+
| `customer_id` | string | False | True | True |
31+
| `order_status` | category | False | True | True |
32+
| `order_purchase_timestamp` | datetime64[ns] | False | True | True |
33+
| `order_approved_at` | datetime64[ns] | False | True | False |
34+
| `order_delivered_timestamp` | datetime64[ns] | False | True | False |
35+
| `order_estimated_delivery_date` | datetime64[ns] | False | True | False |
36+
37+
### Table: `df_order_items` (Role: `transaction_detail`)
38+
| Attribute | Type | PK | Required | Non-nullable |
39+
| :--- | :--- | :--- | :--- | :--- |
40+
| `order_id` | string | True | True | True |
41+
| `product_id` | string | False | True | True |
42+
| `seller_id` | string | False | True | True |
43+
| `price` | float32 | False | True | True |
44+
45+
### Table: `df_customers` (Role: `entity_reference`)
46+
| Attribute | Type | PK | Required | Non-nullable |
47+
| :--- | :--- | :--- | :--- | :--- |
48+
| `customer_id` | string | True | True | True |
49+
| `customer_state` | category | False | True | True |
50+
| `customer_city` | category | False | True | True |
51+
| `customer_segment` | category | False | True | True |
52+
| `account_creation_date` | datetime64[ns] | False | True | True |
53+
54+
### Table: `df_payments` (Role: `transaction_detail`)
55+
| Attribute | Type | PK | Required | Non-nullable |
56+
| :--- | :--- | :--- | :--- | :--- |
57+
| `order_id` | string | True | True | True |
58+
| `payment_value` | float32 | False | True | True |
59+
60+
### Table: `df_products` (Role: `entity_reference`)
61+
| Attribute | Type | PK | Required | Non-nullable |
62+
| :--- | :--- | :--- | :--- | :--- |
63+
| `product_id` | string | True | True | True |
64+
| `product_category_name` | category | False | True | True |
65+
| `product_length_cm` | float32 | False | True | True |
66+
| `product_height_cm` | float32 | False | True | True |
67+
| `product_width_cm` | float32 | False | True | True |
68+
| `product_fragility_index` | category | False | True | True |
69+
| `product_weight_g` | float32 | False | True | True |
70+
| `supplier_tier` | category | False | True | True |

data_extract/shared/extract_logic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
GoogleDriveService,
1212
)
1313

14-
ARCHIVAL_BUCKET = "gs://operations-archival-bucket"
15-
PIPELINE_BUCKET = "gs://operations-pipeline-bucket"
14+
ARCHIVAL_BUCKET = "gs://ops-archival-storage-dev"
15+
PIPELINE_BUCKET = "gs://ops-pipeline-storage-dev"
1616
PARENT_FOLDER = "operations-upload-folder"
1717
MIME_TYPE = "application/vnd.google-apps.folder"
1818

0 commit comments

Comments
 (0)