Skip to content

Commit 42c608f

Browse files
committed
Reconcile dual jobs systems and fix end-to-end model pipeline
- Wire POST /api/v1/jobs to ModelOrchestrator so SDK jobs actually execute instead of staying stuck in "pending" - Bridge legacy model train/execute endpoints to create Job records - Add model_run_id column to Job for status sync with ModelRun - Fix sklearn pickle version mismatch: generated MODEL.py wrappers always create fresh models for training and fall back gracefully on inference - Add sklearn retry logic in runner.py for models with old wrapper code - Pin scikit-learn==1.5.2 in both workspace and model-runner containers - Fix YAML quoting in _write_model_yaml() for descriptions with colons - Upgrade /jobs page with auto-polling, slide-in logs panel, toasts - Remove Jobs tab from Models page (standalone /jobs is now authoritative) - Add e2e test that simulates notebook #9 via API (register, train, infer) - Add 40 unit tests for SDK endpoints and jobs API
1 parent e6c8fb1 commit 42c608f

21 files changed

Lines changed: 2578 additions & 269 deletions

File tree

.dockerignore

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# prevent SDK-registered models from being baked into Docker images
2+
core/model_library/**/model.pkl
3+
4+
# python
5+
__pycache__/
6+
*.py[cod]
7+
venv/
8+
.pytest_cache/
9+
.mypy_cache/
10+
11+
# frontend (built separately)
12+
interface/node_modules/
13+
interface/.next/
14+
interface/out/
15+
16+
# IDE / OS
17+
.git/
18+
.idea/
19+
.vscode/
20+
.DS_Store
21+
22+
# local dev
23+
.openuba/
24+
metastore/
25+
spark-warehouse/
26+
*.log
27+
*.tar
28+
29+
# sdk build artifacts
30+
sdk/dist/
31+
sdk/build/
32+
33+
# test screenshots
34+
core/tests/e2e/screenshots/

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ core/model_library/e2e_test_model_*
1515
core/model_library/test_model_*
1616
core/model_library/inference/
1717
core/model_library/*_backup/
18+
# SDK-registered models (contain model.pkl from register_model)
19+
core/model_library/**/model.pkl
1820
core/storage/saved_models/**/model.pkl
1921
core/storage/saved_models/**/*.pt
2022
core/storage/saved_models/**/*.pb

core/api_routers/jobs.py

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from core.db import get_db
1515
from core.repositories.job_repository import JobRepository
16+
from core.repositories.model_repository import ModelRepository
1617
from core.api_schemas.jobs import (
1718
JobCreate, JobUpdate, JobResponse,
1819
JobLogCreate, JobLogResponse,
@@ -31,8 +32,67 @@ async def create_job(
3132
current_user: dict = Depends(require_permission("jobs", "write"))
3233
):
3334
'''
34-
create a new job
35+
create a new job and dispatch execution via the model orchestrator
3536
'''
37+
# verify model exists and is in a runnable state
38+
model_repo = ModelRepository(db)
39+
model = model_repo.get_by_id(job_data.model_id)
40+
if not model:
41+
raise HTTPException(status_code=404, detail="model not found")
42+
if model.status not in ("installed", "active"):
43+
raise HTTPException(
44+
status_code=400,
45+
detail=f"model must be installed or active to run, current status: {model.status}"
46+
)
47+
48+
# map job_type to orchestrator run_type
49+
run_type_map = {"training": "train", "inference": "infer", "evaluation": "infer"}
50+
run_type = run_type_map.get(job_data.job_type, "infer")
51+
52+
# build input_data for the orchestrator
53+
input_data = job_data.input_data.copy() if job_data.input_data else {}
54+
55+
# if dataset_id provided but no explicit data_source, resolve from dataset record
56+
if job_data.dataset_id and "data_source" not in input_data:
57+
from core.db.models import Dataset
58+
dataset = db.query(Dataset).filter(Dataset.id == job_data.dataset_id).first()
59+
if dataset and dataset.source_type:
60+
if dataset.source_type == "elasticsearch" or dataset.source_type == "es":
61+
input_data["data_source"] = "elasticsearch"
62+
if dataset.file_path:
63+
input_data["index_name"] = dataset.file_path
64+
elif dataset.source_type == "spark":
65+
input_data["data_source"] = "spark"
66+
if dataset.file_path:
67+
input_data["table_name"] = dataset.file_path
68+
elif dataset.source_type in ("local_csv", "upload"):
69+
if dataset.file_path:
70+
input_data["data_source"] = "local_csv"
71+
input_data["file_path"] = str(dataset.file_path)
72+
73+
# merge hyperparameters as params
74+
if job_data.hyperparameters:
75+
input_data["params"] = job_data.hyperparameters
76+
77+
# dispatch execution via orchestrator
78+
model_run_id = None
79+
try:
80+
from core.services.model_orchestrator import ModelOrchestrator
81+
orchestrator = ModelOrchestrator()
82+
model_run_id = orchestrator.execute_model_background(
83+
job_data.model_id,
84+
input_data=input_data if input_data else None,
85+
run_type=run_type,
86+
)
87+
logger.info(f"job execution dispatched: model_run_id={model_run_id}")
88+
except Exception as e:
89+
logger.error(f"failed to dispatch job execution: {e}")
90+
raise HTTPException(
91+
status_code=500,
92+
detail=f"failed to dispatch execution: {e}"
93+
)
94+
95+
# create job record linked to the model run
3696
repo = JobRepository(db)
3797
job = repo.create(
3898
name=job_data.name,
@@ -41,9 +101,13 @@ async def create_job(
41101
created_by=UUID(current_user["user_id"]),
42102
dataset_id=job_data.dataset_id,
43103
hardware_tier=job_data.hardware_tier,
44-
hyperparameters=job_data.hyperparameters
104+
hyperparameters=job_data.hyperparameters,
105+
model_run_id=model_run_id,
45106
)
46-
logger.info(f"job created: {job.id}")
107+
# set status to running since orchestrator dispatched
108+
repo.update(job.id, status="running")
109+
110+
logger.info(f"job created: {job.id} -> model_run {model_run_id}")
47111
return job
48112

49113

@@ -59,6 +123,7 @@ async def list_jobs(
59123
):
60124
'''
61125
list jobs with optional filters
126+
status is synced from linked ModelRun at read time
62127
'''
63128
repo = JobRepository(db)
64129
jobs = repo.list_all(
@@ -78,12 +143,13 @@ async def get_job(
78143
current_user: dict = Depends(require_permission("jobs", "read"))
79144
):
80145
'''
81-
get job by id
146+
get job by id, syncs status from linked ModelRun
82147
'''
83148
repo = JobRepository(db)
84149
job = repo.get_by_id(job_id)
85150
if not job:
86151
raise HTTPException(status_code=404, detail="job not found")
152+
job = repo.sync_from_model_run(job)
87153
return job
88154

89155

@@ -119,7 +185,7 @@ async def delete_job(
119185
raise HTTPException(status_code=404, detail="job not found")
120186

121187

122-
@router.get("/jobs/{job_id}/logs", response_model=List[JobLogResponse])
188+
@router.get("/jobs/{job_id}/logs")
123189
async def get_job_logs(
124190
job_id: UUID,
125191
limit: int = Query(1000, ge=1, le=10000),
@@ -128,13 +194,25 @@ async def get_job_logs(
128194
):
129195
'''
130196
get logs for a job
197+
falls back to model_logs from linked ModelRun if no job_logs exist
131198
'''
132199
repo = JobRepository(db)
133200
job = repo.get_by_id(job_id)
134201
if not job:
135202
raise HTTPException(status_code=404, detail="job not found")
136203
logs = repo.get_logs(job_id, limit=limit)
137-
return logs
204+
# normalize to a consistent format (ModelLog has model_run_id, JobLog has job_id)
205+
result = []
206+
for log in logs:
207+
result.append({
208+
"id": str(log.id),
209+
"job_id": str(job_id),
210+
"level": log.level,
211+
"message": log.message,
212+
"logger_name": getattr(log, "logger_name", None),
213+
"created_at": str(log.created_at),
214+
})
215+
return result
138216

139217

140218
@router.get("/jobs/{job_id}/metrics", response_model=List[TrainingMetricResponse])
@@ -188,6 +266,7 @@ async def stream_job_metrics(
188266
'''
189267
SSE endpoint for streaming job metrics in real-time
190268
the frontend connects to this and receives metric updates as they happen
269+
also syncs status from linked ModelRun
191270
'''
192271
import json
193272

@@ -221,9 +300,10 @@ async def event_generator():
221300
}
222301
last_count = current_count
223302

224-
# send job status updates only when something changed
303+
# sync status from model run and send updates
225304
job_now = repo.get_by_id(job_id)
226305
if job_now:
306+
job_now = repo.sync_from_model_run(job_now)
227307
if job_now.status != last_status or current_count > last_count:
228308
yield {
229309
"event": "status",

core/api_routers/models.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from core.db import get_db
1616
from core.repositories.model_repository import ModelRepository
17+
from core.repositories.job_repository import JobRepository
1718
from core.api_schemas.models import ModelCreate, ModelUpdate, ModelResponse
1819
from core.auth import require_permission, get_current_user
1920

@@ -371,6 +372,20 @@ async def train_model(
371372
version_id=_version_id
372373
)
373374
logger.info(f"model training dispatched: run_id={run_id} for model {model_id}")
375+
376+
# create a Job record so this shows up in the /jobs page
377+
try:
378+
job_repo = JobRepository(db)
379+
job_repo.create(
380+
model_id=model_id,
381+
job_type="training",
382+
created_by=UUID(current_user["user_id"]),
383+
hardware_tier="cpu-small",
384+
model_run_id=run_id,
385+
)
386+
except Exception as je:
387+
logger.warning(f"failed to create job record for training: {je}")
388+
374389
return {
375390
"model_id": str(model_id),
376391
"run_id": str(run_id),
@@ -462,6 +477,20 @@ async def execute_model(
462477
artifact_id=_artifact_id
463478
)
464479
logger.info(f"model execution dispatched: run_id={run_id} for model {model_id}")
480+
481+
# create a Job record so this shows up in the /jobs page
482+
try:
483+
job_repo = JobRepository(db)
484+
job_repo.create(
485+
model_id=model_id,
486+
job_type="inference",
487+
created_by=UUID(current_user["user_id"]),
488+
hardware_tier="cpu-small",
489+
model_run_id=run_id,
490+
)
491+
except Exception as je:
492+
logger.warning(f"failed to create job record for inference: {je}")
493+
465494
return {
466495
"model_id": str(model_id),
467496
"run_id": str(run_id),

0 commit comments

Comments
 (0)