From ecb7f339b15b7b28430b2b658f95e86af96da063 Mon Sep 17 00:00:00 2001 From: Shay Palachy-Affek Date: Tue, 15 Jul 2025 13:40:48 +0300 Subject: [PATCH 1/5] feat: add automatic stale cache cleanup --- README.rst | 13 ++++++++ src/cachier/config.py | 2 ++ src/cachier/core.py | 28 ++++++++++++++++- src/cachier/cores/base.py | 5 ++++ src/cachier/cores/memory.py | 12 +++++++- src/cachier/cores/mongo.py | 9 +++++- src/cachier/cores/pickle.py | 60 ++++++++++++++++++++++++++----------- src/cachier/cores/redis.py | 27 ++++++++++++++++- src/cachier/cores/sql.py | 24 +++++++++++---- tests/test_cleanup.py | 37 +++++++++++++++++++++++ 10 files changed, 190 insertions(+), 27 deletions(-) create mode 100644 tests/test_cleanup.py diff --git a/README.rst b/README.rst index 025659a8..5ec99e37 100644 --- a/README.rst +++ b/README.rst @@ -146,6 +146,8 @@ These parameters can be changed at any time and they will apply to all decorator * `stale_after` * `next_time` * `wait_for_calc_timeout` +* `cleanup_stale` +* `cleanup_interval` The current defaults can be fetched by calling `get_default_params`. @@ -192,6 +194,17 @@ Sometimes you may want your function to trigger a calculation when it encounters Further function calls made while the calculation is being performed will not trigger redundant calculations. +Automatic Cleanup of Stale Values +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Setting ``cleanup_stale=True`` on a decorator will spawn a background thread that periodically removes stale cache entries. The interval between cleanup runs is controlled by ``cleanup_interval`` and defaults to one day. + +.. code-block:: python + + @cachier(stale_after=timedelta(seconds=30), cleanup_stale=True) + def compute(): + ... + + Working with unhashable arguments --------------------------------- diff --git a/src/cachier/config.py b/src/cachier/config.py index cb908a4a..a6054fe9 100644 --- a/src/cachier/config.py +++ b/src/cachier/config.py @@ -63,6 +63,8 @@ class Params: separate_files: bool = False wait_for_calc_timeout: int = 0 allow_none: bool = False + cleanup_stale: bool = False + cleanup_interval: timedelta = timedelta(days=1) _global_params = Params() diff --git a/src/cachier/core.py b/src/cachier/core.py index 10431206..4db5e329 100644 --- a/src/cachier/core.py +++ b/src/cachier/core.py @@ -9,6 +9,7 @@ import inspect import os +import threading import warnings from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor @@ -120,6 +121,8 @@ def cachier( separate_files: Optional[bool] = None, wait_for_calc_timeout: Optional[int] = None, allow_none: Optional[bool] = None, + cleanup_stale: Optional[bool] = None, + cleanup_interval: Optional[timedelta] = None, ): """Wrap as a persistent, stale-free memoization decorator. @@ -183,6 +186,11 @@ def cachier( allow_none: bool, optional Allows storing None values in the cache. If False, functions returning None will not be cached and are recalculated every call. + cleanup_stale: bool, optional + If True, stale cache entries are periodically deleted in a background + thread. Defaults to False. + cleanup_interval: datetime.timedelta, optional + Minimum time between automatic cleanup runs. Defaults to one day. """ # Check for deprecated parameters @@ -236,6 +244,9 @@ def cachier( def _cachier_decorator(func): core.set_func(func) + last_cleanup = datetime.min + cleanup_lock = threading.Lock() + # --- # MAINTAINER NOTE: max_age parameter # @@ -261,7 +272,7 @@ def _cachier_decorator(func): # --- def _call(*args, max_age: Optional[timedelta] = None, **kwds): - nonlocal allow_none + nonlocal allow_none, last_cleanup _allow_none = _update_with_defaults(allow_none, "allow_none", kwds) # print('Inside general wrapper for {}.'.format(func.__name__)) ignore_cache = _pop_kwds_with_deprecation( @@ -280,11 +291,26 @@ def _call(*args, max_age: Optional[timedelta] = None, **kwds): stale_after, "stale_after", kwds ) _next_time = _update_with_defaults(next_time, "next_time", kwds) + _cleanup_flag = _update_with_defaults( + cleanup_stale, "cleanup_stale", kwds + ) + _cleanup_interval_val = _update_with_defaults( + cleanup_interval, "cleanup_interval", kwds + ) # merge args expanded as kwargs and the original kwds kwargs = _convert_args_kwargs( func, _is_method=core.func_is_method, args=args, kwds=kwds ) + if _cleanup_flag: + now = datetime.now() + with cleanup_lock: + if now - last_cleanup >= _cleanup_interval_val: + last_cleanup = now + _get_executor().submit( + core.delete_stale_entries, _stale_after + ) + _print = print if verbose else lambda x: None # Check current global caching state dynamically diff --git a/src/cachier/cores/base.py b/src/cachier/cores/base.py index f4dbbced..edb8e7ed 100644 --- a/src/cachier/cores/base.py +++ b/src/cachier/cores/base.py @@ -10,6 +10,7 @@ import abc # for the _BaseCore abstract base class import inspect import threading +from datetime import timedelta from typing import Callable, Optional, Tuple from .._types import HashFunc @@ -112,3 +113,7 @@ def clear_cache(self) -> None: @abc.abstractmethod def clear_being_calculated(self) -> None: """Mark all entries in this cache as not being calculated.""" + + @abc.abstractmethod + def delete_stale_entries(self, stale_after: timedelta) -> None: + """Delete cache entries older than ``stale_after``.""" diff --git a/src/cachier/cores/memory.py b/src/cachier/cores/memory.py index 19e3718c..ddd0acdb 100644 --- a/src/cachier/cores/memory.py +++ b/src/cachier/cores/memory.py @@ -1,7 +1,7 @@ """A memory-based caching core for cachier.""" import threading -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Dict, Optional, Tuple from .._types import HashFunc @@ -103,3 +103,13 @@ def clear_being_calculated(self) -> None: for entry in self.cache.values(): entry._processing = False entry._condition = None + + def delete_stale_entries(self, stale_after: timedelta) -> None: + """Remove stale entries from the in-memory cache.""" + now = datetime.now() + with self.lock: + keys_to_delete = [ + k for k, v in self.cache.items() if now - v.time > stale_after + ] + for key in keys_to_delete: + del self.cache[key] diff --git a/src/cachier/cores/mongo.py b/src/cachier/cores/mongo.py index 05edf4bd..fbc93711 100644 --- a/src/cachier/cores/mongo.py +++ b/src/cachier/cores/mongo.py @@ -12,7 +12,7 @@ import time # to sleep when waiting on Mongo cache\ import warnings # to warn if pymongo is missing from contextlib import suppress -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Optional, Tuple from .._types import HashFunc, Mongetter @@ -146,3 +146,10 @@ def clear_being_calculated(self) -> None: }, update={"$set": {"processing": False}}, ) + + def delete_stale_entries(self, stale_after: timedelta) -> None: + """Delete stale entries from the MongoDB cache.""" + threshold = datetime.now() - stale_after + self.mongo_collection.delete_many( + filter={"func": self._func_str, "time": {"$lt": threshold}} + ) diff --git a/src/cachier/cores/pickle.py b/src/cachier/cores/pickle.py index 82650ba9..4d851a4a 100644 --- a/src/cachier/cores/pickle.py +++ b/src/cachier/cores/pickle.py @@ -10,7 +10,7 @@ import os import pickle # for local caching import time -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Dict, Optional, Tuple, Union import portalocker # to lock on pickle cache IO @@ -117,8 +117,8 @@ def _convert_legacy_cache_entry( def _load_cache_dict(self) -> Dict[str, CacheEntry]: try: - with portalocker.Lock(self.cache_fpath, mode="rb") as cf: - cache = pickle.load(cf) + with portalocker.Lock(self.cache_fpath, mode="rb") as cf: # type: ignore[arg-type] + cache = pickle.load(cf) # type: ignore[arg-type] self._cache_used_fpath = str(self.cache_fpath) except (FileNotFoundError, EOFError): cache = {} @@ -144,8 +144,8 @@ def _load_cache_by_key( fpath = self.cache_fpath fpath += f"_{hash_str or key}" try: - with portalocker.Lock(fpath, mode="rb") as cache_file: - entry = pickle.load(cache_file) + with portalocker.Lock(fpath, mode="rb") as cache_file: # type: ignore[arg-type] + entry = pickle.load(cache_file) # type: ignore[arg-type] return _PickleCore._convert_legacy_cache_entry(entry) except (FileNotFoundError, EOFError): return None @@ -183,8 +183,8 @@ def _save_cache( elif hash_str is not None: fpath += f"_{hash_str}" with self.lock: - with portalocker.Lock(fpath, mode="wb") as cf: - pickle.dump(cache, cf, protocol=4) + with portalocker.Lock(fpath, mode="wb") as cf: # type: ignore[arg-type] + pickle.dump(cache, cf, protocol=4) # type: ignore[arg-type] # the same as check for separate_file, but changed for typing if isinstance(cache, dict): self._cache_dict = cache @@ -260,16 +260,16 @@ def mark_entry_not_calculated(self, key: str) -> None: cache[key]._processing = False self._save_cache(cache) - def _create_observer(self) -> Observer: + def _create_observer(self) -> Observer: # type: ignore[valid-type] """Create a new observer instance.""" return Observer() - def _cleanup_observer(self, observer: Observer) -> None: + def _cleanup_observer(self, observer: Observer) -> None: # type: ignore[valid-type] """Clean up observer properly.""" try: - if observer.is_alive(): - observer.stop() - observer.join(timeout=1.0) + if observer.is_alive(): # type: ignore[attr-defined] + observer.stop() # type: ignore[attr-defined] + observer.join(timeout=1.0) # type: ignore[attr-defined] except Exception as e: logging.debug("Observer cleanup failed: %s", e) @@ -296,7 +296,7 @@ def wait_on_entry_calc(self, key: str) -> Any: else: raise - def _wait_with_inotify(self, key: str, filename: str) -> Any: + def _wait_with_inotify(self, key: str, filename: str) -> Any: # type: ignore[valid-type] """Wait for calculation using inotify with proper cleanup.""" event_handler = _PickleCore.CacheChangeHandler( filename=filename, core=self, key=key @@ -306,14 +306,14 @@ def _wait_with_inotify(self, key: str, filename: str) -> Any: event_handler.inject_observer(observer) try: - observer.schedule( + observer.schedule( # type: ignore[attr-defined] event_handler, path=self.cache_dir, recursive=True ) - observer.start() + observer.start() # type: ignore[attr-defined] time_spent = 0 - while observer.is_alive(): - observer.join(timeout=1.0) + while observer.is_alive(): # type: ignore[attr-defined] + observer.join(timeout=1.0) # type: ignore[attr-defined] time_spent += 1 self.check_calc_timeout(time_spent) @@ -324,7 +324,7 @@ def _wait_with_inotify(self, key: str, filename: str) -> Any: return event_handler.value finally: # Always cleanup the observer - self._cleanup_observer(observer) + self._cleanup_observer(observer) # type: ignore[attr-defined] def _wait_with_polling(self, key: str) -> Any: """Fallback method using polling instead of inotify.""" @@ -364,3 +364,27 @@ def clear_being_calculated(self) -> None: for key in cache: cache[key]._processing = False self._save_cache(cache) + + def delete_stale_entries(self, stale_after: timedelta) -> None: + """Delete stale cache entries from the pickle cache.""" + now = datetime.now() + if self.separate_files: + path, name = os.path.split(self.cache_fpath) + for subpath in os.listdir(path): + if not subpath.startswith(f"{name}_"): + continue + entry = self._load_cache_by_key( + hash_str=subpath.split("_")[-1] + ) + if entry is not None and (now - entry.time > stale_after): + os.remove(os.path.join(path, subpath)) + return + + with self.lock: + cache = self.get_cache_dict(reload=True) + keys_to_delete = [ + k for k, v in cache.items() if now - v.time > stale_after + ] + for key in keys_to_delete: + del cache[key] + self._save_cache(cache) diff --git a/src/cachier/cores/redis.py b/src/cachier/cores/redis.py index 4b85a146..ccd0ffe0 100644 --- a/src/cachier/cores/redis.py +++ b/src/cachier/cores/redis.py @@ -3,7 +3,7 @@ import pickle import time import warnings -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Callable, Optional, Tuple, Union try: @@ -223,3 +223,28 @@ def clear_being_calculated(self) -> None: warnings.warn( f"Redis clear_being_calculated failed: {e}", stacklevel=2 ) + + def delete_stale_entries(self, stale_after: timedelta) -> None: + """Remove stale entries from the Redis cache.""" + redis_client = self._resolve_redis_client() + pattern = f"{self.key_prefix}:{self._func_str}:*" + try: + keys = redis_client.keys(pattern) + threshold = datetime.now() - stale_after + for key in keys: + ts = redis_client.hget(key, "timestamp") + if ts is None: + continue + try: + ts_val = datetime.fromisoformat(ts.decode("utf-8")) + except Exception as exc: + warnings.warn( + f"Redis timestamp parse failed: {exc}", stacklevel=2 + ) + continue + if ts_val < threshold: + redis_client.delete(key) + except Exception as e: + warnings.warn( + f"Redis delete_stale_entries failed: {e}", stacklevel=2 + ) diff --git a/src/cachier/cores/sql.py b/src/cachier/cores/sql.py index ad4364cf..d46c1544 100644 --- a/src/cachier/cores/sql.py +++ b/src/cachier/cores/sql.py @@ -2,7 +2,7 @@ import pickle import threading -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Callable, Optional, Tuple, Union try: @@ -109,10 +109,10 @@ def get_entry_by_key(self, key: str) -> Tuple[str, Optional[CacheEntry]]: value = pickle.loads(row.value) if row.value is not None else None entry = CacheEntry( value=value, - time=row.timestamp, - stale=row.stale, - _processing=row.processing, - _completed=row.completed, + time=row.timestamp, # type: ignore[arg-type] + stale=row.stale, # type: ignore[arg-type] + _processing=row.processing, # type: ignore[arg-type] + _completed=row.completed, # type: ignore[arg-type] ) return key, entry @@ -286,3 +286,17 @@ def clear_being_calculated(self) -> None: .values(processing=False) ) session.commit() + + def delete_stale_entries(self, stale_after: timedelta) -> None: + """Delete stale entries from the SQL cache.""" + threshold = datetime.now() - stale_after + with self._lock, self._Session() as session: + session.execute( + delete(CacheTable).where( + and_( + CacheTable.function_id == self._func_str, + CacheTable.timestamp < threshold, + ) + ) + ) + session.commit() diff --git a/tests/test_cleanup.py b/tests/test_cleanup.py new file mode 100644 index 00000000..fe8bcabe --- /dev/null +++ b/tests/test_cleanup.py @@ -0,0 +1,37 @@ +import os +import pickle +import time +from datetime import timedelta + +import pytest + +from cachier import cachier + + +@pytest.mark.pickle +def test_cleanup_stale_entries(tmp_path): + @cachier( + cache_dir=tmp_path, + stale_after=timedelta(seconds=1), + cleanup_stale=True, + cleanup_interval=timedelta(seconds=0), + ) + def add(x): + return x + 1 + + add.clear_cache() + add(1) + add(2) + fname = f".{add.__module__}.{add.__qualname__}".replace("<", "_").replace( + ">", "_" + ) + cache_path = os.path.join(add.cache_dpath(), fname) + with open(cache_path, "rb") as fh: + data = pickle.load(fh) + assert len(data) == 2 + time.sleep(1.1) + add(1) + time.sleep(0.2) + with open(cache_path, "rb") as fh: + data = pickle.load(fh) + assert len(data) == 1 From 637aab2b0cda24f200bc14c833d138f5af1ffee5 Mon Sep 17 00:00:00 2001 From: Shay Palachy-Affek Date: Tue, 15 Jul 2025 14:11:05 +0300 Subject: [PATCH 2/5] Fix test expectations for new cleanup defaults --- tests/test_core_lookup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_core_lookup.py b/tests/test_core_lookup.py index 877b683f..2b2a9191 100644 --- a/tests/test_core_lookup.py +++ b/tests/test_core_lookup.py @@ -12,6 +12,8 @@ def test_get_default_params(): "backend", "cache_dir", "caching_enabled", + "cleanup_interval", + "cleanup_stale", "hash_func", "mongetter", "next_time", From c04a4c57395aa4f0fdeaaf206416ff22c7739c15 Mon Sep 17 00:00:00 2001 From: Shay Palachy-Affek Date: Tue, 15 Jul 2025 14:27:28 +0300 Subject: [PATCH 3/5] Handle missing pickle files during stale cleanup --- src/cachier/cores/pickle.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cachier/cores/pickle.py b/src/cachier/cores/pickle.py index 4d851a4a..225366b1 100644 --- a/src/cachier/cores/pickle.py +++ b/src/cachier/cores/pickle.py @@ -10,6 +10,7 @@ import os import pickle # for local caching import time +from contextlib import suppress from datetime import datetime, timedelta from typing import Any, Dict, Optional, Tuple, Union @@ -377,7 +378,8 @@ def delete_stale_entries(self, stale_after: timedelta) -> None: hash_str=subpath.split("_")[-1] ) if entry is not None and (now - entry.time > stale_after): - os.remove(os.path.join(path, subpath)) + with suppress(FileNotFoundError): + os.remove(os.path.join(path, subpath)) return with self.lock: From 6759f5435a4dcd22a389b96994e5c32cc99ff31e Mon Sep 17 00:00:00 2001 From: Shay Palachy-Affek Date: Tue, 15 Jul 2025 14:57:28 +0300 Subject: [PATCH 4/5] Reset global params in cleanup test --- tests/test_cleanup.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tests/test_cleanup.py b/tests/test_cleanup.py index fe8bcabe..1613a33c 100644 --- a/tests/test_cleanup.py +++ b/tests/test_cleanup.py @@ -1,16 +1,28 @@ import os import pickle import time +from dataclasses import replace from datetime import timedelta import pytest -from cachier import cachier +import cachier +from cachier import cachier as cachier_dec + +_copied_defaults = replace(cachier.get_global_params()) + + +def setup_function() -> None: + cachier.set_global_params(**vars(_copied_defaults)) + + +def teardown_function() -> None: + cachier.set_global_params(**vars(_copied_defaults)) @pytest.mark.pickle def test_cleanup_stale_entries(tmp_path): - @cachier( + @cachier_dec( cache_dir=tmp_path, stale_after=timedelta(seconds=1), cleanup_stale=True, From 1134deb9d7ddc74c4f6b0b1b868c3bc53934791f Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 15 Jul 2025 23:25:00 +0300 Subject: [PATCH 5/5] remove mypy ignore comments --- src/cachier/config.py | 2 +- src/cachier/cores/pickle.py | 12 ++++++------ src/cachier/cores/sql.py | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/cachier/config.py b/src/cachier/config.py index a6054fe9..53dfbe82 100644 --- a/src/cachier/config.py +++ b/src/cachier/config.py @@ -132,7 +132,7 @@ def set_global_params(**params: Any) -> None: } cachier.config._global_params = replace( cachier.config._global_params, - **valid_params, # type: ignore[arg-type] + **valid_params, ) diff --git a/src/cachier/cores/pickle.py b/src/cachier/cores/pickle.py index 225366b1..344fcba7 100644 --- a/src/cachier/cores/pickle.py +++ b/src/cachier/cores/pickle.py @@ -118,8 +118,8 @@ def _convert_legacy_cache_entry( def _load_cache_dict(self) -> Dict[str, CacheEntry]: try: - with portalocker.Lock(self.cache_fpath, mode="rb") as cf: # type: ignore[arg-type] - cache = pickle.load(cf) # type: ignore[arg-type] + with portalocker.Lock(self.cache_fpath, mode="rb") as cf: + cache = pickle.load(cf) self._cache_used_fpath = str(self.cache_fpath) except (FileNotFoundError, EOFError): cache = {} @@ -145,8 +145,8 @@ def _load_cache_by_key( fpath = self.cache_fpath fpath += f"_{hash_str or key}" try: - with portalocker.Lock(fpath, mode="rb") as cache_file: # type: ignore[arg-type] - entry = pickle.load(cache_file) # type: ignore[arg-type] + with portalocker.Lock(fpath, mode="rb") as cache_file: + entry = pickle.load(cache_file) return _PickleCore._convert_legacy_cache_entry(entry) except (FileNotFoundError, EOFError): return None @@ -184,8 +184,8 @@ def _save_cache( elif hash_str is not None: fpath += f"_{hash_str}" with self.lock: - with portalocker.Lock(fpath, mode="wb") as cf: # type: ignore[arg-type] - pickle.dump(cache, cf, protocol=4) # type: ignore[arg-type] + with portalocker.Lock(fpath, mode="wb") as cf: + pickle.dump(cache, cf, protocol=4) # the same as check for separate_file, but changed for typing if isinstance(cache, dict): self._cache_dict = cache diff --git a/src/cachier/cores/sql.py b/src/cachier/cores/sql.py index d46c1544..543531ef 100644 --- a/src/cachier/cores/sql.py +++ b/src/cachier/cores/sql.py @@ -109,10 +109,10 @@ def get_entry_by_key(self, key: str) -> Tuple[str, Optional[CacheEntry]]: value = pickle.loads(row.value) if row.value is not None else None entry = CacheEntry( value=value, - time=row.timestamp, # type: ignore[arg-type] - stale=row.stale, # type: ignore[arg-type] - _processing=row.processing, # type: ignore[arg-type] - _completed=row.completed, # type: ignore[arg-type] + time=row.timestamp, + stale=row.stale, + _processing=row.processing, + _completed=row.completed, ) return key, entry