Skip to content

Commit d61b60e

Browse files
authored
Implement automatic cleanup of stale cache entries (#290)
* feat: add automatic stale cache cleanup * Fix test expectations for new cleanup defaults * Handle missing pickle files during stale cleanup * Reset global params in cleanup test * remove mypy ignore comments
1 parent 8d0102c commit d61b60e

11 files changed

Lines changed: 197 additions & 18 deletions

File tree

README.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ These parameters can be changed at any time and they will apply to all decorator
146146
* `stale_after`
147147
* `next_time`
148148
* `wait_for_calc_timeout`
149+
* `cleanup_stale`
150+
* `cleanup_interval`
149151

150152
The current defaults can be fetched by calling `get_default_params`.
151153

@@ -192,6 +194,17 @@ Sometimes you may want your function to trigger a calculation when it encounters
192194
193195
Further function calls made while the calculation is being performed will not trigger redundant calculations.
194196

197+
Automatic Cleanup of Stale Values
198+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
199+
Setting ``cleanup_stale=True`` on a decorator will spawn a background thread that periodically removes stale cache entries. The interval between cleanup runs is controlled by ``cleanup_interval`` and defaults to one day.
200+
201+
.. code-block:: python
202+
203+
@cachier(stale_after=timedelta(seconds=30), cleanup_stale=True)
204+
def compute():
205+
...
206+
207+
195208
196209
Working with unhashable arguments
197210
---------------------------------

src/cachier/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class Params:
6363
separate_files: bool = False
6464
wait_for_calc_timeout: int = 0
6565
allow_none: bool = False
66+
cleanup_stale: bool = False
67+
cleanup_interval: timedelta = timedelta(days=1)
6668

6769

6870
_global_params = Params()
@@ -130,7 +132,7 @@ def set_global_params(**params: Any) -> None:
130132
}
131133
cachier.config._global_params = replace(
132134
cachier.config._global_params,
133-
**valid_params, # type: ignore[arg-type]
135+
**valid_params,
134136
)
135137

136138

src/cachier/core.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import inspect
1111
import os
12+
import threading
1213
import warnings
1314
from collections import OrderedDict
1415
from concurrent.futures import ThreadPoolExecutor
@@ -120,6 +121,8 @@ def cachier(
120121
separate_files: Optional[bool] = None,
121122
wait_for_calc_timeout: Optional[int] = None,
122123
allow_none: Optional[bool] = None,
124+
cleanup_stale: Optional[bool] = None,
125+
cleanup_interval: Optional[timedelta] = None,
123126
):
124127
"""Wrap as a persistent, stale-free memoization decorator.
125128
@@ -183,6 +186,11 @@ def cachier(
183186
allow_none: bool, optional
184187
Allows storing None values in the cache. If False, functions returning
185188
None will not be cached and are recalculated every call.
189+
cleanup_stale: bool, optional
190+
If True, stale cache entries are periodically deleted in a background
191+
thread. Defaults to False.
192+
cleanup_interval: datetime.timedelta, optional
193+
Minimum time between automatic cleanup runs. Defaults to one day.
186194
187195
"""
188196
# Check for deprecated parameters
@@ -236,6 +244,9 @@ def cachier(
236244
def _cachier_decorator(func):
237245
core.set_func(func)
238246

247+
last_cleanup = datetime.min
248+
cleanup_lock = threading.Lock()
249+
239250
# ---
240251
# MAINTAINER NOTE: max_age parameter
241252
#
@@ -261,7 +272,7 @@ def _cachier_decorator(func):
261272
# ---
262273

263274
def _call(*args, max_age: Optional[timedelta] = None, **kwds):
264-
nonlocal allow_none
275+
nonlocal allow_none, last_cleanup
265276
_allow_none = _update_with_defaults(allow_none, "allow_none", kwds)
266277
# print('Inside general wrapper for {}.'.format(func.__name__))
267278
ignore_cache = _pop_kwds_with_deprecation(
@@ -280,11 +291,26 @@ def _call(*args, max_age: Optional[timedelta] = None, **kwds):
280291
stale_after, "stale_after", kwds
281292
)
282293
_next_time = _update_with_defaults(next_time, "next_time", kwds)
294+
_cleanup_flag = _update_with_defaults(
295+
cleanup_stale, "cleanup_stale", kwds
296+
)
297+
_cleanup_interval_val = _update_with_defaults(
298+
cleanup_interval, "cleanup_interval", kwds
299+
)
283300
# merge args expanded as kwargs and the original kwds
284301
kwargs = _convert_args_kwargs(
285302
func, _is_method=core.func_is_method, args=args, kwds=kwds
286303
)
287304

305+
if _cleanup_flag:
306+
now = datetime.now()
307+
with cleanup_lock:
308+
if now - last_cleanup >= _cleanup_interval_val:
309+
last_cleanup = now
310+
_get_executor().submit(
311+
core.delete_stale_entries, _stale_after
312+
)
313+
288314
_print = print if verbose else lambda x: None
289315

290316
# Check current global caching state dynamically

src/cachier/cores/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import abc # for the _BaseCore abstract base class
1111
import inspect
1212
import threading
13+
from datetime import timedelta
1314
from typing import Callable, Optional, Tuple
1415

1516
from .._types import HashFunc
@@ -112,3 +113,7 @@ def clear_cache(self) -> None:
112113
@abc.abstractmethod
113114
def clear_being_calculated(self) -> None:
114115
"""Mark all entries in this cache as not being calculated."""
116+
117+
@abc.abstractmethod
118+
def delete_stale_entries(self, stale_after: timedelta) -> None:
119+
"""Delete cache entries older than ``stale_after``."""

src/cachier/cores/memory.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""A memory-based caching core for cachier."""
22

33
import threading
4-
from datetime import datetime
4+
from datetime import datetime, timedelta
55
from typing import Any, Dict, Optional, Tuple
66

77
from .._types import HashFunc
@@ -103,3 +103,13 @@ def clear_being_calculated(self) -> None:
103103
for entry in self.cache.values():
104104
entry._processing = False
105105
entry._condition = None
106+
107+
def delete_stale_entries(self, stale_after: timedelta) -> None:
108+
"""Remove stale entries from the in-memory cache."""
109+
now = datetime.now()
110+
with self.lock:
111+
keys_to_delete = [
112+
k for k, v in self.cache.items() if now - v.time > stale_after
113+
]
114+
for key in keys_to_delete:
115+
del self.cache[key]

src/cachier/cores/mongo.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import time # to sleep when waiting on Mongo cache\
1313
import warnings # to warn if pymongo is missing
1414
from contextlib import suppress
15-
from datetime import datetime
15+
from datetime import datetime, timedelta
1616
from typing import Any, Optional, Tuple
1717

1818
from .._types import HashFunc, Mongetter
@@ -146,3 +146,10 @@ def clear_being_calculated(self) -> None:
146146
},
147147
update={"$set": {"processing": False}},
148148
)
149+
150+
def delete_stale_entries(self, stale_after: timedelta) -> None:
151+
"""Delete stale entries from the MongoDB cache."""
152+
threshold = datetime.now() - stale_after
153+
self.mongo_collection.delete_many(
154+
filter={"func": self._func_str, "time": {"$lt": threshold}}
155+
)

src/cachier/cores/pickle.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
import os
1111
import pickle # for local caching
1212
import time
13-
from datetime import datetime
13+
from contextlib import suppress
14+
from datetime import datetime, timedelta
1415
from typing import Any, Dict, Optional, Tuple, Union
1516

1617
import portalocker # to lock on pickle cache IO
@@ -260,16 +261,16 @@ def mark_entry_not_calculated(self, key: str) -> None:
260261
cache[key]._processing = False
261262
self._save_cache(cache)
262263

263-
def _create_observer(self) -> Observer:
264+
def _create_observer(self) -> Observer: # type: ignore[valid-type]
264265
"""Create a new observer instance."""
265266
return Observer()
266267

267-
def _cleanup_observer(self, observer: Observer) -> None:
268+
def _cleanup_observer(self, observer: Observer) -> None: # type: ignore[valid-type]
268269
"""Clean up observer properly."""
269270
try:
270-
if observer.is_alive():
271-
observer.stop()
272-
observer.join(timeout=1.0)
271+
if observer.is_alive(): # type: ignore[attr-defined]
272+
observer.stop() # type: ignore[attr-defined]
273+
observer.join(timeout=1.0) # type: ignore[attr-defined]
273274
except Exception as e:
274275
logging.debug("Observer cleanup failed: %s", e)
275276

@@ -296,7 +297,7 @@ def wait_on_entry_calc(self, key: str) -> Any:
296297
else:
297298
raise
298299

299-
def _wait_with_inotify(self, key: str, filename: str) -> Any:
300+
def _wait_with_inotify(self, key: str, filename: str) -> Any: # type: ignore[valid-type]
300301
"""Wait for calculation using inotify with proper cleanup."""
301302
event_handler = _PickleCore.CacheChangeHandler(
302303
filename=filename, core=self, key=key
@@ -306,14 +307,14 @@ def _wait_with_inotify(self, key: str, filename: str) -> Any:
306307
event_handler.inject_observer(observer)
307308

308309
try:
309-
observer.schedule(
310+
observer.schedule( # type: ignore[attr-defined]
310311
event_handler, path=self.cache_dir, recursive=True
311312
)
312-
observer.start()
313+
observer.start() # type: ignore[attr-defined]
313314

314315
time_spent = 0
315-
while observer.is_alive():
316-
observer.join(timeout=1.0)
316+
while observer.is_alive(): # type: ignore[attr-defined]
317+
observer.join(timeout=1.0) # type: ignore[attr-defined]
317318
time_spent += 1
318319
self.check_calc_timeout(time_spent)
319320

@@ -324,7 +325,7 @@ def _wait_with_inotify(self, key: str, filename: str) -> Any:
324325
return event_handler.value
325326
finally:
326327
# Always cleanup the observer
327-
self._cleanup_observer(observer)
328+
self._cleanup_observer(observer) # type: ignore[attr-defined]
328329

329330
def _wait_with_polling(self, key: str) -> Any:
330331
"""Fallback method using polling instead of inotify."""
@@ -364,3 +365,28 @@ def clear_being_calculated(self) -> None:
364365
for key in cache:
365366
cache[key]._processing = False
366367
self._save_cache(cache)
368+
369+
def delete_stale_entries(self, stale_after: timedelta) -> None:
370+
"""Delete stale cache entries from the pickle cache."""
371+
now = datetime.now()
372+
if self.separate_files:
373+
path, name = os.path.split(self.cache_fpath)
374+
for subpath in os.listdir(path):
375+
if not subpath.startswith(f"{name}_"):
376+
continue
377+
entry = self._load_cache_by_key(
378+
hash_str=subpath.split("_")[-1]
379+
)
380+
if entry is not None and (now - entry.time > stale_after):
381+
with suppress(FileNotFoundError):
382+
os.remove(os.path.join(path, subpath))
383+
return
384+
385+
with self.lock:
386+
cache = self.get_cache_dict(reload=True)
387+
keys_to_delete = [
388+
k for k, v in cache.items() if now - v.time > stale_after
389+
]
390+
for key in keys_to_delete:
391+
del cache[key]
392+
self._save_cache(cache)

src/cachier/cores/redis.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pickle
44
import time
55
import warnings
6-
from datetime import datetime
6+
from datetime import datetime, timedelta
77
from typing import Any, Callable, Optional, Tuple, Union
88

99
try:
@@ -223,3 +223,28 @@ def clear_being_calculated(self) -> None:
223223
warnings.warn(
224224
f"Redis clear_being_calculated failed: {e}", stacklevel=2
225225
)
226+
227+
def delete_stale_entries(self, stale_after: timedelta) -> None:
228+
"""Remove stale entries from the Redis cache."""
229+
redis_client = self._resolve_redis_client()
230+
pattern = f"{self.key_prefix}:{self._func_str}:*"
231+
try:
232+
keys = redis_client.keys(pattern)
233+
threshold = datetime.now() - stale_after
234+
for key in keys:
235+
ts = redis_client.hget(key, "timestamp")
236+
if ts is None:
237+
continue
238+
try:
239+
ts_val = datetime.fromisoformat(ts.decode("utf-8"))
240+
except Exception as exc:
241+
warnings.warn(
242+
f"Redis timestamp parse failed: {exc}", stacklevel=2
243+
)
244+
continue
245+
if ts_val < threshold:
246+
redis_client.delete(key)
247+
except Exception as e:
248+
warnings.warn(
249+
f"Redis delete_stale_entries failed: {e}", stacklevel=2
250+
)

src/cachier/cores/sql.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pickle
44
import threading
5-
from datetime import datetime
5+
from datetime import datetime, timedelta
66
from typing import Any, Callable, Optional, Tuple, Union
77

88
try:
@@ -286,3 +286,17 @@ def clear_being_calculated(self) -> None:
286286
.values(processing=False)
287287
)
288288
session.commit()
289+
290+
def delete_stale_entries(self, stale_after: timedelta) -> None:
291+
"""Delete stale entries from the SQL cache."""
292+
threshold = datetime.now() - stale_after
293+
with self._lock, self._Session() as session:
294+
session.execute(
295+
delete(CacheTable).where(
296+
and_(
297+
CacheTable.function_id == self._func_str,
298+
CacheTable.timestamp < threshold,
299+
)
300+
)
301+
)
302+
session.commit()

tests/test_cleanup.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import os
2+
import pickle
3+
import time
4+
from dataclasses import replace
5+
from datetime import timedelta
6+
7+
import pytest
8+
9+
import cachier
10+
from cachier import cachier as cachier_dec
11+
12+
_copied_defaults = replace(cachier.get_global_params())
13+
14+
15+
def setup_function() -> None:
16+
cachier.set_global_params(**vars(_copied_defaults))
17+
18+
19+
def teardown_function() -> None:
20+
cachier.set_global_params(**vars(_copied_defaults))
21+
22+
23+
@pytest.mark.pickle
24+
def test_cleanup_stale_entries(tmp_path):
25+
@cachier_dec(
26+
cache_dir=tmp_path,
27+
stale_after=timedelta(seconds=1),
28+
cleanup_stale=True,
29+
cleanup_interval=timedelta(seconds=0),
30+
)
31+
def add(x):
32+
return x + 1
33+
34+
add.clear_cache()
35+
add(1)
36+
add(2)
37+
fname = f".{add.__module__}.{add.__qualname__}".replace("<", "_").replace(
38+
">", "_"
39+
)
40+
cache_path = os.path.join(add.cache_dpath(), fname)
41+
with open(cache_path, "rb") as fh:
42+
data = pickle.load(fh)
43+
assert len(data) == 2
44+
time.sleep(1.1)
45+
add(1)
46+
time.sleep(0.2)
47+
with open(cache_path, "rb") as fh:
48+
data = pickle.load(fh)
49+
assert len(data) == 1

0 commit comments

Comments
 (0)