Skip to content

Commit d688be3

Browse files
authored
Merge pull request #202 from datakind/feat/schema-validation-during-upload
feat: validate PDP uploads with repo schemas, write normalized output to validated/
2 parents fbfc9df + dd21d3c commit d688be3

12 files changed

Lines changed: 2247 additions & 227 deletions

src/webapp/gcsutil.py

Lines changed: 114 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
"""Cloud storage related helper functions."""
22

33
import datetime
4+
import io
5+
import logging
6+
from typing import Any, Dict, List, Optional
7+
8+
import pandas as pd
49
from pydantic import BaseModel
510
from google.cloud import storage
611
import google.auth
712
from google.auth.transport import requests
813

914
from .config import gcs_vars, databricks_vars
10-
from .validation import validate_file_reader
11-
from typing import Any, List, Optional, Dict
12-
import logging
15+
from .validation import validate_file_reader, HardValidationError
1316

1417
# Set the logging
1518
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
@@ -20,10 +23,10 @@
2023

2124

2225
def rename_file(
23-
bucket_name,
24-
file_name,
25-
new_file_name,
26-
):
26+
bucket_name: str,
27+
file_name: str,
28+
new_file_name: str,
29+
) -> None:
2730
"""Moves a blob from one bucket to another with a new name."""
2831
storage_client = storage.Client()
2932
source_bucket = storage_client.bucket(bucket_name)
@@ -324,9 +327,14 @@ def validate_file(
324327
base_schema: dict,
325328
inst_schema: Optional[Dict[Any, Any]] = None,
326329
institution_id: str = "pdp",
330+
institution_identifier: Optional[str] = None,
327331
) -> List[str]:
328332
"""Validate that a file conforms to one of the allowed schemas.
329333
334+
On success: archives the original to raw/{file_name}, writes the normalized
335+
(canonical columns, coerced dtypes) DataFrame to validated/{file_name}, and
336+
deletes from unvalidated/. Downstream uses validated/ only; raw/ is kept for record.
337+
330338
Args:
331339
bucket_name: GCS bucket name.
332340
file_name: Blob name under unvalidated/.
@@ -335,39 +343,124 @@ def validate_file(
335343
inst_schema: Optional extension schema with institutions.* blocks.
336344
institution_id: Key into inst_schema["institutions"]: "edvise", "pdp", or
337345
institution UUID for custom. Default "pdp" for backward compatibility.
346+
institution_identifier: Optional institution ID (e.g. UUID). Reserved for
347+
future use; Edvise uses JSON-based validation only (different shape).
338348
339349
Returns:
340350
List of inferred schema names (e.g. ["STUDENT"]).
351+
352+
Raises:
353+
ValueError: If file not in unvalidated/, validated/ already exists, or
354+
normalized_df was not returned.
355+
HardValidationError: If validation fails (propagated from validator).
341356
"""
357+
if not file_name or not file_name.strip():
358+
raise ValueError("file_name is required and must be non-empty.")
359+
if "/" in file_name:
360+
raise ValueError("file_name must not contain '/'.")
361+
if not allowed_schemas:
362+
raise ValueError("allowed_schemas must not be empty.")
363+
342364
client = storage.Client()
343365
bucket = client.bucket(bucket_name)
344366
blob = bucket.blob(f"unvalidated/{file_name}")
345-
new_blob_name = f"validated/{file_name}"
346-
schems: List[str] = []
367+
if not blob.exists():
368+
raise ValueError(
369+
f"File not found: unvalidated/{file_name}. "
370+
"Upload the file to unvalidated/ before validating."
371+
)
372+
373+
inferred_schema_names, normalized_df = (
374+
self._run_validation_and_get_normalized_df(
375+
blob,
376+
file_name,
377+
allowed_schemas,
378+
base_schema,
379+
inst_schema,
380+
institution_id,
381+
institution_identifier,
382+
)
383+
)
384+
if normalized_df is None:
385+
raise ValueError(
386+
"Validation succeeded but normalized_df was not returned; "
387+
"cannot write validated output (e.g. empty schema list)."
388+
)
389+
390+
validated_blob_name = f"validated/{file_name}"
391+
validated_blob = bucket.blob(validated_blob_name)
392+
if validated_blob.exists():
393+
raise ValueError(validated_blob_name + ": File already exists.")
394+
395+
self._archive_raw_and_write_validated(bucket, blob, file_name, normalized_df)
396+
return inferred_schema_names
397+
398+
def _archive_raw_and_write_validated(
399+
self,
400+
bucket: Any,
401+
blob: Any,
402+
file_name: str,
403+
normalized_df: pd.DataFrame,
404+
) -> None:
405+
"""Copy blob to raw/, write normalized DataFrame to validated/, delete from unvalidated/."""
406+
raw_blob_name = f"raw/{file_name}"
407+
validated_blob_name = f"validated/{file_name}"
408+
bucket.copy_blob(blob, bucket, raw_blob_name)
409+
logging.debug("Archived original to %s", raw_blob_name)
410+
self._write_dataframe_to_gcs_as_csv(bucket, validated_blob_name, normalized_df)
411+
logging.debug("Wrote normalized data to %s", validated_blob_name)
412+
blob.delete()
413+
logging.debug("Validation complete: validated=normalized, raw=archived")
414+
415+
def _run_validation_and_get_normalized_df(
416+
self,
417+
blob: Any,
418+
file_name: str,
419+
allowed_schemas: list[str],
420+
base_schema: dict,
421+
inst_schema: Optional[Dict[Any, Any]],
422+
institution_id: str,
423+
institution_identifier: Optional[str],
424+
) -> tuple[List[str], Any]:
425+
"""Run validation on blob content; return inferred schema names and normalized DataFrame."""
347426
try:
348427
with blob.open("r") as file:
349-
schemas = validate_file_reader(
428+
result = validate_file_reader(
350429
file,
351430
allowed_schemas,
352431
base_schema,
353432
inst_schema,
354433
institution_id=institution_id,
434+
institution_identifier=institution_identifier,
355435
)
356-
schems = [str(s) for s in schemas.get("schemas", [])]
357-
logging.debug(
358-
f"If you see this file validation was successful {schems}"
359-
)
436+
inferred_schema_names = [str(s) for s in result.get("schemas", [])]
437+
logging.debug(
438+
"Validation successful for %s: %s", file_name, inferred_schema_names
439+
)
440+
return inferred_schema_names, result.get("normalized_df")
441+
except HardValidationError:
442+
raise
443+
except (ValueError, UnicodeError) as e:
444+
logging.exception("Validation failed for %s: %s", file_name, e)
445+
raise
360446
except Exception as e:
447+
# Log any other error with context before re-raising (no silent failures).
361448
logging.exception("Validation failed for %s: %s", file_name, e)
362449
raise
363450

364-
new_blob = bucket.blob(new_blob_name)
365-
if new_blob.exists():
366-
raise ValueError(new_blob_name + ": File already exists.")
367-
bucket.copy_blob(blob, bucket, new_blob_name)
368-
blob.delete()
369-
logging.debug("If you see this file validation was complete")
370-
return schems
451+
def _write_dataframe_to_gcs_as_csv(
452+
self, bucket: Any, blob_name: str, normalized_df: pd.DataFrame
453+
) -> None:
454+
"""Write a DataFrame to GCS as UTF-8 CSV. Used for validated/ output."""
455+
csv_buffer = io.StringIO()
456+
normalized_df.to_csv(
457+
csv_buffer, index=False, encoding="utf-8", lineterminator="\n"
458+
)
459+
blob = bucket.blob(blob_name)
460+
blob.upload_from_string(
461+
csv_buffer.getvalue().encode("utf-8"),
462+
content_type="text/csv; charset=utf-8",
463+
)
371464

372465
def get_file_contents(self, bucket_name: str, file_name: str) -> Any:
373466
"""Returns a file as a bytes object."""

0 commit comments

Comments
 (0)