-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathregister_processing_job.py
More file actions
98 lines (88 loc) · 3.8 KB
/
Copy pathregister_processing_job.py
File metadata and controls
98 lines (88 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
from datetime import datetime
import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
from sqlmodel import select
from sqlmodel.orm.session import Session as SQLModelSession
import murfey.server.prometheus as prom
import murfey.util.db as MurfeyDB
from murfey.server import _transport_object
from murfey.server.ispyb import ISPyBSession
from murfey.util import sanitise
logger = logging.getLogger("murfey.workflows.register_processing_job")
def run(message: dict, murfey_db: SQLModelSession, demo: bool = False):
# Faill immediately if not transport manager is set
if _transport_object is None:
logger.error("Unable to find transport manager")
return {"success": False, "requeue": False}
logger.info(f"Registering the following processing job: \n{message}")
murfey_session_id = message["session_id"]
dc = murfey_db.exec(
select(MurfeyDB.DataCollection, MurfeyDB.DataCollectionGroup)
.where(MurfeyDB.DataCollection.dcg_id == MurfeyDB.DataCollectionGroup.id)
.where(MurfeyDB.DataCollectionGroup.session_id == murfey_session_id)
.where(MurfeyDB.DataCollectionGroup.tag == message["source"])
.where(MurfeyDB.DataCollection.tag == message["tag"])
).all()
if dc:
_dcid = dc[0][0].id
else:
logger.warning(f"No data collection ID found for {sanitise(message['tag'])}")
return {"success": False, "requeue": True}
if pj_murfey := murfey_db.exec(
select(MurfeyDB.ProcessingJob)
.where(MurfeyDB.ProcessingJob.recipe == message["recipe"])
.where(MurfeyDB.ProcessingJob.dc_id == _dcid)
).all():
pid = pj_murfey[0].id
else:
if ISPyBSession() is None:
murfey_pj = MurfeyDB.ProcessingJob(recipe=message["recipe"], dc_id=_dcid)
else:
record = ISPyBDB.ProcessingJob(
dataCollectionId=_dcid, recipe=message["recipe"]
)
run_parameters = message.get("parameters", {})
assert isinstance(run_parameters, dict)
if message.get("job_parameters"):
job_parameters = [
ISPyBDB.ProcessingJobParameter(parameterKey=k, parameterValue=v)
for k, v in message["job_parameters"].items()
]
pid = _transport_object.do_create_ispyb_job(
record, params=job_parameters
).get("return_value", None)
else:
pid = _transport_object.do_create_ispyb_job(record).get(
"return_value", None
)
if pid is None:
return {"success": False, "requeue": True}
murfey_pj = MurfeyDB.ProcessingJob(
id=pid, recipe=message["recipe"], dc_id=_dcid
)
murfey_db.add(murfey_pj)
murfey_db.commit()
pid = murfey_pj.id
murfey_db.close()
# Update Prometheus counter for preprocessed movies
prom.preprocessed_movies.labels(processing_job=pid)
# Register AutoProcProgram database entry if it doesn't already exist
if not murfey_db.exec(
select(MurfeyDB.AutoProcProgram).where(MurfeyDB.AutoProcProgram.pj_id == pid)
).all():
if ISPyBSession() is None:
murfey_app = MurfeyDB.AutoProcProgram(pj_id=pid)
else:
record = ISPyBDB.AutoProcProgram(
processingJobId=pid, processingStartTime=datetime.now()
)
appid = _transport_object.do_update_processing_status(record).get(
"return_value", None
)
if appid is None:
return {"success": False, "requeue": True}
murfey_app = MurfeyDB.AutoProcProgram(id=appid, pj_id=pid)
murfey_db.add(murfey_app)
murfey_db.commit()
murfey_db.close()
return {"success": True}