diff --git a/src/webapp/.env.example b/src/webapp/.env.example index 477737c5..cee506f4 100644 --- a/src/webapp/.env.example +++ b/src/webapp/.env.example @@ -32,6 +32,9 @@ DATABRICKS_WORKSPACE="" DATABRICKS_HOST_URL="" # The service account used to read GCP buckets from Databricks. DATABRICKS_SERVICE_ACCOUNT_EMAIL="" +# Optional. Numeric job id for edvise_validated_gcs_to_bronze_sync (from Databricks job URL / API). +# If unset, the API resolves the job by name and errors if the name is missing or ambiguous. +# DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID="" # Datakinders allowed to issue API key. This should be the MINIMUM set. Keep this group small. Pass as a comma separated string structured like so: "abc@dk.org,bcd@dk.org" # The initial value set is api_key_initial which is the initial API key, this is needed for one-time setup. You can remove this once the api key table is populated. diff --git a/src/webapp/databricks.py b/src/webapp/databricks.py index aa4b6b02..4614f4cb 100644 --- a/src/webapp/databricks.py +++ b/src/webapp/databricks.py @@ -4,6 +4,7 @@ import logging from pydantic import BaseModel from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import DatabricksError from databricks.sdk.service import catalog from databricks.sdk.service.sql import ( Format, @@ -33,6 +34,173 @@ # The name of the deployed pipeline in Databricks. Must match directly. PDP_INFERENCE_JOB_NAME = "edvise_github_sourced_pdp_inference_pipeline" +# GCS validated/ → institution bronze_volume/gcs_uploads (edvise bundle job name). +VALIDATED_BRONZE_SYNC_JOB_NAME = "edvise_validated_gcs_to_bronze_sync" +# Optional: numeric Databricks job id. If unset, the job is resolved by name (must be unique). +DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV = "DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID" +# Environment-specific Databricks job ids for deployed API environments. +VALIDATED_BRONZE_SYNC_JOB_IDS_BY_ENV = { + "DEV": 1005654397694881, + "STAGING": 611181637854021, +} + +# Must match edvise bundle job parameters (github_validated_bronze_sync.yml). +BRONZE_SYNC_GCS_SOURCE_PREFIX = "validated/" +BRONZE_SYNC_BRONZE_SUBDIR = "gcs_uploads" +BRONZE_SYNC_MAX_OBJECTS = "1000" +BRONZE_SYNC_REQUIRE_AT_LEAST_ONE_FILE = "true" +BRONZE_SYNC_STRICT_MODE = "auto" + + +def _create_databricks_workspace_client(operation: str) -> WorkspaceClient: + """ + Create a Databricks WorkspaceClient using configured host and GCP service account. + + Args: + operation: Label for error messages (e.g. ``run_validated_gcs_to_bronze_sync``). + + Returns: + Initialized workspace client. + + Raises: + ValueError: If client creation fails. + """ + try: + return WorkspaceClient( + host=databricks_vars["DATABRICKS_HOST_URL"], + google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"], + ) + except (OSError, DatabricksError) as exc: + LOGGER.exception( + "Failed to create Databricks WorkspaceClient for %s: host=%s service_account=%s", + operation, + databricks_vars["DATABRICKS_HOST_URL"], + gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"], + ) + raise ValueError(f"{operation}(): Workspace client failed: {exc}") from exc + + +def _run_databricks_job_now( + workspace: WorkspaceClient, + job_id: int, + job_parameters: dict[str, str], + operation: str, +) -> int: + """ + Start a Databricks job run and return the run id. + + Raises: + ValueError: If the Jobs API does not return a run id. + """ + try: + run_job: Any = workspace.jobs.run_now(job_id, job_parameters=job_parameters) + except DatabricksError as exc: + LOGGER.exception( + "Databricks job run failed for %s (job_id=%s).", operation, job_id + ) + raise ValueError(f"{operation}(): Job could not be run: {exc}") from exc + + if not run_job.response or run_job.response.run_id is None: + raise ValueError(f"{operation}(): No run_id returned.") + + return int(run_job.response.run_id) + + +def _resolve_validated_bronze_sync_job_id(w: WorkspaceClient) -> int: + """ + Return the job id for the GCS→bronze sync job. + + Prefer ``DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID`` when set (stable across renames). + Otherwise resolve by exact name, deployed environment, then a unique bundle-prefixed name. + """ + raw = (os.environ.get(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV) or "").strip() + if raw: + if not raw.isdigit(): + raise ValueError( + f"{DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} must be a positive integer " + f"(Databricks job id) if set; got {raw!r}." + ) + job_id = int(raw) + if job_id <= 0: + raise ValueError( + f"{DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} must be positive; got {job_id}." + ) + LOGGER.info( + "Bronze sync job id from %s=%s", + DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, + job_id, + ) + return job_id + + jobs = list(w.jobs.list(name=VALIDATED_BRONZE_SYNC_JOB_NAME)) + if len(jobs) == 0: + env_job_id = _resolve_validated_bronze_sync_job_id_by_environment() + if env_job_id is not None: + return env_job_id + if len(jobs) == 0: + jobs = _find_validated_bronze_sync_jobs_by_suffix(w) + if len(jobs) == 0: + raise ValueError( + f"Job named {VALIDATED_BRONZE_SYNC_JOB_NAME!r} or a unique bundle-prefixed " + f"variant was not found. " + f"Deploy the bundle job or set {DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} " + "to the numeric job id from the Databricks UI / API." + ) + if len(jobs) > 1: + ids = [j.job_id for j in jobs if j.job_id is not None] + raise ValueError( + f"Multiple ({len(jobs)}) jobs matched {VALIDATED_BRONZE_SYNC_JOB_NAME!r}; " + f"set {DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} to the correct id. Found job_ids={ids}." + ) + job = jobs[0] + if job.job_id is None: + raise ValueError( + f"Job matching {VALIDATED_BRONZE_SYNC_JOB_NAME!r} has no job_id in list response." + ) + job_id = job.job_id + LOGGER.info( + "Resolved bronze sync job %r: job_id=%s", + _databricks_job_name(job) or VALIDATED_BRONZE_SYNC_JOB_NAME, + job_id, + ) + return job_id + + +def _resolve_validated_bronze_sync_job_id_by_environment() -> Optional[int]: + """Return the deployed Databricks job id for the current API environment.""" + env = (os.environ.get("ENV") or "").strip().upper() + job_id = VALIDATED_BRONZE_SYNC_JOB_IDS_BY_ENV.get(env) + if job_id is not None: + LOGGER.info("Bronze sync job id from ENV=%s mapping: job_id=%s", env, job_id) + return job_id + + +def _databricks_job_name(job: Any) -> Optional[str]: + """Return the display name from a Databricks job list item, if present.""" + settings = getattr(job, "settings", None) + name = getattr(settings, "name", None) + if isinstance(name, str): + return name + name = getattr(job, "name", None) + if isinstance(name, str): + return name + return None + + +def _find_validated_bronze_sync_jobs_by_suffix(w: WorkspaceClient) -> list[Any]: + """ + Find a Databricks Asset Bundle dev-mode job with a prefixed display name. + + Development-mode bundle jobs can be named like + ``[dev service_principal] edvise_validated_gcs_to_bronze_sync``. Only a + single suffix match is accepted by the caller. + """ + suffix = f" {VALIDATED_BRONZE_SYNC_JOB_NAME}" + return [ + job + for job in w.jobs.list() + if (name := _databricks_job_name(job)) is not None and name.endswith(suffix) + ] class DatabricksInferenceRunRequest(BaseModel): @@ -53,6 +221,41 @@ class DatabricksInferenceRunResponse(BaseModel): job_run_id: int +class DatabricksBronzeSyncRequest(BaseModel): + """Parameters to copy validated GCS objects into the institution bronze volume.""" + + inst_name: str + gcp_bucket_name: str + # Full object paths in the bucket, e.g. ["validated/file.csv"]. + validated_blob_paths: list[str] + + +class DatabricksBronzeSyncResponse(BaseModel): + """Result of triggering the bronze sync Databricks job.""" + + job_run_id: int + + +def _build_validated_bronze_sync_job_parameters( + req: DatabricksBronzeSyncRequest, + databricks_institution_name: str, +) -> dict[str, str]: + """Build job_parameters dict for the GCS→bronze sync Databricks job.""" + include_json = json.dumps(req.validated_blob_paths, separators=(",", ":")) + return { + "gcp_bucket_name": req.gcp_bucket_name, + "databricks_institution_name": databricks_institution_name, + "DB_workspace": databricks_vars["DATABRICKS_WORKSPACE"], + "sync_run_id": "", + "gcs_source_prefix": BRONZE_SYNC_GCS_SOURCE_PREFIX, + "bronze_subdir": BRONZE_SYNC_BRONZE_SUBDIR, + "max_objects": BRONZE_SYNC_MAX_OBJECTS, + "require_at_least_one_file": BRONZE_SYNC_REQUIRE_AT_LEAST_ONE_FILE, + "strict_mode": BRONZE_SYNC_STRICT_MODE, + "include_blob_paths_json": include_json, + } + + def get_filepath_of_filetype( file_dict: dict[str, list[SchemaType]], file_type: SchemaType ) -> str: @@ -261,6 +464,44 @@ def run_pdp_inference( return DatabricksInferenceRunResponse(job_run_id=run_id) + def run_validated_gcs_to_bronze_sync( + self, req: DatabricksBronzeSyncRequest + ) -> DatabricksBronzeSyncResponse: + """ + Trigger the job that copies validated/ objects from GCS into bronze_volume/gcs_uploads. + + Args: + req: Institution name, bucket, and full GCS object paths under validated/. + + Returns: + Response containing the Databricks job run id (run started, not completed). + + Raises: + ValueError: If paths are empty, configuration is invalid, or the job cannot start. + """ + operation = "run_validated_gcs_to_bronze_sync" + if not req.validated_blob_paths: + raise ValueError(f"{operation}: validated_blob_paths must be non-empty.") + + LOGGER.info( + "Triggering GCS→bronze sync for institution: %s (%s objects)", + req.inst_name, + len(req.validated_blob_paths), + ) + + workspace = _create_databricks_workspace_client(operation) + try: + job_id = _resolve_validated_bronze_sync_job_id(workspace) + except ValueError as exc: + LOGGER.exception("Job resolution failed for GCS→bronze sync.") + raise ValueError(f"{operation}(): Failed to resolve job: {exc}") from exc + + db_inst_name = databricksify_inst_name(req.inst_name) + job_parameters = _build_validated_bronze_sync_job_parameters(req, db_inst_name) + run_id = _run_databricks_job_now(workspace, job_id, job_parameters, operation) + LOGGER.info("GCS→bronze sync job started. Run ID: %s", run_id) + return DatabricksBronzeSyncResponse(job_run_id=run_id) + def delete_inst(self, inst_name: str) -> None: """Cleanup tasks required on the Databricks side to delete an institution.""" db_inst_name = databricksify_inst_name(inst_name) diff --git a/src/webapp/databricks_test.py b/src/webapp/databricks_test.py index 37fa2855..96e7b76e 100644 --- a/src/webapp/databricks_test.py +++ b/src/webapp/databricks_test.py @@ -1,6 +1,20 @@ +from types import SimpleNamespace +from unittest import mock + import pytest -from .databricks import DatabricksControl +from .databricks import ( + BRONZE_SYNC_BRONZE_SUBDIR, + BRONZE_SYNC_GCS_SOURCE_PREFIX, + BRONZE_SYNC_MAX_OBJECTS, + BRONZE_SYNC_REQUIRE_AT_LEAST_ONE_FILE, + BRONZE_SYNC_STRICT_MODE, + DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, + DatabricksBronzeSyncRequest, + DatabricksControl, + _build_validated_bronze_sync_job_parameters, + _resolve_validated_bronze_sync_job_id, +) @pytest.fixture @@ -51,3 +65,164 @@ def test_invalid_regex_is_ignored(ctrl): def test_returns_none_when_no_match(ctrl): mapping = {"student": "student.csv"} assert ctrl.get_key_for_file(mapping, "unknown.csv") is None + + +def test_resolve_bronze_sync_job_id_from_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, "12345") + w = mock.Mock() + assert _resolve_validated_bronze_sync_job_id(w) == 12345 + w.jobs.list.assert_not_called() + + +def test_resolve_bronze_sync_job_id_env_invalid_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, "not-a-number") + w = mock.Mock() + with pytest.raises(ValueError, match="positive integer"): + _resolve_validated_bronze_sync_job_id(w) + + +def test_resolve_bronze_sync_job_id_by_name_single( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, raising=False) + monkeypatch.setenv("ENV", "DEV") + job = mock.Mock(job_id=99) + w = mock.Mock() + w.jobs.list.return_value = [job] + assert _resolve_validated_bronze_sync_job_id(w) == 99 + + +def test_resolve_bronze_sync_job_id_by_name_ambiguous_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, raising=False) + w = mock.Mock() + w.jobs.list.return_value = [mock.Mock(job_id=1), mock.Mock(job_id=2)] + with pytest.raises(ValueError, match="Multiple"): + _resolve_validated_bronze_sync_job_id(w) + + +def test_resolve_bronze_sync_job_id_by_dev_environment( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, raising=False) + monkeypatch.setenv("ENV", "DEV") + w = mock.Mock() + w.jobs.list.return_value = [] + assert _resolve_validated_bronze_sync_job_id(w) == 1005654397694881 + w.jobs.list.assert_called_once_with(name="edvise_validated_gcs_to_bronze_sync") + + +def test_resolve_bronze_sync_job_id_by_staging_environment( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, raising=False) + monkeypatch.setenv("ENV", "STAGING") + w = mock.Mock() + w.jobs.list.return_value = [] + assert _resolve_validated_bronze_sync_job_id(w) == 611181637854021 + + +def test_resolve_bronze_sync_job_id_by_prefixed_bundle_name( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, raising=False) + monkeypatch.setenv("ENV", "LOCAL") + job = SimpleNamespace( + job_id=123, + settings=SimpleNamespace( + name="[dev dev_cloudrun_sa] edvise_validated_gcs_to_bronze_sync" + ), + ) + w = mock.Mock() + w.jobs.list.side_effect = [[], [job]] + assert _resolve_validated_bronze_sync_job_id(w) == 123 + assert w.jobs.list.call_args_list == [ + mock.call(name="edvise_validated_gcs_to_bronze_sync"), + mock.call(), + ] + + +def test_resolve_bronze_sync_job_id_by_prefixed_bundle_name_ambiguous_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, raising=False) + monkeypatch.setenv("ENV", "LOCAL") + w = mock.Mock() + w.jobs.list.side_effect = [ + [], + [ + SimpleNamespace( + job_id=1, + settings=SimpleNamespace( + name="[dev user_a] edvise_validated_gcs_to_bronze_sync" + ), + ), + SimpleNamespace( + job_id=2, + settings=SimpleNamespace( + name="[dev user_b] edvise_validated_gcs_to_bronze_sync" + ), + ), + ], + ] + with pytest.raises(ValueError, match="Multiple"): + _resolve_validated_bronze_sync_job_id(w) + + +def test_resolve_bronze_sync_job_id_by_name_missing_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, raising=False) + monkeypatch.setenv("ENV", "LOCAL") + w = mock.Mock() + w.jobs.list.return_value = [] + with pytest.raises(ValueError, match="not found"): + _resolve_validated_bronze_sync_job_id(w) + + +def test_build_validated_bronze_sync_job_parameters_shape() -> None: + req = DatabricksBronzeSyncRequest( + inst_name="Test School", + gcp_bucket_name="bucket-a", + validated_blob_paths=["validated/student.csv"], + ) + params = _build_validated_bronze_sync_job_parameters(req, "test_school") + assert params["gcp_bucket_name"] == "bucket-a" + assert params["databricks_institution_name"] == "test_school" + assert params["gcs_source_prefix"] == BRONZE_SYNC_GCS_SOURCE_PREFIX + assert params["bronze_subdir"] == BRONZE_SYNC_BRONZE_SUBDIR + assert params["max_objects"] == BRONZE_SYNC_MAX_OBJECTS + assert params["require_at_least_one_file"] == BRONZE_SYNC_REQUIRE_AT_LEAST_ONE_FILE + assert params["strict_mode"] == BRONZE_SYNC_STRICT_MODE + assert params["sync_run_id"] == "" + assert params["include_blob_paths_json"] == '["validated/student.csv"]' + + +def test_run_validated_gcs_to_bronze_sync_calls_run_now_with_bundle_params( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV, "42") + workspace = mock.Mock() + run_response = mock.Mock() + run_response.response.run_id = 9001 + workspace.jobs.run_now.return_value = run_response + + with mock.patch("src.webapp.databricks.WorkspaceClient", return_value=workspace): + ctrl = DatabricksControl() + req = DatabricksBronzeSyncRequest( + inst_name="My Inst", + gcp_bucket_name="my-bucket", + validated_blob_paths=["validated/foo.csv"], + ) + resp = ctrl.run_validated_gcs_to_bronze_sync(req) + + assert resp.job_run_id == 9001 + workspace.jobs.run_now.assert_called_once() + run_args, run_kwargs = workspace.jobs.run_now.call_args + assert run_args[0] == 42 + params = run_kwargs["job_parameters"] + assert params["include_blob_paths_json"] == '["validated/foo.csv"]' + assert params["gcs_source_prefix"] == BRONZE_SYNC_GCS_SOURCE_PREFIX diff --git a/src/webapp/routers/data.py b/src/webapp/routers/data.py index 88092ee2..85a921b9 100644 --- a/src/webapp/routers/data.py +++ b/src/webapp/routers/data.py @@ -1,5 +1,6 @@ """API functions related to data.""" +import json import uuid from datetime import datetime, date from typing import Annotated, Any, Dict, List, Optional, Tuple, Union, cast @@ -43,7 +44,11 @@ DocType, ) -from ..databricks import DatabricksControl +from ..databricks import ( + VALIDATED_BRONZE_SYNC_JOB_NAME, + DatabricksBronzeSyncRequest, + DatabricksControl, +) from ..gcsdbutils import update_db_from_bucket from ..gcsutil import StorageControl @@ -55,6 +60,137 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) + +def _gcs_bronze_sync_skip_reason( + edvise_id: Optional[str], legacy_id: Optional[str] +) -> Optional[str]: + """ + If sync should not run, return a stable reason code; otherwise None. + + Used for logging (skip_reason when sync is not run). + """ + if os.environ.get("ENABLE_GCS_BRONZE_SYNC_ON_VALIDATION", "true").lower() not in ( + "1", + "true", + "yes", + ): + return "env_disabled" + if edvise_id is None and legacy_id is None: + return "not_edvise_or_legacy" + return None + + +def _log_validation_trace_json(event: str, **fields: Any) -> None: + """Emit one JSON object per line for log aggregators (Cloud Logging, Datadog, etc.).""" + payload: Dict[str, Any] = {"event": event, **fields} + logger.info("%s", json.dumps(payload, default=str, separators=(",", ":"))) + + +def _bronze_sync_trace_base( + correlation_id: str, inst_id: str, bucket: str, file_name: str +) -> Dict[str, Any]: + """Shared fields for GCS→bronze trace log lines.""" + return { + "correlation_id": correlation_id, + "inst_id": inst_id, + "bucket": bucket, + "file_name": file_name, + } + + +def _log_bronze_sync_skipped(trace_base: Dict[str, Any], skip_reason: str) -> None: + _log_validation_trace_json( + "gcs_bronze_sync_background_done", + **trace_base, + outcome="skipped", + skip_reason=skip_reason, + ) + + +def _log_bronze_sync_success( + trace_base: Dict[str, Any], + validated_blob_path: str, + job_run_id: int, +) -> None: + _log_validation_trace_json( + "gcs_bronze_sync_background_done", + **trace_base, + outcome="success", + validated_blob_path=validated_blob_path, + databricks_job_run_id=job_run_id, + databricks_job_name=VALIDATED_BRONZE_SYNC_JOB_NAME, + ) + + +def _log_bronze_sync_trigger_failed( + trace_base: Dict[str, Any], validated_blob_path: str, correlation_id: str +) -> None: + _log_validation_trace_json( + "gcs_bronze_sync_background_done", + **trace_base, + outcome="trigger_failed", + validated_blob_path=validated_blob_path, + databricks_job_name=VALIDATED_BRONZE_SYNC_JOB_NAME, + ) + logger.exception( + "Failed to trigger GCS→bronze Databricks job after validation (non-fatal). " + "correlation_id=%s", + correlation_id, + ) + + +def _attempt_gcs_bronze_sync_trigger( + inst_name: str, + bucket: str, + validated_blob_path: str, + databricks_control: DatabricksControl, + trace_base: Dict[str, Any], + correlation_id: str, +) -> None: + """Call Databricks to start the bronze sync job and log success.""" + sync_resp = databricks_control.run_validated_gcs_to_bronze_sync( + DatabricksBronzeSyncRequest( + inst_name=inst_name, + gcp_bucket_name=bucket, + validated_blob_paths=[validated_blob_path], + ) + ) + _log_bronze_sync_success(trace_base, validated_blob_path, sync_resp.job_run_id) + + +def _trigger_gcs_bronze_sync_if_applicable( + inst_name: str, + edvise_id: Optional[str], + legacy_id: Optional[str], + inst_id: str, + file_name: str, + bucket: str, + databricks_control: DatabricksControl, + correlation_id: str, +) -> None: + """Trigger Databricks job to copy validated/ into bronze without waiting for the copy.""" + trace_base = _bronze_sync_trace_base(correlation_id, inst_id, bucket, file_name) + _log_validation_trace_json("gcs_bronze_sync_background_start", **trace_base) + + skip_reason = _gcs_bronze_sync_skip_reason(edvise_id, legacy_id) + if skip_reason is not None: + _log_bronze_sync_skipped(trace_base, skip_reason) + return + + validated_blob_path = f"validated/{file_name}" + try: + _attempt_gcs_bronze_sync_trigger( + inst_name, + bucket, + validated_blob_path, + databricks_control, + trace_base, + correlation_id, + ) + except Exception: + _log_bronze_sync_trigger_failed(trace_base, validated_blob_path, correlation_id) + + # Cache for EDA data - TTL of 10 minutes (600 seconds) # Cache key format: f"{inst_id}:{batch_id}" EDA_CACHE_TTL = int(os.getenv("EDA_CACHE_TTL", "600")) # Default 10 minutes @@ -1539,6 +1675,7 @@ def validation_helper( current_user: BaseUser, storage_control: StorageControl, sql_session: Session, + databricks_control: DatabricksControl, ) -> Any: """Run file validation for an institution and upsert the file record. @@ -1553,6 +1690,7 @@ def validation_helper( current_user: Authenticated user; must have access to inst_id. storage_control: StorageControl instance for GCS and validate_file. sql_session: DB session for institution, schema, and file record. + databricks_control: Starts the GCS→Databricks bronze sync after validation. Returns: Dict with name, inst_id, file_types, source, status. @@ -1592,6 +1730,15 @@ def validation_helper( allowed_schemas = _infer_allowed_schemas_from_filename(file_name, inst) base_schema_id, base_schema, now = _get_validation_base_schema(sess) bucket = get_external_bucket_name(inst_id) + correlation_id = str(uuid.uuid4()) + _log_validation_trace_json( + "validation_request", + correlation_id=correlation_id, + inst_id=inst_id, + bucket=bucket, + file_name=file_name, + validation_source=source_str, + ) schema_namespace, updated_inst_schema = _resolve_schema_namespace_and_extension( sess, inst, @@ -1603,7 +1750,7 @@ def validation_helper( base_schema_id, file_name, ) - return _run_validation_and_upsert_file_record( + result = _run_validation_and_upsert_file_record( bucket, file_name, allowed_schemas, @@ -1616,6 +1763,19 @@ def validation_helper( storage_control, sess, ) + # GCS validated/ write is complete; start the Databricks run now. The API waits + # only for run_now to return a run id, not for cluster startup or file copying. + _trigger_gcs_bronze_sync_if_applicable( + inst.name, + inst.edvise_id, + inst.legacy_id, + inst_id, + file_name, + bucket, + databricks_control, + correlation_id, + ) + return result @router.post( @@ -1627,6 +1787,7 @@ def validate_file_sftp( current_user: Annotated[BaseUser, Depends(get_current_active_user)], storage_control: Annotated[StorageControl, Depends(StorageControl)], sql_session: Annotated[Session, Depends(get_session)], + databricks_control: Annotated[DatabricksControl, Depends(DatabricksControl)], ) -> Any: """Validate a given file pulled from SFTP. The file_name should be url encoded.""" file_name = decode_url_piece(file_name) @@ -1636,7 +1797,13 @@ def validate_file_sftp( detail="SFTP validation needs to be done by a datakinder.", ) return validation_helper( - "PDP_SFTP", inst_id, file_name, current_user, storage_control, sql_session + "PDP_SFTP", + inst_id, + file_name, + current_user, + storage_control, + sql_session, + databricks_control, ) @@ -1649,13 +1816,20 @@ def validate_file_manual_upload( current_user: Annotated[BaseUser, Depends(get_current_active_user)], storage_control: Annotated[StorageControl, Depends(StorageControl)], sql_session: Annotated[Session, Depends(get_session)], + databricks_control: Annotated[DatabricksControl, Depends(DatabricksControl)], ) -> Any: """Validate a given file. The file_name should be url encoded.""" file_name = decode_url_piece(file_name) return validation_helper( - "MANUAL_UPLOAD", inst_id, file_name, current_user, storage_control, sql_session + "MANUAL_UPLOAD", + inst_id, + file_name, + current_user, + storage_control, + sql_session, + databricks_control, ) diff --git a/src/webapp/routers/data_test.py b/src/webapp/routers/data_test.py index 7add80bd..aafc8e4e 100644 --- a/src/webapp/routers/data_test.py +++ b/src/webapp/routers/data_test.py @@ -37,8 +37,11 @@ ) from fastapi import HTTPException from ..gcsutil import StorageControl +from ..databricks import DatabricksControl MOCK_STORAGE = mock.Mock() +MOCK_DATABRICKS = mock.Mock() +MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.return_value = mock.Mock(job_run_id=1) UUID_2 = uuid.UUID("9bcbc782-2e71-4441-afa2-7a311024a5ec") FILE_UUID_1 = uuid.UUID("f0bb3a20-6d92-4254-afed-6a72f43c562a") @@ -214,10 +217,14 @@ def get_current_active_user_override(): def storage_control_override(): return MOCK_STORAGE + def databricks_control_override(): + return MOCK_DATABRICKS + app.include_router(router) app.dependency_overrides[get_session] = get_session_override app.dependency_overrides[get_current_active_user] = get_current_active_user_override app.dependency_overrides[StorageControl] = storage_control_override + app.dependency_overrides[DatabricksControl] = databricks_control_override client = TestClient(app) yield client @@ -898,6 +905,97 @@ def mock_read_csv(bucket_name: str, blob_path: str) -> pd.DataFrame: EDVISE_INST_2_UUID = uuid.UUID("bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb") EDVISE_SCHEMA_UUID = uuid.UUID("cccccccc-cccc-cccc-cccc-cccccccccccc") LEGACY_INST_UUID = uuid.UUID("dddddddd-dddd-dddd-dddd-dddddddddddd") +PDP_ONLY_INST_UUID = uuid.UUID("eeeeeeee-eeee-eeee-eeee-eeeeeeeeeeee") + + +@pytest.fixture(name="pdp_only_session") +def pdp_only_session_fixture(): + """Database setup for PDP-only institution (pdp_id set; no edvise_id or legacy_id).""" + engine = sqlalchemy.create_engine( + "sqlite://", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(engine) + pdp_schema_doc = { + "version": "1.0.0", + "institutions": {"pdp": {"data_models": {}}}, + } + try: + with sqlalchemy.orm.Session(engine) as session: + session.add_all( + [ + InstTable( + id=PDP_ONLY_INST_UUID, + name="pdp_only_school", + pdp_id="pdp999", + edvise_id=None, + legacy_id=None, + schemas=["STUDENT"], + created_at=DATETIME_TESTING, + updated_at=DATETIME_TESTING, + ), + SchemaRegistryTable( + doc_type=DocType.base, + is_pdp=False, + is_edvise=False, + version_label="1.0.0", + json_doc={"version": "1.0.0", "base": {"data_models": {}}}, + is_active=True, + created_at=DATETIME_TESTING, + ), + SchemaRegistryTable( + doc_type=DocType.extension, + is_pdp=True, + is_edvise=False, + version_label="pdp-1.0.0", + json_doc=pdp_schema_doc, + is_active=True, + created_at=DATETIME_TESTING, + ), + ] + ) + session.commit() + yield session + finally: + Base.metadata.drop_all(engine) + + +@pytest.fixture(name="pdp_only_client") +def pdp_only_client_fixture( + pdp_only_session: sqlalchemy.orm.Session, monkeypatch: Any +) -> Any: + """Test client for PDP-only institution validation tests.""" + monkeypatch.setenv("SST_SKIP_EXT_GEN", "1") + + def get_session_override(): + return pdp_only_session + + def get_current_active_user_override(): + from ..utilities import AccessType, BaseUser + + return BaseUser( + uuid_to_str(USER_UUID), + uuid_to_str(PDP_ONLY_INST_UUID), + AccessType.MODEL_OWNER, + "abc@example.com", + ) + + def storage_control_override(): + return MOCK_STORAGE + + def databricks_control_override(): + return MOCK_DATABRICKS + + app.include_router(router) + app.dependency_overrides[get_session] = get_session_override + app.dependency_overrides[get_current_active_user] = get_current_active_user_override + app.dependency_overrides[StorageControl] = storage_control_override + app.dependency_overrides[DatabricksControl] = databricks_control_override + + client = TestClient(app) + yield client + app.dependency_overrides.clear() @pytest.fixture(name="legacy_session") @@ -963,10 +1061,14 @@ def get_current_active_user_override(): def storage_control_override(): return MOCK_STORAGE + def databricks_control_override(): + return MOCK_DATABRICKS + app.include_router(router) app.dependency_overrides[get_session] = get_session_override app.dependency_overrides[get_current_active_user] = get_current_active_user_override app.dependency_overrides[StorageControl] = storage_control_override + app.dependency_overrides[DatabricksControl] = databricks_control_override client = TestClient(app) yield client @@ -1083,10 +1185,14 @@ def get_current_active_user_override(): def storage_control_override(): return MOCK_STORAGE + def databricks_control_override(): + return MOCK_DATABRICKS + app.include_router(router) app.dependency_overrides[get_session] = get_session_override app.dependency_overrides[get_current_active_user] = get_current_active_user_override app.dependency_overrides[StorageControl] = storage_control_override + app.dependency_overrides[DatabricksControl] = databricks_control_override client = TestClient(app) yield client @@ -1100,6 +1206,7 @@ def storage_control_override(): def test_validate_file_with_edvise_schema(edvise_client: TestClient) -> None: """Test file upload validation uses Edvise Schema (ES) when edvise_id is set.""" MOCK_STORAGE.validate_file.return_value = ["STUDENT"] + MOCK_DATABRICKS.reset_mock() response = edvise_client.post( "/institutions/" @@ -1118,10 +1225,17 @@ def test_validate_file_with_edvise_schema(edvise_client: TestClient) -> None: call_kwargs = MOCK_STORAGE.validate_file.call_args.kwargs assert call_kwargs.get("institution_identifier") == uuid_to_str(EDVISE_INST_UUID) + # GCS → bronze Databricks job triggered for Edvise schools + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.assert_called_once() + sync_req = MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.call_args[0][0] + assert sync_req.validated_blob_paths == ["validated/edvise_student_file.csv"] + assert sync_req.inst_name == "edvise_school" + def test_validate_file_with_legacy_schema(legacy_client: TestClient) -> None: """Test file upload validation uses legacy (any-format) path when legacy_id is set.""" MOCK_STORAGE.validate_file.return_value = ["STUDENT"] + MOCK_DATABRICKS.reset_mock() response = legacy_client.post( "/institutions/" @@ -1135,10 +1249,75 @@ def test_validate_file_with_legacy_schema(legacy_client: TestClient) -> None: assert response.json()["inst_id"] == uuid_to_str(LEGACY_INST_UUID) assert MOCK_STORAGE.validate_file.called + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.assert_called_once() + sync_req = MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.call_args[0][0] + assert sync_req.validated_blob_paths == ["validated/legacy_student_data.csv"] + assert sync_req.inst_name == "legacy_school" call_kwargs = MOCK_STORAGE.validate_file.call_args.kwargs assert call_kwargs.get("institution_id") == "legacy" +def test_validate_upload_pdp_only_institution_skips_bronze_sync( + pdp_only_client: TestClient, +) -> None: + """PDP-only schools do not trigger GCS→bronze sync (Edvise/Legacy pipeline only).""" + MOCK_STORAGE.validate_file.return_value = ["STUDENT"] + MOCK_DATABRICKS.reset_mock() + + response = pdp_only_client.post( + "/institutions/" + + uuid_to_str(PDP_ONLY_INST_UUID) + + "/input/validate-upload/pdp_student_file.csv", + ) + + assert response.status_code == 200 + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.assert_not_called() + + +def test_validate_upload_skips_bronze_sync_when_env_disabled( + edvise_client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + """ENABLE_GCS_BRONZE_SYNC_ON_VALIDATION=false disables bronze sync for Edvise schools.""" + monkeypatch.setenv("ENABLE_GCS_BRONZE_SYNC_ON_VALIDATION", "false") + MOCK_STORAGE.validate_file.return_value = ["STUDENT"] + MOCK_DATABRICKS.reset_mock() + + response = edvise_client.post( + "/institutions/" + + uuid_to_str(EDVISE_INST_UUID) + + "/input/validate-upload/edvise_student_file.csv", + ) + + assert response.status_code == 200 + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.assert_not_called() + + +def test_validate_upload_databricks_trigger_failure_is_non_fatal( + edvise_client: TestClient, +) -> None: + """Databricks trigger errors do not fail validation after the file is validated.""" + MOCK_STORAGE.validate_file.return_value = ["STUDENT"] + MOCK_DATABRICKS.reset_mock() + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.side_effect = RuntimeError( + "network failed" + ) + try: + response = edvise_client.post( + "/institutions/" + + uuid_to_str(EDVISE_INST_UUID) + + "/input/validate-upload/edvise_student_file.csv", + ) + + assert response.status_code == 200 + assert response.json()["name"] == "edvise_student_file.csv" + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.assert_called_once() + finally: + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.side_effect = None + MOCK_DATABRICKS.run_validated_gcs_to_bronze_sync.return_value = mock.Mock( + job_run_id=1 + ) + + def test_validate_upload_rejects_empty_file_name(legacy_client: TestClient) -> None: """Empty or whitespace-only file name returns 422.""" # Trailing space in path decodes to " " @@ -1611,10 +1790,14 @@ def get_current_active_user_override(): def storage_control_override(): return MOCK_STORAGE + def databricks_control_override(): + return MOCK_DATABRICKS + app.include_router(router) app.dependency_overrides[get_session] = get_session_override app.dependency_overrides[get_current_active_user] = get_current_active_user_override app.dependency_overrides[StorageControl] = storage_control_override + app.dependency_overrides[DatabricksControl] = databricks_control_override client = TestClient(app) try: