3030)
3131from backend .storage .minio_client import download_file , upload_file
3232from backend .utils .docker_runner import Compstrat , run_pipeline
33- from backend .utils .results_sender import send_experiment_results_by_email
33+ from backend .utils .results_sender import (
34+ send_experiment_results_by_email ,
35+ send_task_failed_email ,
36+ )
3437from backend .utils .token_store import mark_task_completed
3538
3639celery_app = Celery ("tasks" , broker = REDIS_URL , backend = REDIS_URL )
@@ -86,98 +89,63 @@ def process_and_cleanup_task(
8689 metrics_by_lang : dict [str , RunstratMetrics ] = {}
8790
8891 try :
89- model_object_key = None
90- try :
91- model_object_key = upload_file (model_path , f"models/{ task_uid } /model.onnx" )
92- except Exception :
93- logger .exception ("MinIO model upload failed for task %s" , task_uid )
92+ model_object_key = upload_file (model_path , f"models/{ task_uid } /model.onnx" )
9493
9594 for lang in langs_to_run :
96- try :
97- csv_src = LANGUAGE_CSVS [lang ]
98- dest = get_thread_filepath (task_uid , LAUNCH_INFO_FILE )
99- os .makedirs (os .path .dirname (dest ), exist_ok = True )
100- shutil .copy2 (csv_src , dest )
101-
102- methods_launched = _count_methods_in_csv (csv_src )
103- suffix = f"_{ lang } "
104- run_pipeline (task_uid , out_suffix = suffix )
105-
106- ai_csv = _ai_csv_path (task_uid , suffix )
107- metrics = compute_metrics (ai_csv )
108- metrics_by_lang [lang ] = metrics
109-
110- results_object_key = None
111- try :
112- results_object_key = upload_file (
113- ai_csv , f"results/{ task_uid } /{ lang } /AI.csv"
114- )
115- except Exception :
116- logger .exception (
117- "MinIO results upload failed for task %s lang %s" ,
118- task_uid ,
119- lang ,
120- )
121-
122- try :
123- save_experiment (
124- experiment_name = experiment ,
125- model_name = model_name ,
126- email = email ,
127- metrics = metrics ,
128- methods_launched = methods_launched ,
129- language = lang ,
130- model_object_key = model_object_key ,
131- results_object_key = results_object_key ,
132- )
133- except Exception :
134- logger .exception (
135- "DB save failed for task %s lang %s" , task_uid , lang
136- )
137- except Exception :
138- logger .exception ("Pipeline failed for task %s lang %s" , task_uid , lang )
139-
140- if language == "all" and len (metrics_by_lang ) >= 1 :
141- all_ai_csvs = [
142- _ai_csv_path (task_uid , f"_{ lang } " )
143- for lang in langs_to_run
144- if lang in metrics_by_lang
145- ]
146- try :
147- combined = combine_metrics (all_ai_csvs )
148- total_launched = sum (
149- _count_methods_in_csv (LANGUAGE_CSVS [lang ])
150- for lang in langs_to_run
151- if lang in metrics_by_lang
152- )
153-
154- merged_csv_path = os .path .join (
155- get_thread_filepath (task_uid , RESULTS_DIR ), "ai_all.csv"
156- )
157- merge_csvs (all_ai_csvs , merged_csv_path )
158-
159- all_results_key = None
160- try :
161- all_results_key = upload_file (
162- merged_csv_path , f"results/{ task_uid } /all/AI.csv"
163- )
164- except Exception :
165- logger .exception (
166- "MinIO all-results upload failed for task %s" , task_uid
167- )
168-
169- save_experiment (
170- experiment_name = experiment ,
171- model_name = model_name ,
172- email = email ,
173- metrics = combined ,
174- methods_launched = total_launched ,
175- language = "all" ,
176- model_object_key = model_object_key ,
177- results_object_key = all_results_key ,
178- )
179- except Exception :
180- logger .exception ("Aggregated DB save failed for task %s" , task_uid )
95+ csv_src = LANGUAGE_CSVS [lang ]
96+ dest = get_thread_filepath (task_uid , LAUNCH_INFO_FILE )
97+ os .makedirs (os .path .dirname (dest ), exist_ok = True )
98+ shutil .copy2 (csv_src , dest )
99+
100+ methods_launched = _count_methods_in_csv (csv_src )
101+ suffix = f"_{ lang } "
102+ run_pipeline (task_uid , out_suffix = suffix )
103+
104+ ai_csv = _ai_csv_path (task_uid , suffix )
105+ metrics = compute_metrics (ai_csv )
106+ metrics_by_lang [lang ] = metrics
107+
108+ results_object_key = upload_file (
109+ ai_csv , f"results/{ task_uid } /{ lang } /AI.csv"
110+ )
111+
112+ save_experiment (
113+ experiment_name = experiment ,
114+ model_name = model_name ,
115+ email = email ,
116+ metrics = metrics ,
117+ methods_launched = methods_launched ,
118+ language = lang ,
119+ model_object_key = model_object_key ,
120+ results_object_key = results_object_key ,
121+ )
122+
123+ if language == "all" :
124+ all_ai_csvs = [_ai_csv_path (task_uid , f"_{ lang } " ) for lang in langs_to_run ]
125+ combined = combine_metrics (all_ai_csvs )
126+ total_launched = sum (
127+ _count_methods_in_csv (LANGUAGE_CSVS [lang ]) for lang in langs_to_run
128+ )
129+
130+ merged_csv_path = os .path .join (
131+ get_thread_filepath (task_uid , RESULTS_DIR ), "ai_all.csv"
132+ )
133+ merge_csvs (all_ai_csvs , merged_csv_path )
134+
135+ all_results_key = upload_file (
136+ merged_csv_path , f"results/{ task_uid } /all/AI.csv"
137+ )
138+
139+ save_experiment (
140+ experiment_name = experiment ,
141+ model_name = model_name ,
142+ email = email ,
143+ metrics = combined ,
144+ methods_launched = total_launched ,
145+ language = "all" ,
146+ model_object_key = model_object_key ,
147+ results_object_key = all_results_key ,
148+ )
181149
182150 send_experiment_results_by_email (
183151 email ,
@@ -188,6 +156,12 @@ def process_and_cleanup_task(
188156 )
189157 except Exception :
190158 logger .exception ("Task %s failed" , task_uid )
159+ try :
160+ send_task_failed_email (email , experiment , filename )
161+ except Exception :
162+ logger .exception (
163+ "Failed to send failure notification for task %s" , task_uid
164+ )
191165
192166 finally :
193167 try :
0 commit comments