Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
70d89ab
feat: add bigquery atomic view swap to manage published versioning fo…
BLMgithub Apr 14, 2026
f93888a
feat: add provision for semantics dataset; enable and bind bigquery r…
BLMgithub Apr 14, 2026
bb447f6
feat: implement persistent ID registrar for UInt32 mapping of UUID co…
BLMgithub Apr 17, 2026
24ebbef
refactor: transition assembly and semantic stages to use UInt32 integ…
BLMgithub Apr 17, 2026
4234873
docs: update docstring and stage markdown documents to match refactor…
BLMgithub Apr 17, 2026
b93ad7a
feat: implement disk volume mount provisioning and document workaroun…
BLMgithub Apr 17, 2026
33d1786
refactor: refactor: replace id_registrar memory-intensive anti-join w…
BLMgithub Apr 18, 2026
091b196
refactor: decouple assembly and semantic stage IO
BLMgithub Apr 18, 2026
70ef08c
test: update validation unit test to match refactoring
BLMgithub Apr 21, 2026
54fdefa
feat: add local and gcp path IO adapter for id_mapping; add unit test…
BLMgithub Apr 21, 2026
b6e27f3
feat: implement direct GCS streaming by URIs via BigQuery metadata; u…
BLMgithub Apr 21, 2026
887efa6
fix: id_registrar replacing the accumulated mapped ids when GCS stora…
BLMgithub Apr 23, 2026
d89ca84
feat: implement discovery-first global ID mapping to eliminate redund…
BLMgithub Apr 24, 2026
840f973
feat: codify bigquery datasets and external tables; update ci pipelin…
BLMgithub Apr 24, 2026
16004aa
docs: align project documentation and benchmarks with new pipeline ar…
BLMgithub Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions .gcp/terraforms/bigquery.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# ------------------------------------------------------------
# OPS EXTERNALIZED TABLES (For metadata caching)
# ------------------------------------------------------------

resource "google_bigquery_connection" "biglake_connection" {
connection_id = "ops_biglake_connection"
location = var.region
friendly_name = "BigLake Connection for GCS Parquet Scanning"
cloud_resource {}
}

# Enable connection service to access pipeline bucket
resource "google_storage_bucket_iam_member" "biglake_storage_viewer" {
bucket = google_storage_bucket.ops_pipeline_bucket.name
role = "roles/storage.objectViewer"
member = "serviceAccount:${google_bigquery_connection.biglake_connection.cloud_resource[0].service_account_id}"
}

resource "google_bigquery_dataset" "silver_dataset" {
dataset_id = var.bq_dataset_id
location = var.region

delete_contents_on_destroy = false
}

locals {
external_tables = [
"df_orders",
"df_customers",
"df_order_items",
"df_products",
"df_payments"
]
}

resource "google_bigquery_table" "external_tables" {
for_each = toset(local.external_tables)
dataset_id = google_bigquery_dataset.silver_dataset.dataset_id
table_id = each.key

# Might throw error if contracted/ is empty
external_data_configuration {
autodetect = true
source_format = "PARQUET"
connection_id = google_bigquery_connection.biglake_connection.name
source_uris = ["gs://${google_storage_bucket.ops_pipeline_bucket.name}/contracted/${each.key}_*.parquet"]

# Triggered manually by pipeline
metadata_cache_mode = "MANUAL"
}
lifecycle {
prevent_destroy = true
}
}


# ------------------------------------------------------------
# BIGQUERY SEMANTTIC DATASETS (For table versionining)
# ------------------------------------------------------------

locals {
# Expiration for versioned tables
one_month_ms = 2678400000

semantic_datasets = [
"seller_semantic",
"customer_semantic",
"product_semantic"
]
}

resource "google_bigquery_dataset" "semantic_datasets" {
for_each = toset(local.semantic_datasets)
dataset_id = each.key
location = var.region

delete_contents_on_destroy = false
default_table_expiration_ms = local.one_month_ms

description = "Semantic layer for ${each.key}. Tables expire after 1 month."

labels = {
env = var.environment
layer = "semantic"
}

lifecycle {
prevent_destroy = true
}
}
19 changes: 18 additions & 1 deletion .gcp/terraforms/iam_bindings.tf
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ locals {
"roles/monitoring.admin", # Manage Monitoring in monitoring.tf
"roles/logging.configWriter", # Required for log-based alert policies
"roles/iam.serviceAccountAdmin", # Manage Alert policies in monitoring.tf
"roles/iam.admin" # Manage Iam roles
"roles/iam.admin", # Manage Iam roles
"roles/bigquery.admin", # Manage BigQuery datasets and views
"roles/serviceusage.serviceUsageAdmin", # Manage APIs
]
}

Expand Down Expand Up @@ -74,6 +76,21 @@ resource "google_storage_bucket_iam_member" "pipeline_runner_pipeline_access" {
member = "serviceAccount:${google_service_account.platform_accounts["ops-pipeline-sa"].email}"
}

# Pipeline Runner BigQuery Access
locals {
pipeline_bq_roles = [
"roles/bigquery.dataEditor",
"roles/bigquery.jobUser"
]
}

resource "google_project_iam_member" "pipeline_runner_bq_access" {
for_each = toset(local.pipeline_bq_roles)
project = var.project_id
role = each.key
member = "serviceAccount:${google_service_account.platform_accounts["ops-pipeline-sa"].email}"
}


# ------------------------------------------------------------
# GOOGLE SERVICE AGENTS (Pub/Sub)
Expand Down
33 changes: 30 additions & 3 deletions .gcp/terraforms/jobs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,49 @@ resource "google_cloud_run_v2_job" "pipeline" {

resources {
limits = {
cpu = "2"
cpu = "4"
memory = "8Gi"
}
}
env {
name = "POLARS_MAX_THREADS"
value = "2"
value = "4"
}
env {
name = "GCP_REGION"
value = var.region
}
env {
name = "BQ_DATASET_ID"
value = var.bq_dataset_id
}
env {
name = "GCP_PROJECT"
value = var.project_id
}

volume_mounts {
name = "ephemeral-disk-1"
mount_path = "/tmp"
}
}

volumes {
name = "ephemeral-disk-1"
empty_dir {
size_limit = "10Gi"
}
}
}
}
lifecycle {
ignore_changes = [
# Github ci-infra updates image every update
template[0].template[0].containers[0].image,
client,
client_version
client_version,
# Block terraform from defaulting medium to MEMORY, DISK isn't supported by provider yet
template[0].template[0].volumes[0].empty_dir[0].medium
]
}
}
Expand Down
2 changes: 2 additions & 0 deletions .gcp/terraforms/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ locals {
"cloudscheduler.googleapis.com",
"iamcredentials.googleapis.com",
"drive.googleapis.com",
"bigquery.googleapis.com",
"bigqueryconnection.googleapis.com",
]
}

Expand Down
1 change: 1 addition & 0 deletions .gcp/terraforms/storage.tf
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ resource "google_storage_bucket" "ops_pipeline_bucket" {
}
}
}

7 changes: 6 additions & 1 deletion .gcp/terraforms/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ variable "project_id" {
}

variable "region" {
description = "The Default GCP region"
description = "The Project GCP region"
type = string
default = "us-east1"
}
Expand All @@ -24,3 +24,8 @@ variable "alert_email_map" {
description = "List of emails to receive pipeline alerts"
sensitive = true
}

variable "bq_dataset_id" {
description = "BigQuery dataset containing externalized GCS tables"
type = string
}
1 change: 1 addition & 0 deletions .github/workflows/ci-infra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@ jobs:
TF_VAR_region: ${{ env.REGION }}
TF_VAR_github_repo: ${{ env.GITHUB_REPO }}
TF_VAR_alert_email_map: ${{ secrets.ALERT_EMAIL_MAP }}
TF_VAR_bq_dataset_id: ${{secrets.BQ_DATASET_ID}}
run: terraform apply -auto-approve
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ __pycache__/
runtime/
/data/raw
data/published/
data/id_mapping/
data/run_artifact
data/contracted/
assets/benchmarks/benchmark.py
docker-compose.benchmark.yml

# local editor configs
pyrightconfig.json
Expand Down
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python-envs.defaultEnvManager": "ms-python.python:conda",
"python-envs.defaultPackageManager": "ms-python.python:conda"
"python-envs.defaultPackageManager": "ms-python.python:conda",
"python.terminal.activateEnvironment": false
}
Loading
Loading