@@ -87,18 +87,52 @@ class ArchiverTask(celery.Task):
8787 max_retries = 0
8888 ignore_result = False
8989
90+ def load_archive_job (self , job_pk , retry_if_missing = True , task_id = None , kwargs = None ):
91+ """Load an ArchiveJob and optionally retry bound tasks if row is missing."""
92+ job = ArchiveJob .load (job_pk )
93+ if job :
94+ return job
95+
96+ request = getattr (self , 'request' , None )
97+ request_kwargs = kwargs or getattr (request , 'kwargs' , None ) or {}
98+ context = {
99+ 'job_pk' : job_pk ,
100+ 'registration_id' : request_kwargs .get ('dst_pk' ),
101+ 'task_id' : task_id or getattr (request , 'id' , None ),
102+ 'task_name' : self .name ,
103+ 'retries' : getattr (request , 'retries' , None ),
104+ 'max_retries' : self .max_retries ,
105+ }
106+ should_retry = (
107+ retry_if_missing
108+ and context ['retries' ] is not None
109+ and context ['max_retries' ] is not None
110+ and context ['retries' ] < context ['max_retries' ]
111+ )
112+ context ['should_retry' ] = should_retry
113+
114+ error = ArchiverStateError ({
115+ 'error' : 'ArchiveJob not found' ,
116+ ** context ,
117+ })
118+ if should_retry :
119+ raise self .retry (exc = error )
120+
121+ sentry .log_message (
122+ f'ArchiveJob { job_pk } not found during archiver task execution' ,
123+ extra_data = context ,
124+ )
125+ sentry .log_exception (error )
126+ raise error
127+
90128 def on_failure (self , exc , task_id , args , kwargs , einfo ):
91- job = ArchiveJob .load (kwargs .get ('job_pk' ))
92- compact_traceback = utils .compact_traceback (einfo )
93- if not job :
94- archiver_state_exc = ArchiverStateError ({
95- 'exception' : exc ,
96- 'args' : args ,
97- 'kwargs' : kwargs ,
98- 'einfo' : compact_traceback ,
99- })
100- sentry .log_exception (archiver_state_exc )
101- raise archiver_state_exc
129+ job_pk = kwargs .get ('job_pk' )
130+ job = self .load_archive_job (job_pk , retry_if_missing = False , task_id = task_id , kwargs = kwargs )
131+ compact_traceback = utils .compact_traceback (
132+ einfo ,
133+ max_lines = 20 ,
134+ max_chars = 3000 ,
135+ )
102136
103137 if job .status == ARCHIVER_FAILURE :
104138 # already captured
@@ -161,9 +195,15 @@ def get_addon_from_gv(src_node, addon_name, requesting_user):
161195 )
162196
163197
164- @celery_app .task (base = ArchiverTask , ignore_result = False )
198+ @celery_app .task (
199+ bind = True ,
200+ base = ArchiverTask ,
201+ ignore_result = False ,
202+ max_retries = 3 ,
203+ default_retry_delay = 60 ,
204+ )
165205@logged ('stat_addon' )
166- def stat_addon (addon_short_name , job_pk ):
206+ def stat_addon (self , addon_short_name , job_pk ):
167207 """Collect metadata about the file tree of a given addon
168208
169209 :param addon_short_name: AddonConfig.short_name of the addon to be examined
@@ -178,7 +218,7 @@ def stat_addon(addon_short_name, job_pk):
178218 addon_name = 'dataverse'
179219 version = 'latest' if addon_short_name .split ('-' )[- 1 ] == 'draft' else 'latest-published'
180220 create_app_context ()
181- job = ArchiveJob . load (job_pk )
221+ job = self . load_archive_job (job_pk )
182222 src , dst , user = job .info ()
183223
184224 src_addon = None
@@ -206,9 +246,15 @@ def stat_addon(addon_short_name, job_pk):
206246 return result
207247
208248
209- @celery_app .task (base = ArchiverTask , ignore_result = False )
249+ @celery_app .task (
250+ bind = True ,
251+ base = ArchiverTask ,
252+ ignore_result = False ,
253+ max_retries = 3 ,
254+ default_retry_delay = 60 ,
255+ )
210256@logged ('make_copy_request' )
211- def make_copy_request (job_pk , url , data ):
257+ def make_copy_request (self , job_pk , url , data ):
212258 """Make the copy request to the WaterButler API and handle
213259 successful and failed responses
214260
@@ -218,7 +264,7 @@ def make_copy_request(job_pk, url, data):
218264 :return: None
219265 """
220266 create_app_context ()
221- job = ArchiveJob . load (job_pk )
267+ job = self . load_archive_job (job_pk )
222268 src , dst , user = job .info ()
223269 logger .info (f"Sending copy request for addon: { data ['provider' ]} on node: { dst ._id } " )
224270 cookie = furl (url ).query .params .get ('cookie' )
@@ -235,9 +281,15 @@ def make_waterbutler_payload(dst_id, rename):
235281 'provider' : settings .ARCHIVE_PROVIDER ,
236282 }
237283
238- @celery_app .task (base = ArchiverTask , ignore_result = False )
284+ @celery_app .task (
285+ bind = True ,
286+ base = ArchiverTask ,
287+ ignore_result = False ,
288+ max_retries = 3 ,
289+ default_retry_delay = 60 ,
290+ )
239291@logged ('archive_addon' )
240- def archive_addon (addon_short_name , job_pk ):
292+ def archive_addon (self , addon_short_name , job_pk ):
241293 """Archive the contents of an addon by making a copy request to the
242294 WaterButler API
243295
@@ -246,7 +298,7 @@ def archive_addon(addon_short_name, job_pk):
246298 :return: None
247299 """
248300 create_app_context ()
249- job = ArchiveJob . load (job_pk )
301+ job = self . load_archive_job (job_pk )
250302 src , dst , user = job .info ()
251303 logger .info (f'Archiving addon: { addon_short_name } on node: { src ._id } ' )
252304
@@ -274,9 +326,15 @@ def archive_addon(addon_short_name, job_pk):
274326 data = make_waterbutler_payload (dst ._id , rename )
275327 make_copy_request .delay (job_pk = job_pk , url = url , data = data )
276328
277- @celery_app .task (base = ArchiverTask , ignore_result = False )
329+ @celery_app .task (
330+ bind = True ,
331+ base = ArchiverTask ,
332+ ignore_result = False ,
333+ max_retries = 3 ,
334+ default_retry_delay = 60 ,
335+ )
278336@logged ('archive_node' )
279- def archive_node (stat_results , job_pk ):
337+ def archive_node (self , stat_results , job_pk ):
280338 """First use the results of #stat_node to check disk usage of the
281339 initiated registration, then either fail the registration or
282340 create a celery.group group of subtasks to archive addons
@@ -286,7 +344,7 @@ def archive_node(stat_results, job_pk):
286344 :return: None
287345 """
288346 create_app_context ()
289- job = ArchiveJob . load (job_pk )
347+ job = self . load_archive_job (job_pk )
290348 src , dst , user = job .info ()
291349 logger .info (f'Archiving node: { src ._id } ' )
292350
@@ -381,7 +439,7 @@ def archive_success(self, dst_pk, job_pk):
381439 )
382440 self .retry (exc = err )
383441
384- job = ArchiveJob . load (job_pk )
442+ job = self . load_archive_job (job_pk )
385443 if not job .sent :
386444 job .sent = True
387445 job .save ()
0 commit comments