Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ Thumbs.db
# Secrets (GCP service accounts, etc.)
secrets/
.opencode
.Rproj.user
CLAUDE.local.md
.Rhistory
r-archive/config.yml
37 changes: 25 additions & 12 deletions docs/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,33 @@ Patient pipeline is complete and deployed to production (Cloud Run).
| Module | Purpose |
|--------|---------|
| `extract/patient.py` | Read Excel trackers → raw parquet (openpyxl, multi-sheet) |
| `extract/product.py` | Read Excel trackers → raw product parquet (month sheets, stock section) |
| `extract/wide_format.py` | Mandalay wide-format handling (column expansion 2020-21; cell splitting 2017-19) |
| `clean/patient.py` | Type conversion, validation, transformations → cleaned parquet |
| `clean/schema.py` | 83-column meta schema matching R output |
| `clean/product.py` | Product cleaning pipeline (R steps 2.0-2.21) → cleaned product parquet |
| `clean/schema.py` | 83-column patient meta schema matching R output |
| `clean/schema_product.py` | 19-column product meta schema + helpers |
| `clean/converters.py` | Safe type conversion with ErrorCollector |
| `clean/validators.py` | Case-insensitive allowed-values validation |
| `clean/transformers.py` | Explicit transformations (regimen, BP splitting, FBG) |
| `clean/date_parser.py` | Flexible date parsing (Excel serials, DD/MM/YYYY, month-year) |
| `tables/patient.py` | Aggregate cleaned parquets → static, monthly, annual tables |
| `tables/product.py` | Aggregate cleaned product parquets → product_data table |
| `tables/clinic.py` | Create clinic static table from reference_data/clinic_data.xlsx |
| `tables/logs.py` | Aggregate error logs → logs table |
| `tables/metadata.py` | Tracker metadata table (MD5 + per-tracker output presence flags) |
| `pipeline/patient.py` | Orchestrate extract+clean per tracker, parallel workers |
| `pipeline/tracker.py` | Per-tracker pipeline execution |
| `pipeline/product.py` | Product pipeline orchestration (mirrors patient, single product_data table) |
| `pipeline/tracker.py` | Per-tracker pipeline execution (patient + product) |
| `pipeline/models.py` | Result dataclasses |
| `gcp/storage.py` | GCS download/upload |
| `gcp/bigquery.py` | BigQuery table load |
| `gcp/drive.py` | Google Drive download (clinic_data.xlsx); file ID hardcoded in module |
| `reference/synonyms.py` | Column name synonym mapping (YAML) |
| `reference/products.py` | Stock_Summary product reference loader (known products, categories) |
| `reference/provinces.py` | Allowed province validation |
| `reference/loaders.py` | YAML loading utilities |
| `state/` | State management module (exists, not yet wired into pipeline) |
| `state/` | Reserved for incremental-processing logic (design in [migration/MIGRATION_GUIDE.md](migration/MIGRATION_GUIDE.md); not yet implemented) |
| `utils/` | Shared utilities |
| `config.py` | Pydantic settings from `.env` / `A4D_*` env vars |
| `logging.py` | loguru setup, `file_logger()` context manager |
Expand All @@ -36,24 +44,28 @@ Patient pipeline is complete and deployed to production (Cloud Run).
## CLI Commands

```bash
uv run a4d process-patient # Extract + clean + tables (local run)
uv run a4d create-tables # Re-create all tables (patient, logs, clinic) from existing cleaned parquets
uv run a4d process-patient # Extract + clean + tables (local run, patient)
uv run a4d process-product # Extract + clean + table (local run, product)
uv run a4d create-tables # Re-create patient/logs/clinic tables from existing cleaned parquets
uv run a4d create-product-tables # Re-create product table from existing cleaned parquets
uv run a4d upload-tables # Upload tables to BigQuery
uv run a4d download-trackers # Download tracker files from GCS
uv run a4d upload-output # Upload output directory to GCS
uv run a4d download-reference-data # Download clinic_data.xlsx from Google Drive into reference_data/
uv run a4d run-pipeline # Full end-to-end pipeline (drive download→GCS download→process→upload)
uv run a4d run-pipeline # Full end-to-end pipeline (patient + product arms, drive/GCS/BigQuery)
```

Key options: `--file` (single tracker), `--workers N`, `--force`, `--skip-tables`, `--skip-download`, `--skip-upload`, `--skip-drive-download`.
Key options: `--file` (single tracker), `--workers N`, `--skip-tables`, `--skip-download`, `--skip-upload`, `--skip-drive-download`, `--skip-product`, `--incremental` (skip trackers matching previous run's manifest).

## Output Directory Structure

```text
output/
├── patient_data_raw/ # Raw extracted parquets (one per tracker)
├── patient_data_cleaned/ # Cleaned parquets (one per tracker)
├── tables/ # Final tables: static.parquet, monthly.parquet, annual.parquet, logs.parquet, clinic_data_static.parquet
├── patient_data_raw/ # Raw extracted patient parquets (one per tracker)
├── patient_data_cleaned/ # Cleaned patient parquets (one per tracker)
├── product_data_raw/ # Raw extracted product parquets (one per tracker)
├── product_data_cleaned/ # Cleaned product parquets (one per tracker)
├── tables/ # Final tables: patient_data_{static,monthly,annual}.parquet, product_data.parquet, clinic_data_static.parquet, table_logs.parquet, tracker_metadata.parquet
└── logs/ # Per-tracker log files (JSON)
```

Expand All @@ -68,5 +80,6 @@ output/
## Migration Status

- **Patient pipeline**: complete, validated against 174 trackers, deployed to production
- **Product pipeline**: not yet started
- **State management**: module exists but not wired into pipeline yet
- **Product pipeline**: complete, merged into `src/a4d/` (2026-04-23).
- **Tracker metadata table**: generated on every `create-tables` / `run-pipeline` run (MD5 + output-presence flags) and uploaded to BigQuery `tracker_metadata`.
- **Incremental processing**: shipped 2026-05-01 behind the `--incremental` CLI flag (opt-in) on `process-patient`, `process-product`, and `run-pipeline`. Skips trackers whose MD5 + completion state match the previous run's manifest (BigQuery → local parquet → empty fallback). Default behaviour unchanged. See `a4d.state` module + [migration/MIGRATION_GUIDE.md](migration/MIGRATION_GUIDE.md) state-management section.
62 changes: 38 additions & 24 deletions docs/migration/MIGRATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Reference for the A4D pipeline migration from R to Python.

**Status**: Phases 0–7 complete. Patient pipeline production-ready. Product pipeline not yet started.
**Status**: Phases 0–9 complete. Patient pipeline production-ready. Product pipeline merged into `src/a4d/` on 2026-04-23.
**Branch**: `migration`

---
Expand Down Expand Up @@ -87,48 +87,66 @@ upload-tables # tables/*.parquet → BigQuery

```
src/a4d/
├── extract/patient.py # Excel → raw parquet
├── extract/
│ ├── patient.py # Patient: Excel → raw parquet
│ ├── product.py # Product: Excel month sheets → raw parquet
│ └── wide_format.py # Mandalay wide-format handlers (column/cell)
├── clean/
│ ├── patient.py # Main cleaning pipeline
│ ├── schema.py # 83-column meta schema
│ ├── patient.py # Patient cleaning pipeline
│ ├── product.py # Product cleaning pipeline (R steps 2.0-2.21)
│ ├── schema.py # 83-column patient schema
│ ├── schema_product.py # 19-column product schema
│ ├── converters.py # Safe type conversion + ErrorCollector
│ ├── validators.py # Case-insensitive allowed-values
│ ├── transformers.py # Explicit transformations
│ └── date_parser.py # Flexible date parsing
├── tables/
│ ├── patient.py # static/monthly/annual aggregation
│ ├── product.py # product_data aggregation
│ ├── clinic.py # clinic static table
│ └── logs.py # Error log aggregation
├── pipeline/
│ ├── patient.py # Orchestration + parallel workers
│ ├── tracker.py # Per-tracker execution
│ ├── patient.py # Patient orchestration + parallel workers
│ ├── product.py # Product orchestration (mirrors patient)
│ ├── tracker.py # Per-tracker execution (patient + product)
│ └── models.py # Result dataclasses
├── gcp/
│ ├── storage.py # GCS operations
│ ├── drive.py # Google Drive (clinic_data.xlsx)
│ └── bigquery.py # BigQuery load
├── reference/
│ ├── synonyms.py # Column name mapping (YAML)
│ ├── products.py # Stock_Summary product reference loader
│ ├── provinces.py # Allowed province validation
│ └── loaders.py # YAML loading utilities
├── state/ # State management (exists, not yet wired up)
├── config.py # Pydantic settings from A4D_* env vars
├── logging.py # loguru setup
├── errors.py # Shared error types
└── cli.py # Typer CLI (6 commands)
└── cli.py # Typer CLI (patient + product commands, run-pipeline)
```

### State Management (Designed, Not Yet Active)
### State Management (Incremental Processing)

```
1. Container starts (stateless, fresh)
2. Query BigQuery metadata table
SELECT file_name, file_hash FROM tracker_metadata
3. Compare with current file hashes
4. Process only: new + changed + previously failed
5. Update metadata table (append new records)
SELECT file_name, clinic_code, md5, complete FROM tracker_metadata
3. Compare with current file MD5s
4. Process only: new + changed + previously incomplete
5. Re-publish metadata table (full replace) at end of run
6. Container shuts down (state persists in BigQuery)
```

Currently: pipeline processes all trackers found in `data_root`. Incremental logic exists in `state/` but is not wired into `pipeline/patient.py` yet.
Wired up via the `a4d.state` module ([src/a4d/state/](../../src/a4d/state/)) and exposed
through the `--incremental` CLI flag on `process-patient`, `process-product`, and
`run-pipeline`. Source precedence is BigQuery → local
`output_root/tables/tracker_metadata.parquet` → empty manifest, so local devs
without `gcloud auth` get the local-parquet fallback automatically.

The flag is **opt-in**: default behaviour is unchanged (process every tracker
found in `data_root`). Flipping the default to incremental is a separate
decision after a soak window.

---

Expand Down Expand Up @@ -226,6 +244,7 @@ with file_logger("clinic_001_patient", output_root) as log:
| 5 | Pipeline integration: `pipeline/patient.py` + parallel processing |
| 6 | GCP: `gcp/storage.py`, `gcp/bigquery.py`, CLI commands |
| 7 | Validation: 174 trackers compared, 8 bugs fixed, production verdict |
| 9 | Product pipeline: merged WIP product modules into `src/a4d/`; `run-pipeline` runs both arms. |

---

Expand All @@ -238,18 +257,13 @@ with file_logger("clinic_001_patient", output_root) as log:
- Compare dashboard reports with R pipeline baseline
- Fix any issues discovered during first real run

### Phase 9: Product Pipeline

- `extract/product.py` — same pattern as patient extraction
- `clean/product.py` — same pattern as patient cleaning
- `tables/product.py` — product aggregation tables
- Validate against R product pipeline outputs

### State Management (Incremental Processing)
### Production Scheduling

- `state/` module exists with BigQuery state design
- Wire into `pipeline/patient.py` so only changed/new trackers are processed
- Required before production scheduling (Cloud Run + Cloud Scheduler)
Cloud Run + Cloud Scheduler wiring (cron, image build, deploy manifest). The
state module shipped behind the `--incremental` CLI flag — see the
[State Management](#state-management-incremental-processing) section above.
The default behaviour remains "process every tracker"; flipping the default to
incremental is a separate decision after a soak window.

---

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Feature Proposal: Product Data Processing Pipeline

> **OBSOLETE — historical reference only.** The product data pipeline described here was implemented and merged into `src/a4d/` on 2026-04-23. This document predates that work and is preserved for context on the original proposal; the "Problem Statement", "Implementation Plan", "Success Criteria", and "Questions for User" sections no longer reflect reality. For current state, see [docs/CLAUDE.md](../CLAUDE.md) Migration Status. For the R↔Python step mapping that guided the implementation, see [product_r_to_python_mapping.md](product_r_to_python_mapping.md).

## Summary

Implement a complete product data processing pipeline in the Python codebase to match the functionality of the R pipeline. This is a critical gap as the R pipeline processes both patient AND product data, while the Python pipeline currently only handles patient data.
Expand Down
Loading