Skip to content

Commit 3a50f17

Browse files
authored
Merge pull request #91 from datakind/develop
Ingestion Validation
2 parents 1ec9df4 + d85c4ec commit 3a50f17

14 files changed

Lines changed: 4602 additions & 1840 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ dependencies = [
2727
"pandas",
2828
"six",
2929
"types-six",
30-
"fuzzywuzzy"
30+
"fuzzywuzzy",
31+
"databricks-sql-connector",
32+
"pandera~=0.13"
3133
]
3234

3335
[project.urls]

src/webapp/gcsutil.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@
88

99
from .config import gcs_vars, databricks_vars
1010
from .validation import validate_file_reader
11-
from .utilities import (
12-
SchemaType,
13-
)
11+
from typing import Any, List
12+
import logging
13+
14+
# Set the logging
15+
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
16+
logger = logging.getLogger(__name__)
17+
logger.setLevel(logging.DEBUG)
1418

1519
SIGNED_URL_EXPIRY_MIN = 30
1620

@@ -34,7 +38,7 @@ def rename_file(
3438
# There is also an `if_source_generation_match` parameter, which is not used in this example.
3539
destination_generation_match_precondition = 0
3640

37-
blob_copy = source_bucket.copy_blob(
41+
source_bucket.copy_blob(
3842
source_blob,
3943
new_file_name,
4044
if_generation_match=destination_generation_match_precondition,
@@ -55,7 +59,7 @@ def credentials(self):
5559
self._credentials, self._project_id = google.auth.default()
5660
return self._credentials
5761

58-
def generate_upload_signed_url(self, bucket_name: str, file_name: str) -> str:
62+
def generate_upload_signed_url(self, bucket_name: str, file_name: str) -> Any:
5963
"""Generates a v4 signed URL for uploading a blob using HTTP PUT."""
6064
r = requests.Request()
6165
self.credentials().refresh(r)
@@ -88,7 +92,7 @@ def generate_upload_signed_url(self, bucket_name: str, file_name: str) -> str:
8892

8993
return url
9094

91-
def generate_download_signed_url(self, bucket_name: str, blob_name: str) -> str:
95+
def generate_download_signed_url(self, bucket_name: str, blob_name: str) -> Any:
9296
"""Generates a v4 signed URL for downloading a blob using HTTP GET."""
9397
r = requests.Request()
9498
self.credentials().refresh(r)
@@ -172,7 +176,7 @@ def create_bucket(self, bucket_name: str) -> None:
172176
new_bucket.set_iam_policy(policy)
173177

174178
def list_blobs_in_folder(
175-
self, bucket_name: str, prefix: str, delimiter=None
179+
self, bucket_name: str, prefix: str, delimiter: Any = None
176180
) -> list[str]:
177181
"""Lists all the blobs in the bucket that begin with the prefix.
178182
@@ -218,7 +222,7 @@ def list_blobs_in_folder(
218222

219223
def download_file(
220224
self, bucket_name: str, file_name: str, destination_file_name: str
221-
):
225+
) -> Any:
222226
"""Downloads a blob from the bucket."""
223227

224228
# The path to which the file should be downloaded
@@ -264,17 +268,21 @@ def delete_file(self, bucket_name: str, file_name: str):
264268
blob.delete()
265269

266270
def validate_file(
267-
self, bucket_name: str, file_name: str, allowed_schemas: set[SchemaType]
268-
) -> set[SchemaType]:
271+
self, bucket_name: str, file_name: str, allowed_schemas: list[str]
272+
) -> List[str]:
269273
"""Validate that a file is one of the allowed schemas."""
270274
client = storage.Client()
271275
bucket = client.bucket(bucket_name)
272276
blob = bucket.blob(f"unvalidated/{file_name}")
273277
new_blob_name = f"validated/{file_name}"
274-
schems = set()
278+
schems: List[str] = []
275279
try:
276280
with blob.open("r") as file:
277-
schems = validate_file_reader(file, allowed_schemas)
281+
schemas = validate_file_reader(file, allowed_schemas)
282+
schems = [str(s) for s in schemas.get("schemas", [])]
283+
logging.debug(
284+
f"If you see this file validation was successful {schems}"
285+
)
278286
except Exception as e:
279287
blob.delete()
280288
raise e
@@ -283,6 +291,7 @@ def validate_file(
283291
raise ValueError(new_blob_name + ": File already exists.")
284292
bucket.copy_blob(blob, bucket, new_blob_name)
285293
blob.delete()
294+
logging.debug("If you see this file validation was complete")
286295
return schems
287296

288297
def get_file_contents(self, bucket_name: str, file_name: str):

src/webapp/routers/data.py

Lines changed: 93 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
import uuid
44
from datetime import datetime, date
55

6-
from typing import Annotated, Any, Dict
6+
from typing import Annotated, Any, Dict, List
77
from pydantic import BaseModel
88
from fastapi import APIRouter, Depends, HTTPException, status, Response
99
from sqlalchemy import and_, or_
1010
from sqlalchemy.orm import Session
1111
from sqlalchemy.future import select
12+
import os
13+
import logging
14+
from sqlalchemy.exc import IntegrityError
1215

1316
from ..utilities import (
1417
has_access_to_inst_or_err,
@@ -20,7 +23,6 @@
2023
get_current_active_user,
2124
DataSource,
2225
get_external_bucket_name,
23-
SchemaType,
2426
decode_url_piece,
2527
)
2628

@@ -29,13 +31,17 @@
2931
local_session,
3032
BatchTable,
3133
FileTable,
32-
InstTable,
3334
)
3435

3536
from ..gcsdbutils import update_db_from_bucket
3637

3738
from ..gcsutil import StorageControl
3839

40+
# Set the logging
41+
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
42+
logger = logging.getLogger(__name__)
43+
logger.setLevel(logging.DEBUG)
44+
3945
router = APIRouter(
4046
prefix="/institutions",
4147
tags=["data"],
@@ -91,7 +97,7 @@ class DataInfo(BaseModel):
9197
name: str
9298
data_id: str
9399
# The batch(es) that this data is present in.
94-
batch_ids: set[str] = {}
100+
batch_ids: set[str] = set()
95101
inst_id: str
96102
# Size to the nearest MB.
97103
# size_mb: int
@@ -123,7 +129,7 @@ class ValidationResult(BaseModel):
123129
# Must be unique within an institution to avoid confusion.
124130
name: str
125131
inst_id: str
126-
file_types: set[SchemaType]
132+
file_types: List[str]
127133
source: str
128134

129135

@@ -838,6 +844,33 @@ def download_url_inst_file(
838844
)
839845

840846

847+
def infer_models_from_filename(file_path: str, institution_id: str) -> List[str]:
848+
name = os.path.basename(file_path).lower()
849+
850+
inferred = set()
851+
if "course" in name:
852+
inferred.add("COURSE")
853+
if "student" in name:
854+
inferred.add("STUDENT")
855+
if institution_id == "pdp":
856+
inferred.add("SEMESTER")
857+
if "semester" in name:
858+
inferred.add("SEMESTER")
859+
if "cohort" in name:
860+
inferred.add("STUDENT")
861+
inferred.add("SEMESTER")
862+
863+
if not inferred:
864+
logging.error(
865+
ValueError(
866+
f"Could not infer model(s) from file name: {name}, filenames sould be descriptive of the kind of data it contains e.g. course, cohort"
867+
)
868+
)
869+
inferred.add("UNKNOWN")
870+
871+
return sorted(inferred)
872+
873+
841874
def validation_helper(
842875
source_str: str,
843876
inst_id: str,
@@ -854,51 +887,76 @@ def validation_helper(
854887
detail="File name can't contain '/'.",
855888
)
856889
local_session.set(sql_session)
857-
inst_query_result = (
858-
local_session.get()
859-
.execute(select(InstTable).where(InstTable.id == str_to_uuid(inst_id)))
860-
.all()
861-
)
862-
if len(inst_query_result) == 0:
863-
raise HTTPException(
864-
status_code=status.HTTP_404_NOT_FOUND,
865-
detail="Institution not found.",
866-
)
867-
if len(inst_query_result) > 1:
868-
raise HTTPException(
869-
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
870-
detail="Institution duplicates found.",
871-
)
872-
allowed_schemas = set()
873-
if inst_query_result[0][0].schemas:
874-
allowed_schemas = set(inst_query_result[0][0].schemas)
875890

876-
inferred_schemas = set()
891+
allowed_schemas = None
892+
if not allowed_schemas:
893+
allowed_schemas = infer_models_from_filename(file_name, "pdp")
894+
895+
inferred_schemas: list[str] = []
896+
877897
try:
878898
inferred_schemas = storage_control.validate_file(
879-
get_external_bucket_name(inst_id), file_name, allowed_schemas
899+
get_external_bucket_name(inst_id),
900+
file_name,
901+
allowed_schemas,
902+
)
903+
logging.debug(
904+
f"!!!!!!!!!!Inferred Schemas was successful {list(inferred_schemas)}"
880905
)
881906
except Exception as e:
907+
logging.debug(f"!!!!!!!!!!Inferred Schemas FAILED {e}")
882908
raise HTTPException(
883909
status_code=status.HTTP_400_BAD_REQUEST,
884910
detail="File type is not valid and/or not accepted by this institution: "
885911
+ str(e),
886912
) from e
887-
new_file_record = FileTable(
888-
name=file_name,
889-
inst_id=str_to_uuid(inst_id),
890-
uploader=str_to_uuid(current_user.user_id),
891-
source=source_str,
892-
sst_generated=False,
893-
schemas=list(inferred_schemas),
894-
valid=True,
913+
914+
existing_file = (
915+
local_session.get()
916+
.query(FileTable)
917+
.filter_by(
918+
name=file_name,
919+
inst_id=str_to_uuid(inst_id),
920+
)
921+
.first()
895922
)
896-
local_session.get().add(new_file_record)
923+
924+
if existing_file:
925+
logging.info(f"File '{file_name}' already exists for institution {inst_id}.")
926+
db_status = f"File '{file_name}' already exists for institution {inst_id}."
927+
else:
928+
try:
929+
new_file_record = FileTable(
930+
name=file_name,
931+
inst_id=str_to_uuid(inst_id),
932+
uploader=str_to_uuid(current_user.user_id),
933+
source=source_str,
934+
sst_generated=False,
935+
schemas=list(inferred_schemas),
936+
valid=True,
937+
)
938+
local_session.get().add(new_file_record)
939+
local_session.get().flush()
940+
logging.info(f"File record inserted for '{file_name}'")
941+
db_status = f"File record inserted for '{file_name}'"
942+
except IntegrityError as e:
943+
local_session.get().rollback()
944+
logging.warning(f"IntegrityError: {e}")
945+
db_status = "Already exists"
946+
except Exception as e:
947+
local_session.get().rollback()
948+
logging.error(f"Unexpected DB error: {e}")
949+
raise HTTPException(
950+
status_code=500,
951+
detail=f"Unexpected database error while inserting file record: {e}",
952+
)
953+
897954
return {
898955
"name": file_name,
899956
"inst_id": inst_id,
900-
"file_types": inferred_schemas,
957+
"file_types": list(inferred_schemas),
901958
"source": source_str,
959+
"status": db_status,
902960
}
903961

904962

src/webapp/routers/data_test.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ def test_update_batch(client: TestClient):
557557

558558
def test_validate_success_batch(client: TestClient):
559559
"""Test PATCH /institutions/<uuid>/batch."""
560-
MOCK_STORAGE.validate_file.return_value = {SchemaType.UNKNOWN}
560+
MOCK_STORAGE.validate_file.return_value = ["UNKNOWN"]
561561

562562
# Use validate for manual upload
563563
response_upload = client.post(
@@ -608,28 +608,28 @@ def test_validate_success_batch(client: TestClient):
608608

609609
def test_validate_failure_batch(client: TestClient):
610610
"""Test PATCH /institutions/<uuid>/batch."""
611-
MOCK_STORAGE.validate_file.return_value = {SchemaType.PDP_COHORT}
611+
MOCK_STORAGE.validate_file.return_value = ["COURSE"]
612612
# Authorized.
613613
# Use validate upload
614614
response_upload = client.post(
615615
"/institutions/"
616616
+ uuid_to_str(USER_VALID_INST_UUID)
617-
+ "/input/validate-upload/file_name.csv",
617+
+ "/input/validate-upload/file_name_course.csv",
618618
)
619619
assert response_upload.status_code == 200
620-
assert response_upload.json()["name"] == "file_name.csv"
621-
assert response_upload.json()["file_types"] == ["PDP_COHORT"]
620+
assert response_upload.json()["name"] == "file_name_course.csv"
621+
assert response_upload.json()["file_types"] == ["COURSE"]
622622
assert response_upload.json()["inst_id"] == uuid_to_str(USER_VALID_INST_UUID)
623623
assert response_upload.json()["source"] == "MANUAL_UPLOAD"
624624

625625
# Use valiate sftp
626626
response_sftp = client.post(
627627
"/institutions/"
628628
+ uuid_to_str(USER_VALID_INST_UUID)
629-
+ "/input/validate-upload/file_name.csv",
629+
+ "/input/validate-upload/file_name_course.csv",
630630
)
631631
assert response_sftp.status_code == 200
632-
assert response_sftp.json()["name"] == "file_name.csv"
633-
assert response_sftp.json()["file_types"] == ["PDP_COHORT"]
632+
assert response_sftp.json()["name"] == "file_name_course.csv"
633+
assert response_sftp.json()["file_types"] == ["COURSE"]
634634
assert response_sftp.json()["inst_id"] == uuid_to_str(USER_VALID_INST_UUID)
635635
assert response_sftp.json()["source"] == "MANUAL_UPLOAD"

src/webapp/test_files/financial_sst_pdp.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Student ID,Institution ID,Academic Year,Dependency Status,Housing Status,Cost of Attendance,EFC,Total Institutional Grants,Total State Grants,Total Federal Grants,Unmet Need,Net Price,Applied Aid
1+
Student ID,Institution ID,Academic Year,Dependency Status,Housing Status,Cost of Attendance,EFC,Total Institutional Grants,Total State Grants,Pell Status First Year,Unmet Need,Net Price,Applied Aid
22
999999,99999999,2019-20,Unknown,Off-campus,3505,0,0,0,774,2731,2731,N
33
999998,99999999,2019-20,Independent,Off-campus,4210,0,0,0,3097,1113,1113,Y
44
999997,99999999,2019-20,Dependent,On-campus housing,19938,1768,0,2566,4445,11159,12927,Y

0 commit comments

Comments
 (0)