55from datetime import timedelta
66
77from django .contrib .auth import authenticate , login , logout
8- from django .db import transaction
8+ from django .db import DatabaseError , transaction
99from django .db .models import Q
1010from django .http import HttpResponse
1111from django .utils import timezone
2424log = logging .getLogger (__name__ )
2525
2626
27+ MAX_SCORE_UPDATE_RETRIES = 5
28+
29+
2730class XQueueViewSet (viewsets .ViewSet ):
2831 """
2932 A collection of services for xqueue-watcher interactions and authentication.
@@ -151,9 +154,23 @@ def get_submission(self, request):
151154 )
152155
153156 if external_grader :
154- try :
155- if external_grader .status != 'pulled' :
157+ submission_id = external_grader .submission .id
158+ if external_grader .status != 'pulled' :
159+ try :
156160 external_grader .update_status ("pulled" )
161+ except ValueError as error :
162+ log .error (
163+ "Invalid status transition for submission %s: current=%s target='pulled'" ,
164+ submission_id ,
165+ external_grader .status ,
166+ exc_info = error
167+ )
168+ return Response (
169+ self .compose_reply (False , 'Unable to transition submission for grading' ),
170+ status = status .HTTP_409_CONFLICT
171+ )
172+
173+ try :
157174 submission_data = {
158175 "grader_payload" : json .dumps ({"grader" : external_grader .grader_file_name }),
159176 "student_info" : json .dumps ({
@@ -166,24 +183,29 @@ def get_submission(self, request):
166183
167184 payload = {
168185 'xqueue_header' : json .dumps ({
169- 'submission_id' : external_grader . submission . id ,
186+ 'submission_id' : submission_id ,
170187 'submission_key' : external_grader .pullkey
171188 }),
172189 'xqueue_body' : json .dumps (submission_data ),
173190 # Xqueue watcher expects this to be a JSON string
174191 'xqueue_files' : json .dumps (get_files_for_grader (external_grader ))
175192 }
176-
177- return Response (
178- self .compose_reply (True , content = json .dumps (payload )),
179- status = status .HTTP_200_OK
193+ except (TypeError , ValueError ) as error :
194+ log .error (
195+ "Unable to serialize submission payload for submission %s: %s" ,
196+ submission_id ,
197+ error
180198 )
181- except ValueError as e :
182199 return Response (
183- self .compose_reply (False , f"Error processing submission: { str ( e ) } " ),
184- status = status .HTTP_400_BAD_REQUEST
200+ self .compose_reply (False , 'Unable to serialize submission payload' ),
201+ status = status .HTTP_500_INTERNAL_SERVER_ERROR
185202 )
186203
204+ return Response (
205+ self .compose_reply (True , content = json .dumps (payload )),
206+ status = status .HTTP_200_OK
207+ )
208+
187209 return Response (
188210 self .compose_reply (False , f"Queue '{ queue_name } ' is empty" ),
189211 status = status .HTTP_200_OK
@@ -207,8 +229,14 @@ def put_result(self, request):
207229 )
208230
209231 try :
210- external_grader = ExternalGraderDetail .objects .select_for_update (
211- nowait = True ).get (submission__id = submission_id )
232+ # Lock the row to prevent duplicate grading attempts from racing and emitting
233+ # duplicate score events for the same submission.
234+ external_grader = (
235+ ExternalGraderDetail .objects
236+ .select_for_update (nowait = True )
237+ .select_related ('submission__student_item' )
238+ .get (submission__id = submission_id )
239+ )
212240 except ExternalGraderDetail .DoesNotExist :
213241 log .error (
214242 "Grader submission_id refers to nonexistent entry in Submission DB: "
@@ -221,6 +249,15 @@ def put_result(self, request):
221249 self .compose_reply (False , 'Submission does not exist' ),
222250 status = status .HTTP_404_NOT_FOUND
223251 )
252+ except DatabaseError :
253+ log .warning (
254+ "Concurrent grade update detected for submission %s; rejecting duplicate request" ,
255+ submission_id
256+ )
257+ return Response (
258+ self .compose_reply (False , 'Submission is currently being processed' ),
259+ status = status .HTTP_409_CONFLICT
260+ )
224261
225262 if not external_grader .pullkey or submission_key != external_grader .pullkey :
226263 log .error (
@@ -235,39 +272,59 @@ def put_result(self, request):
235272 )
236273
237274 # pylint: disable=broad-exception-caught
275+ submission_context = {
276+ 'submission_id' : submission_id ,
277+ 'course_id' : external_grader .submission .student_item .course_id ,
278+ 'user_id' : external_grader .submission .student_item .student_id ,
279+ 'item_id' : external_grader .submission .student_item .item_id ,
280+ 'queue_name' : external_grader .queue_name ,
281+ 'queue_key' : external_grader .queue_key ,
282+ }
283+
238284 try :
239- log .info ("Attempting to set_score..." )
240- set_score (str (external_grader .submission .uuid ),
241- points_earned ,
242- external_grader .points_possible
243- )
285+ log .info (
286+ "Attempting to record score for submission %(submission_id)s (course=%(course_id)s, item=%(item_id)s, user=%(user_id)s)" ,
287+ submission_context
288+ )
289+ set_score (
290+ str (external_grader .submission .uuid ),
291+ points_earned ,
292+ external_grader .points_possible
293+ )
244294 external_grader .grader_reply = score_msg
245- external_grader .save ()
295+ external_grader .save (update_fields = [ 'grader_reply' ] )
246296 external_grader .update_status ('retired' )
247- log .info ("Successfully updated submission score for submission %s" , submission_id )
297+ log .info (
298+ "Successfully updated submission %(submission_id)s for user %(user_id)s" ,
299+ submission_context
300+ )
248301
249- # Modify the event emission in put_result
250302 EXTERNAL_GRADER_SCORE_SUBMITTED .send_event (
251303 send_robust = False ,
252304 score = ExternalGraderScoreData (
253305 points_possible = external_grader .points_possible ,
254306 points_earned = points_earned ,
255- course_id = external_grader . submission . student_item . course_id ,
307+ course_id = submission_context [ ' course_id' ] ,
256308 score_msg = score_msg ,
257309 submission_id = submission_id ,
258- # Extract these from the submission
259- user_id = external_grader .submission .student_item .student_id ,
260- module_id = external_grader .submission .student_item .item_id ,
261- queue_key = external_grader .queue_key ,
262- queue_name = external_grader .queue_name
310+ user_id = submission_context ['user_id' ],
311+ module_id = submission_context ['item_id' ],
312+ queue_key = submission_context ['queue_key' ],
313+ queue_name = submission_context ['queue_name' ]
263314 )
264315 )
265- log .info ("Score event sent to bus successfully" )
316+ log .info (
317+ "External grader score event emitted for submission %(submission_id)s (course=%(course_id)s, item=%(item_id)s, user=%(user_id)s, queue=%(queue_name)s, queue_key=%(queue_key)s)" ,
318+ submission_context
319+ )
266320
267321 except Exception :
268- log .exception ("Error when execute set_score" )
322+ log .exception (
323+ "Error recording score for submission %(submission_id)s (course=%(course_id)s, item=%(item_id)s, user=%(user_id)s, queue=%(queue_name)s)" ,
324+ submission_context
325+ )
269326 # Keep track of how many times we've failed to set_score a grade for this submission
270- if external_grader .num_failures >= 30 :
327+ if external_grader .num_failures + 1 >= MAX_SCORE_UPDATE_RETRIES :
271328 external_grader .update_status ('failed' )
272329 else :
273330 external_grader .update_status ('retry' )
0 commit comments