Skip to content

Commit 59a3276

Browse files
authored
Merge pull request #153 from datakind/develop
testing h2o pipeline on staging
2 parents 45900ce + 4f915e0 commit 59a3276

9 files changed

Lines changed: 825 additions & 438 deletions

File tree

src/webapp/database.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,9 @@ class ModelTable(Base):
511511
)
512512
# version is unused. version is not currently supported. The webapp only knows about the name of the model and any usages of a model will only use the live version.
513513
version: Mapped[int] = mapped_column(Integer, default=0)
514+
framework: Mapped[str | None] = mapped_column(
515+
String(VAR_CHAR_STANDARD_LENGTH), nullable=False, default="sklearn"
516+
)
514517

515518
# Within a given institution, there should be no duplicated model names.
516519
__table_args__ = (UniqueConstraint("name", "inst_id", name="model_name_inst_uc"),)
@@ -548,6 +551,9 @@ class JobTable(Base):
548551
String(VAR_CHAR_STANDARD_LENGTH), nullable=True
549552
)
550553
completed: Mapped[bool] = mapped_column(nullable=True)
554+
framework: Mapped[str | None] = mapped_column(
555+
String(VAR_CHAR_STANDARD_LENGTH), nullable=False, default="sklearn"
556+
)
551557

552558

553559
class DocType(enum.Enum):

src/webapp/databricks.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
# The name of the deployed pipeline in Databricks. Must match directly.
3737
PDP_INFERENCE_JOB_NAME = "github_sourced_pdp_inference_pipeline"
38+
PDP_H2O_INFERENCE_JOB_NAME = "edvise_github_sourced_pdp_inference_pipeline"
3839

3940

4041
class DatabricksInferenceRunRequest(BaseModel):
@@ -44,7 +45,7 @@ class DatabricksInferenceRunRequest(BaseModel):
4445
# Note that the following should be the filepath.
4546
filepath_to_type: dict[str, list[SchemaType]]
4647
model_name: str
47-
model_type: str = "sklearn"
48+
model_type: str
4849
# The email where notifications will get sent.
4950
email: str
5051
gcp_external_bucket_name: str
@@ -98,7 +99,17 @@ def setup_new_inst(self, inst_name: str) -> None:
9899
db_inst_name = databricksify_inst_name(inst_name)
99100
cat_name = databricks_vars["CATALOG_NAME"]
100101
for medallion in MEDALLION_LEVELS:
101-
w.schemas.create(name=f"{db_inst_name}_{medallion}", catalog_name=cat_name)
102+
try:
103+
w.schemas.create(
104+
name=f"{db_inst_name}_{medallion}", catalog_name=cat_name
105+
)
106+
except Exception as e:
107+
LOGGER.exception(
108+
f"Failed to provision schemas in databricks for {db_inst_name}_{medallion}: {e}"
109+
)
110+
raise ValueError(
111+
f"setup_new_inst(): Failed to provision schemas in databricks for {db_inst_name}_{medallion}: {e}"
112+
)
102113
LOGGER.info(
103114
f"Creating medallion level schemas for {db_inst_name} & {medallion}."
104115
)
@@ -192,16 +203,22 @@ def run_pdp_inference(
192203

193204
db_inst_name = databricksify_inst_name(req.inst_name)
194205

206+
if req.model_type == "sklearn":
207+
pipeline_type = PDP_INFERENCE_JOB_NAME
208+
elif req.model_type == "h2o":
209+
pipeline_type = PDP_H2O_INFERENCE_JOB_NAME
210+
else:
211+
raise ValueError("Invalid model framework assigned to institution model")
195212
try:
196-
job = next(w.jobs.list(name=PDP_INFERENCE_JOB_NAME), None)
213+
job = next(w.jobs.list(name=pipeline_type), None)
197214
if not job or job.job_id is None:
198215
raise ValueError(
199-
f"run_pdp_inference(): Job '{PDP_INFERENCE_JOB_NAME}' was not found or has no job_id."
216+
f"run_pdp_inference(): Job '{pipeline_type}' was not found or has no job_id for '{gcs_vars['GCP_SERVICE_ACCOUNT_EMAIL']}' and '{databricks_vars['DATABRICKS_HOST_URL']}'."
200217
)
201218
job_id = job.job_id
202-
LOGGER.info(f"Resolved job ID for '{PDP_INFERENCE_JOB_NAME}': {job_id}")
219+
LOGGER.info(f"Resolved job ID for '{pipeline_type}': {job_id}")
203220
except Exception as e:
204-
LOGGER.exception(f"Job lookup failed for '{PDP_INFERENCE_JOB_NAME}'.")
221+
LOGGER.exception(f"Job lookup failed for '{pipeline_type}'.")
205222
raise ValueError(f"run_pdp_inference(): Failed to find job: {e}")
206223

207224
try:

src/webapp/gcsutil.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,9 @@ def validate_file(
340340
f"If you see this file validation was successful {schems}"
341341
)
342342
except Exception as e:
343-
blob.delete()
344-
raise e
343+
logging.exception("Validation failed for %s: %s", file_name, e)
344+
raise
345+
345346
new_blob = bucket.blob(new_blob_name)
346347
if new_blob.exists():
347348
raise ValueError(new_blob_name + ": File already exists.")

0 commit comments

Comments
 (0)