Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3745aea
Consolidate colas + cola_parsed_data into single table
zukadek Apr 6, 2026
b432531
Merge postgres_schema.dbml into cola_data_model.dbml; delete duplicate
zukadek Apr 6, 2026
167f491
Add files via upload
saifislam2026 Apr 7, 2026
4800947
Add files via upload
saifislam2026 Apr 7, 2026
413b55a
Fix Postgres image upsert + add e2e test pipeline
zukadek Apr 7, 2026
9c5b2d7
Fix brand_name/completed_date overwrite and add download_status to im…
zukadek Apr 7, 2026
507e603
Remove outdated saifislam manual file drops (cola_data_model_0407)
zukadek Apr 8, 2026
05ed9fb
Add .DS_Store and *.duckdb to .gitignore
zukadek Apr 8, 2026
72a9689
Add files via upload
saifislam2026 Apr 14, 2026
2d472aa
Add files via upload
saifislam2026 Apr 14, 2026
f16b4a9
Add files via upload
saifislam2026 Apr 22, 2026
f95b419
Merge pull request #2 from TTB-OA/feature/schema-consolidation-azure-…
saifislam2026 Apr 24, 2026
febb396
Add files via upload
saifislam2026 Apr 29, 2026
e64d7ae
Delete code/cola_azure_ingest_0429.py
saifislam2026 May 10, 2026
adf691d
Delete code/cola_azure_ingest_image_april2026.py
saifislam2026 May 10, 2026
e9338ff
Delete code/cola_detail_ingestion_april2026.py
saifislam2026 May 10, 2026
d3ec79e
Delete code/cola_image_dimension_backfill_Task3.py
saifislam2026 May 10, 2026
e808266
Delete code/cola_image_dimension_backfill_Task4b.py
saifislam2026 May 10, 2026
83404b8
Delete code/cola_image_dimension_extractor_april2026.py
saifislam2026 May 10, 2026
02452e6
Delete code/cola_image_pipeline_Task4a.py
saifislam2026 May 10, 2026
126676c
Delete code/cola_image_pipeline_github.py
saifislam2026 May 10, 2026
69e85d8
Delete code/cola_pipeline_timing_benchmark_Task5.py
saifislam2026 May 10, 2026
207f0e2
Delete code/cola_validation_report_with_external_Task1.py
saifislam2026 May 10, 2026
acef2c8
Delete code/ttb_api_to_postgres_loader_0429.py
saifislam2026 May 10, 2026
134b4d6
Delete code/ttb_api_to_postgres_loader_10pages_Task2.py
saifislam2026 May 10, 2026
5a61934
Delete code/api_loader_historic_metadata.py
saifislam2026 May 10, 2026
9b7de4e
Add files via upload
saifislam2026 May 11, 2026
dcda6b0
Delete docs/cola_pipeline_timing_benchmark_output_20260414_172151.txt
saifislam2026 May 11, 2026
1762168
Add files via upload
saifislam2026 May 11, 2026
b5af9d4
Delete code/program1_postgres_ingest_with_handwritten_file.py
saifislam2026 May 14, 2026
4176e03
Delete code/program2_azure_ingest_with_handwritten_image.py
saifislam2026 May 14, 2026
38b7dc9
Add files via upload
saifislam2026 May 14, 2026
9696668
Add files via upload
saifislam2026 May 20, 2026
58c3376
Delete code/historical_ingest.py
saifislam2026 May 27, 2026
af5f27c
Delete code/incremental_ingest.py
saifislam2026 May 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .DS_Store
Binary file not shown.
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ data/
raw_images/

# VS Code settings
.vscode/
.vscode/

# Mac system files
.DS_Store

# Local DuckDB database
*.duckdb
87 changes: 87 additions & 0 deletions code/azure_blob_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
Azure Blob Storage helper for the TTB COLA image pipeline.

Provides upload, existence check, and URL generation for label images
stored in Azure Blob Storage using Service Principal authentication.

Required environment variables (set in .env):
AZURE_TENANT_ID - Azure AD tenant ID
AZURE_CLIENT_ID - Service principal client ID
AZURE_CLIENT_SECRET - Service principal client secret
AZURE_STORAGE_ACCOUNT_NAME - Storage account name (e.g. ttbstoragetesting)
AZURE_CONTAINER_NAME - Blob container name
"""

import logging
import os

from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv

load_dotenv()

logger = logging.getLogger(__name__)

TENANT_ID = os.getenv("AZURE_TENANT_ID", "")
CLIENT_ID = os.getenv("AZURE_CLIENT_ID", "")
CLIENT_SECRET = os.getenv("AZURE_CLIENT_SECRET", "")
STORAGE_ACCOUNT_NAME = os.getenv("AZURE_STORAGE_ACCOUNT_NAME", "")
CONTAINER_NAME = os.getenv("AZURE_CONTAINER_NAME", "")

ACCOUNT_URL = f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net"


def get_blob_service_client() -> BlobServiceClient | None:
"""
Return an authenticated BlobServiceClient using Service Principal credentials.
Returns None if credentials are missing or authentication fails.
"""
if not all([TENANT_ID, CLIENT_ID, CLIENT_SECRET, STORAGE_ACCOUNT_NAME, CONTAINER_NAME]):
logger.warning("Azure Blob credentials incomplete — check .env for AZURE_* vars.")
return None
try:
credential = ClientSecretCredential(
tenant_id=TENANT_ID,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
)
client = BlobServiceClient(account_url=ACCOUNT_URL, credential=credential)
# Verify access by listing containers (lightweight check)
client.get_container_client(CONTAINER_NAME).get_container_properties()
logger.info(f"Azure Blob client initialized for container '{CONTAINER_NAME}'")
return client
except Exception as e:
logger.warning(f"Failed to initialize Azure Blob client: {e}")
return None


def blob_exists(client: BlobServiceClient, blob_name: str) -> bool:
"""Return True if the blob already exists in the container."""
try:
return client.get_blob_client(CONTAINER_NAME, blob_name).exists()
except Exception as e:
logger.debug(f"Error checking blob existence for {blob_name}: {e}")
return False


def upload_to_blob(client: BlobServiceClient, local_path: str, blob_name: str) -> str | None:
"""
Upload a local file to Azure Blob Storage.
Returns the public blob URL on success, None on failure.
"""
try:
if blob_exists(client, blob_name):
logger.debug(f"Blob already exists, skipping upload: {blob_name}")
return f"{ACCOUNT_URL}/{CONTAINER_NAME}/{blob_name}"

blob_client = client.get_blob_client(CONTAINER_NAME, blob_name)
with open(local_path, "rb") as f:
blob_client.upload_blob(f, overwrite=True)

url = f"{ACCOUNT_URL}/{CONTAINER_NAME}/{blob_name}"
logger.debug(f"Uploaded to Azure Blob: {blob_name}")
return url
except Exception as e:
logger.warning(f"Failed to upload {blob_name} to Azure Blob: {e}")
return None
193 changes: 91 additions & 102 deletions code/fetch_colas_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,32 @@ def get_db_connection() -> duckdb.DuckDBPyConnection:
pass
# Migrate: add columns introduced after initial schema
migrations = [
# Legacy column additions for existing DBs
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS status_code varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS status varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS received_code varchar",
"ALTER TABLE cola_parsed_data ADD COLUMN IF NOT EXISTS fax_number varchar",
"ALTER TABLE cola_parsed_data ADD COLUMN IF NOT EXISTS vendor_code varchar",
"ALTER TABLE cola_parsed_data ADD COLUMN IF NOT EXISTS for_sale_in varchar",
"ALTER TABLE cola_parsed_data ADD COLUMN IF NOT EXISTS bottle_capacity varchar",
# Detail fields merged into colas
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS applicant_name text",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS mailing_address text",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS permit_number text",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS formula varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS grape_varietal varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS appelation varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS phone_number varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS fax_number varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS email varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS application_type varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS date_of_application varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS qualifications text",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS vendor_code varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS for_sale_in varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS bottle_capacity varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS front_label_url varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS back_label_url varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS neck_label_url varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS strip_label_url varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS other_label_url varchar",
"ALTER TABLE colas ADD COLUMN IF NOT EXISTS parsed_on datetime",
# Junction tables for one-to-many fields
"""CREATE TABLE IF NOT EXISTS cola_qualifications (
cola_id BIGINT NOT NULL,
Expand Down Expand Up @@ -258,10 +277,11 @@ def _to_int(val) -> int | None:

def map_to_colas_record(item: dict, detail: dict | None = None) -> dict:
"""Map an API COLA item to the colas table schema.
Uses cached lookup tables for descriptions; detail (from /colas/{id}) used when available.
Includes both core fields (list endpoint) and detail fields (/colas/{id}).
Detail fields are populated when detail is provided (--fetch-details), else None.
"""
src = detail or item
origin_code = item.get("originCode", "")
src = detail or item
origin_code = item.get("originCode", "")
class_type_code = item.get("classTypeCode", "")

# Prefer nested objects from detail, fall back to cached lookup tables
Expand All @@ -274,7 +294,8 @@ def map_to_colas_record(item: dict, detail: dict | None = None) -> dict:
permits = src.get("permits") or []
permit_id = src.get("permitId") or next((p.get("permitId") for p in permits), None)

return {
record: dict = {
# ── Core fields ───────────────────────────────────────────────────────
"cola_id": _to_int(item.get("ttbId")),
"permit_num": permit_id,
"serial_num": item.get("serialNum"),
Expand All @@ -291,90 +312,68 @@ def map_to_colas_record(item: dict, detail: dict | None = None) -> dict:
"scraped_on": get_standard_timestamp(),
}

if detail:
# ── Detail fields (from /colas/{id}) ─────────────────────────────────
submitter = detail.get("submitter") or {}
qualifications = detail.get("qualifications") or []
grape_varietals = detail.get("grapeVarietals") or []

def map_to_parsed_record(list_item: dict, detail: dict) -> dict:
"""Map a COLA detail response to the cola_parsed_data table schema.
list_item: from the paginated list endpoint (has completedDate).
detail: from /colas/{id} (has submitter, qualifications, grapeVarietals, permits).
"""
submitter = detail.get("submitter") or {}
qualifications = detail.get("qualifications") or []
grape_varietals = detail.get("grapeVarietals") or []
permits = detail.get("permits") or []

# Qualifications: use qualificationRef.longDesc, fall back to cached QUALIFICATIONS lookup
qual_parts = []
for q in qualifications:
ref = q.get("qualificationRef") or {}
ref_id = ref.get("refQualificationId")
text = ref.get("longDesc") or QUALIFICATIONS.get(ref_id) or q.get("qualftnCommentText") or ""
if text:
qual_parts.append(text)
qual_text = " | ".join(qual_parts) or None

# Grape varietals: use vartlName, fall back to cached GRAPE_VARIETALS lookup
grape_text = "; ".join(
g.get("vartlName") or GRAPE_VARIETALS.get(g.get("vartlId")) or str(g.get("vartlId", ""))
for g in grape_varietals
if g.get("vartlName") or g.get("vartlId")
) if grape_varietals else None

# Permits: use primary permit address for mailing address
primary_permit = next((p for p in permits if p.get("primaryPermitFlg") == "Y"), permits[0] if permits else {})
permit_nums = "; ".join(p.get("permitId") for p in permits if p.get("permitId")) or list_item.get("permitId")
address_parts = [
primary_permit.get("permitFrstStrtAddr"),
primary_permit.get("permitSecndStrtAddr"),
primary_permit.get("permitCityAddr"),
primary_permit.get("permitStateAddr"),
primary_permit.get("permitZipAddr"),
primary_permit.get("permitCntryAddr"),
]
mailing_address = ", ".join(p for p in address_parts if p) or None

# Submitter name: detail uses submtrFrstName/submtrLastName
first = submitter.get("submtrFrstName") or ""
last = submitter.get("submtrLastName") or ""
applicant_name = (primary_permit.get("permitName") or
" ".join(x for x in [first, last] if x) or None)

class_type_obj = detail.get("classType") or {}

return {
"cola_id": _to_int(list_item.get("ttbId")),
"ttb_id": list_item.get("ttbId"),
"ct_code": list_item.get("classTypeCode"),
"or_code": list_item.get("originCode"),
"permit_number": permit_nums,
"source_dm_im": list_item.get("receivedCode"),
"serial_number": list_item.get("serialNum"),
"commodity": class_type_obj.get("description") or CLASS_TYPES.get(list_item.get("classTypeCode", "")),
"brand_name": list_item.get("brandName"),
"fanciful_name": list_item.get("fancifulName"),
"applicant_name": applicant_name,
"mailing_address": mailing_address,
"formula": detail.get("formulaNum"),
"grape_varietal": grape_text,
"appelation": None, # not in API
"phone_number": submitter.get("telNo"),
"fax_number": submitter.get("faxNo"),
"email": None, # not in API (SubmitterDTO has no email field)
"application_type": None, # not in API
"date_of_application": None, # not in API
"qualifications": qual_text,
"date_issued": list_item.get("completedDate"),
"ttb_status": list_item.get("status"),
"ttb_ct_description": class_type_obj.get("description"),
"vendor_code": detail.get("vendorCode"),
"for_sale_in": detail.get("forSaleIn"),
"bottle_capacity": str(detail.get("bottleCapacity")) if detail.get("bottleCapacity") is not None else None,
"front_label_url": None, # still requires HTML scraping
"back_label_url": None,
"neck_label_url": None,
"strip_label_url": None,
"other_label_url": None,
"parsed_on": get_standard_timestamp(),
}
qual_parts = []
for q in qualifications:
ref = q.get("qualificationRef") or {}
ref_id = ref.get("refQualificationId")
text = ref.get("longDesc") or QUALIFICATIONS.get(ref_id) or q.get("qualftnCommentText") or ""
if text:
qual_parts.append(text)
qual_text = " | ".join(qual_parts) or None

grape_text = "; ".join(
g.get("vartlName") or GRAPE_VARIETALS.get(g.get("vartlId")) or str(g.get("vartlId", ""))
for g in grape_varietals
if g.get("vartlName") or g.get("vartlId")
) if grape_varietals else None

primary_permit = next((p for p in permits if p.get("primaryPermitFlg") == "Y"), permits[0] if permits else {})
permit_nums = "; ".join(p.get("permitId") for p in permits if p.get("permitId")) or item.get("permitId")
address_parts = [
primary_permit.get("permitFrstStrtAddr"),
primary_permit.get("permitSecndStrtAddr"),
primary_permit.get("permitCityAddr"),
primary_permit.get("permitStateAddr"),
primary_permit.get("permitZipAddr"),
primary_permit.get("permitCntryAddr"),
]
mailing_address = ", ".join(p for p in address_parts if p) or None
first = submitter.get("submtrFrstName") or ""
last = submitter.get("submtrLastName") or ""
applicant_name = (primary_permit.get("permitName") or
" ".join(x for x in [first, last] if x) or None)

record.update({
"applicant_name": applicant_name,
"mailing_address": mailing_address,
"permit_number": permit_nums,
"formula": detail.get("formulaNum"),
"grape_varietal": grape_text,
"appelation": None, # not in API
"phone_number": submitter.get("telNo"),
"fax_number": submitter.get("faxNo"),
"email": None, # not in API
"application_type": None, # not in API
"date_of_application": None, # not in API
"qualifications": qual_text,
"vendor_code": detail.get("vendorCode"),
"for_sale_in": detail.get("forSaleIn"),
"bottle_capacity": str(detail.get("bottleCapacity")) if detail.get("bottleCapacity") is not None else None,
"front_label_url": None, # still requires HTML scraping
"back_label_url": None,
"neck_label_url": None,
"strip_label_url": None,
"other_label_url": None,
"parsed_on": get_standard_timestamp(),
})

return record


def upsert_junction_tables(
Expand Down Expand Up @@ -558,20 +557,13 @@ def fetch_and_save(conn: duckdb.DuckDBPyConnection | None) -> int:
details_map[ttb_id] = detail
time.sleep(REQUEST_DELAY)

colas_rows = [map_to_colas_record(r, details_map.get(r.get("ttbId", ""))) for r in records]
parsed_rows = (
[map_to_parsed_record(r, details_map[r["ttbId"]]) for r in records if r.get("ttbId") in details_map]
if FETCH_DETAILS else []
)
colas_rows = [map_to_colas_record(r, details_map.get(r.get("ttbId", ""))) for r in records]

# ── DuckDB upsert (skipped in PG_ONLY mode) ───────────────────────────
if not PG_ONLY and conn is not None:
n = batch_upsert_to_db(conn, cast(list[dict[str, Any]], colas_rows), "colas", "cola_id", batch_size=BATCH_SIZE)
saved += n
logger.info(f"Page {page}: upserted {n} colas ({saved:,}/{effective_total:,} total)")
if parsed_rows:
batch_upsert_to_db(conn, cast(list[dict[str, Any]], parsed_rows), "cola_parsed_data", "cola_id", batch_size=BATCH_SIZE)
logger.info(f" → also saved {len(parsed_rows)} parsed_data records")
else:
saved += len(colas_rows)
logger.info(f"Page {page}: processed {len(colas_rows)} colas ({saved:,}/{effective_total:,} total)")
Expand All @@ -580,9 +572,6 @@ def fetch_and_save(conn: duckdb.DuckDBPyConnection | None) -> int:
if PG_MODE and _pg_conn is not None:
pg_n = pg.upsert_colas(_pg_conn, colas_rows)
logger.info(f" → Postgres: upserted {pg_n} colas")
if parsed_rows:
pg_pn = pg.upsert_parsed_data(_pg_conn, parsed_rows)
logger.info(f" → Postgres: upserted {pg_pn} parsed_data records")

# Upsert junction tables (qualifications, grape varietals) — requires details + DuckDB
if FETCH_DETAILS and details_map and not PG_ONLY and conn is not None:
Expand Down
Loading