Skip to content

Commit 8bc243f

Browse files
committed
Raise Reject if cooldown in effect (feedback) and force ignore cooldown in mgmt commands by default
1 parent 4007f9a commit 8bc243f

7 files changed

Lines changed: 65 additions & 50 deletions

File tree

learning_resources/management/commands/backpopulate_mit_edx_data.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@ def add_arguments(self, parser):
3737
help="If provided, use this file as the source of program API data",
3838
default=None,
3939
)
40-
parser.add_argument(
41-
"--force",
42-
dest="force",
43-
action="store_true",
44-
help="Bypass the task cooldown and run immediately",
45-
)
4640
super().add_arguments(parser)
4741

4842
def handle(self, *args, **options): # noqa: ARG002
@@ -59,7 +53,7 @@ def handle(self, *args, **options): # noqa: ARG002
5953
task = get_mit_edx_data.delay(
6054
options["api_course_datafile"],
6155
options["api_program_datafile"],
62-
_cooldown_force=options.get("force", False),
56+
_cooldown_force=True,
6357
)
6458
self.stdout.write(f"Started task {task} to get MIT edX course data")
6559
self.stdout.write("Waiting on task...")
@@ -70,4 +64,6 @@ def handle(self, *args, **options): # noqa: ARG002
7064
self.stdout.write(
7165
f"Population of MIT edX data finished, took {total_seconds} seconds"
7266
)
73-
self.write_population_result(count)
67+
self.stdout.write(
68+
f"Populated {count} resources. See celery logs for details."
69+
)

learning_resources/management/commands/backpopulate_mitxonline_data.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,6 @@ def add_arguments(self, parser):
2525
action="store_true",
2626
help="Delete all existing records first",
2727
)
28-
parser.add_argument(
29-
"--force",
30-
dest="force",
31-
action="store_true",
32-
help="Bypass the task cooldown and run immediately",
33-
)
3428
super().add_arguments(parser)
3529

3630
def handle(self, *args, **options): # noqa: ARG002
@@ -44,9 +38,7 @@ def handle(self, *args, **options): # noqa: ARG002
4438
):
4539
resource_delete_actions(learning_resource)
4640
else:
47-
task = get_mitxonline_data.delay(
48-
_cooldown_force=options.get("force", False),
49-
)
41+
task = get_mitxonline_data.delay(_cooldown_force=True)
5042
self.stdout.write(f"Started task {task} to get MITx Online course data")
5143
self.stdout.write("Waiting on task...")
5244
start = now_in_utc()
@@ -56,4 +48,6 @@ def handle(self, *args, **options): # noqa: ARG002
5648
self.stdout.write(
5749
f"Population of MITX Online data finished, took {total_seconds} seconds"
5850
)
59-
self.write_population_result(count)
51+
self.stdout.write(
52+
f"Populated {count} resources. See celery logs for details."
53+
)

learning_resources/management/commands/backpopulate_oll_data.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,6 @@ def add_arguments(self, parser):
3131
help="If provided, use this google sheets id to get the data",
3232
default=None,
3333
)
34-
parser.add_argument(
35-
"--force",
36-
dest="force",
37-
action="store_true",
38-
help="Bypass the task cooldown and run immediately",
39-
)
4034
super().add_arguments(parser)
4135

4236
def handle(self, *args, **options): # noqa: ARG002
@@ -53,7 +47,7 @@ def handle(self, *args, **options): # noqa: ARG002
5347
else:
5448
task = get_oll_data.delay(
5549
sheets_id=options["sheets_id"],
56-
_cooldown_force=options.get("force", False),
50+
_cooldown_force=True,
5751
)
5852
self.stdout.write(f"Started task {task} to get oll course data")
5953
self.stdout.write("Waiting on task...")
@@ -64,4 +58,6 @@ def handle(self, *args, **options): # noqa: ARG002
6458
self.stdout.write(
6559
f"Population of oll data finished, took {total_seconds} seconds"
6660
)
67-
self.write_population_result(count)
61+
self.stdout.write(
62+
f"Populated {count} resources. See celery logs for details."
63+
)

learning_resources/management/commands/backpopulate_xpro_data.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,6 @@ def add_arguments(self, parser):
2525
action="store_true",
2626
help="Delete all existing records first",
2727
)
28-
parser.add_argument(
29-
"--force",
30-
dest="force",
31-
action="store_true",
32-
help="Bypass the task cooldown and run immediately",
33-
)
3428
super().add_arguments(parser)
3529

3630
def handle(self, *args, **options): # noqa: ARG002
@@ -45,9 +39,7 @@ def handle(self, *args, **options): # noqa: ARG002
4539
):
4640
resource_delete_actions(learning_resource)
4741
else:
48-
task = get_xpro_data.delay(
49-
_cooldown_force=options.get("force", False),
50-
)
42+
task = get_xpro_data.delay(_cooldown_force=True)
5143
self.stdout.write(f"Started task {task} to get xpro course data")
5244
self.stdout.write("Waiting on task...")
5345
start = now_in_utc()
@@ -57,4 +49,6 @@ def handle(self, *args, **options): # noqa: ARG002
5749
self.stdout.write(
5850
f"Population of xpro data finished, took {total_seconds} seconds"
5951
)
60-
self.write_population_result(count)
52+
self.stdout.write(
53+
f"Populated {count} resources. See celery logs for details."
54+
)

learning_resources/management/commands/mixins.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,6 @@ def configure_test_resources(self, options):
2323
test_mode=True, published=False
2424
)
2525

26-
def write_population_result(self, count):
27-
"""Write the final result line for a backpopulate command."""
28-
if count is None:
29-
self.stdout.write(
30-
"Task skipped (cooldown active). See celery logs for details."
31-
)
32-
else:
33-
self.stdout.write(
34-
f"Populated {count} resources. See celery logs for details."
35-
)
36-
3726

3827
class ConfirmDeleteMixin:
3928
"""

main/decorators.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from collections.abc import Callable
66
from functools import wraps
77

8+
from celery import current_task, states
9+
from celery.exceptions import Reject
810
from django.core.cache import caches
911

1012
log = logging.getLogger(__name__)
@@ -28,9 +30,9 @@ def cooldown_task(
2830
Place this *below* ``@app.task`` so the cooldown runs on the worker, not
2931
on the enqueuing process.
3032
31-
Dropped calls return ``None``. If the wrapped function may itself return
32-
``None``, callers cannot disambiguate — annotate as ``int | None`` (or
33-
similar) and treat ``None`` as "skipped".
33+
When a call is skipped from inside a Celery worker, the task's state is
34+
explicitly set to ``REJECTED`` in the result backend and
35+
``Reject(requeue=False)`` is raised (should be ignored by sentry).
3436
3537
To bypass the cooldown for a specific invocation (e.g., operator-forced
3638
recovery), pass ``_cooldown_force=True`` as a kwarg through ``delay()``
@@ -73,6 +75,13 @@ def wrapper(*args, **kwargs):
7375
caches["redis"].set(lock_key, "1", timeout=wait_time)
7476
elif not caches["redis"].add(lock_key, "1", timeout=wait_time):
7577
log.info("Skipping %s: cooldown active (%ss)", lock_key, wait_time)
78+
if current_task and not current_task.request.called_directly:
79+
current_task.update_state(
80+
state=states.REJECTED,
81+
meta={"reason": "cooldown", "key": lock_key},
82+
)
83+
reason = "cooldown active"
84+
raise Reject(reason, requeue=False)
7685
return None
7786
return func(*args, **kwargs)
7887

main/decorators_test.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Tests for main.decorators"""
22

33
import pytest
4+
from celery.exceptions import Reject
45

56
from main.decorators import cooldown_task
67

@@ -144,3 +145,39 @@ def my_task(**kwargs):
144145
my_task(_cooldown_force=True)
145146
assert my_task() is None
146147
mock_redis.add.assert_called_once()
148+
149+
150+
def test_cooldown_task_raises_reject_inside_celery_worker(mock_redis, mocker):
151+
"""
152+
When a skip happens inside a real Celery task run, write REJECTED to the
153+
result backend and raise Reject so the run is observable as REJECTED
154+
rather than PENDING/SUCCESS.
155+
"""
156+
mock_redis.add.return_value = False
157+
mock_task = mocker.patch("main.decorators.current_task")
158+
mock_task.request.called_directly = False
159+
160+
@cooldown_task(wait_time=60)
161+
def my_task():
162+
return 1
163+
164+
with pytest.raises(Reject) as exc_info:
165+
my_task()
166+
assert exc_info.value.requeue is False
167+
mock_task.update_state.assert_called_once()
168+
assert mock_task.update_state.call_args.kwargs["state"] == "REJECTED"
169+
assert mock_task.update_state.call_args.kwargs["meta"]["reason"] == "cooldown"
170+
171+
172+
def test_cooldown_task_returns_none_when_called_directly(mock_redis, mocker):
173+
"""Direct (non-worker) skipped calls return None — no Reject, no state write."""
174+
mock_redis.add.return_value = False
175+
mock_task = mocker.patch("main.decorators.current_task")
176+
mock_task.request.called_directly = True
177+
178+
@cooldown_task(wait_time=60)
179+
def my_task():
180+
return 1
181+
182+
assert my_task() is None
183+
mock_task.update_state.assert_not_called()

0 commit comments

Comments
 (0)