|
1 | 1 | """main decorators""" |
2 | 2 |
|
| 3 | +import inspect |
3 | 4 | import logging |
| 5 | +from collections.abc import Callable |
4 | 6 | from functools import wraps |
5 | 7 |
|
6 | 8 | from django.core.cache import caches |
7 | 9 |
|
8 | 10 | log = logging.getLogger(__name__) |
9 | 11 |
|
| 12 | +KEY_PREFIX = "cooldown" |
10 | 13 |
|
11 | | -def rate_limited_task(wait_time: int, key: str | None = None): |
| 14 | + |
| 15 | +def cooldown_task( |
| 16 | + wait_time: int, |
| 17 | + key: str | None = None, |
| 18 | + key_func: Callable[..., str] | None = None, |
| 19 | +): |
12 | 20 | """ |
13 | 21 | Drop calls made within `wait_time` seconds of the previous invocation. |
14 | 22 |
|
15 | 23 | The lock is acquired before the wrapped function runs and is not released |
16 | | - on exception — failures still count against the rate limit to prevent |
17 | | - retry storms against upstream APIs. Uses an atomic `cache.add` so it is |
18 | | - safe across Celery workers. |
| 24 | + on exception — failures count against the cooldown to prevent retry |
| 25 | + storms against upstream APIs. Uses an atomic ``cache.add`` so it is safe |
| 26 | + across Celery workers. |
| 27 | +
|
| 28 | + Place this *below* ``@app.task`` so the cooldown runs on the worker, not |
| 29 | + on the enqueuing process. |
| 30 | +
|
| 31 | + Dropped calls return ``None``. If the wrapped function may itself return |
| 32 | + ``None``, callers cannot disambiguate — annotate as ``int | None`` (or |
| 33 | + similar) and treat ``None`` as "skipped". |
| 34 | +
|
| 35 | + To bypass the cooldown for a specific invocation (e.g., operator-forced |
| 36 | + recovery), pass ``_cooldown_force=True`` as a kwarg through ``delay()`` |
| 37 | + or ``apply_async``. The wrapper consumes it before calling the wrapped |
| 38 | + function and refreshes the lock so subsequent calls are still gated. |
| 39 | + This is race-free relative to clearing the lock from outside, which has |
| 40 | + a window between clear and enqueue where another worker can reacquire. |
| 41 | +
|
| 42 | + The wrapper also exposes ``clear_cooldown(*args, **kwargs)`` which |
| 43 | + deletes the lock key. Useful for operational debugging from a shell; |
| 44 | + prefer ``_cooldown_force=True`` from the enqueuing path. |
19 | 45 |
|
20 | 46 | Args: |
21 | 47 | wait_time: Lock duration in seconds. |
22 | | - key: Optional cache key. Defaults to the wrapped function's |
| 48 | + key: Optional static cache key. Defaults to the wrapped function's |
23 | 49 | fully-qualified name. |
| 50 | + key_func: Optional callable receiving the wrapped function's bound |
| 51 | + arguments as keyword args; returns a string suffix appended to |
| 52 | + the base key. Opt-in; use to scope the cooldown per |
| 53 | + argument-set. |
24 | 54 | """ |
25 | 55 |
|
26 | 56 | def decorator(func): |
27 | | - lock_key = f"ratelimit:{key or f'{func.__module__}.{func.__qualname__}'}" |
| 57 | + base_key = f"{KEY_PREFIX}:{key or f'{func.__module__}.{func.__qualname__}'}" |
| 58 | + sig = inspect.signature(func) if key_func else None |
| 59 | + |
| 60 | + def _key_for(*args, **kwargs): |
| 61 | + if key_func is None: |
| 62 | + return base_key |
| 63 | + bound = sig.bind(*args, **kwargs) |
| 64 | + bound.apply_defaults() |
| 65 | + return f"{base_key}:{key_func(**bound.arguments)}" |
28 | 66 |
|
29 | 67 | @wraps(func) |
30 | 68 | def wrapper(*args, **kwargs): |
31 | | - if not caches["redis"].add(lock_key, "1", timeout=wait_time): |
32 | | - log.info("Skipping %s: rate-limited (%ss)", lock_key, wait_time) |
| 69 | + force = kwargs.pop("_cooldown_force", False) |
| 70 | + lock_key = _key_for(*args, **kwargs) |
| 71 | + if force: |
| 72 | + log.info("Force-overriding cooldown for %s", lock_key) |
| 73 | + caches["redis"].set(lock_key, "1", timeout=wait_time) |
| 74 | + elif not caches["redis"].add(lock_key, "1", timeout=wait_time): |
| 75 | + log.info("Skipping %s: cooldown active (%ss)", lock_key, wait_time) |
33 | 76 | return None |
34 | 77 | return func(*args, **kwargs) |
35 | 78 |
|
| 79 | + def clear_cooldown(*args, **kwargs): |
| 80 | + caches["redis"].delete(_key_for(*args, **kwargs)) |
| 81 | + |
| 82 | + wrapper.clear_cooldown = clear_cooldown |
36 | 83 | return wrapper |
37 | 84 |
|
38 | 85 | return decorator |
0 commit comments