|
13 | 13 | ) |
14 | 14 | from google.cloud import storage |
15 | 15 | from google.api_core import exceptions as gcs_errors |
16 | | -from .validation_extension import generate_extension_schema |
17 | 16 | from .config import databricks_vars, gcs_vars |
18 | 17 | from .utilities import databricksify_inst_name, SchemaType |
19 | 18 | from typing import List, Any, Dict, Optional |
20 | | -from fastapi import HTTPException |
21 | 19 | import requests |
22 | 20 | import hashlib |
23 | 21 | import json |
24 | 22 | import gzip |
25 | 23 | from cachetools import TTLCache |
26 | 24 | import threading |
27 | 25 | import re |
28 | | -import pandas as pd |
29 | 26 |
|
30 | 27 | # Setting up logger |
31 | 28 | LOGGER = logging.getLogger(__name__) |
@@ -623,78 +620,3 @@ def matches_one(pat: Any) -> bool: |
623 | 620 | return key |
624 | 621 |
|
625 | 622 | return None |
626 | | - |
627 | | - def create_custom_schema_extension( |
628 | | - self, |
629 | | - bucket_name: str, |
630 | | - inst_query: Any, |
631 | | - file_name: str, |
632 | | - base_schema: Dict[str, Any], # pass base schema dict in |
633 | | - extension_schema: Optional[dict] = None, # existing extension or None |
634 | | - ) -> Any: |
635 | | - if ( |
636 | | - os.getenv("SST_SKIP_EXT_GEN") == "1" |
637 | | - ): # skip using workspace client for tests |
638 | | - LOGGER.info("SST_SKIP_EXT_GEN=1; skipping Databricks extension generation.") |
639 | | - return None |
640 | | - |
641 | | - inst_name = inst_query.name |
642 | | - inst_id = str(inst_query.id) |
643 | | - |
644 | | - mapping = { |
645 | | - "course": [ |
646 | | - "course.csv", |
647 | | - "courses.csv", |
648 | | - r"^(?=.*AR_DEIDENTIFIED)(?=.*COURSE).*\.csv$", |
649 | | - ], |
650 | | - "student": ["student.csv", r"^(?=.*AR_DEIDENTIFIED)(?!.*COURSE).*\.csv$"], |
651 | | - "semester": ["semester.csv"], |
652 | | - } |
653 | | - |
654 | | - key = self.get_key_for_file(mapping, file_name) # e.g., "student" |
655 | | - if key is None: |
656 | | - raise HTTPException( |
657 | | - 404, detail=f"{file_name} not found in {inst_name} validation_mapping" |
658 | | - ) |
659 | | - |
660 | | - key_lc = key.lower() |
661 | | - |
662 | | - # 4) If this model already exists in the provided extension for this institution, skip |
663 | | - if extension_schema is not None: |
664 | | - if not isinstance(extension_schema, dict): |
665 | | - raise HTTPException( |
666 | | - 400, detail="extension_schema must be a dict if provided" |
667 | | - ) |
668 | | - |
669 | | - inst_block = extension_schema.get("institutions", {}).get(inst_id, {}) |
670 | | - data_models = inst_block.get("data_models", {}) |
671 | | - existing_keys_lc = {str(k).lower() for k in data_models.keys()} |
672 | | - |
673 | | - if key_lc in existing_keys_lc: |
674 | | - LOGGER.info( |
675 | | - "Model '%s' already present for institution '%s' — skipping (return None).", |
676 | | - key, |
677 | | - inst_id, |
678 | | - ) |
679 | | - return None # <-- sentinel: do not write |
680 | | - |
681 | | - # 5) Read the unvalidated CSV from GCS |
682 | | - try: |
683 | | - client = storage.Client() |
684 | | - bucket = client.bucket(bucket_name) |
685 | | - blob = bucket.blob(f"unvalidated/{file_name}") |
686 | | - with blob.open("r") as fh: |
687 | | - df = pd.read_csv(fh) |
688 | | - except Exception as e: |
689 | | - LOGGER.exception("Failed to read %s from GCS", file_name) |
690 | | - raise HTTPException(500, detail=f"Failed to read {file_name} from GCS: {e}") |
691 | | - |
692 | | - updated_extension = generate_extension_schema( |
693 | | - df=df, |
694 | | - models=key, # exactly one model |
695 | | - institution_id=inst_id, |
696 | | - base_schema=base_schema, # reference only, not mutated |
697 | | - existing_extension=extension_schema, # may be None |
698 | | - ) |
699 | | - |
700 | | - return updated_extension |
0 commit comments