Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
1450b94
feat: added extension generator script
Mesh-ach Aug 11, 2025
6970af9
fix: adjusted schema generator
Mesh-ach Aug 12, 2025
7aaccca
fix: adjusted schema generator
Mesh-ach Aug 12, 2025
2807df3
fix: adjusted schema generator
Mesh-ach Aug 12, 2025
0b4bbfa
fix: linting
Mesh-ach Aug 12, 2025
82c6942
fix: linting
Mesh-ach Aug 12, 2025
8ed0434
fix: linting
Mesh-ach Aug 12, 2025
3f13887
fix: linting
Mesh-ach Aug 12, 2025
9df1e99
fix: linting
Mesh-ach Aug 12, 2025
71863c4
fix: linting
Mesh-ach Aug 12, 2025
c528470
fix: linting
Mesh-ach Aug 12, 2025
b09518c
fix: linting
Mesh-ach Aug 12, 2025
41c35df
fix: linting
Mesh-ach Aug 12, 2025
aaa8017
fix: linting
Mesh-ach Aug 12, 2025
af17d3f
fix: linting
Mesh-ach Aug 12, 2025
887f946
fix: linting
Mesh-ach Aug 12, 2025
696bdf4
fix: linting
Mesh-ach Aug 12, 2025
e2f294c
fix: linting
Mesh-ach Aug 12, 2025
6ca86fc
fix: linting
Mesh-ach Aug 12, 2025
b7141c8
fix: linting
Mesh-ach Aug 12, 2025
908cd1d
adjusted schema generatio logic
Mesh-ach Aug 12, 2025
d93a7a9
adjusted schema generatio logic
Mesh-ach Aug 12, 2025
f340153
adjusted schema generatio logic
Mesh-ach Aug 12, 2025
d4b3244
adjusted schema generatio logic
Mesh-ach Aug 12, 2025
dd90b69
adjusted schema generatio logic
Mesh-ach Aug 12, 2025
48dfcd6
adjusted schema generatio logic
Mesh-ach Aug 12, 2025
c4657e0
attempting to pin deps so that repo is more robust to package updates
Aug 12, 2025
40f22e9
fixed test issues
Mesh-ach Aug 12, 2025
aa46254
fixed test issues
Mesh-ach Aug 12, 2025
68ba40c
fixed test issues
Mesh-ach Aug 12, 2025
55fff94
fixed test issues
Mesh-ach Aug 12, 2025
e262392
fixed test issues
Mesh-ach Aug 12, 2025
0d407a8
fixed test issues
Mesh-ach Aug 12, 2025
e86350b
Merge pull request #141 from datakind/fix/pinning-deps
Mesh-ach Aug 12, 2025
bddab22
Merge branch 'develop' of github.com-work:datakind/sst-app-api into E…
Mesh-ach Aug 12, 2025
168dc1e
fix: deps issues pulled from dev
Mesh-ach Aug 12, 2025
a931c77
fix: deps issues pulled from dev
Mesh-ach Aug 12, 2025
9f25f4e
renamed confusing func
Mesh-ach Aug 12, 2025
a9137fe
Merge pull request #140 from datakind/ExtensionGenerator
Mesh-ach Aug 12, 2025
87ea884
fixL linting
Mesh-ach Aug 12, 2025
a027953
Merge pull request #142 from datakind/ExtensionGenerator
Mesh-ach Aug 12, 2025
f7bf35f
added delete batch endpoint
Mesh-ach Aug 13, 2025
bc01678
added delete batch endpoint
Mesh-ach Aug 13, 2025
f7e8b19
fix linting errors
Mesh-ach Aug 13, 2025
1da5dfa
fix linting errors
Mesh-ach Aug 13, 2025
daaa86c
fix linting errors
Mesh-ach Aug 13, 2025
85341ec
fix linting errors
Mesh-ach Aug 13, 2025
2c3ce4f
Merge pull request #143 from datakind/DeleteBatch
Mesh-ach Aug 13, 2025
9a32f25
fix linting errors
Mesh-ach Aug 13, 2025
12f1119
Merge pull request #144 from datakind/DeleteBatch
Mesh-ach Aug 13, 2025
f3bd7fa
fix linting errors
Mesh-ach Aug 13, 2025
cdb28cf
fix linting errors
Mesh-ach Aug 13, 2025
123c05b
fix linting errors
Mesh-ach Aug 13, 2025
0741bcd
Merge pull request #145 from datakind/DeleteBatch
Mesh-ach Aug 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,41 @@ requires-python = ">=3.10,<3.13"
dependencies = [
"databricks-sdk~=0.38.0",
"pydantic~=2.10",
"fastapi[standard]>=0.115.4",
"google-cloud>=0.34.0",
"google-cloud-storage>=2.18.2",
"paramiko>=3.5.0",
"cloud-sql-python-connector[pymysql]>=1.14.0",
"sqlalchemy>=2.0.36",
"pyjwt>=2.10.1",
"passlib>=1.7.4",
"bcrypt>=4.2.0",
"crypto>=1.4.1",
"python-dotenv>=1.0.1",
"strenum>=0.4.15",
"fastapi[standard]~=0.115.4",
"google-cloud-storage~=2.18.2",
"paramiko~=3.5.0",
"cloud-sql-python-connector[pymysql]~=1.14.0",
"sqlalchemy~=2.0.36",
"pyjwt~=2.10.1",
"passlib~=1.7.4",
"bcrypt~=4.2.0",
"pycryptodome~=3.20.0",
"python-dotenv~=1.0.1",
"strenum~=0.4.15",
"tomli~=2.0; python_version<'3.11'",
"jsonpickle>=4.0.1",
"requests>=2.0.0",
"types-requests",
"types-paramiko",
"pandas",
"six",
"types-six",
"fuzzywuzzy",
"databricks-sql-connector",
"jsonpickle~=4.0.1",
"requests~=2.32.0",
"types-requests~=2.32.0.0",
"types-paramiko~=3.5.0.0",
"pandas~=2.0",
"six~=1.16.0",
"thefuzz[speedup]~=0.22.1",
"databricks-sql-connector~=3.5.0",
"pandera~=0.13",
"mlflow"
"mlflow~=2.15.0"
]

[project.urls]
Repository = "https://github.com/datakind/sst-app-api"

[dependency-groups]
dev = [
"black>=25.1.0",
"coverage>=7.6.9",
"black~=25.1.0",
"coverage~=7.6.9",
"ipykernel~=6.29",
"jupyterlab~=4.2",
"mypy~=1.11",
"pylint>=3.3.2",
"pylint~=3.3.2",
"pytest~=8.3",
"ruff~=0.8",
]
Expand Down
189 changes: 188 additions & 1 deletion src/webapp/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@
Disposition,
StatementState,
)
from google.cloud import storage
from .validation_extension import generate_extension_schema
from .config import databricks_vars, gcs_vars
from .utilities import databricksify_inst_name, SchemaType
from typing import List, Any, Dict
from typing import List, Any, Dict, IO, cast, Optional
from databricks.sdk.errors import DatabricksError
from fastapi import HTTPException

try:
import tomllib as _toml # Py 3.11+
except ModuleNotFoundError:
import tomli as _toml # Py ≤ 3.10
import pandas as pd
import re

# Setting up logger
LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -366,3 +376,180 @@ def fetch_table_data(

# Combine column names with corresponding row values
return [dict(zip(column_names, row)) for row in data_rows]

def get_key_for_file(
self, mapping: Dict[str, Any], file_name: str
) -> Optional[str]:
"""
Case-insensitive match of file_name against mapping values.
Values may be:
- str literal (e.g., "student.csv") → allow optional base suffixes before the ext.
- str regex (e.g., r"^course_.*\.csv$") → re.IGNORECASE fullmatch.
- compiled regex (re.Pattern) → fullmatch, adding IGNORECASE if missing.
- list of any of the above.
"""
# normalize filename (handles windows paths + stray whitespace)
name = os.path.basename(file_name.replace("\\", "/")).strip()

REGEX_META = re.compile(r"[()\[\]\{\}\|\?\+\*\\]")

def looks_like_regex(s: str) -> bool:
s = s.strip()
return (
s.startswith("^") or s.endswith("$") or REGEX_META.search(s) is not None
)

def matches_one(pat: Any) -> bool:
# compiled regex
if isinstance(pat, re.Pattern):
# ensure case-insensitive
flags = pat.flags | re.IGNORECASE
return re.fullmatch(re.compile(pat.pattern, flags), name) is not None

# string literal / regex
if isinstance(pat, str):
p = pat.strip()

# exact literal (case-insensitive)
if name.casefold() == p.casefold():
return True

if looks_like_regex(p):
try:
return re.fullmatch(p, name, flags=re.IGNORECASE) is not None
except re.error:
return False

# literal with suffix tolerance
p_base, p_ext = os.path.splitext(p)
if p_ext:
# ^base(?:[._-].+)?ext$
rx = re.compile(
rf"^{re.escape(p_base)}(?:[._-].+)?{re.escape(p_ext)}$",
re.IGNORECASE,
)
else:
# ^literal(?:[._-].+)?(?:\..+)?$
rx = re.compile(
rf"^{re.escape(p)}(?:[._-].+)?(?:\..+)?$",
re.IGNORECASE,
)
return rx.fullmatch(name) is not None

# unsupported type
return False

for key, val in mapping.items():
items = val if isinstance(val, list) else [val]
for pat in items:
if matches_one(pat):
return key

return None

def create_custom_schema_extension(
self,
bucket_name: str,
inst_query: Any,
file_name: str,
base_schema: Dict[str, Any], # pass base schema dict in
extension_schema: Optional[dict] = None, # existing extension or None
) -> Any:
if (
os.getenv("SST_SKIP_EXT_GEN") == "1"
): # skip using workspace client for tests
LOGGER.info("SST_SKIP_EXT_GEN=1; skipping Databricks extension generation.")
return None

# 1) Databricks client
try:
w = WorkspaceClient(
host=databricks_vars["DATABRICKS_HOST_URL"],
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
)
LOGGER.info("Successfully created Databricks WorkspaceClient.")
except Exception as e:
LOGGER.exception("WorkspaceClient init failed")
raise ValueError(f"Workspace client initialization failed: {e}")

# 2) Fetch & parse config.toml to get validation_mapping
try:
inst_name = inst_query[0][0].name
inst_id_raw = inst_query[0][0].id
inst_id = str(inst_id_raw) # be robust if id is not a string
config_volume_path = (
f"/Volumes/staging_sst_01/"
f"{databricksify_inst_name(inst_name)}_bronze/bronze_volume/config.toml"
)
LOGGER.info("Attempting to download from %s", config_volume_path)
response = w.files.download(config_volume_path)
stream = cast(IO[bytes], response.contents)
file_bytes = stream.read()
LOGGER.info("Download successful, received %d bytes", len(file_bytes))
except Exception as e:
LOGGER.exception("Failed to fetch config.toml")
raise HTTPException(500, detail=f"Failed to fetch config: {e}")

try:
cfg = _toml.loads(file_bytes.decode("utf-8"))
mapping = cfg["webapp"]["validation_mapping"]
except KeyError:
raise HTTPException(
404, detail="Missing [webapp].validation_mapping in config.toml"
)
except Exception as e:
LOGGER.exception("Invalid TOML")
raise HTTPException(400, detail=f"Invalid TOML in {file_name}: {e}")

if not isinstance(mapping, dict):
raise HTTPException(
400, detail="validation_mapping must be a TOML table (dictionary)"
)

key = self.get_key_for_file(mapping, file_name) # e.g., "student"
if key is None:
raise HTTPException(
404, detail=f"{file_name} not found in {inst_name} validation_mapping"
)

key_lc = key.lower()

# 4) If this model already exists in the provided extension for this institution, skip
if extension_schema is not None:
if not isinstance(extension_schema, dict):
raise HTTPException(
400, detail="extension_schema must be a dict if provided"
)

inst_block = extension_schema.get("institutions", {}).get(inst_id, {})
data_models = inst_block.get("data_models", {})
existing_keys_lc = {str(k).lower() for k in data_models.keys()}

if key_lc in existing_keys_lc:
LOGGER.info(
"Model '%s' already present for institution '%s' — skipping (return None).",
key,
inst_id,
)
return None # <-- sentinel: do not write

# 5) Read the unvalidated CSV from GCS
try:
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(f"unvalidated/{file_name}")
with blob.open("r") as fh:
df = pd.read_csv(fh)
except Exception as e:
LOGGER.exception("Failed to read %s from GCS", file_name)
raise HTTPException(500, detail=f"Failed to read {file_name} from GCS: {e}")

updated_extension = generate_extension_schema(
df=df,
models=key, # exactly one model
institution_id=inst_id,
base_schema=base_schema, # reference only, not mutated
existing_extension=extension_schema, # may be None
)

return updated_extension
53 changes: 53 additions & 0 deletions src/webapp/databricks_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pytest

from .databricks import DatabricksControl


@pytest.fixture
def ctrl():
return DatabricksControl()


def test_exact_literal_case_insensitive(ctrl):
mapping = {"student": "student.csv"}
assert ctrl.get_key_for_file(mapping, "Student.csv") == "student"


def test_literal_with_suffix_and_same_ext(ctrl):
mapping = {"student": "student.csv"}
assert ctrl.get_key_for_file(mapping, "student_20240101.csv") == "student"
assert ctrl.get_key_for_file(mapping, "student-final.csv") == "student"
# should not match a different extension
assert ctrl.get_key_for_file(mapping, "student_20240101.tsv") is None


def test_literal_without_ext_allows_suffix_and_optional_ext(ctrl):
mapping = {"student": "student"}
assert ctrl.get_key_for_file(mapping, "student") == "student"
assert ctrl.get_key_for_file(mapping, "student_v2") == "student"
assert ctrl.get_key_for_file(mapping, "student_v2.csv") == "student"


def test_regex_fullmatch_ignorecase(ctrl):
mapping = {"course": r"^course(?:[._-].+)?\.csv$"}
assert ctrl.get_key_for_file(mapping, "Course_20240101.CSV") == "course"
assert ctrl.get_key_for_file(mapping, "COURSE.csv") == "course"
# ensure fullmatch (not substring)
assert ctrl.get_key_for_file(mapping, "my_course_20240101.csv") is None


def test_list_values_mixed_literal_and_regex(ctrl):
mapping = {"student": ["students.csv", r"^stud\d+\.csv$"]}
assert ctrl.get_key_for_file(mapping, "STUD123.csv") == "student"
assert ctrl.get_key_for_file(mapping, "students_2024.csv") == "student"


def test_invalid_regex_is_ignored(ctrl):
mapping = {"bad": ["(unclosed", "ok.csv"]}
# bad regex should be skipped; literal should match
assert ctrl.get_key_for_file(mapping, "OK.csv") == "bad"


def test_returns_none_when_no_match(ctrl):
mapping = {"student": "student.csv"}
assert ctrl.get_key_for_file(mapping, "unknown.csv") is None
49 changes: 49 additions & 0 deletions src/webapp/gcsutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,55 @@ def delete_file(self, bucket_name: str, file_name: str) -> None:
raise ValueError(file_name + ": File not found.")
blob.delete()

def delete_batch_files(
self,
bucket_name: str,
batch_files: list[str],
) -> Any:
prefix = "validated/"

now_iso = datetime.datetime.now()
deleted: List[Dict[str, str]] = []
not_found: List[str] = []
errors: List[Dict[str, str]] = []

for fname in batch_files:
if not isinstance(fname, str) or not fname.strip():
errors.append(
{
"file": str(fname),
"path": f"{prefix}{fname}",
"error": "invalid filename",
}
)
continue

blob_path = f"{prefix}{fname}"
try:
logger.info("Attempting to delete gs://%s/%s", bucket_name, blob_path)
# One-liner delete; raises NotFound if missing
self.delete_file(bucket_name=bucket_name, file_name=blob_path)
logger.info("Delete successful: gs://%s/%s", bucket_name, blob_path)
deleted.append(
{"file": fname, "path": blob_path, "deleted_at": str(now_iso)}
)
except ValueError:
logger.warning(
"Blob or bucket not found: gs://%s/%s", bucket_name, blob_path
)
not_found.append(fname)
except Exception as e: # network/other unexpected errors
logger.exception(
"Unexpected error deleting gs://%s/%s", bucket_name, blob_path
)
errors.append({"file": fname, "path": blob_path, "error": str(e)})

return {
"deleted": deleted,
"not_found": not_found,
"errors": errors,
}

def validate_file(
self,
bucket_name: str,
Expand Down
Loading
Loading