Skip to content

Commit ecb7f33

Browse files
committed
feat: add automatic stale cache cleanup
1 parent 8d0102c commit ecb7f33

10 files changed

Lines changed: 190 additions & 27 deletions

File tree

README.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ These parameters can be changed at any time and they will apply to all decorator
146146
* `stale_after`
147147
* `next_time`
148148
* `wait_for_calc_timeout`
149+
* `cleanup_stale`
150+
* `cleanup_interval`
149151

150152
The current defaults can be fetched by calling `get_default_params`.
151153

@@ -192,6 +194,17 @@ Sometimes you may want your function to trigger a calculation when it encounters
192194
193195
Further function calls made while the calculation is being performed will not trigger redundant calculations.
194196

197+
Automatic Cleanup of Stale Values
198+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
199+
Setting ``cleanup_stale=True`` on a decorator will spawn a background thread that periodically removes stale cache entries. The interval between cleanup runs is controlled by ``cleanup_interval`` and defaults to one day.
200+
201+
.. code-block:: python
202+
203+
@cachier(stale_after=timedelta(seconds=30), cleanup_stale=True)
204+
def compute():
205+
...
206+
207+
195208
196209
Working with unhashable arguments
197210
---------------------------------

src/cachier/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class Params:
6363
separate_files: bool = False
6464
wait_for_calc_timeout: int = 0
6565
allow_none: bool = False
66+
cleanup_stale: bool = False
67+
cleanup_interval: timedelta = timedelta(days=1)
6668

6769

6870
_global_params = Params()

src/cachier/core.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import inspect
1111
import os
12+
import threading
1213
import warnings
1314
from collections import OrderedDict
1415
from concurrent.futures import ThreadPoolExecutor
@@ -120,6 +121,8 @@ def cachier(
120121
separate_files: Optional[bool] = None,
121122
wait_for_calc_timeout: Optional[int] = None,
122123
allow_none: Optional[bool] = None,
124+
cleanup_stale: Optional[bool] = None,
125+
cleanup_interval: Optional[timedelta] = None,
123126
):
124127
"""Wrap as a persistent, stale-free memoization decorator.
125128
@@ -183,6 +186,11 @@ def cachier(
183186
allow_none: bool, optional
184187
Allows storing None values in the cache. If False, functions returning
185188
None will not be cached and are recalculated every call.
189+
cleanup_stale: bool, optional
190+
If True, stale cache entries are periodically deleted in a background
191+
thread. Defaults to False.
192+
cleanup_interval: datetime.timedelta, optional
193+
Minimum time between automatic cleanup runs. Defaults to one day.
186194
187195
"""
188196
# Check for deprecated parameters
@@ -236,6 +244,9 @@ def cachier(
236244
def _cachier_decorator(func):
237245
core.set_func(func)
238246

247+
last_cleanup = datetime.min
248+
cleanup_lock = threading.Lock()
249+
239250
# ---
240251
# MAINTAINER NOTE: max_age parameter
241252
#
@@ -261,7 +272,7 @@ def _cachier_decorator(func):
261272
# ---
262273

263274
def _call(*args, max_age: Optional[timedelta] = None, **kwds):
264-
nonlocal allow_none
275+
nonlocal allow_none, last_cleanup
265276
_allow_none = _update_with_defaults(allow_none, "allow_none", kwds)
266277
# print('Inside general wrapper for {}.'.format(func.__name__))
267278
ignore_cache = _pop_kwds_with_deprecation(
@@ -280,11 +291,26 @@ def _call(*args, max_age: Optional[timedelta] = None, **kwds):
280291
stale_after, "stale_after", kwds
281292
)
282293
_next_time = _update_with_defaults(next_time, "next_time", kwds)
294+
_cleanup_flag = _update_with_defaults(
295+
cleanup_stale, "cleanup_stale", kwds
296+
)
297+
_cleanup_interval_val = _update_with_defaults(
298+
cleanup_interval, "cleanup_interval", kwds
299+
)
283300
# merge args expanded as kwargs and the original kwds
284301
kwargs = _convert_args_kwargs(
285302
func, _is_method=core.func_is_method, args=args, kwds=kwds
286303
)
287304

305+
if _cleanup_flag:
306+
now = datetime.now()
307+
with cleanup_lock:
308+
if now - last_cleanup >= _cleanup_interval_val:
309+
last_cleanup = now
310+
_get_executor().submit(
311+
core.delete_stale_entries, _stale_after
312+
)
313+
288314
_print = print if verbose else lambda x: None
289315

290316
# Check current global caching state dynamically

src/cachier/cores/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import abc # for the _BaseCore abstract base class
1111
import inspect
1212
import threading
13+
from datetime import timedelta
1314
from typing import Callable, Optional, Tuple
1415

1516
from .._types import HashFunc
@@ -112,3 +113,7 @@ def clear_cache(self) -> None:
112113
@abc.abstractmethod
113114
def clear_being_calculated(self) -> None:
114115
"""Mark all entries in this cache as not being calculated."""
116+
117+
@abc.abstractmethod
118+
def delete_stale_entries(self, stale_after: timedelta) -> None:
119+
"""Delete cache entries older than ``stale_after``."""

src/cachier/cores/memory.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""A memory-based caching core for cachier."""
22

33
import threading
4-
from datetime import datetime
4+
from datetime import datetime, timedelta
55
from typing import Any, Dict, Optional, Tuple
66

77
from .._types import HashFunc
@@ -103,3 +103,13 @@ def clear_being_calculated(self) -> None:
103103
for entry in self.cache.values():
104104
entry._processing = False
105105
entry._condition = None
106+
107+
def delete_stale_entries(self, stale_after: timedelta) -> None:
108+
"""Remove stale entries from the in-memory cache."""
109+
now = datetime.now()
110+
with self.lock:
111+
keys_to_delete = [
112+
k for k, v in self.cache.items() if now - v.time > stale_after
113+
]
114+
for key in keys_to_delete:
115+
del self.cache[key]

src/cachier/cores/mongo.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import time # to sleep when waiting on Mongo cache\
1313
import warnings # to warn if pymongo is missing
1414
from contextlib import suppress
15-
from datetime import datetime
15+
from datetime import datetime, timedelta
1616
from typing import Any, Optional, Tuple
1717

1818
from .._types import HashFunc, Mongetter
@@ -146,3 +146,10 @@ def clear_being_calculated(self) -> None:
146146
},
147147
update={"$set": {"processing": False}},
148148
)
149+
150+
def delete_stale_entries(self, stale_after: timedelta) -> None:
151+
"""Delete stale entries from the MongoDB cache."""
152+
threshold = datetime.now() - stale_after
153+
self.mongo_collection.delete_many(
154+
filter={"func": self._func_str, "time": {"$lt": threshold}}
155+
)

src/cachier/cores/pickle.py

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import os
1111
import pickle # for local caching
1212
import time
13-
from datetime import datetime
13+
from datetime import datetime, timedelta
1414
from typing import Any, Dict, Optional, Tuple, Union
1515

1616
import portalocker # to lock on pickle cache IO
@@ -117,8 +117,8 @@ def _convert_legacy_cache_entry(
117117

118118
def _load_cache_dict(self) -> Dict[str, CacheEntry]:
119119
try:
120-
with portalocker.Lock(self.cache_fpath, mode="rb") as cf:
121-
cache = pickle.load(cf)
120+
with portalocker.Lock(self.cache_fpath, mode="rb") as cf: # type: ignore[arg-type]
121+
cache = pickle.load(cf) # type: ignore[arg-type]
122122
self._cache_used_fpath = str(self.cache_fpath)
123123
except (FileNotFoundError, EOFError):
124124
cache = {}
@@ -144,8 +144,8 @@ def _load_cache_by_key(
144144
fpath = self.cache_fpath
145145
fpath += f"_{hash_str or key}"
146146
try:
147-
with portalocker.Lock(fpath, mode="rb") as cache_file:
148-
entry = pickle.load(cache_file)
147+
with portalocker.Lock(fpath, mode="rb") as cache_file: # type: ignore[arg-type]
148+
entry = pickle.load(cache_file) # type: ignore[arg-type]
149149
return _PickleCore._convert_legacy_cache_entry(entry)
150150
except (FileNotFoundError, EOFError):
151151
return None
@@ -183,8 +183,8 @@ def _save_cache(
183183
elif hash_str is not None:
184184
fpath += f"_{hash_str}"
185185
with self.lock:
186-
with portalocker.Lock(fpath, mode="wb") as cf:
187-
pickle.dump(cache, cf, protocol=4)
186+
with portalocker.Lock(fpath, mode="wb") as cf: # type: ignore[arg-type]
187+
pickle.dump(cache, cf, protocol=4) # type: ignore[arg-type]
188188
# the same as check for separate_file, but changed for typing
189189
if isinstance(cache, dict):
190190
self._cache_dict = cache
@@ -260,16 +260,16 @@ def mark_entry_not_calculated(self, key: str) -> None:
260260
cache[key]._processing = False
261261
self._save_cache(cache)
262262

263-
def _create_observer(self) -> Observer:
263+
def _create_observer(self) -> Observer: # type: ignore[valid-type]
264264
"""Create a new observer instance."""
265265
return Observer()
266266

267-
def _cleanup_observer(self, observer: Observer) -> None:
267+
def _cleanup_observer(self, observer: Observer) -> None: # type: ignore[valid-type]
268268
"""Clean up observer properly."""
269269
try:
270-
if observer.is_alive():
271-
observer.stop()
272-
observer.join(timeout=1.0)
270+
if observer.is_alive(): # type: ignore[attr-defined]
271+
observer.stop() # type: ignore[attr-defined]
272+
observer.join(timeout=1.0) # type: ignore[attr-defined]
273273
except Exception as e:
274274
logging.debug("Observer cleanup failed: %s", e)
275275

@@ -296,7 +296,7 @@ def wait_on_entry_calc(self, key: str) -> Any:
296296
else:
297297
raise
298298

299-
def _wait_with_inotify(self, key: str, filename: str) -> Any:
299+
def _wait_with_inotify(self, key: str, filename: str) -> Any: # type: ignore[valid-type]
300300
"""Wait for calculation using inotify with proper cleanup."""
301301
event_handler = _PickleCore.CacheChangeHandler(
302302
filename=filename, core=self, key=key
@@ -306,14 +306,14 @@ def _wait_with_inotify(self, key: str, filename: str) -> Any:
306306
event_handler.inject_observer(observer)
307307

308308
try:
309-
observer.schedule(
309+
observer.schedule( # type: ignore[attr-defined]
310310
event_handler, path=self.cache_dir, recursive=True
311311
)
312-
observer.start()
312+
observer.start() # type: ignore[attr-defined]
313313

314314
time_spent = 0
315-
while observer.is_alive():
316-
observer.join(timeout=1.0)
315+
while observer.is_alive(): # type: ignore[attr-defined]
316+
observer.join(timeout=1.0) # type: ignore[attr-defined]
317317
time_spent += 1
318318
self.check_calc_timeout(time_spent)
319319

@@ -324,7 +324,7 @@ def _wait_with_inotify(self, key: str, filename: str) -> Any:
324324
return event_handler.value
325325
finally:
326326
# Always cleanup the observer
327-
self._cleanup_observer(observer)
327+
self._cleanup_observer(observer) # type: ignore[attr-defined]
328328

329329
def _wait_with_polling(self, key: str) -> Any:
330330
"""Fallback method using polling instead of inotify."""
@@ -364,3 +364,27 @@ def clear_being_calculated(self) -> None:
364364
for key in cache:
365365
cache[key]._processing = False
366366
self._save_cache(cache)
367+
368+
def delete_stale_entries(self, stale_after: timedelta) -> None:
369+
"""Delete stale cache entries from the pickle cache."""
370+
now = datetime.now()
371+
if self.separate_files:
372+
path, name = os.path.split(self.cache_fpath)
373+
for subpath in os.listdir(path):
374+
if not subpath.startswith(f"{name}_"):
375+
continue
376+
entry = self._load_cache_by_key(
377+
hash_str=subpath.split("_")[-1]
378+
)
379+
if entry is not None and (now - entry.time > stale_after):
380+
os.remove(os.path.join(path, subpath))
381+
return
382+
383+
with self.lock:
384+
cache = self.get_cache_dict(reload=True)
385+
keys_to_delete = [
386+
k for k, v in cache.items() if now - v.time > stale_after
387+
]
388+
for key in keys_to_delete:
389+
del cache[key]
390+
self._save_cache(cache)

src/cachier/cores/redis.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pickle
44
import time
55
import warnings
6-
from datetime import datetime
6+
from datetime import datetime, timedelta
77
from typing import Any, Callable, Optional, Tuple, Union
88

99
try:
@@ -223,3 +223,28 @@ def clear_being_calculated(self) -> None:
223223
warnings.warn(
224224
f"Redis clear_being_calculated failed: {e}", stacklevel=2
225225
)
226+
227+
def delete_stale_entries(self, stale_after: timedelta) -> None:
228+
"""Remove stale entries from the Redis cache."""
229+
redis_client = self._resolve_redis_client()
230+
pattern = f"{self.key_prefix}:{self._func_str}:*"
231+
try:
232+
keys = redis_client.keys(pattern)
233+
threshold = datetime.now() - stale_after
234+
for key in keys:
235+
ts = redis_client.hget(key, "timestamp")
236+
if ts is None:
237+
continue
238+
try:
239+
ts_val = datetime.fromisoformat(ts.decode("utf-8"))
240+
except Exception as exc:
241+
warnings.warn(
242+
f"Redis timestamp parse failed: {exc}", stacklevel=2
243+
)
244+
continue
245+
if ts_val < threshold:
246+
redis_client.delete(key)
247+
except Exception as e:
248+
warnings.warn(
249+
f"Redis delete_stale_entries failed: {e}", stacklevel=2
250+
)

src/cachier/cores/sql.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pickle
44
import threading
5-
from datetime import datetime
5+
from datetime import datetime, timedelta
66
from typing import Any, Callable, Optional, Tuple, Union
77

88
try:
@@ -109,10 +109,10 @@ def get_entry_by_key(self, key: str) -> Tuple[str, Optional[CacheEntry]]:
109109
value = pickle.loads(row.value) if row.value is not None else None
110110
entry = CacheEntry(
111111
value=value,
112-
time=row.timestamp,
113-
stale=row.stale,
114-
_processing=row.processing,
115-
_completed=row.completed,
112+
time=row.timestamp, # type: ignore[arg-type]
113+
stale=row.stale, # type: ignore[arg-type]
114+
_processing=row.processing, # type: ignore[arg-type]
115+
_completed=row.completed, # type: ignore[arg-type]
116116
)
117117
return key, entry
118118

@@ -286,3 +286,17 @@ def clear_being_calculated(self) -> None:
286286
.values(processing=False)
287287
)
288288
session.commit()
289+
290+
def delete_stale_entries(self, stale_after: timedelta) -> None:
291+
"""Delete stale entries from the SQL cache."""
292+
threshold = datetime.now() - stale_after
293+
with self._lock, self._Session() as session:
294+
session.execute(
295+
delete(CacheTable).where(
296+
and_(
297+
CacheTable.function_id == self._func_str,
298+
CacheTable.timestamp < threshold,
299+
)
300+
)
301+
)
302+
session.commit()

0 commit comments

Comments
 (0)