Skip to content

Commit bb945b2

Browse files
committed
Address second review: stampede lock, cache key consistency, error handling, tests
- Add cache.add lock to prevent duplicate task dispatch on concurrent requests - Pass combined_cache_key from view to task to avoid key mismatch from race conditions between dispatch and execution - Handle Conference.DoesNotExist in task for deleted conferences - Clean up computing lock in finally block - Align frontend poll timeout (3min) with error cache TTL (2min) - Add integration tests: task cache population, error caching, missing conference - Add stampede prevention test (cache.add returns False)
1 parent 8d65665 commit bb945b2

4 files changed

Lines changed: 165 additions & 26 deletions

File tree

backend/reviews/admin.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -470,13 +470,15 @@ def review_recap_compute_analysis_view(self, request, review_session_id):
470470
if cached_result is not None:
471471
return JsonResponse(cached_result)
472472

473-
# Dispatch the Celery task to the heavy_processing queue
474-
compute_recap_analysis.apply_async(
475-
args=[conference.id],
476-
kwargs={"force_recompute": force_recompute},
477-
queue="heavy_processing",
478-
)
479-
check_pending_heavy_processing_work.delay()
473+
# Use cache.add as a lock to prevent duplicate task dispatch
474+
computing_key = f"{combined_cache_key}:computing"
475+
if cache.add(computing_key, True, timeout=600):
476+
compute_recap_analysis.apply_async(
477+
args=[conference.id, combined_cache_key],
478+
kwargs={"force_recompute": force_recompute},
479+
queue="heavy_processing",
480+
)
481+
check_pending_heavy_processing_work.delay()
480482

481483
return JsonResponse({"status": "processing"})
482484

backend/reviews/tasks.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,26 @@
66

77

88
@app.task
9-
def compute_recap_analysis(conference_id, force_recompute=False):
9+
def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=False):
1010
from django.core.cache import cache
1111

12+
from conferences.models import Conference
1213
from reviews.admin import get_accepted_submissions
1314
from reviews.similar_talks import (
14-
_get_cache_key,
1515
compute_similar_talks,
1616
compute_topic_clusters,
1717
)
1818

19-
from conferences.models import Conference
19+
try:
20+
conference = Conference.objects.get(id=conference_id)
21+
except Conference.DoesNotExist:
22+
logger.error(
23+
"Conference %s not found for recap analysis", conference_id
24+
)
25+
return
2026

21-
conference = Conference.objects.get(id=conference_id)
2227
accepted_submissions = list(get_accepted_submissions(conference))
2328

24-
combined_cache_key = _get_cache_key(
25-
"recap_analysis", conference_id, accepted_submissions
26-
)
27-
2829
try:
2930
similar_talks = compute_similar_talks(
3031
accepted_submissions,
@@ -72,6 +73,8 @@ def compute_recap_analysis(conference_id, force_recompute=False):
7273
cache.set(
7374
combined_cache_key,
7475
{"status": "error", "message": "Analysis failed. Please try again."},
75-
60 * 5,
76+
60 * 2,
7677
)
7778
raise
79+
finally:
80+
cache.delete(f"{combined_cache_key}:computing")

backend/reviews/templates/reviews-recap.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ <h2 class="recap-section-title">🔗 Similar Talks</h2>
573573
var pollTimer = null;
574574
var pollStartTime = null;
575575
var pollAttempt = 0;
576-
var POLL_TIMEOUT = 120000;
576+
var POLL_TIMEOUT = 180000;
577577

578578
function getNextPollInterval() {
579579
// Exponential backoff: 1s, 2s, 3s, 5s, 5s, 5s...

backend/reviews/tests/test_recap.py

Lines changed: 143 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pytest
44
from django.contrib.admin import AdminSite
55
from django.core.exceptions import PermissionDenied
6+
from django.test import override_settings
67

78
from conferences.tests.factories import ConferenceFactory
89
from reviews.admin import ReviewSessionAdmin
@@ -182,16 +183,24 @@ def test_recap_view_redirects_when_shortlist_not_visible(rf, mocker):
182183
# --- review_recap_compute_analysis_view tests ---
183184

184185

186+
FAKE_CACHE_KEY = "recap_analysis:conf_test:abc123"
187+
188+
185189
def _mock_analysis_deps(mocker, cache_return=None):
186190
"""Mock the lazy-imported dependencies used in the compute analysis view."""
187-
mock_cache = mocker.patch("django.core.cache.cache.get", return_value=cache_return)
191+
mock_cache_get = mocker.patch(
192+
"django.core.cache.cache.get", return_value=cache_return
193+
)
194+
mock_cache_add = mocker.patch(
195+
"django.core.cache.cache.add", return_value=True
196+
)
188197
mock_task = mocker.patch("reviews.tasks.compute_recap_analysis.apply_async")
189198
mock_check = mocker.patch("pycon.tasks.check_pending_heavy_processing_work.delay")
190199
mocker.patch(
191200
"reviews.similar_talks._get_cache_key",
192-
return_value="recap_analysis:conf_test:abc123",
201+
return_value=FAKE_CACHE_KEY,
193202
)
194-
return mock_cache, mock_task, mock_check
203+
return mock_cache_get, mock_cache_add, mock_task, mock_check
195204

196205

197206
def test_compute_analysis_view_returns_cached_result(rf, mocker):
@@ -226,7 +235,9 @@ def test_compute_analysis_view_returns_cached_result(rf, mocker):
226235
},
227236
}
228237

229-
mock_cache, mock_task, _ = _mock_analysis_deps(mocker, cache_return=cached_data)
238+
mock_cache_get, _, mock_task, _ = _mock_analysis_deps(
239+
mocker, cache_return=cached_data
240+
)
230241

231242
request = rf.get("/")
232243
request.user = user
@@ -248,7 +259,7 @@ def test_compute_analysis_view_returns_cached_result(rf, mocker):
248259
def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker):
249260
user, conference, review_session, submissions = _create_recap_setup()
250261

251-
_, mock_task, mock_check = _mock_analysis_deps(mocker, cache_return=None)
262+
_, _, mock_task, mock_check = _mock_analysis_deps(mocker, cache_return=None)
252263

253264
request = rf.get("/")
254265
request.user = user
@@ -261,7 +272,7 @@ def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker):
261272
assert data == {"status": "processing"}
262273

263274
mock_task.assert_called_once_with(
264-
args=[conference.id],
275+
args=[conference.id, FAKE_CACHE_KEY],
265276
kwargs={"force_recompute": False},
266277
queue="heavy_processing",
267278
)
@@ -272,7 +283,7 @@ def test_compute_analysis_view_dispatches_task_on_cache_miss(rf, mocker):
272283
def test_compute_analysis_view_dispatches_task_with_recompute(rf, mocker):
273284
user, conference, review_session, submissions = _create_recap_setup()
274285

275-
_, mock_task, _ = _mock_analysis_deps(mocker, cache_return=None)
286+
_, _, mock_task, _ = _mock_analysis_deps(mocker, cache_return=None)
276287

277288
request = rf.get("/?recompute=1")
278289
request.user = user
@@ -293,7 +304,9 @@ def test_compute_analysis_view_recompute_skips_cache(rf, mocker):
293304
user, conference, review_session, submissions = _create_recap_setup()
294305

295306
cached_data = {"submissions_list": [], "topic_clusters": {"topics": []}}
296-
mock_cache, mock_task, _ = _mock_analysis_deps(mocker, cache_return=cached_data)
307+
mock_cache_get, _, mock_task, _ = _mock_analysis_deps(
308+
mocker, cache_return=cached_data
309+
)
297310

298311
request = rf.get("/?recompute=1")
299312
request.user = user
@@ -305,10 +318,33 @@ def test_compute_analysis_view_recompute_skips_cache(rf, mocker):
305318
assert data == {"status": "processing"}
306319

307320
# Cache should NOT have been checked when recompute=1
308-
mock_cache.assert_not_called()
321+
mock_cache_get.assert_not_called()
309322
mock_task.assert_called_once()
310323

311324

325+
def test_compute_analysis_view_skips_dispatch_when_already_computing(rf, mocker):
326+
user, conference, review_session, submissions = _create_recap_setup()
327+
328+
mock_cache_get, mock_cache_add, mock_task, mock_check = _mock_analysis_deps(
329+
mocker, cache_return=None
330+
)
331+
# Simulate lock already held — cache.add returns False
332+
mock_cache_add.return_value = False
333+
334+
request = rf.get("/")
335+
request.user = user
336+
337+
admin = ReviewSessionAdmin(ReviewSession, AdminSite())
338+
response = admin.review_recap_compute_analysis_view(request, review_session.id)
339+
340+
data = json.loads(response.content)
341+
assert data == {"status": "processing"}
342+
343+
# Task should NOT be dispatched since lock was already held
344+
mock_task.assert_not_called()
345+
mock_check.assert_not_called()
346+
347+
312348
def test_compute_analysis_view_permission_denied_for_non_reviewer(rf):
313349
user = UserFactory(is_staff=True, is_superuser=False)
314350
conference = ConferenceFactory()
@@ -342,3 +378,101 @@ def test_compute_analysis_view_permission_denied_when_shortlist_not_visible(rf):
342378

343379
with pytest.raises(PermissionDenied):
344380
admin.review_recap_compute_analysis_view(request, review_session.id)
381+
382+
383+
# --- compute_recap_analysis task tests ---
384+
385+
386+
LOCMEM_CACHE = {
387+
"default": {
388+
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
389+
"LOCATION": "test-recap-analysis",
390+
}
391+
}
392+
393+
394+
@pytest.mark.django_db
395+
@override_settings(CACHES=LOCMEM_CACHE)
396+
def test_task_populates_cache_with_results(mocker):
397+
from django.core.cache import cache
398+
399+
from reviews.tasks import compute_recap_analysis
400+
401+
user, conference, review_session, submissions = _create_recap_setup()
402+
sub1, sub2 = submissions
403+
404+
mocker.patch(
405+
"reviews.similar_talks.compute_similar_talks",
406+
return_value={
407+
sub1.id: [{"id": sub2.id, "title": str(sub2.title), "similarity": 75.0}],
408+
sub2.id: [],
409+
},
410+
)
411+
mocker.patch(
412+
"reviews.similar_talks.compute_topic_clusters",
413+
return_value={
414+
"topics": [
415+
{"name": "ML", "count": 2, "keywords": ["ml"], "submissions": []}
416+
],
417+
"outliers": [],
418+
"submission_topics": {},
419+
},
420+
)
421+
422+
cache_key = "recap_analysis:conf_test:integration"
423+
# Set computing lock to verify it gets cleaned up
424+
cache.set(f"{cache_key}:computing", True)
425+
426+
result = compute_recap_analysis(conference.id, cache_key)
427+
428+
assert len(result["submissions_list"]) == 2
429+
assert result["submissions_list"][0]["id"] == sub1.id
430+
assert result["submissions_list"][0]["similar"][0]["similarity"] == 75.0
431+
assert result["topic_clusters"]["topics"][0]["name"] == "ML"
432+
433+
# Verify cache was populated
434+
cached = cache.get(cache_key)
435+
assert cached == result
436+
437+
# Verify computing lock was cleaned up
438+
assert cache.get(f"{cache_key}:computing") is None
439+
440+
441+
@pytest.mark.django_db
442+
@override_settings(CACHES=LOCMEM_CACHE)
443+
def test_task_caches_error_on_failure(mocker):
444+
from django.core.cache import cache
445+
446+
from reviews.tasks import compute_recap_analysis
447+
448+
user, conference, review_session, submissions = _create_recap_setup()
449+
450+
mocker.patch(
451+
"reviews.similar_talks.compute_similar_talks",
452+
side_effect=RuntimeError("ML model failed"),
453+
)
454+
455+
cache_key = "recap_analysis:conf_test:error"
456+
cache.set(f"{cache_key}:computing", True)
457+
458+
with pytest.raises(RuntimeError, match="ML model failed"):
459+
compute_recap_analysis(conference.id, cache_key)
460+
461+
# Verify error was cached
462+
cached = cache.get(cache_key)
463+
assert cached["status"] == "error"
464+
assert "failed" in cached["message"].lower()
465+
466+
# Verify computing lock was cleaned up
467+
assert cache.get(f"{cache_key}:computing") is None
468+
469+
470+
def test_task_handles_missing_conference(mocker):
471+
from reviews.tasks import compute_recap_analysis
472+
473+
mock_similar = mocker.patch("reviews.similar_talks.compute_similar_talks")
474+
475+
result = compute_recap_analysis(999999, "recap_analysis:conf_999999:key")
476+
477+
assert result is None
478+
mock_similar.assert_not_called()

0 commit comments

Comments
 (0)