Skip to content

Commit ca6ed02

Browse files
gabrielC1409leoaulasneo98
authored andcommitted
feat: Improve transaction handling to prevent concurrent processing and add timeout mechanism (Get Submission)
- Implemented locking to ensure submissions are processed by a single xqueue watcher. - Added timeout mechanism for submissions stuck in 'pulled' state. - Updated tests to cover new error scenarios and timeout handling.
1 parent 34f4b7b commit ca6ed02

3 files changed

Lines changed: 152 additions & 90 deletions

File tree

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
Improving Transaction Handling in Get Submission
2+
================================================
3+
4+
Status
5+
------
6+
7+
**Provisional** *2025-03-19*
8+
9+
Context
10+
-------
11+
12+
We identified a concurrency issue in the submission processing flow. Multiple instances of the xqueue watcher were processing the same transaction simultaneously, causing duplicate handling of submissions. Additionally, submissions remained stuck in the "pulled" state indefinitely when errors or unexpected interruptions occurred during processing.
13+
14+
Decision
15+
--------
16+
17+
We introduced a robust locking mechanism at the database level to prevent concurrent processing of the same submission. Specifically, we employed database row-level locking (`select_for_update(nowait=True)`) on submissions to ensure only one xqueue watcher instance can process a transaction at a time. Additionally, we implemented a timeout-based fallback, marking submissions in the "pulled" state as available again if they exceed a defined timeout period (5 minutes).
18+
19+
Consequences
20+
------------
21+
22+
Positive
23+
~~~~~~~~
24+
25+
- Ensures data consistency by preventing duplicate transaction processing.
26+
- Implements automatic recovery for submissions stuck in the "pulled" state, improving overall system reliability.
27+
- Minimal impact on system performance due to efficient database-level locking.
28+
29+
Negative
30+
~~~~~~~~
31+
32+
- Slightly increased complexity in transaction handling logic.
33+
- Potential minor latency increase when contention occurs due to database locks.

submissions/tests/test_viewsets.py

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77

88
from django.contrib.auth import get_user_model
99
from django.core.files.base import ContentFile
10+
from django.db import DatabaseError
1011
from django.test import override_settings
1112
from django.urls import reverse
12-
from django.utils import timezone
1313
from rest_framework import status
1414
from rest_framework.permissions import AllowAny, IsAuthenticated
1515
from rest_framework.test import APITestCase
@@ -71,35 +71,66 @@ def test_get_submission_queue_empty(self):
7171
self.viewset.compose_reply(False, f"Queue '{queue_name}' is empty")
7272
)
7373

74-
@patch('submissions.views.xqueue.timezone.now', return_value=timezone.now())
75-
@patch('submissions.views.xqueue.uuid.uuid4', return_value=str(uuid.uuid4()))
76-
def test_get_submission_success(self, mock_uuid, _):
77-
"""Test successfully retrieving a submission from the queue."""
78-
queue_name = 'prueba'
74+
def test_get_submission_successful(self):
75+
"""Test retrieving a valid submission from the queue."""
7976
self.client.login(username='testuser', password='testpass')
80-
new_submission = SubmissionFactory()
81-
submission_queue_record = ExternalGraderDetailFactory(
77+
78+
queue_name = 'valid_queue'
79+
80+
submission = SubmissionFactory()
81+
submission_record = ExternalGraderDetailFactory(
82+
submission=submission,
8283
queue_name=queue_name,
83-
status='pending',
84-
submission=new_submission
84+
status='pending'
8585
)
8686

8787
response = self.client.get(self.get_submission_url, {'queue_name': queue_name})
8888
self.assertEqual(response.status_code, status.HTTP_200_OK)
89-
content = json.loads(response.data['content'])
90-
self.assertEqual(response.data['return_code'], 0)
89+
submission_record.refresh_from_db()
90+
self.assertEqual(submission_record.status, 'pulled')
91+
self.assertIsNotNone(submission_record.pullkey)
92+
93+
def test_get_submission_error_updating_status(self):
94+
"""Test error handling when updating the submission status fails."""
95+
self.client.login(username='testuser', password='testpass')
96+
new_submission = SubmissionFactory()
97+
98+
queue_name = 'test_queue'
99+
submission_record = ExternalGraderDetailFactory(
100+
submission=new_submission, # Se usa una nueva Submission
101+
queue_name=queue_name,
102+
status='pending'
103+
)
104+
with patch(
105+
'submissions.views.xqueue.ExternalGraderDetail.update_status',
106+
side_effect=ValueError('Invalid transition')
107+
):
108+
response = self.client.get(self.get_submission_url, {'queue_name': queue_name})
109+
110+
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
111+
self.assertEqual(
112+
response.data,
113+
self.viewset.compose_reply(False, "Error processing submission: Invalid transition")
114+
)
115+
submission_record.refresh_from_db()
116+
self.assertEqual(submission_record.status, 'pending')
91117

92-
xqueue_header = json.loads(content['xqueue_header'])
93-
xqueue_body = json.loads(content['xqueue_body'])
94-
self.assertEqual(xqueue_header['submission_id'], new_submission.id)
95-
self.assertEqual(xqueue_header['submission_key'], str(mock_uuid.return_value))
96-
self.assertEqual(xqueue_body['student_response'], new_submission.answer)
97-
self.assertEqual(content['xqueue_files'], '{}')
118+
def test_get_submission_db_error(self):
119+
"""Test DatabaseError handling when retrieving a submission."""
120+
self.client.login(username='testuser', password='testpass')
121+
queue_name = 'test_queue'
98122

99-
submission_queue_record.refresh_from_db()
100-
self.assertEqual(submission_queue_record.status, 'pulled')
101-
self.assertEqual(submission_queue_record.pullkey, str(mock_uuid.return_value))
102-
self.assertIsNotNone(submission_queue_record.status_time)
123+
with patch(
124+
'submissions.views.xqueue.ExternalGraderDetail.objects.select_for_update',
125+
side_effect=DatabaseError("Simulated DB Error")
126+
):
127+
response = self.client.get(self.get_submission_url, {'queue_name': queue_name})
128+
129+
self.assertEqual(response.status_code, status.HTTP_409_CONFLICT)
130+
self.assertEqual(
131+
response.data,
132+
self.viewset.compose_reply(False, "Submission already in process")
133+
)
103134

104135
@patch('submissions.views.xqueue.ExternalGraderDetail.update_status',
105136
side_effect=ValueError('Invalid transition'))
@@ -283,7 +314,7 @@ def test_put_result_auto_retire(self):
283314
@patch('submissions.views.xqueue.log')
284315
def test_put_result_logging(self, mock_log):
285316
"""
286-
Test that appropriate logging occurs in various scenarios.
317+
Test that appropriate logging occurs in various escenarios.
287318
"""
288319
self.submission.queue_record.status = 'pulled'
289320
self.submission.queue_record.save()
@@ -305,14 +336,8 @@ def test_put_result_logging(self, mock_log):
305336
mock_set_score.return_value = True
306337
self.client.post(self.url, payload, format='json')
307338

308-
mock_log.info.assert_any_call(
309-
"Score event sent to bus successfully <====="
310-
)
311-
312-
mock_log.info.assert_any_call(
313-
"Successfully updated submission score for submission %s",
314-
self.submission.id
315-
)
339+
# Verificar que se llamó el log con el mensaje correcto
340+
mock_log.info.assert_any_call('=====> Saving score {"score": 8}')
316341

317342
@patch('submissions.views.xqueue.log')
318343
def test_put_result_success(self, mock_log):

submissions/views/xqueue.py

Lines changed: 63 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import json
44
import logging
55
import uuid
6+
from datetime import timedelta
67

78
from django.contrib.auth import authenticate, login, logout
8-
from django.db import transaction
9+
from django.db import DatabaseError, transaction
10+
from django.db.models import Q
911
from django.http import HttpResponse
1012
from django.utils import timezone
1113
from openedx_events.content_authoring.data import ExternalGraderScoreData
@@ -117,6 +119,7 @@ def get_submission(self, request):
117119
- Submission data with pull information if successful
118120
- Error message if queue is empty or invalid
119121
"""
122+
120123
queue_name = request.query_params.get('queue_name')
121124

122125
if not queue_name:
@@ -125,69 +128,70 @@ def get_submission(self, request):
125128
status=status.HTTP_400_BAD_REQUEST
126129
)
127130

128-
# if queue_name not in settings.XQUEUES: TODO: Define how to set this variable,
129-
# maybe as a tutor config or hardcoded en edx platform
130-
# if queue_name not in {'my_course_queue': 'http://172.16.0.220:8125', 'test-pull': None}:
131-
# return Response(
132-
# {'success': False, 'message': f"Queue '{queue_name}' not found"},
133-
# status=status.HTTP_404_NOT_FOUND
134-
# )
135-
136-
submission_record = ExternalGraderDetail.objects.filter(
137-
queue_name=queue_name,
138-
status__in=['pending']
139-
).select_related('submission').order_by('status_time').first()
140-
141-
if not submission_record:
131+
timeout_threshold = timezone.now() - timedelta(minutes=5)
132+
try:
133+
submission_record = (
134+
ExternalGraderDetail.objects
135+
.select_for_update(nowait=True)
136+
.filter(
137+
Q(queue_name=queue_name, status='pending') |
138+
Q(queue_name=queue_name, status='pulled', status_time__lt=timeout_threshold)
139+
)
140+
.select_related('submission')
141+
.order_by('status_time')
142+
.first()
143+
)
144+
except DatabaseError:
142145
return Response(
143-
self.compose_reply(False, f"Queue '{queue_name}' is empty"),
144-
status=status.HTTP_404_NOT_FOUND
146+
self.compose_reply(False, "Submission already in process"),
147+
status=status.HTTP_409_CONFLICT
145148
)
146149

147-
try:
148-
pull_time = timezone.now()
149-
pullkey = str(uuid.uuid4())
150-
# grader_id = request.META.get('REMOTE_ADDR', '')
151-
submission_record.update_status('pulled')
152-
submission_record.pullkey = pullkey
153-
submission_record.status_time = pull_time
154-
submission_record.save(update_fields=['pullkey', 'status_time'])
155-
156-
ext_header = {
157-
'submission_id': submission_record.submission.id,
158-
'submission_key': pullkey
159-
}
160-
answer = submission_record.submission.answer
161-
submission_data = {
162-
"grader_payload": json.dumps({
163-
"grader": submission_record.grader_file_name
164-
}),
165-
"student_info": json.dumps({
166-
"anonymous_student_id": str(submission_record.submission.student_item.student_id,),
167-
"submission_time": str(int(submission_record.created_at.timestamp())),
168-
"random_seed": 1
169-
}),
170-
"student_response": answer
171-
}
172-
173-
file_manager = SubmissionFileManager(submission_record)
174-
175-
payload = {
176-
'xqueue_header': json.dumps(ext_header),
177-
'xqueue_body': json.dumps(submission_data),
178-
'xqueue_files': json.dumps(file_manager.get_files_for_grader())
179-
}
150+
if submission_record:
151+
try:
152+
pull_time = timezone.now()
153+
pullkey = str(uuid.uuid4())
154+
submission_record.update_status('pulled')
155+
submission_record.pullkey = pullkey
156+
submission_record.status_time = pull_time
157+
submission_record.save(update_fields=['pullkey', 'status_time'])
158+
159+
submission_data = {
160+
"grader_payload": json.dumps({"grader": submission_record.grader_file_name}),
161+
"student_info": json.dumps({
162+
"anonymous_student_id": str(submission_record.submission.student_item.student_id),
163+
"submission_time": str(int(submission_record.created_at.timestamp())),
164+
"random_seed": 1
165+
}),
166+
"student_response": submission_record.submission.answer
167+
}
168+
169+
file_manager = SubmissionFileManager(submission_record)
170+
171+
payload = {
172+
'xqueue_header': json.dumps({
173+
'submission_id': submission_record.submission.id,
174+
'submission_key': pullkey
175+
}),
176+
'xqueue_body': json.dumps(submission_data),
177+
'xqueue_files': json.dumps(file_manager.get_files_for_grader())
178+
}
179+
180+
return Response(
181+
self.compose_reply(True, content=json.dumps(payload)),
182+
status=status.HTTP_200_OK
183+
)
180184

181-
return Response(
182-
self.compose_reply(True, content=json.dumps(payload)),
183-
status=status.HTTP_200_OK
184-
)
185+
except ValueError as e:
186+
return Response(
187+
self.compose_reply(False, f"Error processing submission: {str(e)}"),
188+
status=status.HTTP_400_BAD_REQUEST
189+
)
185190

186-
except ValueError as e:
187-
return Response(
188-
self.compose_reply(False, f"Error processing submission: {str(e)}"),
189-
status=status.HTTP_400_BAD_REQUEST
190-
)
191+
return Response(
192+
self.compose_reply(False, f"Queue '{queue_name}' is empty"),
193+
status=status.HTTP_404_NOT_FOUND
194+
)
191195

192196
@action(detail=False, methods=['post'], url_name='put_result')
193197
@transaction.atomic

0 commit comments

Comments
 (0)