Code optimaiztion#3
Open
saifislam2026 wants to merge 35 commits into
Open
Conversation
- fetch_colas_api.py: remove parsed_rows / cola_parsed_data upserts; all fields (core + detail) now written to colas table in one pass - postgres_helper.py: drop cola_parsed_data table; expand colas schema with all detail columns (applicant, qualifications, grape_varietal, etc.); add ALTER TABLE IF NOT EXISTS migrations for existing DBs; remove upsert_parsed_data(); rewrite upsert_colas() with COALESCE to preserve detail data on core-only updates - scrape_cola_images.py: remove all_parsed_cola_data collection and cola_parsed_data upsert block; image scraper now only writes image records - docs/cola_data_model.sql: detail columns merged into colas table, cola_parsed_data table removed; SQL view syntax fixes (CROSS JOIN LATERAL) - docs/cola_data_model.dbml: single consolidated DBML reflecting merged schema - azure_blob_helper.py: new helper for Azure Blob Storage uploads
• reads candidate cola_id values from Postgres • opens each TTB COLA detail page • finds electronic label image URLs • downloads images with the same session • uploads them to Azure Blob • writes image metadata rows to cola_images
- postgres_helper.py: align cola_images schema with live DB (drop file_name, local_path, raw_json; add created_by/updated_by); switch unique constraint to (cola_id, blob_name) WHERE blob_name IS NOT NULL; add SAVEPOINT-based migrations so ALTER TABLE / CREATE INDEX failures don't abort the transaction; add deduplication before index creation; drop legacy cola_id+image_type index on startup - scrape_cola_images.py: add Postgres support (--postgres, --postgres-only flags, PG_MODE/PG_ONLY globals); fix completed_date=None crash in download_cola_detail by deriving date from cola_id prefix; remap img_type -> image_type in Postgres upsert path - test_pipeline.py: new orchestration script — fetch 50 COLAs to Postgres, query their IDs, scrape images to Azure Blob + Postgres, print timing
…age records - postgres_helper: add COALESCE to all core colas fields so partial upserts from image scraper (cola_id + image_count_to_parse only) no longer overwrite brand_name, completed_date, scraped_on, etc. with NULL; image_count_to_parse remains a direct update since image scraper is authoritative for that field - scrape_cola_images: set download_status = 'SUCCESS'/'FAILED' on all image records (paper and electronic paths); failed electronic images now append to the image list so they are tracked in DB instead of silently skipped
…blob Feature/schema consolidation azure blob
There was a problem hiding this comment.
Pull request overview
This PR adds a two-step "handwritten/fallback" COLA pipeline (Program 1 + Program 2), introduces an Azure Blob Storage helper, wires Azure uploads and a --postgres-only mode into the existing image scraper, and consolidates the previously separate cola_parsed_data table into the colas table. Documentation and lockfile are updated accordingly.
Changes:
- New standalone scripts:
program1_postgres_ingest_with_handwritten_file.py(export missing COLA IDs + flag handwritten),program2_azure_ingest_with_handwritten_image.py(fetch fallback images, upload to Azure, write tocola_images). - Refactors
postgres_helper.py/fetch_colas_api.pyto fold parsed-detail fields into thecolastable and dropcola_parsed_data;scrape_cola_images.pygains Azure Blob upload and--postgres-onlypaths plus a newazure_blob_helper.py. - Adds
test_pipeline.py, updatesdocs/,pyproject.toml,uv.lockforazure-identityandazure-storage-blob.
Reviewed changes
Copilot reviewed 13 out of 17 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| code/program1_postgres_ingest_with_handwritten_file.py | New: export missing-image IDs and upsert handwritten flags. |
| code/program2_azure_ingest_with_handwritten_image.py | New: fetch fallback images, upload to Azure, insert into cola_images. |
| code/azure_blob_helper.py | New Azure Blob auth/upload helper using ClientSecretCredential. |
| code/scrape_cola_images.py | Adds Azure upload path and --postgres/--postgres-only modes. |
| code/postgres_helper.py | Merges parsed-data fields into colas; removes cola_parsed_data; updates cola_images schema. |
| code/fetch_colas_api.py | Removes map_to_parsed_record; merges detail mapping into map_to_colas_record. |
| code/test_pipeline.py | New end-to-end smoke test for metadata + image pipeline. |
| docs/README.md | New README for Program 1 / Program 2 (references some non-existent scripts). |
| docs/cola_data_model.dbml, docs/cola_data_model.sql, docs/postgres_schema.dbml | Docs updated/removed to reflect schema consolidation. |
| docs/cola_validation_report_with_external_output_20260414_151643.txt | Adds validation report snapshot. |
| pyproject.toml, uv.lock | Adds azure-identity and azure-storage-blob dependencies. |
| .gitignore | Ignores .DS_Store and *.duckdb. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+273
to
+289
| def db_insert_result(conn, *, cola_id, detail_page_url, source_image_url, | ||
| blob_url, blob_name, status, error_message): | ||
| sql = """ | ||
| INSERT INTO cola_registry.cola_images | ||
| (cola_id, image_type, source_image_url, blob_url, blob_name, | ||
| download_status, error_message, is_paper_submission, | ||
| image_width_px, image_height_px, image_dimension_text, | ||
| created_by, updated_by) | ||
| VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) | ||
| """ | ||
| with conn.cursor() as cur: | ||
| cur.execute(sql, ( | ||
| cola_id, "label_fallback_publicViewImage", | ||
| source_image_url, blob_url, blob_name, | ||
| status, error_message, True, | ||
| None, None, None, | ||
| CREATED_BY, CREATED_BY, |
Comment on lines
+5
to
+53
| ## Program 1 | ||
| File: program1_postgres_flag_file.py | ||
|
|
||
| Purpose: | ||
| 1. API to PG ingest | ||
| 2. Apply window filter dates | ||
| 3. Flag missing IDs and export CSV | ||
|
|
||
| What it does: | ||
| - Pulls COLA records from TTB API for the selected window. | ||
| - Upserts rows into Postgres table cola_registry.colas. | ||
| - Finds IDs with no SUCCESS image and writes missing flags in Postgres. | ||
| - Exports a flag CSV used by Program 2. | ||
|
|
||
| Window filter arguments: | ||
| - --start-date YYYY-MM-DD | ||
| - --end-date YYYY-MM-DD | ||
|
|
||
| Typical run: | ||
| powershell | ||
| & "h:/My Drive/TTB/.env/.venv/Scripts/python.exe" "h:/My Drive/TTB/Code0508/gitver/program1_postgres_flag_file.py" --start-date 2026-04-23 --end-date 2026-05-07 | ||
|
|
||
| ## Program 2 | ||
| File: program2_azure_ingest_with_flags.py | ||
|
|
||
| Purpose: | ||
| 1. Ingest all IDs | ||
| 2. Apply window filter dates | ||
| 3. Ingest flagged IDs | ||
| 4. Save dimension data in Postgres | ||
|
|
||
| What it does: | ||
| - Reads all window IDs from Postgres and runs all-ID ingest. | ||
| - Reads Program 1 flag CSV and ingests flagged IDs. | ||
| - Uploads images to Azure blob storage. | ||
| - Writes missing and handwritten marker blobs. | ||
| - Inserts SUCCESS image rows into cola_registry.cola_images. | ||
| - Saves dimensions in Postgres: image_width_px, image_height_px, image_dimension_text. | ||
|
|
||
| Window filter arguments: | ||
| - --start-date YYYY-MM-DD | ||
| - --end-date YYYY-MM-DD | ||
|
|
||
| Required input: | ||
| - --flag-csv path from Program 1 output | ||
|
|
||
| Typical run: | ||
| powershell | ||
| & "h:/My Drive/TTB/.env/.venv/Scripts/python.exe" "h:/My Drive/TTB/Code0508/gitver/program2_azure_ingest_with_flags.py" --start-date 2026-04-23 --end-date 2026-05-07 --flag-csv "h:/My Drive/TTB/Code0508/rpt/revise_missing_flags_2026-04-23_to_2026-05-07_YYYYMMDD_HHMMSS.csv" |
Comment on lines
+846
to
+851
| id_list = ", ".join(f"'{i}'" for i in TTB_IDS) | ||
| cur.execute(f"""--sql | ||
| SELECT cola_id::text, completed_date | ||
| FROM cola_registry.colas | ||
| WHERE cola_id::text IN ({id_list}) | ||
| """) |
Comment on lines
+341
to
+399
| with get_db_conn() as conn: | ||
| if db_has_success(conn, cola_id, source_url, blob_name): | ||
| result["outcome"] = "SKIPPED_EXISTING_SUCCESS" | ||
| return result | ||
|
|
||
| if not args.execute: | ||
| if args.check_blob_in_dry_run and blob is not None: | ||
| result["outcome"] = "DRY_RUN_BLOB_EXISTS" if blob.exists() else "DRY_RUN_WOULD_UPLOAD" | ||
| else: | ||
| result["outcome"] = "DRY_RUN_VALID_IMAGE" | ||
| return result | ||
|
|
||
| if blob is None: | ||
| raise PipelineError("Azure container client is not initialized") | ||
|
|
||
| blob.upload_blob( | ||
| resp.content, overwrite=True, | ||
| content_settings=ContentSettings(content_type=result["content_type"]), | ||
| ) | ||
| result["blob_url"] = blob.url | ||
|
|
||
| with get_db_conn() as conn: | ||
| conn.autocommit = False | ||
| with conn.cursor() as cur: | ||
| cur.execute("SELECT pg_try_advisory_xact_lock(hashtext(%s))", (cola_id,)) | ||
| locked = bool(cur.fetchone()[0]) | ||
| if not locked: | ||
| conn.rollback() | ||
| result["outcome"] = "SKIPPED_LOCK_NOT_ACQUIRED" | ||
| return result | ||
| if db_has_success(conn, cola_id, source_url, blob_name): | ||
| conn.rollback() | ||
| result["outcome"] = "SKIPPED_RACE_EXISTING_SUCCESS" | ||
| return result | ||
| db_insert_result( | ||
| conn, cola_id=cola_id, detail_page_url=detail_page_url, | ||
| source_image_url=source_url, blob_url=blob.url, | ||
| blob_name=blob_name, status="SUCCESS", error_message=None, | ||
| ) | ||
| conn.commit() | ||
|
|
||
| result["outcome"] = "SUCCESS_UPLOADED" | ||
| return result | ||
|
|
||
| except Exception as exc: | ||
| result["error"] = str(exc)[:500] | ||
| result["outcome"] = "FAILED" | ||
|
|
||
| if args.execute and args.record_failures: | ||
| try: | ||
| with get_db_conn() as conn: | ||
| conn.autocommit = False | ||
| db_insert_result( | ||
| conn, cola_id=cola_id, detail_page_url=detail_page_url, | ||
| source_image_url=source_url, blob_url=None, | ||
| blob_name=result["blob_name"] or None, | ||
| status="FAILED", error_message=result["error"], | ||
| ) | ||
| conn.commit() |
Comment on lines
+400
to
+401
| except Exception: | ||
| pass |
Comment on lines
+872
to
+874
| pg.upsert_colas(_pg_conn, image_count_updates) | ||
| logger.info(f"Postgres: updated image_count_to_parse for {len(image_count_updates)} COLAs") | ||
| except Exception as e: |
Comment on lines
+279
to
+292
| with get_conn() as conn: | ||
| rows = fetch_missing_rows(conn, args.start_date, args.end_date, args.limit) | ||
|
|
||
| flagged_count = 0 | ||
| if not args.no_flag_handwritten: | ||
| ensure_handwritten_flag_table(conn) | ||
| cola_ids = [str(row[0]) for row in rows] | ||
| flagged_count = upsert_handwritten_flags( | ||
| conn, cola_ids, | ||
| args.handwritten_source, | ||
| args.handwritten_reason, | ||
| args.marked_by, | ||
| ) | ||
|
|
Comment on lines
+228
to
+231
| with conn.cursor() as cur: | ||
| for cola_id in cola_ids: | ||
| cur.execute(sql, (cola_id, source, reason, marked_by)) | ||
| conn.commit() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Clean COLA pipeline
Files added/updated:
code/program1_postgres_ingest_with_handwritten_file.py
code/program2_azure_ingest_with_handwritten_image.py