55from collections .abc import Callable
66from functools import wraps
77
8+ from celery import current_task , states
9+ from celery .exceptions import Reject
810from django .core .cache import caches
911
1012log = logging .getLogger (__name__ )
@@ -28,16 +30,23 @@ def cooldown_task(
2830 Place this *below* ``@app.task`` so the cooldown runs on the worker, not
2931 on the enqueuing process.
3032
31- Dropped calls return ``None``. If the wrapped function may itself return
32- ``None``, callers cannot disambiguate — annotate as ``int | None`` (or
33- similar) and treat ``None`` as "skipped".
34-
35- To bypass the cooldown for a specific invocation (e.g., operator-forced
36- recovery), pass ``_cooldown_force=True`` as a kwarg through ``delay()``
37- or ``apply_async``. The wrapper consumes it before calling the wrapped
38- function and refreshes the lock so subsequent calls are still gated.
39- This is race-free relative to clearing the lock from outside, which has
40- a window between clear and enqueue where another worker can reacquire.
33+ When a call is skipped from inside a Celery worker, the task's state is
34+ explicitly set to ``REJECTED`` in the result backend and
35+ ``Reject(requeue=False)`` is raised. Celery's own reject handling does
36+ not write a state to the backend, so the ``update_state`` is what makes
37+ ``AsyncResult.status`` observable as ``REJECTED`` rather than
38+ ``PENDING``. ``Reject`` is a Celery control-flow exception and is not
39+ reported to Sentry. Direct (non-task) calls return ``None``.
40+
41+ The cooldown is intended to gate scheduled (Celery Beat) invocations.
42+ Operator-initiated calls (management commands, ad-hoc ``delay()`` from a
43+ shell) should pass ``_cooldown_force=True`` as a kwarg through ``delay()``
44+ or ``apply_async`` so they are not skipped — and so synchronous callers
45+ using ``task.get()`` do not block waiting on a skipped run. The wrapper
46+ consumes the kwarg before calling the wrapped function and refreshes the
47+ lock so subsequent scheduled calls are still gated. This is race-free
48+ relative to clearing the lock from outside, which has a window between
49+ clear and enqueue where another worker can reacquire.
4150
4251 The wrapper also exposes ``clear_cooldown(*args, **kwargs)`` which
4352 deletes the lock key. Useful for operational debugging from a shell;
@@ -73,6 +82,13 @@ def wrapper(*args, **kwargs):
7382 caches ["redis" ].set (lock_key , "1" , timeout = wait_time )
7483 elif not caches ["redis" ].add (lock_key , "1" , timeout = wait_time ):
7584 log .info ("Skipping %s: cooldown active (%ss)" , lock_key , wait_time )
85+ if current_task and not current_task .request .called_directly :
86+ current_task .update_state (
87+ state = states .REJECTED ,
88+ meta = {"reason" : "cooldown" , "key" : lock_key },
89+ )
90+ reason = "cooldown active"
91+ raise Reject (reason , requeue = False )
7692 return None
7793 return func (* args , ** kwargs )
7894
0 commit comments