Skip to content

Commit 7f88230

Browse files
author
Michael Aydinbas
committed
rename just commands, fix error in tests, fix max_workers not working for run-local
1 parent 0a7c3c5 commit 7f88230

5 files changed

Lines changed: 95 additions & 66 deletions

File tree

a4d-python/SETUP.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,13 @@ gcloud config set project a4dphase2
5858
# Test with a single file (fastest)
5959
just run-file /path/to/tracker.xlsx
6060

61-
# Process all trackers in A4D_DATA_ROOT, skip GCS/BigQuery
62-
just run --skip-upload
61+
# Process all files already in A4D_DATA_ROOT — no GCS
62+
just run-local
6363

64-
# Full pipeline (downloads from GCS, uploads results, loads into BigQuery)
64+
# Download latest files from GCS, process locally — no upload
65+
just run-download
66+
67+
# Full pipeline: download from GCS, process, upload results + load BigQuery
6568
just run
6669
```
6770

a4d-python/justfile

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,30 +59,27 @@ clean:
5959
find . -type d -name __pycache__ -exec rm -rf {} +
6060
find . -type f -name "*.pyc" -delete
6161

62-
# Run full pipeline (extract + clean + tables)
62+
# Full pipeline: download from GCS, process, upload to GCS + BigQuery
6363
run *ARGS:
64-
uv run a4d process-patient {{ARGS}}
64+
uv run a4d run-pipeline {{ARGS}}
6565

66-
# Run pipeline with 8 workers (parallel processing)
67-
run-parallel:
68-
uv run a4d process-patient --workers 8
66+
# Download from GCS, process locally, no upload
67+
run-download *ARGS:
68+
uv run a4d run-pipeline --skip-upload {{ARGS}}
6969

70-
# Extract and clean only (skip table creation)
71-
run-clean:
72-
uv run a4d process-patient --workers 8 --skip-tables
70+
# Process local files only, no GCS (use files already in data_root)
71+
# Optionally pass a path: just run-local --data-root /path/to/trackers
72+
run-local *ARGS:
73+
uv run a4d process-patient {{ARGS}}
7374

74-
# Force reprocess all files (ignore existing outputs)
75-
run-force:
76-
uv run a4d process-patient --workers 8 --force
75+
# Process a single tracker file
76+
run-file FILE:
77+
uv run a4d process-patient --file "{{FILE}}"
7778

7879
# Create tables from existing cleaned parquet files
7980
create-tables INPUT:
8081
uv run a4d create-tables --input "{{INPUT}}"
8182

82-
# Process a single tracker file
83-
run-file FILE:
84-
uv run a4d process-patient --file "{{FILE}}"
85-
8683
# Build Docker image
8784
docker-build:
8885
docker build -t a4d-python:latest .

a4d-python/src/a4d/cli.py

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from rich.console import Console
99
from rich.table import Table
1010

11-
from a4d.pipeline.patient import process_patient_tables, run_patient_pipeline
11+
from a4d.pipeline.patient import discover_tracker_files, process_patient_tables, run_patient_pipeline
1212
from a4d.tables.logs import create_table_logs
1313

1414
app = typer.Typer(
@@ -69,30 +69,36 @@ def process_patient_cmd(
6969
),
7070
] = None,
7171
workers: Annotated[
72-
int, typer.Option("--workers", "-w", help="Number of parallel workers (1 = sequential)")
73-
] = 1,
72+
int | None, typer.Option("--workers", "-w", help="Number of parallel workers (default: A4D_MAX_WORKERS)")
73+
] = None,
7474
skip_tables: Annotated[
7575
bool, typer.Option("--skip-tables", help="Skip table creation (only extract + clean)")
7676
] = False,
7777
force: Annotated[
7878
bool, typer.Option("--force", help="Force reprocessing (ignore existing outputs)")
7979
] = False,
80-
clean: Annotated[
81-
bool,
82-
typer.Option("--clean", help="Wipe output directory before running (default when --file is used)"),
83-
] = False,
80+
data_root: Annotated[
81+
Path | None,
82+
typer.Option("--data-root", "-d", help="Directory containing tracker files (default: from config)"),
83+
] = None,
8484
output_root: Annotated[
8585
Path | None, typer.Option("--output", "-o", help="Output directory (default: from config)")
8686
] = None,
8787
):
8888
"""Process patient data pipeline.
8989
9090
\b
91+
Output is always cleaned before each run so tables reflect only the
92+
current run's files.
93+
9194
Examples:
92-
# Process all trackers in data_root
95+
# Process all trackers in data_root (from config)
9396
uv run a4d process-patient
9497
95-
# Process specific file (output is always cleaned first)
98+
# Process all trackers in a specific directory
99+
uv run a4d process-patient --data-root /path/to/trackers
100+
101+
# Process specific file
96102
uv run a4d process-patient --file /path/to/tracker.xlsx
97103
98104
# Parallel processing with 8 workers
@@ -101,25 +107,45 @@ def process_patient_cmd(
101107
# Just extract + clean, skip tables
102108
uv run a4d process-patient --skip-tables
103109
"""
104-
console.print("\n[bold blue]A4D Patient Pipeline[/bold blue]\n")
110+
from a4d.config import settings as _settings
105111

106-
# Prepare tracker files list
107-
tracker_files = [file] if file else None
112+
console.print("\n[bold blue]A4D Patient Pipeline[/bold blue]\n")
108113

109-
# Single-file mode always cleans first — there's no reason to keep stale
110-
# outputs from previous runs when testing a specific file.
111-
clean_output = clean or (file is not None)
114+
if file:
115+
tracker_files = [file]
116+
data_root_display = f"{file} (single file)"
117+
elif data_root:
118+
tracker_files = discover_tracker_files(data_root)
119+
if not tracker_files:
120+
console.print(f"[bold red]Error: No tracker files found in {data_root}[/bold red]\n")
121+
raise typer.Exit(1)
122+
data_root_display = str(data_root)
123+
else:
124+
tracker_files = None # pipeline uses settings.data_root
125+
data_root_display = str(_settings.data_root)
126+
127+
_output_root = output_root or _settings.output_root
128+
_workers = workers if workers is not None else _settings.max_workers
129+
130+
console.print(f"Data root: {data_root_display}")
131+
console.print(f"Output root: {_output_root}")
132+
console.print(f"Workers: {_workers}")
133+
if skip_tables:
134+
console.print("Tables: skipped")
135+
if force:
136+
console.print("Force: yes")
137+
console.print()
112138

113139
# Step 1: Extract + clean (table creation handled below for visible progress)
114140
console.print("[bold]Step 1/3:[/bold] Extracting and cleaning tracker files...")
115141
try:
116142
result = run_patient_pipeline(
117143
tracker_files=tracker_files,
118-
max_workers=workers,
144+
max_workers=_workers,
119145
output_root=output_root,
120146
skip_tables=True, # tables created below with console feedback
121147
force=force,
122-
clean_output=clean_output,
148+
clean_output=True,
123149
show_progress=True,
124150
console_log_level="ERROR",
125151
)
@@ -130,9 +156,6 @@ def process_patient_cmd(
130156
# Step 2+3: Table and log creation with console feedback
131157
tables: dict[str, Path] = {}
132158
if not skip_tables and result.successful_trackers > 0:
133-
from a4d.config import settings as _settings
134-
135-
_output_root = output_root or _settings.output_root
136159
cleaned_dir = _output_root / "patient_data_cleaned"
137160
tables_dir = _output_root / "tables"
138161
logs_dir = _output_root / "logs"
@@ -483,14 +506,18 @@ def upload_output_cmd(
483506
@app.command("run-pipeline")
484507
def run_pipeline_cmd(
485508
workers: Annotated[
486-
int, typer.Option("--workers", "-w", help="Number of parallel workers (1 = sequential)")
487-
] = 4,
509+
int | None, typer.Option("--workers", "-w", help="Number of parallel workers (default: A4D_MAX_WORKERS)")
510+
] = None,
488511
force: Annotated[
489512
bool, typer.Option("--force", help="Force reprocessing (ignore existing outputs)")
490513
] = False,
514+
skip_download: Annotated[
515+
bool,
516+
typer.Option("--skip-download", help="Skip GCS download (use files already in data_root)"),
517+
] = False,
491518
skip_upload: Annotated[
492519
bool,
493-
typer.Option("--skip-upload", help="Skip GCS and BigQuery uploads (local testing)"),
520+
typer.Option("--skip-upload", help="Skip GCS and BigQuery upload steps"),
494521
] = False,
495522
):
496523
"""Run the full end-to-end A4D pipeline.
@@ -506,28 +533,33 @@ def run_pipeline_cmd(
506533
507534
\b
508535
Examples:
509-
# Full pipeline with 4 workers
536+
# Full pipeline (download + process + upload)
510537
uv run a4d run-pipeline
511538
512-
# Force reprocess all files
513-
uv run a4d run-pipeline --force
514-
515-
# Local testing without GCS/BigQuery uploads
539+
# Download latest files, process locally, skip upload
516540
uv run a4d run-pipeline --skip-upload
541+
542+
# Process local files only, no download or upload
543+
uv run a4d run-pipeline --skip-download --skip-upload
517544
"""
518545
from a4d.config import settings
519546
from a4d.gcp.bigquery import load_pipeline_tables
520547
from a4d.gcp.storage import download_tracker_files, upload_output
521548

549+
_workers = workers if workers is not None else settings.max_workers
550+
522551
console.print("\n[bold blue]A4D Full Pipeline[/bold blue]\n")
523552
console.print(f"Data root: {settings.data_root}")
524553
console.print(f"Output root: {settings.output_root}")
525-
console.print(f"Workers: {workers}")
554+
console.print(f"Workers: {_workers}")
526555
console.print(f"Project: {settings.project_id}")
527-
console.print(f"Dataset: {settings.dataset}\n")
556+
console.print(f"Dataset: {settings.dataset}")
557+
console.print(f"Download: {'yes' if not skip_download else 'skipped (--skip-download)'}")
558+
console.print(f"Upload: {'yes' if not skip_upload else 'skipped (--skip-upload)'}")
559+
console.print()
528560

529561
# Step 1 – Download tracker files from GCS
530-
if not skip_upload:
562+
if not skip_download:
531563
console.print("[bold]Step 1/5:[/bold] Downloading tracker files from GCS...")
532564
try:
533565
downloaded = download_tracker_files(destination=settings.data_root)
@@ -536,13 +568,13 @@ def run_pipeline_cmd(
536568
console.print(f"\n[bold red]Error during download: {e}[/bold red]\n")
537569
raise typer.Exit(1) from e
538570
else:
539-
console.print("[bold]Step 1/5:[/bold] Skipping GCS download (--skip-upload)\n")
571+
console.print("[bold]Step 1/5:[/bold] Skipping GCS download (--skip-download)\n")
540572

541573
# Step 2+3 – Extract, clean and build tables
542574
console.print("[bold]Steps 2–3/5:[/bold] Processing tracker files...\n")
543575
try:
544576
result = run_patient_pipeline(
545-
max_workers=workers,
577+
max_workers=_workers,
546578
force=force,
547579
show_progress=True,
548580
console_log_level="WARNING",

a4d-python/src/a4d/pipeline/patient.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ def run_patient_pipeline(
132132
133133
Pipeline steps:
134134
1. For each tracker (optionally parallel):
135-
- Extract patient data from Excel → raw parquet
136-
- Clean raw data → cleaned parquet
135+
- Extract patient data from Excel → raw parquet
136+
- Clean raw data → cleaned parquet
137137
2. Create final tables from all cleaned parquets (if not skipped)
138138
139139
Args:
@@ -142,6 +142,7 @@ def run_patient_pipeline(
142142
output_root: Output directory (None = use settings.output_root)
143143
skip_tables: If True, only extract + clean, skip table creation
144144
force: If True, reprocess even if outputs exist
145+
clean_output: If True, wipe patient_data_raw/, patient_data_cleaned/, tables/ before run
145146
progress_callback: Optional callback(tracker_name, success) called after each tracker
146147
show_progress: If True, show tqdm progress bar
147148
console_log_level: Console log level (None=INFO, ERROR=quiet, etc)
@@ -175,10 +176,9 @@ def run_patient_pipeline(
175176
if output_root is None:
176177
output_root = settings.output_root
177178

178-
# Wipe previous run's intermediate outputs so tables only reflect this run.
179-
# Does not delete logs (useful for debugging) or the tables dir itself.
179+
# Wipe previous run's outputs so tables reflect only this run.
180180
if clean_output:
181-
for subdir in ("patient_data_raw", "patient_data_cleaned", "tables"):
181+
for subdir in ("patient_data_raw", "patient_data_cleaned", "tables", "logs"):
182182
target = output_root / subdir
183183
if target.exists():
184184
shutil.rmtree(target)
@@ -215,11 +215,7 @@ def run_patient_pipeline(
215215
logger.info("Processing trackers sequentially")
216216

217217
# Use tqdm if requested
218-
iterator = (
219-
tqdm(tracker_files, desc="Processing trackers", unit="file")
220-
if show_progress
221-
else tracker_files
222-
)
218+
iterator = tqdm(tracker_files, desc="Processing trackers", unit="file") if show_progress else tracker_files
223219

224220
for tracker_file in iterator:
225221
if show_progress:
@@ -265,9 +261,7 @@ def run_patient_pipeline(
265261
# Collect results as they complete
266262
futures_iterator = as_completed(futures)
267263
if show_progress:
268-
futures_iterator = tqdm(
269-
futures_iterator, total=len(futures), desc="Processing trackers", unit="file"
270-
)
264+
futures_iterator = tqdm(futures_iterator, total=len(futures), desc="Processing trackers", unit="file")
271265

272266
for future in futures_iterator:
273267
tracker_file = futures[future]

a4d-python/tests/test_cli/test_cli.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def test_upload_tables_help(self):
4343
def test_run_pipeline_help(self):
4444
result = runner.invoke(app, ["run-pipeline", "--help"])
4545
assert result.exit_code == 0
46+
assert "--skip-download" in result.output
4647
assert "--skip-upload" in result.output
4748

4849

@@ -92,6 +93,7 @@ def test_skip_upload_calls_pipeline(self, mock_settings, mock_run_pipeline, tmp_
9293
mock_settings.output_root = tmp_path / "output"
9394
mock_settings.project_id = "test-project"
9495
mock_settings.dataset = "test-dataset"
96+
mock_settings.max_workers = 4
9597

9698
(tmp_path / "data").mkdir()
9799
(tmp_path / "output").mkdir()
@@ -105,7 +107,7 @@ def test_skip_upload_calls_pipeline(self, mock_settings, mock_run_pipeline, tmp_
105107
mock_result.tables = {}
106108
mock_run_pipeline.return_value = mock_result
107109

108-
result = runner.invoke(app, ["run-pipeline", "--skip-upload"])
110+
result = runner.invoke(app, ["run-pipeline", "--skip-download", "--skip-upload"])
109111

110112
mock_run_pipeline.assert_called_once()
111113
assert result.exit_code == 0
@@ -117,6 +119,7 @@ def test_pipeline_failure_exits_nonzero(self, mock_settings, mock_run_pipeline,
117119
mock_settings.output_root = tmp_path / "output"
118120
mock_settings.project_id = "test-project"
119121
mock_settings.dataset = "test-dataset"
122+
mock_settings.max_workers = 4
120123

121124
(tmp_path / "data").mkdir()
122125
(tmp_path / "output").mkdir()
@@ -132,7 +135,7 @@ def test_pipeline_failure_exits_nonzero(self, mock_settings, mock_run_pipeline,
132135
mock_result.tables = {}
133136
mock_run_pipeline.return_value = mock_result
134137

135-
result = runner.invoke(app, ["run-pipeline", "--skip-upload"])
138+
result = runner.invoke(app, ["run-pipeline", "--skip-download", "--skip-upload"])
136139

137140
assert result.exit_code == 1
138141

0 commit comments

Comments
 (0)