2020import tempfile
2121from contextlib import contextmanager
2222from functools import lru_cache , partial
23- from typing import Any , BinaryIO , Dict , Generator , List , Optional , Tuple , Union , cast
23+ from typing import (
24+ Any ,
25+ BinaryIO ,
26+ Callable ,
27+ Dict ,
28+ Generator ,
29+ List ,
30+ Optional ,
31+ Tuple ,
32+ Union ,
33+ cast ,
34+ )
2435
2536import pandas as pd
2637from pandera import Column , Check , DataFrameSchema
2738from pandera .errors import SchemaError , SchemaErrors
2839
2940from edvise .dataio .read import read_raw_pdp_cohort_data , read_raw_pdp_course_data
41+ from edvise .dataio .pdp_cohort_converters import converter_func_cohort
3042from edvise .utils .data_cleaning import handling_duplicates
3143
3244from . import validation_pdp_edvise as pdp_edvise
3345
46+ # Type for PDP converter functions (DataFrame -> DataFrame); used for cohort/course.
47+ PDPConverterFunc = Optional [Callable [[pd .DataFrame ], pd .DataFrame ]]
48+
3449# --------------------------------------------------------------------------- #
3550# Logging
3651# --------------------------------------------------------------------------- #
@@ -49,6 +64,8 @@ def validate_file_reader(
4964 inst_schema : Optional [Dict [Any , Any ]] = None ,
5065 institution_id : str = "pdp" ,
5166 institution_identifier : Optional [str ] = None ,
67+ pdp_cohort_converter_func : PDPConverterFunc = None ,
68+ pdp_course_converter_func : PDPConverterFunc = None ,
5269) -> dict [str , Any ]:
5370 """Validates a dataset given a filename and schema selection.
5471
@@ -60,6 +77,8 @@ def validate_file_reader(
6077 institution_id: Key into inst_schema["institutions"]: "edvise", "pdp", or
6178 institution UUID for custom. Default "pdp" for backward compatibility.
6279 institution_identifier: Optional institution identifier (e.g. UUID) for display/context.
80+ pdp_cohort_converter_func: Optional custom PDP cohort converter (school-specific).
81+ pdp_course_converter_func: Optional custom PDP course converter (school-specific).
6382
6483 Returns:
6584 Dict with validation_status, schemas, missing_optional, unknown_extra_columns.
@@ -76,6 +95,8 @@ def validate_file_reader(
7695 allowed_schema ,
7796 institution_id ,
7897 institution_identifier ,
98+ pdp_cohort_converter_func = pdp_cohort_converter_func ,
99+ pdp_course_converter_func = pdp_course_converter_func ,
79100 )
80101
81102
@@ -724,6 +745,67 @@ def _compute_model_list_and_merged_specs(
724745PDP_COURSE_DTTM_FORMATS = ("ISO8601" , "%Y%m%d.0" , "%Y%m%d" )
725746
726747
748+ def _validate_pdp_converter_callables (
749+ pdp_cohort_converter_func : PDPConverterFunc ,
750+ pdp_course_converter_func : PDPConverterFunc ,
751+ ) -> None :
752+ """Raise HardValidationError if a provided converter is not callable (so API returns 400)."""
753+ if pdp_cohort_converter_func is not None and not callable (
754+ pdp_cohort_converter_func
755+ ):
756+ raise HardValidationError (
757+ schema_errors = "pdp_cohort_converter_func must be callable (DataFrame -> DataFrame)" ,
758+ failure_cases = [],
759+ )
760+ if pdp_course_converter_func is not None and not callable (
761+ pdp_course_converter_func
762+ ):
763+ raise HardValidationError (
764+ schema_errors = "pdp_course_converter_func must be callable (DataFrame -> DataFrame)" ,
765+ failure_cases = [],
766+ )
767+
768+
769+ def _convert_pdp_schema_errors_to_hard (
770+ e : Union [SchemaErrors , SchemaError ], model_set : set [str ]
771+ ) -> None :
772+ """Log and re-raise Pandera schema errors as HardValidationError (no return)."""
773+ logger .error (
774+ "PDP edvise schema validation failed: model_set=%s, error=%s" ,
775+ model_set ,
776+ e ,
777+ exc_info = True ,
778+ )
779+ hard = pdp_edvise ._convert_schema_errors_to_hard_validation_error (
780+ e , raw_to_canon = {}, canon_to_raw = {}, merged_specs = {}
781+ )
782+ raise hard from e
783+
784+
785+ def _read_pdp_validated_dataframe (
786+ path : str ,
787+ model_set : set [str ],
788+ cohort_converter : Callable [[pd .DataFrame ], pd .DataFrame ],
789+ course_converter_func : PDPConverterFunc ,
790+ ) -> pd .DataFrame :
791+ """Read and validate PDP cohort or course data; return validated DataFrame or raise."""
792+ if model_set == {"STUDENT" }:
793+ return read_raw_pdp_cohort_data (
794+ file_path = path ,
795+ schema = pdp_edvise .get_edvise_schema_for_models (["STUDENT" ]),
796+ converter_func = cohort_converter ,
797+ spark_session = None ,
798+ )
799+ if model_set == {"COURSE" }:
800+ return _read_pdp_course_edvise (
801+ path , course_converter_func = course_converter_func
802+ )
803+ raise HardValidationError (
804+ schema_errors = f"PDP single-model expected; got models={ list (model_set )} " ,
805+ failure_cases = [],
806+ )
807+
808+
727809@contextmanager
728810def _path_for_edvise_read (filename : Src , enc : str ) -> Generator [str , None , None ]:
729811 """
@@ -776,27 +858,37 @@ def _path_for_edvise_read(filename: Src, enc: str) -> Generator[str, None, None]
776858 pass
777859
778860
779- def _read_pdp_course_edvise (path : str ) -> pd .DataFrame :
861+ def _read_pdp_course_edvise (
862+ path : str ,
863+ course_converter_func : PDPConverterFunc = None ,
864+ ) -> pd .DataFrame :
780865 """
781866 Read and validate PDP course data via edvise (same as pipeline).
782867
783- Tries each datetime format with each converter: first
784- handling_duplicates(..., school_type="pdp"), then handling_duplicates(df)
785- for older edvise. Raises HardValidationError if all attempts fail.
868+ Tries each datetime format with each converter. If a custom
869+ course_converter_func is provided (e.g. from a school), it is tried first;
870+ then the default handling_duplicates(..., school_type="pdp"), then
871+ handling_duplicates for older edvise. Raises HardValidationError if all
872+ attempts fail.
786873
787874 Args:
788875 path: Path to course CSV.
876+ course_converter_func: Optional custom converter (e.g. converter_func_course)
877+ that schools can provide; if None, only default converters are used.
789878
790879 Returns:
791880 Validated DataFrame (same as pipeline output).
792881
793882 Raises:
794883 HardValidationError: If no (converter, format) pair succeeded.
795884 """
796- converters = (
885+ default_converters = (
797886 partial (handling_duplicates , school_type = "pdp" ),
798887 handling_duplicates ,
799888 )
889+ converters = (
890+ (course_converter_func ,) if course_converter_func is not None else ()
891+ ) + default_converters
800892 last_error : Optional [Exception ] = None
801893 for converter in converters :
802894 for fmt in PDP_COURSE_DTTM_FORMATS :
@@ -839,19 +931,27 @@ def _validate_pdp_with_edvise_read(
839931 enc : str ,
840932 model_list : List [str ],
841933 institution_id : str ,
934+ pdp_cohort_converter_func : PDPConverterFunc = None ,
935+ pdp_course_converter_func : PDPConverterFunc = None ,
842936) -> Dict [str , Any ]:
843937 """
844938 Validate PDP cohort or course via edvise read + schema (same as pipeline).
845939
846940 Resolves filename to a path (temp file if file-like), then calls
847- read_raw_pdp_cohort_data or read_raw_pdp_course_data. Converts Pandera
848- SchemaErrors to HardValidationError for API/formatter consistency.
941+ read_raw_pdp_cohort_data or read_raw_pdp_course_data. Uses the same
942+ converter functions as the edvise repo: cohort converter filters dual
943+ enrollment students (DE/DS/SE); course converter handles duplicates.
944+ Schools can provide custom converters via the optional func args.
849945
850946 Args:
851947 filename: Path or file-like to CSV.
852948 enc: Encoding (from sniff_encoding) for file-like decode.
853949 model_list: Single model, e.g. ["STUDENT"] or ["COURSE"].
854950 institution_id: Institution schema key (e.g. "pdp").
951+ pdp_cohort_converter_func: Optional custom cohort converter; if None,
952+ uses converter_func_cohort from edvise (filters DE/DS/SE).
953+ pdp_course_converter_func: Optional custom course converter (e.g.
954+ converter_func_course); if None, uses default handling_duplicates.
855955
856956 Returns:
857957 Dict with validation_status, schemas, missing_optional,
@@ -863,23 +963,19 @@ def _validate_pdp_with_edvise_read(
863963 _reset_to_start_if_possible (filename )
864964 model_set = {str (m ).strip ().upper () for m in model_list if m }
865965
966+ _validate_pdp_converter_callables (
967+ pdp_cohort_converter_func , pdp_course_converter_func
968+ )
969+ cohort_converter = pdp_cohort_converter_func or converter_func_cohort
970+
866971 with _path_for_edvise_read (filename , enc ) as path :
867972 try :
868- if model_set == {"STUDENT" }:
869- df = read_raw_pdp_cohort_data (
870- file_path = path ,
871- schema = pdp_edvise .get_edvise_schema_for_models (["STUDENT" ]),
872- converter_func = None ,
873- spark_session = None ,
874- )
875- elif model_set == {"COURSE" }:
876- df = _read_pdp_course_edvise (path )
877- else :
878- raise HardValidationError (
879- schema_errors = f"PDP single-model expected; got models={ model_list } " ,
880- failure_cases = [],
881- )
882-
973+ df = _read_pdp_validated_dataframe (
974+ path ,
975+ model_set ,
976+ cohort_converter ,
977+ pdp_course_converter_func ,
978+ )
883979 return {
884980 "validation_status" : "passed" ,
885981 "schemas" : model_list ,
@@ -888,16 +984,17 @@ def _validate_pdp_with_edvise_read(
888984 "normalized_df" : df ,
889985 }
890986 except (SchemaErrors , SchemaError ) as e :
891- logger .error (
892- "PDP edvise schema validation failed: model_set=%s, error=%s" ,
893- model_set ,
894- e ,
895- exc_info = True ,
987+ _convert_pdp_schema_errors_to_hard (e , model_set )
988+ except HardValidationError :
989+ raise
990+ except Exception as e :
991+ logger .exception (
992+ "PDP validation failed: model_set=%s, error=%s" , model_set , e
896993 )
897- hard = pdp_edvise . _convert_schema_errors_to_hard_validation_error (
898- e , raw_to_canon = {}, canon_to_raw = {}, merged_specs = {}
899- )
900- raise hard from e
994+ raise HardValidationError (
995+ schema_errors = f"PDP validation failed (model_set= { model_set !r } ): { e } " ,
996+ failure_cases = [ str ( e )],
997+ ) from e
901998
902999
9031000# --------------------------------------------------------------------------- #
@@ -912,6 +1009,8 @@ def validate_dataset(
9121009 models : Union [str , List [str ], None ] = None ,
9131010 institution_id : str = "pdp" ,
9141011 institution_identifier : Optional [str ] = None ,
1012+ pdp_cohort_converter_func : PDPConverterFunc = None ,
1013+ pdp_course_converter_func : PDPConverterFunc = None ,
9151014) -> Dict [str , Any ]:
9161015 """
9171016 Validate a dataset against merged base/extension schemas.
@@ -920,6 +1019,10 @@ def validate_dataset(
9201019 (if applicable) or JSON-based validation. Returns dict with validation_status,
9211020 schemas, normalized_df (or None if empty merged_specs). Raises HardValidationError
9221021 on failure; UnicodeError if encoding is not UTF-8/UTF-16/UTF-32.
1022+
1023+ For PDP uploads, optional pdp_cohort_converter_func and pdp_course_converter_func
1024+ allow schools to supply custom converters (e.g. from config); if None, edvise
1025+ defaults are used (cohort: filter DE/DS/SE; course: handling_duplicates).
9231026 """
9241027 try :
9251028 enc = sniff_encoding (filename )
@@ -941,7 +1044,14 @@ def validate_dataset(
9411044
9421045 # PDP single-model: use edvise read + validate (same as pipeline)
9431046 if pdp_edvise .get_edvise_schema_for_upload (institution_id , model_list ) is not None :
944- return _validate_pdp_with_edvise_read (filename , enc , model_list , institution_id )
1047+ return _validate_pdp_with_edvise_read (
1048+ filename ,
1049+ enc ,
1050+ model_list ,
1051+ institution_id ,
1052+ pdp_cohort_converter_func = pdp_cohort_converter_func ,
1053+ pdp_course_converter_func = pdp_course_converter_func ,
1054+ )
9451055
9461056 (
9471057 raw_to_canon ,
0 commit comments