Skip to content

Product pipeline#6

Open
Ali88-hub wants to merge 10 commits intomigrationfrom
product-pipeline
Open

Product pipeline#6
Ali88-hub wants to merge 10 commits intomigrationfrom
product-pipeline

Conversation

@Ali88-hub
Copy link
Copy Markdown
Collaborator

Summary

Adds the product data pipeline (extraction → cleaning → tables) alongside
finalization work on the patient pipeline. Output now includes
product_data parquet/BQ table and an incremental processing flow keyed off
tracker_metadata.

This branch represents the cumulative Python migration on top of dev.
The recent (product-focused) commits are the primary delta worth reviewing;
earlier commits are part of the established migration baseline.

What's new

Product pipeline (most recent commits):

  • Shared extraction layer + product-side extractor (src/a4d/extract/product.py)
  • Per-sheet cleaning pipeline matching R's loop semantics
    (src/a4d/clean/product.py, partitions by [sheet, product] via .over())
  • Table aggregation (src/a4d/tables/product.py)
  • Incremental processing via tracker_metadata state filter
    (src/a4d/state/)
  • CLI integration: a4d run-pipeline runs both arms; --skip-product
    available for patient-only runs
  • Source-vs-output reconciliation validators for both arms
    (src/a4d/validate/source_vs_output_*.py)

Patient pipeline:

  • Typo-rescue for known-misparsed patient IDs
  • Multi-insulin CSV emission
  • Shared tracker helpers extracted to src/a4d/pipeline/tracker.py

Reference data / config:

  • reference_data/validation_rules.yaml (centralized validation thresholds)
  • Updated clinic_data.xlsx

Test plan

  • uv sync && uv run pytest
  • just run (full pipeline, both arms) on a small tracker subset
  • just run --skip-product (patient-only path still works)
  • Source-vs-output reconciliation passes for product
  • Spot-check product_data.parquet against R reference output
  • CI green (Python 3.13, ruff, type checks)

Ali88-hub and others added 10 commits May 4, 2026 16:34
- New extract/common.py centralizes get_tracker_year, find_month_sheets,
  extract_tracker_month, clean_excel_errors so the upcoming product extractor
  can reuse them without duplicating patient.py's logic.
- extract/patient.py re-exports the shared helpers; behavior unchanged.
- tables/patient.py accepts a pre-loaded DataFrame to avoid re-reading the
  cleaned parquet for each table (static/monthly/annual).
- pipeline/patient.py loads the cleaned parquet once and threads it through;
  drops the vestigial `force` kwarg.
- clean/patient.py: tuple-unpack rewrite of the date-validation row loop
  (skips per-row dict construction). Behavior-preserving.
- config.py: surface min_tracker_year/max_tracker_year as Pydantic defaults
  (2017/2030) so the new tests can override without breaking existing runs.
- tests/test_tables/test_patient.py updated for the new DataFrame-input
  signatures.

Patient parquets remain byte-identical to migration; this is a pure refactor.
Three intentional patient-output deltas vs. migration:

1. Date typo rescue: clean/date_parser.py adds rescue_date_typos() and
   TYPO_REPLACEMENTS so common Excel-date typos (e.g., misspelled month
   strings) round-trip to a valid date instead of becoming Undefined.
   clean/converters.py wires the rescue through parse_date_column.
   errors.py registers the "typo_rescued" log code.

2. Multi-insulin CSV acceptance: clean/validators.py adds the
   allow_csv_subset flag so 2024+ trackers' comma-separated
   insulin_subtype values (e.g., "pre-mixed,rapid-acting") survive
   validation in canonical case instead of being replaced by Undefined.
   reference_data/validation_rules.yaml flips insulin_subtype on.
   clean/patient.py: docstring update on _derive_insulin_fields.

3. load_numeric_ranges helper + numeric_ranges: YAML block (consumed by
   the validate/ tree shipped in a later commit). Carrying both now keeps
   the YAML self-consistent.

Plus a small per-row dedup optimization in clean/converters.py and
the matching test updates.
Lays the groundwork the product cleaning pipeline (next commit) needs:

- extract/product.py: read the Stock_Summary section across month sheets
  (parallels extract/patient.py and reuses extract/common helpers).
- extract/wide_format.py: Mandalay-specific wide-format handling
  (column expansion 2020-21; cell splitting 2017-19).
- clean/schema_product.py: 19-column product meta schema + helpers.
- reference/products.py: known-product reference loader (categories,
  canonical names) backing product validation.

Reusable patient-side polish that the product layer now depends on:

- reference/synonyms.py: cache load_*_mapper with @lru_cache and fix the
  scalar-string -> list normalization so single-string YAML entries no
  longer iterate as characters.
- reference/loaders.py: explicit UTF-8 in load_yaml.
- tables/logs.py: schema_overrides on parse_log_file.
- clean/schema.py: docstring touch-up.
- clean/product.py implements R steps 2.0-2.21 from
  helper_product_data.R: synonym mapping, type conversion,
  patient-id repair, supplier-name preservation, units/balance
  reconciliation, end-row zeroing, and the per-sheet partition
  required for the .over() rolling balance to match R's per-sheet
  loop. Reuses safe_convert_column, fix_patient_id, and
  load_product_mapper from migration.
- tables/product.py aggregates cleaned product parquets into the
  single product_data table.
- tests/test_clean/test_product.py covers the cleaning steps;
  tests/test_tables/test_link_product_patient.py exercises the
  cross-product/patient link assumptions used by downstream tables.
Adds the manifest-driven incremental skip path that future commits will
expose behind --incremental:

- state/source.py + manifest.py: load the previous run's manifest from
  BigQuery, falling back to local parquet, then to an empty manifest.
- state/filter.py: skip trackers whose MD5 + per-pipeline completion
  flags match the previous manifest.
- tables/metadata.py: build tracker_metadata (MD5 + per-tracker output-
  presence flags) at table-creation time; md5_file is reused by filter.py.
- gcp/bigquery.py: add select_tracker_metadata and register product_data
  + tracker_metadata in PARQUET_TO_TABLE.
- state/__init__.py: additive re-exports for the new helpers.

Tests cover each state module, the metadata table builder, and the new
BigQuery accessor.
Wires the product cleaning + state machinery into a runnable pipeline:

- pipeline/product.py: orchestrates product extract+clean+table for the
  full corpus, mirroring pipeline/patient.py.
- pipeline/tracker.py: adds process_tracker_product so the per-tracker
  worker can run patient and product arms together.
- cli.py: introduces process-product, create-product-tables, and the
  unified run-pipeline command (patient + product + drive/GCS/BigQuery);
  adds --force and --incremental flags across process-patient,
  process-product, and run-pipeline. Imports the new state helpers and
  product entry points. The full ~810-line diff lands in one commit
  because the new commands and flags interleave with existing patient
  command bodies.
- tests/test_cli/test_force_incremental.py covers --force/--incremental
  for both process-* commands and run-pipeline.

Breaking change:

- tables/__init__.py drops the wildcard re-exports from .patient,
  .clinic, .logs, .metadata, .product. Callers must now import from the
  specific submodule (a4d.tables.patient, a4d.tables.product, ...).
  Verified zero affected call sites in src/, tests/, and scripts/.

Plus the doc/config drag-along:

- justfile: new product + run-pipeline recipes.
- readme.md, docs/CLAUDE.md, docs/migration/MIGRATION_GUIDE.md,
  docs/migration/PYTHON_IMPROVEMENTS.md: product pipeline + state docs.
- docs/PRODUCT_DATA_PIPELINE_FEATURE.md moved into docs/migration/.
- .gitignore: ignore .Rproj.user, .Rhistory, CLAUDE.local.md, and
  r-archive/config.yml.
Adds the source-vs-output validators that re-read each tracker's source
cells and compare against the cleaned parquet to flag silent data loss
or schema drift before tables are uploaded.

- validate/source_vs_output_patient.py + source_vs_output_product.py
  walk every patient/product row in the cleaned output, locate the
  matching source cell, and emit a structured diff (typed as
  match/converted/missing/unparseable). Activates the load_numeric_ranges
  helper and numeric_ranges: YAML block carried in the typo-rescue commit.
- validate/common.py holds the shared diff and IO primitives.
- Tests cover both patient and product paths against fixture trackers.
Read query rows directly and build the polars DataFrame with an
explicit schema instead of going through pandas. Removes a hidden
dependency on pandas (not declared in pyproject) and makes the
schema deterministic when the result set is empty.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Remove explicit encoding="utf-8" from open() calls and the
env_file_encoding setting (utf-8 is the default on the platforms
we target), and drop the Windows backslash-to-forward-slash
conversion in GCS blob names since Path.relative_to already
yields POSIX separators on macOS/Linux.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Removes the windows-shell directive, [unix]/[windows] recipe
attributes, and the [windows] PowerShell clean recipe. The project
targets macOS (dev) + Linux (Cloud Run / Docker prod); these shims
were drag-along noise from 85891e7 and were never present on
migration. Recipes that rely on bash heredocs will now fail loudly
on Windows, which is correct for an unsupported platform.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@Ali88-hub Ali88-hub requested a review from pmayd May 5, 2026 13:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant