Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/webapp/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ DATABRICKS_WORKSPACE=""
DATABRICKS_HOST_URL=""
# The service account used to read GCP buckets from Databricks.
DATABRICKS_SERVICE_ACCOUNT_EMAIL=""
# Optional. Numeric job id for edvise_validated_gcs_to_bronze_sync (from Databricks job URL / API).
# If unset, the API resolves the job by name and errors if the name is missing or ambiguous.
# DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID=""

# Datakinders allowed to issue API key. This should be the MINIMUM set. Keep this group small. Pass as a comma separated string structured like so: "abc@dk.org,bcd@dk.org"
# The initial value set is api_key_initial which is the initial API key, this is needed for one-time setup. You can remove this once the api key table is populated.
Expand Down
241 changes: 241 additions & 0 deletions src/webapp/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from pydantic import BaseModel
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.service import catalog
from databricks.sdk.service.sql import (
Format,
Expand Down Expand Up @@ -33,6 +34,173 @@

# The name of the deployed pipeline in Databricks. Must match directly.
PDP_INFERENCE_JOB_NAME = "edvise_github_sourced_pdp_inference_pipeline"
# GCS validated/ → institution bronze_volume/gcs_uploads (edvise bundle job name).
VALIDATED_BRONZE_SYNC_JOB_NAME = "edvise_validated_gcs_to_bronze_sync"
# Optional: numeric Databricks job id. If unset, the job is resolved by name (must be unique).
DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV = "DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID"
# Environment-specific Databricks job ids for deployed API environments.
VALIDATED_BRONZE_SYNC_JOB_IDS_BY_ENV = {
"DEV": 1005654397694881,
"STAGING": 611181637854021,
}

# Must match edvise bundle job parameters (github_validated_bronze_sync.yml).
BRONZE_SYNC_GCS_SOURCE_PREFIX = "validated/"
BRONZE_SYNC_BRONZE_SUBDIR = "gcs_uploads"
BRONZE_SYNC_MAX_OBJECTS = "1000"
BRONZE_SYNC_REQUIRE_AT_LEAST_ONE_FILE = "true"
BRONZE_SYNC_STRICT_MODE = "auto"


def _create_databricks_workspace_client(operation: str) -> WorkspaceClient:
"""
Create a Databricks WorkspaceClient using configured host and GCP service account.

Args:
operation: Label for error messages (e.g. ``run_validated_gcs_to_bronze_sync``).

Returns:
Initialized workspace client.

Raises:
ValueError: If client creation fails.
"""
try:
return WorkspaceClient(
host=databricks_vars["DATABRICKS_HOST_URL"],
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
)
except (OSError, DatabricksError) as exc:
LOGGER.exception(
"Failed to create Databricks WorkspaceClient for %s: host=%s service_account=%s",
operation,
databricks_vars["DATABRICKS_HOST_URL"],
gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
)
raise ValueError(f"{operation}(): Workspace client failed: {exc}") from exc


def _run_databricks_job_now(
workspace: WorkspaceClient,
job_id: int,
job_parameters: dict[str, str],
operation: str,
) -> int:
"""
Start a Databricks job run and return the run id.

Raises:
ValueError: If the Jobs API does not return a run id.
"""
try:
run_job: Any = workspace.jobs.run_now(job_id, job_parameters=job_parameters)
except DatabricksError as exc:
LOGGER.exception(
"Databricks job run failed for %s (job_id=%s).", operation, job_id
)
raise ValueError(f"{operation}(): Job could not be run: {exc}") from exc

if not run_job.response or run_job.response.run_id is None:
raise ValueError(f"{operation}(): No run_id returned.")

return int(run_job.response.run_id)


def _resolve_validated_bronze_sync_job_id(w: WorkspaceClient) -> int:
"""
Return the job id for the GCS→bronze sync job.

Prefer ``DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID`` when set (stable across renames).
Otherwise resolve by exact name, deployed environment, then a unique bundle-prefixed name.
"""
raw = (os.environ.get(DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV) or "").strip()
if raw:
if not raw.isdigit():
raise ValueError(
f"{DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} must be a positive integer "
f"(Databricks job id) if set; got {raw!r}."
)
job_id = int(raw)
if job_id <= 0:
raise ValueError(
f"{DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} must be positive; got {job_id}."
)
LOGGER.info(
"Bronze sync job id from %s=%s",
DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV,
job_id,
)
return job_id

jobs = list(w.jobs.list(name=VALIDATED_BRONZE_SYNC_JOB_NAME))
if len(jobs) == 0:
env_job_id = _resolve_validated_bronze_sync_job_id_by_environment()
if env_job_id is not None:
return env_job_id
if len(jobs) == 0:
jobs = _find_validated_bronze_sync_jobs_by_suffix(w)
if len(jobs) == 0:
raise ValueError(
f"Job named {VALIDATED_BRONZE_SYNC_JOB_NAME!r} or a unique bundle-prefixed "
f"variant was not found. "
f"Deploy the bundle job or set {DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} "
"to the numeric job id from the Databricks UI / API."
)
if len(jobs) > 1:
ids = [j.job_id for j in jobs if j.job_id is not None]
raise ValueError(
f"Multiple ({len(jobs)}) jobs matched {VALIDATED_BRONZE_SYNC_JOB_NAME!r}; "
f"set {DATABRICKS_VALIDATED_BRONZE_SYNC_JOB_ID_ENV} to the correct id. Found job_ids={ids}."
)
job = jobs[0]
if job.job_id is None:
raise ValueError(
f"Job matching {VALIDATED_BRONZE_SYNC_JOB_NAME!r} has no job_id in list response."
)
job_id = job.job_id
LOGGER.info(
"Resolved bronze sync job %r: job_id=%s",
_databricks_job_name(job) or VALIDATED_BRONZE_SYNC_JOB_NAME,
job_id,
)
return job_id


def _resolve_validated_bronze_sync_job_id_by_environment() -> Optional[int]:
"""Return the deployed Databricks job id for the current API environment."""
env = (os.environ.get("ENV") or "").strip().upper()
job_id = VALIDATED_BRONZE_SYNC_JOB_IDS_BY_ENV.get(env)
if job_id is not None:
LOGGER.info("Bronze sync job id from ENV=%s mapping: job_id=%s", env, job_id)
return job_id


def _databricks_job_name(job: Any) -> Optional[str]:
"""Return the display name from a Databricks job list item, if present."""
settings = getattr(job, "settings", None)
name = getattr(settings, "name", None)
if isinstance(name, str):
return name
name = getattr(job, "name", None)
if isinstance(name, str):
return name
return None


def _find_validated_bronze_sync_jobs_by_suffix(w: WorkspaceClient) -> list[Any]:
"""
Find a Databricks Asset Bundle dev-mode job with a prefixed display name.

Development-mode bundle jobs can be named like
``[dev service_principal] edvise_validated_gcs_to_bronze_sync``. Only a
single suffix match is accepted by the caller.
"""
suffix = f" {VALIDATED_BRONZE_SYNC_JOB_NAME}"
return [
job
for job in w.jobs.list()
if (name := _databricks_job_name(job)) is not None and name.endswith(suffix)
]


class DatabricksInferenceRunRequest(BaseModel):
Expand All @@ -53,6 +221,41 @@ class DatabricksInferenceRunResponse(BaseModel):
job_run_id: int


class DatabricksBronzeSyncRequest(BaseModel):
"""Parameters to copy validated GCS objects into the institution bronze volume."""

inst_name: str
gcp_bucket_name: str
# Full object paths in the bucket, e.g. ["validated/file.csv"].
validated_blob_paths: list[str]


class DatabricksBronzeSyncResponse(BaseModel):
"""Result of triggering the bronze sync Databricks job."""

job_run_id: int


def _build_validated_bronze_sync_job_parameters(
req: DatabricksBronzeSyncRequest,
databricks_institution_name: str,
) -> dict[str, str]:
"""Build job_parameters dict for the GCS→bronze sync Databricks job."""
include_json = json.dumps(req.validated_blob_paths, separators=(",", ":"))
return {
"gcp_bucket_name": req.gcp_bucket_name,
"databricks_institution_name": databricks_institution_name,
"DB_workspace": databricks_vars["DATABRICKS_WORKSPACE"],
"sync_run_id": "",
"gcs_source_prefix": BRONZE_SYNC_GCS_SOURCE_PREFIX,
"bronze_subdir": BRONZE_SYNC_BRONZE_SUBDIR,
"max_objects": BRONZE_SYNC_MAX_OBJECTS,
"require_at_least_one_file": BRONZE_SYNC_REQUIRE_AT_LEAST_ONE_FILE,
"strict_mode": BRONZE_SYNC_STRICT_MODE,
"include_blob_paths_json": include_json,
}


def get_filepath_of_filetype(
file_dict: dict[str, list[SchemaType]], file_type: SchemaType
) -> str:
Expand Down Expand Up @@ -261,6 +464,44 @@ def run_pdp_inference(

return DatabricksInferenceRunResponse(job_run_id=run_id)

def run_validated_gcs_to_bronze_sync(
self, req: DatabricksBronzeSyncRequest
) -> DatabricksBronzeSyncResponse:
"""
Trigger the job that copies validated/ objects from GCS into bronze_volume/gcs_uploads.

Args:
req: Institution name, bucket, and full GCS object paths under validated/.

Returns:
Response containing the Databricks job run id (run started, not completed).

Raises:
ValueError: If paths are empty, configuration is invalid, or the job cannot start.
"""
operation = "run_validated_gcs_to_bronze_sync"
if not req.validated_blob_paths:
raise ValueError(f"{operation}: validated_blob_paths must be non-empty.")

LOGGER.info(
"Triggering GCS→bronze sync for institution: %s (%s objects)",
req.inst_name,
len(req.validated_blob_paths),
)

workspace = _create_databricks_workspace_client(operation)
try:
job_id = _resolve_validated_bronze_sync_job_id(workspace)
except ValueError as exc:
LOGGER.exception("Job resolution failed for GCS→bronze sync.")
raise ValueError(f"{operation}(): Failed to resolve job: {exc}") from exc

db_inst_name = databricksify_inst_name(req.inst_name)
job_parameters = _build_validated_bronze_sync_job_parameters(req, db_inst_name)
run_id = _run_databricks_job_now(workspace, job_id, job_parameters, operation)
LOGGER.info("GCS→bronze sync job started. Run ID: %s", run_id)
return DatabricksBronzeSyncResponse(job_run_id=run_id)

def delete_inst(self, inst_name: str) -> None:
"""Cleanup tasks required on the Databricks side to delete an institution."""
db_inst_name = databricksify_inst_name(inst_name)
Expand Down
Loading
Loading