33
44from pydantic import BaseModel
55from sqlmodel import select
6+ from sqlmodel .orm .session import Session as SQLModelSession
67from werkzeug .utils import secure_filename
78
89from murfey .server import _transport_object
@@ -31,12 +32,12 @@ class SXTTiltSeriesInfo(BaseModel):
3132 xrm_reference : str | None
3233
3334
34- def process_sxt_tilt_series_workflow (
35+ def process_sxt_tilt_series (
3536 visit_name : str ,
3637 session_id : MurfeySessionID ,
3738 tilt_series_info : SXTTiltSeriesInfo ,
38- murfey_db : Session ,
39- ):
39+ murfey_db : SQLModelSession ,
40+ ) -> dict [ str , bool ] :
4041 tilt_series_query = murfey_db .exec (
4142 select (TiltSeries )
4243 .where (TiltSeries .session_id == session_id )
@@ -47,7 +48,7 @@ def process_sxt_tilt_series_workflow(
4748 tilt_series = tilt_series_query [0 ]
4849 if tilt_series .processing_requested :
4950 logger .info (f"Tilt series { tilt_series .tag } has already been processed" )
50- return
51+ return { "success" : True }
5152 else :
5253 tilt_series = TiltSeries (
5354 session_id = session_id ,
@@ -59,6 +60,7 @@ def process_sxt_tilt_series_workflow(
5960 murfey_db .add (tilt_series )
6061 murfey_db .commit ()
6162
63+ # Find all processing jobs registered for this tilt series
6264 collected_ids = murfey_db .exec (
6365 select (DataCollectionGroup , DataCollection , ProcessingJob , AutoProcProgram )
6466 .where (DataCollectionGroup .session_id == session_id )
@@ -67,8 +69,11 @@ def process_sxt_tilt_series_workflow(
6769 .where (DataCollection .dcg_id == DataCollectionGroup .id )
6870 .where (ProcessingJob .dc_id == DataCollection .id )
6971 .where (AutoProcProgram .pj_id == ProcessingJob .id )
70- .where (ProcessingJob .recipe == "sxt-aretomo" )
71- ).one ()
72+ ).all ()
73+ if len (collected_ids ) == 0 :
74+ logger .warning (f"No processing recipes found for { tilt_series .tag } " )
75+ return {"success" : False , "requeue" : False }
76+
7277 instrument_name = (
7378 murfey_db .exec (select (Session ).where (Session .id == session_id ))
7479 .one ()
@@ -78,46 +83,66 @@ def process_sxt_tilt_series_workflow(
7883 instrument_name
7984 ]
8085
86+ # Find the visit folder and any subfolders needed
8187 parts = [secure_filename (p ) for p in Path (tilt_series_info .txrm ).parts ]
8288 visit_idx = parts .index (visit_name )
8389 core = Path (* Path (tilt_series_info .txrm ).parts [: visit_idx + 1 ])
8490 ppath = Path (
8591 "/" .join (secure_filename (p ) for p in Path (tilt_series_info .txrm ).parts )
8692 )
87- sub_dataset = "/" .join (ppath .relative_to (core ).parts [:- 1 ])
88- extra_path = machine_config .processed_extra_directory
89- stack_file = (
90- core
91- / machine_config .processed_directory_name
92- / sub_dataset
93- / extra_path
94- / "Tomograms"
95- / f"{ tilt_series .tag } _stack.mrc"
96- )
97- stack_file .parent .mkdir (parents = True , exist_ok = True )
98- zocalo_message = {
99- "recipes" : ["sxt-aretomo" ],
100- "parameters" : {
101- "txrm_file" : tilt_series_info .txrm ,
102- "xrm_reference" : tilt_series_info .xrm_reference or "" ,
103- "dcid" : collected_ids [1 ].id ,
104- "appid" : collected_ids [3 ].id ,
105- "stack_file" : str (stack_file ),
106- "tilt_axis" : 0 ,
107- "pixel_size" : tilt_series_info .pixel_size ,
108- "manual_tilt_offset" : - tilt_series_info .tilt_offset ,
109- "node_creator_queue" : machine_config .node_creator_queue ,
110- },
111- }
112- if _transport_object :
113- logger .info (
114- f"Sending Zocalo message for processing: { sanitise (str (zocalo_message ))} "
115- )
116- _transport_object .send ("processing_recipe" , zocalo_message , new_connection = True )
117- else :
118- logger .info (
119- f"No transport object found. Zocalo message would be { sanitise (str (zocalo_message ))} "
93+ sub_dataset = "/" .join (ppath .relative_to (core ).parts [1 :- 1 ])
94+
95+ # Loop over all processing jobs, and send the alignment recipe for it
96+ for recipe_ids in collected_ids :
97+ # Stack file path needs to contain both recipe name and tilt series name
98+ stack_file = (
99+ core
100+ / machine_config .processed_directory_name
101+ / machine_config .processed_extra_directory
102+ / sub_dataset
103+ / tilt_series .tag
104+ / recipe_ids [2 ].recipe
105+ / "Tomograms"
106+ / f"{ tilt_series .tag } _stack.mrc"
120107 )
108+ stack_file .parent .mkdir (parents = True , exist_ok = True )
109+
110+ # Send message to rabbitmq
111+ zocalo_message = {
112+ "recipes" : [recipe_ids [2 ].recipe ],
113+ "parameters" : {
114+ "txrm_file" : tilt_series_info .txrm ,
115+ "xrm_reference" : tilt_series_info .xrm_reference or "" ,
116+ "dcid" : recipe_ids [1 ].id ,
117+ "appid" : recipe_ids [3 ].id ,
118+ "stack_file" : str (stack_file ),
119+ "tilt_axis" : 0 ,
120+ "pixel_size" : tilt_series_info .pixel_size ,
121+ "manual_tilt_offset" : - tilt_series_info .tilt_offset ,
122+ "node_creator_queue" : machine_config .node_creator_queue ,
123+ },
124+ }
125+ if _transport_object :
126+ logger .info (
127+ f"Sending Zocalo message for processing: { sanitise (str (zocalo_message ))} "
128+ )
129+ _transport_object .send (
130+ "processing_recipe" , zocalo_message , new_connection = True
131+ )
132+ else :
133+ logger .info (
134+ f"No transport object found. Zocalo message would be { sanitise (str (zocalo_message ))} "
135+ )
121136 tilt_series .processing_requested = True
122137 murfey_db .add (tilt_series )
123138 murfey_db .commit ()
139+ return {"success" : True }
140+
141+
142+ def run (message : dict , murfey_db : SQLModelSession ) -> dict [str , bool ]:
143+ return process_sxt_tilt_series (
144+ message ["visit_name" ],
145+ message ["session_id" ],
146+ SXTTiltSeriesInfo (** message ["tilt_series_info" ]),
147+ murfey_db ,
148+ )
0 commit comments