-
Notifications
You must be signed in to change notification settings - Fork 72
Expand file tree
/
Copy pathmemory.py
More file actions
119 lines (105 loc) · 4.02 KB
/
memory.py
File metadata and controls
119 lines (105 loc) · 4.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""A memory-based caching core for cachier."""
import threading
from datetime import datetime, timedelta
from typing import Any, Dict, Optional, Tuple
from .._types import HashFunc
from ..config import CacheEntry
from .base import _BaseCore, _get_func_str
class _MemoryCore(_BaseCore):
"""The memory core class for cachier."""
def __init__(
self,
hash_func: Optional[HashFunc],
wait_for_calc_timeout: Optional[int],
entry_size_limit: Optional[int] = None,
):
super().__init__(hash_func, wait_for_calc_timeout, entry_size_limit)
self.cache: Dict[str, CacheEntry] = {}
def _hash_func_key(self, key: str) -> str:
return f"{_get_func_str(self.func)}:{key}"
def get_entry_by_key(
self, key: str, reload=False
) -> Tuple[str, Optional[CacheEntry]]:
with self.lock:
return key, self.cache.get(self._hash_func_key(key), None)
def set_entry(self, key: str, func_res: Any) -> bool:
if not self._should_store(func_res):
return False
hash_key = self._hash_func_key(key)
with self.lock:
try:
# we need to retain the existing condition so that
# mark_entry_not_calculated can notify all possibly-waiting
# threads about it
cond = self.cache[hash_key]._condition
except KeyError: # pragma: no cover
cond = None
self.cache[hash_key] = CacheEntry(
value=func_res,
time=datetime.now(),
stale=False,
_processing=False,
_condition=cond,
_completed=True,
)
return True
def mark_entry_being_calculated(self, key: str) -> None:
with self.lock:
condition = threading.Condition()
hash_key = self._hash_func_key(key)
if hash_key in self.cache:
self.cache[hash_key]._processing = True
self.cache[hash_key]._condition = condition
# condition.acquire()
else:
self.cache[hash_key] = CacheEntry(
value=None,
time=datetime.now(),
stale=False,
_processing=True,
_condition=condition,
)
def mark_entry_not_calculated(self, key: str) -> None:
hash_key = self._hash_func_key(key)
with self.lock:
if hash_key not in self.cache:
return # that's ok, we don't need an entry in that case
entry = self.cache[hash_key]
entry._processing = False
cond = entry._condition
if cond:
cond.acquire()
cond.notify_all()
cond.release()
entry._condition = None
def wait_on_entry_calc(self, key: str) -> Any:
hash_key = self._hash_func_key(key)
with self.lock: # pragma: no cover
entry = self.cache[hash_key]
if entry is None:
return None
if not entry._processing:
return entry.value
if entry._condition is None:
raise RuntimeError("No condition set for entry")
entry._condition.acquire()
entry._condition.wait()
entry._condition.release()
return self.cache[hash_key].value
def clear_cache(self) -> None:
with self.lock:
self.cache.clear()
def clear_being_calculated(self) -> None:
with self.lock:
for entry in self.cache.values():
entry._processing = False
entry._condition = None
def delete_stale_entries(self, stale_after: timedelta) -> None:
"""Remove stale entries from the in-memory cache."""
now = datetime.now()
with self.lock:
keys_to_delete = [
k for k, v in self.cache.items() if now - v.time > stale_after
]
for key in keys_to_delete:
del self.cache[key]