Skip to content

Commit 928453c

Browse files
authored
Merge pull request #209 from datakind/feat/schema-validation-during-upload
feat: use PDP cohort converter and support custom converters
2 parents 0857e5d + cbe7afa commit 928453c

2 files changed

Lines changed: 225 additions & 33 deletions

File tree

src/webapp/validation.py

Lines changed: 145 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,32 @@
2020
import tempfile
2121
from contextlib import contextmanager
2222
from functools import lru_cache, partial
23-
from typing import Any, BinaryIO, Dict, Generator, List, Optional, Tuple, Union, cast
23+
from typing import (
24+
Any,
25+
BinaryIO,
26+
Callable,
27+
Dict,
28+
Generator,
29+
List,
30+
Optional,
31+
Tuple,
32+
Union,
33+
cast,
34+
)
2435

2536
import pandas as pd
2637
from pandera import Column, Check, DataFrameSchema
2738
from pandera.errors import SchemaError, SchemaErrors
2839

2940
from edvise.dataio.read import read_raw_pdp_cohort_data, read_raw_pdp_course_data
41+
from edvise.dataio.pdp_cohort_converters import converter_func_cohort
3042
from edvise.utils.data_cleaning import handling_duplicates
3143

3244
from . import validation_pdp_edvise as pdp_edvise
3345

46+
# Type for PDP converter functions (DataFrame -> DataFrame); used for cohort/course.
47+
PDPConverterFunc = Optional[Callable[[pd.DataFrame], pd.DataFrame]]
48+
3449
# --------------------------------------------------------------------------- #
3550
# Logging
3651
# --------------------------------------------------------------------------- #
@@ -49,6 +64,8 @@ def validate_file_reader(
4964
inst_schema: Optional[Dict[Any, Any]] = None,
5065
institution_id: str = "pdp",
5166
institution_identifier: Optional[str] = None,
67+
pdp_cohort_converter_func: PDPConverterFunc = None,
68+
pdp_course_converter_func: PDPConverterFunc = None,
5269
) -> dict[str, Any]:
5370
"""Validates a dataset given a filename and schema selection.
5471
@@ -60,6 +77,8 @@ def validate_file_reader(
6077
institution_id: Key into inst_schema["institutions"]: "edvise", "pdp", or
6178
institution UUID for custom. Default "pdp" for backward compatibility.
6279
institution_identifier: Optional institution identifier (e.g. UUID) for display/context.
80+
pdp_cohort_converter_func: Optional custom PDP cohort converter (school-specific).
81+
pdp_course_converter_func: Optional custom PDP course converter (school-specific).
6382
6483
Returns:
6584
Dict with validation_status, schemas, missing_optional, unknown_extra_columns.
@@ -76,6 +95,8 @@ def validate_file_reader(
7695
allowed_schema,
7796
institution_id,
7897
institution_identifier,
98+
pdp_cohort_converter_func=pdp_cohort_converter_func,
99+
pdp_course_converter_func=pdp_course_converter_func,
79100
)
80101

81102

@@ -724,6 +745,67 @@ def _compute_model_list_and_merged_specs(
724745
PDP_COURSE_DTTM_FORMATS = ("ISO8601", "%Y%m%d.0", "%Y%m%d")
725746

726747

748+
def _validate_pdp_converter_callables(
749+
pdp_cohort_converter_func: PDPConverterFunc,
750+
pdp_course_converter_func: PDPConverterFunc,
751+
) -> None:
752+
"""Raise HardValidationError if a provided converter is not callable (so API returns 400)."""
753+
if pdp_cohort_converter_func is not None and not callable(
754+
pdp_cohort_converter_func
755+
):
756+
raise HardValidationError(
757+
schema_errors="pdp_cohort_converter_func must be callable (DataFrame -> DataFrame)",
758+
failure_cases=[],
759+
)
760+
if pdp_course_converter_func is not None and not callable(
761+
pdp_course_converter_func
762+
):
763+
raise HardValidationError(
764+
schema_errors="pdp_course_converter_func must be callable (DataFrame -> DataFrame)",
765+
failure_cases=[],
766+
)
767+
768+
769+
def _convert_pdp_schema_errors_to_hard(
770+
e: Union[SchemaErrors, SchemaError], model_set: set[str]
771+
) -> None:
772+
"""Log and re-raise Pandera schema errors as HardValidationError (no return)."""
773+
logger.error(
774+
"PDP edvise schema validation failed: model_set=%s, error=%s",
775+
model_set,
776+
e,
777+
exc_info=True,
778+
)
779+
hard = pdp_edvise._convert_schema_errors_to_hard_validation_error(
780+
e, raw_to_canon={}, canon_to_raw={}, merged_specs={}
781+
)
782+
raise hard from e
783+
784+
785+
def _read_pdp_validated_dataframe(
786+
path: str,
787+
model_set: set[str],
788+
cohort_converter: Callable[[pd.DataFrame], pd.DataFrame],
789+
course_converter_func: PDPConverterFunc,
790+
) -> pd.DataFrame:
791+
"""Read and validate PDP cohort or course data; return validated DataFrame or raise."""
792+
if model_set == {"STUDENT"}:
793+
return read_raw_pdp_cohort_data(
794+
file_path=path,
795+
schema=pdp_edvise.get_edvise_schema_for_models(["STUDENT"]),
796+
converter_func=cohort_converter,
797+
spark_session=None,
798+
)
799+
if model_set == {"COURSE"}:
800+
return _read_pdp_course_edvise(
801+
path, course_converter_func=course_converter_func
802+
)
803+
raise HardValidationError(
804+
schema_errors=f"PDP single-model expected; got models={list(model_set)}",
805+
failure_cases=[],
806+
)
807+
808+
727809
@contextmanager
728810
def _path_for_edvise_read(filename: Src, enc: str) -> Generator[str, None, None]:
729811
"""
@@ -776,27 +858,37 @@ def _path_for_edvise_read(filename: Src, enc: str) -> Generator[str, None, None]
776858
pass
777859

778860

779-
def _read_pdp_course_edvise(path: str) -> pd.DataFrame:
861+
def _read_pdp_course_edvise(
862+
path: str,
863+
course_converter_func: PDPConverterFunc = None,
864+
) -> pd.DataFrame:
780865
"""
781866
Read and validate PDP course data via edvise (same as pipeline).
782867
783-
Tries each datetime format with each converter: first
784-
handling_duplicates(..., school_type="pdp"), then handling_duplicates(df)
785-
for older edvise. Raises HardValidationError if all attempts fail.
868+
Tries each datetime format with each converter. If a custom
869+
course_converter_func is provided (e.g. from a school), it is tried first;
870+
then the default handling_duplicates(..., school_type="pdp"), then
871+
handling_duplicates for older edvise. Raises HardValidationError if all
872+
attempts fail.
786873
787874
Args:
788875
path: Path to course CSV.
876+
course_converter_func: Optional custom converter (e.g. converter_func_course)
877+
that schools can provide; if None, only default converters are used.
789878
790879
Returns:
791880
Validated DataFrame (same as pipeline output).
792881
793882
Raises:
794883
HardValidationError: If no (converter, format) pair succeeded.
795884
"""
796-
converters = (
885+
default_converters = (
797886
partial(handling_duplicates, school_type="pdp"),
798887
handling_duplicates,
799888
)
889+
converters = (
890+
(course_converter_func,) if course_converter_func is not None else ()
891+
) + default_converters
800892
last_error: Optional[Exception] = None
801893
for converter in converters:
802894
for fmt in PDP_COURSE_DTTM_FORMATS:
@@ -839,19 +931,27 @@ def _validate_pdp_with_edvise_read(
839931
enc: str,
840932
model_list: List[str],
841933
institution_id: str,
934+
pdp_cohort_converter_func: PDPConverterFunc = None,
935+
pdp_course_converter_func: PDPConverterFunc = None,
842936
) -> Dict[str, Any]:
843937
"""
844938
Validate PDP cohort or course via edvise read + schema (same as pipeline).
845939
846940
Resolves filename to a path (temp file if file-like), then calls
847-
read_raw_pdp_cohort_data or read_raw_pdp_course_data. Converts Pandera
848-
SchemaErrors to HardValidationError for API/formatter consistency.
941+
read_raw_pdp_cohort_data or read_raw_pdp_course_data. Uses the same
942+
converter functions as the edvise repo: cohort converter filters dual
943+
enrollment students (DE/DS/SE); course converter handles duplicates.
944+
Schools can provide custom converters via the optional func args.
849945
850946
Args:
851947
filename: Path or file-like to CSV.
852948
enc: Encoding (from sniff_encoding) for file-like decode.
853949
model_list: Single model, e.g. ["STUDENT"] or ["COURSE"].
854950
institution_id: Institution schema key (e.g. "pdp").
951+
pdp_cohort_converter_func: Optional custom cohort converter; if None,
952+
uses converter_func_cohort from edvise (filters DE/DS/SE).
953+
pdp_course_converter_func: Optional custom course converter (e.g.
954+
converter_func_course); if None, uses default handling_duplicates.
855955
856956
Returns:
857957
Dict with validation_status, schemas, missing_optional,
@@ -863,23 +963,19 @@ def _validate_pdp_with_edvise_read(
863963
_reset_to_start_if_possible(filename)
864964
model_set = {str(m).strip().upper() for m in model_list if m}
865965

966+
_validate_pdp_converter_callables(
967+
pdp_cohort_converter_func, pdp_course_converter_func
968+
)
969+
cohort_converter = pdp_cohort_converter_func or converter_func_cohort
970+
866971
with _path_for_edvise_read(filename, enc) as path:
867972
try:
868-
if model_set == {"STUDENT"}:
869-
df = read_raw_pdp_cohort_data(
870-
file_path=path,
871-
schema=pdp_edvise.get_edvise_schema_for_models(["STUDENT"]),
872-
converter_func=None,
873-
spark_session=None,
874-
)
875-
elif model_set == {"COURSE"}:
876-
df = _read_pdp_course_edvise(path)
877-
else:
878-
raise HardValidationError(
879-
schema_errors=f"PDP single-model expected; got models={model_list}",
880-
failure_cases=[],
881-
)
882-
973+
df = _read_pdp_validated_dataframe(
974+
path,
975+
model_set,
976+
cohort_converter,
977+
pdp_course_converter_func,
978+
)
883979
return {
884980
"validation_status": "passed",
885981
"schemas": model_list,
@@ -888,16 +984,19 @@ def _validate_pdp_with_edvise_read(
888984
"normalized_df": df,
889985
}
890986
except (SchemaErrors, SchemaError) as e:
891-
logger.error(
892-
"PDP edvise schema validation failed: model_set=%s, error=%s",
893-
model_set,
894-
e,
895-
exc_info=True,
896-
)
897-
hard = pdp_edvise._convert_schema_errors_to_hard_validation_error(
898-
e, raw_to_canon={}, canon_to_raw={}, merged_specs={}
987+
_convert_pdp_schema_errors_to_hard(e, model_set)
988+
except HardValidationError:
989+
raise
990+
except Exception as e:
991+
logger.exception(
992+
"PDP validation failed: model_set=%s, error=%s", model_set, e
899993
)
900-
raise hard from e
994+
raise HardValidationError(
995+
schema_errors=f"PDP validation failed (model_set={model_set!r}): {e}",
996+
failure_cases=[str(e)],
997+
) from e
998+
999+
return {} # Unreachable: every path above returns or raises
9011000

9021001

9031002
# --------------------------------------------------------------------------- #
@@ -912,6 +1011,8 @@ def validate_dataset(
9121011
models: Union[str, List[str], None] = None,
9131012
institution_id: str = "pdp",
9141013
institution_identifier: Optional[str] = None,
1014+
pdp_cohort_converter_func: PDPConverterFunc = None,
1015+
pdp_course_converter_func: PDPConverterFunc = None,
9151016
) -> Dict[str, Any]:
9161017
"""
9171018
Validate a dataset against merged base/extension schemas.
@@ -920,6 +1021,10 @@ def validate_dataset(
9201021
(if applicable) or JSON-based validation. Returns dict with validation_status,
9211022
schemas, normalized_df (or None if empty merged_specs). Raises HardValidationError
9221023
on failure; UnicodeError if encoding is not UTF-8/UTF-16/UTF-32.
1024+
1025+
For PDP uploads, optional pdp_cohort_converter_func and pdp_course_converter_func
1026+
allow schools to supply custom converters (e.g. from config); if None, edvise
1027+
defaults are used (cohort: filter DE/DS/SE; course: handling_duplicates).
9231028
"""
9241029
try:
9251030
enc = sniff_encoding(filename)
@@ -941,7 +1046,14 @@ def validate_dataset(
9411046

9421047
# PDP single-model: use edvise read + validate (same as pipeline)
9431048
if pdp_edvise.get_edvise_schema_for_upload(institution_id, model_list) is not None:
944-
return _validate_pdp_with_edvise_read(filename, enc, model_list, institution_id)
1049+
return _validate_pdp_with_edvise_read(
1050+
filename,
1051+
enc,
1052+
model_list,
1053+
institution_id,
1054+
pdp_cohort_converter_func=pdp_cohort_converter_func,
1055+
pdp_course_converter_func=pdp_course_converter_func,
1056+
)
9451057

9461058
(
9471059
raw_to_canon,

0 commit comments

Comments
 (0)