Skip to content

Commit 55c2009

Browse files
Merge pull request #228 from datakind/feat/remove-custom-institutions
feat: remove custom institutions, enforce PDP / Edvise / Legacy in API and uploads
2 parents 7dbc362 + 485ac01 commit 55c2009

11 files changed

Lines changed: 611 additions & 550 deletions

src/webapp/databricks.py

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,16 @@
1313
)
1414
from google.cloud import storage
1515
from google.api_core import exceptions as gcs_errors
16-
from .validation_extension import generate_extension_schema
1716
from .config import databricks_vars, gcs_vars
1817
from .utilities import databricksify_inst_name, SchemaType
1918
from typing import List, Any, Dict, Optional
20-
from fastapi import HTTPException
2119
import requests
2220
import hashlib
2321
import json
2422
import gzip
2523
from cachetools import TTLCache
2624
import threading
2725
import re
28-
import pandas as pd
2926

3027
# Setting up logger
3128
LOGGER = logging.getLogger(__name__)
@@ -623,78 +620,3 @@ def matches_one(pat: Any) -> bool:
623620
return key
624621

625622
return None
626-
627-
def create_custom_schema_extension(
628-
self,
629-
bucket_name: str,
630-
inst_query: Any,
631-
file_name: str,
632-
base_schema: Dict[str, Any], # pass base schema dict in
633-
extension_schema: Optional[dict] = None, # existing extension or None
634-
) -> Any:
635-
if (
636-
os.getenv("SST_SKIP_EXT_GEN") == "1"
637-
): # skip using workspace client for tests
638-
LOGGER.info("SST_SKIP_EXT_GEN=1; skipping Databricks extension generation.")
639-
return None
640-
641-
inst_name = inst_query.name
642-
inst_id = str(inst_query.id)
643-
644-
mapping = {
645-
"course": [
646-
"course.csv",
647-
"courses.csv",
648-
r"^(?=.*AR_DEIDENTIFIED)(?=.*COURSE).*\.csv$",
649-
],
650-
"student": ["student.csv", r"^(?=.*AR_DEIDENTIFIED)(?!.*COURSE).*\.csv$"],
651-
"semester": ["semester.csv"],
652-
}
653-
654-
key = self.get_key_for_file(mapping, file_name) # e.g., "student"
655-
if key is None:
656-
raise HTTPException(
657-
404, detail=f"{file_name} not found in {inst_name} validation_mapping"
658-
)
659-
660-
key_lc = key.lower()
661-
662-
# 4) If this model already exists in the provided extension for this institution, skip
663-
if extension_schema is not None:
664-
if not isinstance(extension_schema, dict):
665-
raise HTTPException(
666-
400, detail="extension_schema must be a dict if provided"
667-
)
668-
669-
inst_block = extension_schema.get("institutions", {}).get(inst_id, {})
670-
data_models = inst_block.get("data_models", {})
671-
existing_keys_lc = {str(k).lower() for k in data_models.keys()}
672-
673-
if key_lc in existing_keys_lc:
674-
LOGGER.info(
675-
"Model '%s' already present for institution '%s' — skipping (return None).",
676-
key,
677-
inst_id,
678-
)
679-
return None # <-- sentinel: do not write
680-
681-
# 5) Read the unvalidated CSV from GCS
682-
try:
683-
client = storage.Client()
684-
bucket = client.bucket(bucket_name)
685-
blob = bucket.blob(f"unvalidated/{file_name}")
686-
with blob.open("r") as fh:
687-
df = pd.read_csv(fh)
688-
except Exception as e:
689-
LOGGER.exception("Failed to read %s from GCS", file_name)
690-
raise HTTPException(500, detail=f"Failed to read {file_name} from GCS: {e}")
691-
692-
updated_extension = generate_extension_schema(
693-
df=df,
694-
models=key, # exactly one model
695-
institution_id=inst_id,
696-
base_schema=base_schema, # reference only, not mutated
697-
existing_extension=extension_schema, # may be None
698-
)
699-
700-
return updated_extension

src/webapp/gcsutil.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ def validate_file(
378378
base_schema: Base schema dict.
379379
inst_schema: Optional extension schema with institutions.* blocks.
380380
institution_id: Key into inst_schema["institutions"]: "edvise", "pdp",
381-
"legacy" (any-format uploads), or institution UUID for custom. Default "pdp".
381+
or "legacy" (any-format uploads). Default "pdp".
382382
institution_identifier: Optional institution ID (e.g. UUID). Reserved for
383383
future use; Edvise uses JSON-based validation only (different shape).
384384

src/webapp/routers/data.py

Lines changed: 7 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,31 +1340,6 @@ def _get_validation_base_schema(sess: Session) -> Tuple[Any, Any, float]:
13401340
return (base_schema_id, base_schema, now)
13411341

13421342

1343-
def _ext_models_set(doc: Optional[dict], inst: Any, inst_id: str) -> set[str]:
1344-
"""Extract model keys from extension document (root or institutions.* layout).
1345-
1346-
Args:
1347-
doc: Extension schema JSON doc (or None).
1348-
inst: Institution row (for id in institutions lookup).
1349-
inst_id: Institution id string (for institutions lookup).
1350-
1351-
Returns:
1352-
Set of lowercase model names (e.g. {"student", "course"}).
1353-
"""
1354-
if not doc or not isinstance(doc, dict):
1355-
return set()
1356-
if isinstance(doc.get("data_models"), dict):
1357-
return {str(k).lower() for k in doc["data_models"].keys()}
1358-
inst_key_candidates = {str(getattr(inst, "id", "")), inst_id}
1359-
insts = doc.get("institutions", {})
1360-
if isinstance(insts, dict):
1361-
for key in inst_key_candidates:
1362-
block = insts.get(key)
1363-
if isinstance(block, dict) and isinstance(block.get("data_models"), dict):
1364-
return {str(k).lower() for k in block["data_models"].keys()}
1365-
return set()
1366-
1367-
13681343
def _resolve_edvise_schema(
13691344
sess: Session, now: float
13701345
) -> Tuple[str, Optional[Dict[str, Any]]]:
@@ -1422,109 +1397,6 @@ def _resolve_pdp_schema(
14221397
return (schema_namespace, inst_schema)
14231398

14241399

1425-
def _persist_custom_schema_extension(
1426-
sess: Session,
1427-
inst_id: str,
1428-
schema_extension: Dict[str, Any],
1429-
base_schema_id: Any,
1430-
cache_key: str,
1431-
) -> None:
1432-
"""Deactivate existing extension records and insert new one; update cache."""
1433-
import time
1434-
1435-
existing_extensions = (
1436-
sess.execute(
1437-
select(SchemaRegistryTable).where(
1438-
SchemaRegistryTable.inst_id == str_to_uuid(inst_id),
1439-
SchemaRegistryTable.doc_type == DocType.extension,
1440-
SchemaRegistryTable.is_active.is_(True),
1441-
)
1442-
)
1443-
.scalars()
1444-
.all()
1445-
)
1446-
for existing in existing_extensions:
1447-
existing.is_active = False
1448-
new_record = SchemaRegistryTable(
1449-
doc_type=DocType.extension,
1450-
inst_id=str_to_uuid(inst_id),
1451-
is_pdp=False, # type: ignore
1452-
version_label="1.0.0",
1453-
extends_schema_id=base_schema_id,
1454-
json_doc=schema_extension,
1455-
is_active=True,
1456-
)
1457-
sess.add(new_record)
1458-
sess.flush()
1459-
logging.info(
1460-
"Schema record inserted for '%s' (deactivated %d existing)",
1461-
inst_id,
1462-
len(existing_extensions),
1463-
)
1464-
STATE._ext_cache[cache_key] = (time.monotonic() + EXT_TTL, schema_extension)
1465-
1466-
1467-
def _resolve_custom_schema(
1468-
sess: Session,
1469-
inst: Any,
1470-
inst_id: str,
1471-
now: float,
1472-
allowed_schemas: List[str],
1473-
bucket: str,
1474-
base_schema: dict,
1475-
base_schema_id: Any,
1476-
file_name: str,
1477-
) -> Tuple[str, Optional[Dict[str, Any]]]:
1478-
"""Resolve schema namespace and extension for custom (non-PDP/ES/legacy) institutions."""
1479-
schema_namespace = str(getattr(inst, "id", ""))
1480-
ext_cache = STATE._ext_cache
1481-
key = str(getattr(inst, "id", ""))
1482-
cached = ext_cache.get(key)
1483-
if cached and now < cached[0]:
1484-
inst_schema = cached[1]
1485-
else:
1486-
inst_schema = sess.execute(
1487-
select(SchemaRegistryTable.json_doc)
1488-
.where(
1489-
SchemaRegistryTable.inst_id == getattr(inst, "id", None),
1490-
SchemaRegistryTable.is_active.is_(True),
1491-
SchemaRegistryTable.doc_type == DocType.extension,
1492-
)
1493-
.limit(1)
1494-
).scalar_one_or_none()
1495-
ext_cache[key] = (now + EXT_TTL, inst_schema)
1496-
inferred_lower = {m.lower() for m in allowed_schemas}
1497-
ext_models = _ext_models_set(inst_schema, inst, inst_id)
1498-
if inferred_lower.issubset(ext_models):
1499-
return (schema_namespace, inst_schema)
1500-
dbc = DatabricksControl()
1501-
schema_extension: Optional[Dict[str, Any]] = dbc.create_custom_schema_extension(
1502-
bucket_name=bucket,
1503-
inst_query=inst,
1504-
file_name=file_name,
1505-
base_schema=base_schema,
1506-
extension_schema=inst_schema,
1507-
)
1508-
if schema_extension is not None:
1509-
try:
1510-
_persist_custom_schema_extension(
1511-
sess, inst_id, schema_extension, base_schema_id, key
1512-
)
1513-
except IntegrityError as e:
1514-
sess.rollback()
1515-
logging.warning("IntegrityError: %s", e)
1516-
except Exception as e:
1517-
sess.rollback()
1518-
logging.error("Unexpected DB error: %s", e)
1519-
raise HTTPException(
1520-
status_code=500,
1521-
detail=f"Unexpected database error while inserting file record: {e}",
1522-
)
1523-
return (schema_namespace, schema_extension)
1524-
logging.info("No-op: extension already contains this model for inst %s", inst_id)
1525-
return (schema_namespace, inst_schema)
1526-
1527-
15281400
def _resolve_schema_namespace_and_extension(
15291401
sess: Session,
15301402
inst: Any,
@@ -1536,7 +1408,7 @@ def _resolve_schema_namespace_and_extension(
15361408
base_schema_id: Any,
15371409
file_name: str,
15381410
) -> Tuple[str, Optional[Dict[str, Any]]]:
1539-
"""Resolve schema_namespace and updated_inst_schema by institution type (edvise/pdp/legacy/custom)."""
1411+
"""Resolve schema_namespace and updated_inst_schema by institution type (edvise/pdp/legacy)."""
15401412
pdp_id = getattr(inst, "pdp_id", None)
15411413
edvise_id = getattr(inst, "edvise_id", None)
15421414
legacy_id = getattr(inst, "legacy_id", None)
@@ -1552,16 +1424,12 @@ def _resolve_schema_namespace_and_extension(
15521424
return _resolve_pdp_schema(sess, now)
15531425
if legacy_id:
15541426
return ("legacy", None)
1555-
return _resolve_custom_schema(
1556-
sess,
1557-
inst,
1558-
inst_id,
1559-
now,
1560-
allowed_schemas,
1561-
bucket,
1562-
base_schema,
1563-
base_schema_id,
1564-
file_name,
1427+
raise HTTPException(
1428+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
1429+
detail=(
1430+
"Institution configuration error: institution has no pdp_id, edvise_id, "
1431+
"or legacy_id; cannot resolve validation schema."
1432+
),
15651433
)
15661434

15671435

src/webapp/routers/data_test.py

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
DataOverview,
3535
DataInfo,
3636
_infer_allowed_schemas_from_filename,
37-
_ext_models_set,
3837
)
3938
from fastapi import HTTPException
4039
from ..gcsutil import StorageControl
@@ -164,6 +163,9 @@ def session_fixture():
164163
InstTable(
165164
id=USER_VALID_INST_UUID,
166165
name="school_1",
166+
legacy_id="legacy_test",
167+
pdp_id=None,
168+
edvise_id=None,
167169
created_at=DATETIME_TESTING,
168170
updated_at=DATETIME_TESTING,
169171
),
@@ -917,7 +919,7 @@ def legacy_session_fixture():
917919
legacy_id="legacy123",
918920
pdp_id=None,
919921
edvise_id=None,
920-
schemas=["STUDENT", "COURSE"],
922+
schemas=["UNKNOWN"],
921923
created_at=DATETIME_TESTING,
922924
updated_at=DATETIME_TESTING,
923925
),
@@ -1271,36 +1273,6 @@ def test_infer_allowed_schemas_non_legacy_arbitrary_raises_422() -> None:
12711273
assert "random" in exc_info.value.detail
12721274

12731275

1274-
def test_ext_models_set_none_returns_empty() -> None:
1275-
"""None doc returns empty set."""
1276-
inst = _make_inst()
1277-
assert _ext_models_set(None, inst, "inst-id") == set()
1278-
1279-
1280-
def test_ext_models_set_root_data_models() -> None:
1281-
"""Doc with root data_models returns lowercase keys."""
1282-
inst = _make_inst()
1283-
doc: dict[str, Any] = {"data_models": {"STUDENT": {}, "COURSE": {}}}
1284-
assert _ext_models_set(doc, inst, "x") == {"course", "student"}
1285-
1286-
1287-
def test_ext_models_set_institutions_block() -> None:
1288-
"""Doc with institutions[inst_id].data_models returns keys."""
1289-
inst = _make_inst()
1290-
inst.id = uuid.UUID("12345678-1234-1234-1234-123456789abc") # type: ignore
1291-
doc: dict[str, Any] = {
1292-
"institutions": {
1293-
"12345678123412341234123456789abc": {
1294-
"data_models": {"student": {}, "course": {}},
1295-
}
1296-
}
1297-
}
1298-
assert _ext_models_set(doc, inst, "12345678123412341234123456789abc") == {
1299-
"course",
1300-
"student",
1301-
}
1302-
1303-
13041276
def test_validate_edvise_non_descriptive_filename_returns_422(
13051277
edvise_client: TestClient,
13061278
) -> None:
@@ -1413,6 +1385,35 @@ def test_validation_helper_pdp_and_edvise_mutual_exclusivity(
14131385
edvise_session.commit()
14141386

14151387

1388+
def test_validation_helper_rejects_institution_without_school_type(
1389+
edvise_client: TestClient, edvise_session: sqlalchemy.orm.Session
1390+
) -> None:
1391+
"""Upload validation requires pdp_id, edvise_id, or legacy_id on the institution."""
1392+
inst = edvise_session.execute(
1393+
select(InstTable).where(InstTable.id == EDVISE_INST_UUID)
1394+
).scalar_one()
1395+
saved = (inst.edvise_id, inst.pdp_id, inst.legacy_id)
1396+
inst.edvise_id = None # type: ignore
1397+
inst.pdp_id = None # type: ignore
1398+
inst.legacy_id = None # type: ignore
1399+
edvise_session.commit()
1400+
1401+
from .data import STATE
1402+
1403+
STATE._edvise_cache = (0.0, None)
1404+
1405+
response = edvise_client.post(
1406+
"/institutions/"
1407+
+ uuid_to_str(EDVISE_INST_UUID)
1408+
+ "/input/validate-upload/test_student_file.csv",
1409+
)
1410+
assert response.status_code == 500
1411+
assert "no pdp_id, edvise_id, or legacy_id" in response.json()["detail"]
1412+
1413+
inst.edvise_id, inst.pdp_id, inst.legacy_id = saved
1414+
edvise_session.commit()
1415+
1416+
14161417
def test_edvise_schema_cache(
14171418
edvise_client: TestClient, edvise_session: sqlalchemy.orm.Session
14181419
) -> None:

0 commit comments

Comments
 (0)