Skip to content

Commit 348ab37

Browse files
committed
Refactoring and --force option on affected mgmt commands
1 parent 04f3099 commit 348ab37

8 files changed

Lines changed: 229 additions & 69 deletions

File tree

learning_resources/management/commands/backpopulate_mit_edx_data.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ def add_arguments(self, parser):
3737
help="If provided, use this file as the source of program API data",
3838
default=None,
3939
)
40+
parser.add_argument(
41+
"--force",
42+
dest="force",
43+
action="store_true",
44+
help="Bypass the task cooldown and run immediately",
45+
)
4046
super().add_arguments(parser)
4147

4248
def handle(self, *args, **options): # noqa: ARG002
@@ -51,7 +57,9 @@ def handle(self, *args, **options): # noqa: ARG002
5157
resource_delete_actions(learning_resource)
5258
else:
5359
task = get_mit_edx_data.delay(
54-
options["api_course_datafile"], options["api_program_datafile"]
60+
options["api_course_datafile"],
61+
options["api_program_datafile"],
62+
_cooldown_force=options.get("force", False),
5563
)
5664
self.stdout.write(f"Started task {task} to get MIT edX course data")
5765
self.stdout.write("Waiting on task...")
@@ -62,11 +70,4 @@ def handle(self, *args, **options): # noqa: ARG002
6270
self.stdout.write(
6371
f"Population of MIT edX data finished, took {total_seconds} seconds"
6472
)
65-
if count is None:
66-
self.stdout.write(
67-
"Task skipped (rate-limited). See celery logs for details."
68-
)
69-
else:
70-
self.stdout.write(
71-
f"Populated {count} resources. See celery logs for details."
72-
)
73+
self.write_population_result(count)

learning_resources/management/commands/backpopulate_mitxonline_data.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ def add_arguments(self, parser):
2525
action="store_true",
2626
help="Delete all existing records first",
2727
)
28+
parser.add_argument(
29+
"--force",
30+
dest="force",
31+
action="store_true",
32+
help="Bypass the task cooldown and run immediately",
33+
)
2834
super().add_arguments(parser)
2935

3036
def handle(self, *args, **options): # noqa: ARG002
@@ -38,7 +44,9 @@ def handle(self, *args, **options): # noqa: ARG002
3844
):
3945
resource_delete_actions(learning_resource)
4046
else:
41-
task = get_mitxonline_data.delay()
47+
task = get_mitxonline_data.delay(
48+
_cooldown_force=options.get("force", False),
49+
)
4250
self.stdout.write(f"Started task {task} to get MITx Online course data")
4351
self.stdout.write("Waiting on task...")
4452
start = now_in_utc()
@@ -48,11 +56,4 @@ def handle(self, *args, **options): # noqa: ARG002
4856
self.stdout.write(
4957
f"Population of MITX Online data finished, took {total_seconds} seconds"
5058
)
51-
if count is None:
52-
self.stdout.write(
53-
"Task skipped (rate-limited). See celery logs for details."
54-
)
55-
else:
56-
self.stdout.write(
57-
f"Populated {count} resources. See celery logs for details."
58-
)
59+
self.write_population_result(count)

learning_resources/management/commands/backpopulate_oll_data.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ def add_arguments(self, parser):
3131
help="If provided, use this google sheets id to get the data",
3232
default=None,
3333
)
34+
parser.add_argument(
35+
"--force",
36+
dest="force",
37+
action="store_true",
38+
help="Bypass the task cooldown and run immediately",
39+
)
3440
super().add_arguments(parser)
3541

3642
def handle(self, *args, **options): # noqa: ARG002
@@ -45,7 +51,10 @@ def handle(self, *args, **options): # noqa: ARG002
4551
):
4652
resource_delete_actions(learning_resource)
4753
else:
48-
task = get_oll_data.delay(sheets_id=options["sheets_id"])
54+
task = get_oll_data.delay(
55+
sheets_id=options["sheets_id"],
56+
_cooldown_force=options.get("force", False),
57+
)
4958
self.stdout.write(f"Started task {task} to get oll course data")
5059
self.stdout.write("Waiting on task...")
5160
start = now_in_utc()
@@ -55,11 +64,4 @@ def handle(self, *args, **options): # noqa: ARG002
5564
self.stdout.write(
5665
f"Population of oll data finished, took {total_seconds} seconds"
5766
)
58-
if count is None:
59-
self.stdout.write(
60-
"Task skipped (rate-limited). See celery logs for details."
61-
)
62-
else:
63-
self.stdout.write(
64-
f"Populated {count} resources. See celery logs for details."
65-
)
67+
self.write_population_result(count)

learning_resources/management/commands/backpopulate_xpro_data.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ def add_arguments(self, parser):
2525
action="store_true",
2626
help="Delete all existing records first",
2727
)
28+
parser.add_argument(
29+
"--force",
30+
dest="force",
31+
action="store_true",
32+
help="Bypass the task cooldown and run immediately",
33+
)
2834
super().add_arguments(parser)
2935

3036
def handle(self, *args, **options): # noqa: ARG002
@@ -39,7 +45,9 @@ def handle(self, *args, **options): # noqa: ARG002
3945
):
4046
resource_delete_actions(learning_resource)
4147
else:
42-
task = get_xpro_data.delay()
48+
task = get_xpro_data.delay(
49+
_cooldown_force=options.get("force", False),
50+
)
4351
self.stdout.write(f"Started task {task} to get xpro course data")
4452
self.stdout.write("Waiting on task...")
4553
start = now_in_utc()
@@ -49,11 +57,4 @@ def handle(self, *args, **options): # noqa: ARG002
4957
self.stdout.write(
5058
f"Population of xpro data finished, took {total_seconds} seconds"
5159
)
52-
if count is None:
53-
self.stdout.write(
54-
"Task skipped (rate-limited). See celery logs for details."
55-
)
56-
else:
57-
self.stdout.write(
58-
f"Populated {count} resources. See celery logs for details."
59-
)
60+
self.write_population_result(count)

learning_resources/management/commands/mixins.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ def configure_test_resources(self, options):
2323
test_mode=True, published=False
2424
)
2525

26+
def write_population_result(self, count):
27+
"""Write the final result line for a backpopulate command."""
28+
if count is None:
29+
self.stdout.write(
30+
"Task skipped (cooldown active). See celery logs for details."
31+
)
32+
else:
33+
self.stdout.write(
34+
f"Populated {count} resources. See celery logs for details."
35+
)
36+
2637

2738
class ConfirmDeleteMixin:
2839
"""

learning_resources/tasks.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
from learning_resources_search.exceptions import RetryError
5858
from main.celery import app
5959
from main.constants import ISOFORMAT
60-
from main.decorators import rate_limited_task
60+
from main.decorators import cooldown_task
6161
from main.utils import chunks, clear_views_cache, now_in_utc
6262

6363
log = logging.getLogger(__name__)
@@ -118,7 +118,12 @@ def get_micromasters_data():
118118

119119

120120
@app.task
121-
@rate_limited_task(wait_time=3600)
121+
@cooldown_task(
122+
wait_time=3600,
123+
key_func=lambda *, api_course_datafile=None, api_program_datafile=None: (
124+
f"course={api_course_datafile}:program={api_program_datafile}"
125+
),
126+
)
122127
def get_mit_edx_data(
123128
api_course_datafile: str | None = None,
124129
api_program_datafile: str | None = None,
@@ -142,7 +147,7 @@ def get_mit_edx_data(
142147

143148

144149
@app.task
145-
@rate_limited_task(wait_time=900)
150+
@cooldown_task(wait_time=900)
146151
def get_mitxonline_data() -> int | None:
147152
"""Execute the MITX Online ETL pipeline"""
148153
courses = pipelines.mitxonline_courses_etl()
@@ -152,8 +157,11 @@ def get_mitxonline_data() -> int | None:
152157

153158

154159
@app.task
155-
@rate_limited_task(wait_time=900)
156-
def get_oll_data(sheets_id=None):
160+
@cooldown_task(
161+
wait_time=900,
162+
key_func=lambda *, sheets_id=None: f"sheets_id={sheets_id}",
163+
)
164+
def get_oll_data(sheets_id=None) -> int | None:
157165
"""Execute the OLL ETL pipeline.
158166
159167
Args:
@@ -181,8 +189,8 @@ def get_sloan_data():
181189

182190

183191
@app.task
184-
@rate_limited_task(wait_time=900)
185-
def get_xpro_data():
192+
@cooldown_task(wait_time=900)
193+
def get_xpro_data() -> int | None:
186194
"""Execute the xPro ETL pipeline"""
187195
courses = pipelines.xpro_courses_etl()
188196
programs = pipelines.xpro_programs_etl()

main/decorators.py

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,85 @@
11
"""main decorators"""
22

3+
import inspect
34
import logging
5+
from collections.abc import Callable
46
from functools import wraps
57

68
from django.core.cache import caches
79

810
log = logging.getLogger(__name__)
911

12+
KEY_PREFIX = "cooldown"
1013

11-
def rate_limited_task(wait_time: int, key: str | None = None):
14+
15+
def cooldown_task(
16+
wait_time: int,
17+
key: str | None = None,
18+
key_func: Callable[..., str] | None = None,
19+
):
1220
"""
1321
Drop calls made within `wait_time` seconds of the previous invocation.
1422
1523
The lock is acquired before the wrapped function runs and is not released
16-
on exception — failures still count against the rate limit to prevent
17-
retry storms against upstream APIs. Uses an atomic `cache.add` so it is
18-
safe across Celery workers.
24+
on exception — failures count against the cooldown to prevent retry
25+
storms against upstream APIs. Uses an atomic ``cache.add`` so it is safe
26+
across Celery workers.
27+
28+
Place this *below* ``@app.task`` so the cooldown runs on the worker, not
29+
on the enqueuing process.
30+
31+
Dropped calls return ``None``. If the wrapped function may itself return
32+
``None``, callers cannot disambiguate — annotate as ``int | None`` (or
33+
similar) and treat ``None`` as "skipped".
34+
35+
To bypass the cooldown for a specific invocation (e.g., operator-forced
36+
recovery), pass ``_cooldown_force=True`` as a kwarg through ``delay()``
37+
or ``apply_async``. The wrapper consumes it before calling the wrapped
38+
function and refreshes the lock so subsequent calls are still gated.
39+
This is race-free relative to clearing the lock from outside, which has
40+
a window between clear and enqueue where another worker can reacquire.
41+
42+
The wrapper also exposes ``clear_cooldown(*args, **kwargs)`` which
43+
deletes the lock key. Useful for operational debugging from a shell;
44+
prefer ``_cooldown_force=True`` from the enqueuing path.
1945
2046
Args:
2147
wait_time: Lock duration in seconds.
22-
key: Optional cache key. Defaults to the wrapped function's
48+
key: Optional static cache key. Defaults to the wrapped function's
2349
fully-qualified name.
50+
key_func: Optional callable receiving the wrapped function's bound
51+
arguments as keyword args; returns a string suffix appended to
52+
the base key. Opt-in; use to scope the cooldown per
53+
argument-set.
2454
"""
2555

2656
def decorator(func):
27-
lock_key = f"ratelimit:{key or f'{func.__module__}.{func.__qualname__}'}"
57+
base_key = f"{KEY_PREFIX}:{key or f'{func.__module__}.{func.__qualname__}'}"
58+
sig = inspect.signature(func) if key_func else None
59+
60+
def _key_for(*args, **kwargs):
61+
if key_func is None:
62+
return base_key
63+
bound = sig.bind(*args, **kwargs)
64+
bound.apply_defaults()
65+
return f"{base_key}:{key_func(**bound.arguments)}"
2866

2967
@wraps(func)
3068
def wrapper(*args, **kwargs):
31-
if not caches["redis"].add(lock_key, "1", timeout=wait_time):
32-
log.info("Skipping %s: rate-limited (%ss)", lock_key, wait_time)
69+
force = kwargs.pop("_cooldown_force", False)
70+
lock_key = _key_for(*args, **kwargs)
71+
if force:
72+
log.info("Force-overriding cooldown for %s", lock_key)
73+
caches["redis"].set(lock_key, "1", timeout=wait_time)
74+
elif not caches["redis"].add(lock_key, "1", timeout=wait_time):
75+
log.info("Skipping %s: cooldown active (%ss)", lock_key, wait_time)
3376
return None
3477
return func(*args, **kwargs)
3578

79+
def clear_cooldown(*args, **kwargs):
80+
caches["redis"].delete(_key_for(*args, **kwargs))
81+
82+
wrapper.clear_cooldown = clear_cooldown
3683
return wrapper
3784

3885
return decorator

0 commit comments

Comments
 (0)