Skip to content

Code optimaiztion#3

Open
saifislam2026 wants to merge 35 commits into
mainfrom
code-optimaiztion
Open

Code optimaiztion#3
saifislam2026 wants to merge 35 commits into
mainfrom
code-optimaiztion

Conversation

@saifislam2026
Copy link
Copy Markdown

@saifislam2026 saifislam2026 commented May 14, 2026

Clean COLA pipeline
Files added/updated:

  1. code/program1_postgres_ingest_with_handwritten_file.py

    • Finds COLA IDs in the selected date window - ingest the data- including handwritten id
    • Exports missing IDs to CSV.
    • Flags those COLAs as handwritten/fallback candidates in Postgres.
  2. code/program2_azure_ingest_with_handwritten_image.py

    • Reads the Program 1 CSV.
    • Fetches fallback direct images from TTB.
    • Uploads valid images to Azure Blob Storage.
    • Inserts SUCCESS or FAILED image rows into cola_registry.cola_images.

zukadek and others added 30 commits April 6, 2026 14:50
- fetch_colas_api.py: remove parsed_rows / cola_parsed_data upserts; all
  fields (core + detail) now written to colas table in one pass
- postgres_helper.py: drop cola_parsed_data table; expand colas schema with
  all detail columns (applicant, qualifications, grape_varietal, etc.); add
  ALTER TABLE IF NOT EXISTS migrations for existing DBs; remove
  upsert_parsed_data(); rewrite upsert_colas() with COALESCE to preserve
  detail data on core-only updates
- scrape_cola_images.py: remove all_parsed_cola_data collection and
  cola_parsed_data upsert block; image scraper now only writes image records
- docs/cola_data_model.sql: detail columns merged into colas table,
  cola_parsed_data table removed; SQL view syntax fixes (CROSS JOIN LATERAL)
- docs/cola_data_model.dbml: single consolidated DBML reflecting merged schema
- azure_blob_helper.py: new helper for Azure Blob Storage uploads
•	reads candidate cola_id values from Postgres 
•	opens each TTB COLA detail page 
•	finds electronic label image URLs 
•	downloads images with the same session 
•	uploads them to Azure Blob 
•	writes image metadata rows to cola_images
- postgres_helper.py: align cola_images schema with live DB (drop file_name,
  local_path, raw_json; add created_by/updated_by); switch unique constraint
  to (cola_id, blob_name) WHERE blob_name IS NOT NULL; add SAVEPOINT-based
  migrations so ALTER TABLE / CREATE INDEX failures don't abort the
  transaction; add deduplication before index creation; drop legacy
  cola_id+image_type index on startup

- scrape_cola_images.py: add Postgres support (--postgres, --postgres-only
  flags, PG_MODE/PG_ONLY globals); fix completed_date=None crash in
  download_cola_detail by deriving date from cola_id prefix; remap img_type
  -> image_type in Postgres upsert path

- test_pipeline.py: new orchestration script — fetch 50 COLAs to Postgres,
  query their IDs, scrape images to Azure Blob + Postgres, print timing
…age records

- postgres_helper: add COALESCE to all core colas fields so partial upserts
  from image scraper (cola_id + image_count_to_parse only) no longer overwrite
  brand_name, completed_date, scraped_on, etc. with NULL; image_count_to_parse
  remains a direct update since image scraper is authoritative for that field
- scrape_cola_images: set download_status = 'SUCCESS'/'FAILED' on all image
  records (paper and electronic paths); failed electronic images now append to
  the image list so they are tracked in DB instead of silently skipped
…blob

Feature/schema consolidation azure blob
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a two-step "handwritten/fallback" COLA pipeline (Program 1 + Program 2), introduces an Azure Blob Storage helper, wires Azure uploads and a --postgres-only mode into the existing image scraper, and consolidates the previously separate cola_parsed_data table into the colas table. Documentation and lockfile are updated accordingly.

Changes:

  • New standalone scripts: program1_postgres_ingest_with_handwritten_file.py (export missing COLA IDs + flag handwritten), program2_azure_ingest_with_handwritten_image.py (fetch fallback images, upload to Azure, write to cola_images).
  • Refactors postgres_helper.py / fetch_colas_api.py to fold parsed-detail fields into the colas table and drop cola_parsed_data; scrape_cola_images.py gains Azure Blob upload and --postgres-only paths plus a new azure_blob_helper.py.
  • Adds test_pipeline.py, updates docs/, pyproject.toml, uv.lock for azure-identity and azure-storage-blob.

Reviewed changes

Copilot reviewed 13 out of 17 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
code/program1_postgres_ingest_with_handwritten_file.py New: export missing-image IDs and upsert handwritten flags.
code/program2_azure_ingest_with_handwritten_image.py New: fetch fallback images, upload to Azure, insert into cola_images.
code/azure_blob_helper.py New Azure Blob auth/upload helper using ClientSecretCredential.
code/scrape_cola_images.py Adds Azure upload path and --postgres/--postgres-only modes.
code/postgres_helper.py Merges parsed-data fields into colas; removes cola_parsed_data; updates cola_images schema.
code/fetch_colas_api.py Removes map_to_parsed_record; merges detail mapping into map_to_colas_record.
code/test_pipeline.py New end-to-end smoke test for metadata + image pipeline.
docs/README.md New README for Program 1 / Program 2 (references some non-existent scripts).
docs/cola_data_model.dbml, docs/cola_data_model.sql, docs/postgres_schema.dbml Docs updated/removed to reflect schema consolidation.
docs/cola_validation_report_with_external_output_20260414_151643.txt Adds validation report snapshot.
pyproject.toml, uv.lock Adds azure-identity and azure-storage-blob dependencies.
.gitignore Ignores .DS_Store and *.duckdb.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +273 to +289
def db_insert_result(conn, *, cola_id, detail_page_url, source_image_url,
blob_url, blob_name, status, error_message):
sql = """
INSERT INTO cola_registry.cola_images
(cola_id, image_type, source_image_url, blob_url, blob_name,
download_status, error_message, is_paper_submission,
image_width_px, image_height_px, image_dimension_text,
created_by, updated_by)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
with conn.cursor() as cur:
cur.execute(sql, (
cola_id, "label_fallback_publicViewImage",
source_image_url, blob_url, blob_name,
status, error_message, True,
None, None, None,
CREATED_BY, CREATED_BY,
Comment thread docs/README.md
Comment on lines +5 to +53
## Program 1
File: program1_postgres_flag_file.py

Purpose:
1. API to PG ingest
2. Apply window filter dates
3. Flag missing IDs and export CSV

What it does:
- Pulls COLA records from TTB API for the selected window.
- Upserts rows into Postgres table cola_registry.colas.
- Finds IDs with no SUCCESS image and writes missing flags in Postgres.
- Exports a flag CSV used by Program 2.

Window filter arguments:
- --start-date YYYY-MM-DD
- --end-date YYYY-MM-DD

Typical run:
powershell
& "h:/My Drive/TTB/.env/.venv/Scripts/python.exe" "h:/My Drive/TTB/Code0508/gitver/program1_postgres_flag_file.py" --start-date 2026-04-23 --end-date 2026-05-07

## Program 2
File: program2_azure_ingest_with_flags.py

Purpose:
1. Ingest all IDs
2. Apply window filter dates
3. Ingest flagged IDs
4. Save dimension data in Postgres

What it does:
- Reads all window IDs from Postgres and runs all-ID ingest.
- Reads Program 1 flag CSV and ingests flagged IDs.
- Uploads images to Azure blob storage.
- Writes missing and handwritten marker blobs.
- Inserts SUCCESS image rows into cola_registry.cola_images.
- Saves dimensions in Postgres: image_width_px, image_height_px, image_dimension_text.

Window filter arguments:
- --start-date YYYY-MM-DD
- --end-date YYYY-MM-DD

Required input:
- --flag-csv path from Program 1 output

Typical run:
powershell
& "h:/My Drive/TTB/.env/.venv/Scripts/python.exe" "h:/My Drive/TTB/Code0508/gitver/program2_azure_ingest_with_flags.py" --start-date 2026-04-23 --end-date 2026-05-07 --flag-csv "h:/My Drive/TTB/Code0508/rpt/revise_missing_flags_2026-04-23_to_2026-05-07_YYYYMMDD_HHMMSS.csv"
Comment on lines +846 to +851
id_list = ", ".join(f"'{i}'" for i in TTB_IDS)
cur.execute(f"""--sql
SELECT cola_id::text, completed_date
FROM cola_registry.colas
WHERE cola_id::text IN ({id_list})
""")
Comment on lines +341 to +399
with get_db_conn() as conn:
if db_has_success(conn, cola_id, source_url, blob_name):
result["outcome"] = "SKIPPED_EXISTING_SUCCESS"
return result

if not args.execute:
if args.check_blob_in_dry_run and blob is not None:
result["outcome"] = "DRY_RUN_BLOB_EXISTS" if blob.exists() else "DRY_RUN_WOULD_UPLOAD"
else:
result["outcome"] = "DRY_RUN_VALID_IMAGE"
return result

if blob is None:
raise PipelineError("Azure container client is not initialized")

blob.upload_blob(
resp.content, overwrite=True,
content_settings=ContentSettings(content_type=result["content_type"]),
)
result["blob_url"] = blob.url

with get_db_conn() as conn:
conn.autocommit = False
with conn.cursor() as cur:
cur.execute("SELECT pg_try_advisory_xact_lock(hashtext(%s))", (cola_id,))
locked = bool(cur.fetchone()[0])
if not locked:
conn.rollback()
result["outcome"] = "SKIPPED_LOCK_NOT_ACQUIRED"
return result
if db_has_success(conn, cola_id, source_url, blob_name):
conn.rollback()
result["outcome"] = "SKIPPED_RACE_EXISTING_SUCCESS"
return result
db_insert_result(
conn, cola_id=cola_id, detail_page_url=detail_page_url,
source_image_url=source_url, blob_url=blob.url,
blob_name=blob_name, status="SUCCESS", error_message=None,
)
conn.commit()

result["outcome"] = "SUCCESS_UPLOADED"
return result

except Exception as exc:
result["error"] = str(exc)[:500]
result["outcome"] = "FAILED"

if args.execute and args.record_failures:
try:
with get_db_conn() as conn:
conn.autocommit = False
db_insert_result(
conn, cola_id=cola_id, detail_page_url=detail_page_url,
source_image_url=source_url, blob_url=None,
blob_name=result["blob_name"] or None,
status="FAILED", error_message=result["error"],
)
conn.commit()
Comment on lines +400 to +401
except Exception:
pass
Comment on lines +872 to +874
pg.upsert_colas(_pg_conn, image_count_updates)
logger.info(f"Postgres: updated image_count_to_parse for {len(image_count_updates)} COLAs")
except Exception as e:
Comment on lines +279 to +292
with get_conn() as conn:
rows = fetch_missing_rows(conn, args.start_date, args.end_date, args.limit)

flagged_count = 0
if not args.no_flag_handwritten:
ensure_handwritten_flag_table(conn)
cola_ids = [str(row[0]) for row in rows]
flagged_count = upsert_handwritten_flags(
conn, cola_ids,
args.handwritten_source,
args.handwritten_reason,
args.marked_by,
)

Comment on lines +228 to +231
with conn.cursor() as cur:
for cola_id in cola_ids:
cur.execute(sql, (cola_id, source, reason, marked_by))
conn.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants