Skip to content

Commit 38efab6

Browse files
author
Michael Aydinbas
committed
Fix GCS upload: timestamped prefix, upload only tables and logs
Each pipeline run now writes to an isolated YYYY/MM/DD/HHMMSS/ prefix so runs never overwrite each other. Only tables/ and logs/ are uploaded; large intermediate parquets (raw, cleaned) are excluded. Expose the full GCS exception in logs instead of just the file path.
1 parent c306479 commit 38efab6

2 files changed

Lines changed: 24 additions & 4 deletions

File tree

a4d-python/src/a4d/cli.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Command-line interface for A4D pipeline."""
22

33
import warnings
4+
from datetime import datetime
45
from pathlib import Path
56
from typing import Annotated
67

@@ -565,8 +566,10 @@ def run_pipeline_cmd(
565566
from a4d.config import settings
566567
from a4d.gcp.bigquery import load_pipeline_tables
567568
from a4d.gcp.storage import download_tracker_files, upload_output
569+
from a4d.tables.clinic import create_table_clinic_static
568570

569571
_workers = workers if workers is not None else settings.max_workers
572+
run_ts = datetime.now().strftime("%Y/%m/%d/%H%M%S")
570573

571574
console.print("\n[bold blue]A4D Full Pipeline[/bold blue]\n")
572575
console.print(f"Data root: {settings.data_root}")
@@ -621,13 +624,30 @@ def run_pipeline_cmd(
621624
raise typer.Exit(1) from e
622625

623626
tables_dir = settings.output_root / "tables"
627+
logs_dir = settings.output_root / "logs"
624628

625-
# Step 4 – Upload output to GCS
629+
# Clinic static table — independent of tracker processing, always created
630+
console.print("[bold]Step 3b/5:[/bold] Creating clinic static table...")
631+
try:
632+
create_table_clinic_static(tables_dir)
633+
console.print(" ✓ Clinic static table created\n")
634+
except Exception as e:
635+
console.print(f" [bold red]Error creating clinic static table: {e}[/bold red]\n")
636+
raise typer.Exit(1) from e
637+
638+
# Step 4 – Upload tables/ and logs/ to GCS under a timestamped prefix
639+
# Each run gets an isolated path: YYYY/MM/DD/HHMMSS/tables/ and .../logs/
640+
# This avoids overwriting previous runs and keeps objectCreator permission sufficient.
626641
if not skip_upload:
627642
console.print("[bold]Step 4/5:[/bold] Uploading output files to GCS...")
643+
console.print(f" Prefix: {run_ts}/\n")
628644
try:
629-
uploaded = upload_output(source_dir=settings.output_root)
630-
console.print(f" ✓ Uploaded {len(uploaded)} files\n")
645+
uploaded: list[str] = []
646+
if tables_dir.exists():
647+
uploaded += upload_output(source_dir=tables_dir, prefix=f"{run_ts}/tables")
648+
if logs_dir.exists():
649+
uploaded += upload_output(source_dir=logs_dir, prefix=f"{run_ts}/logs")
650+
console.print(f" ✓ Uploaded {len(uploaded)} files to gs://{settings.upload_bucket}/{run_ts}/\n")
631651
except Exception as e:
632652
console.print(f"\n[bold red]Error during GCS upload: {e}[/bold red]\n")
633653
raise typer.Exit(1) from e

a4d-python/src/a4d/gcp/storage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def _blob_name(file_path: Path) -> str:
157157
uploaded.append(future.result())
158158
except Exception:
159159
file_path = futures[future]
160-
logger.error(f"Failed to upload: {file_path}")
160+
logger.exception(f"Failed to upload: {file_path}")
161161

162162
logger.info(f"Uploaded {len(uploaded)} files to gs://{bucket_name}")
163163
return uploaded

0 commit comments

Comments
 (0)