-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache.py
More file actions
304 lines (236 loc) · 9.82 KB
/
cache.py
File metadata and controls
304 lines (236 loc) · 9.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
Caching utilities for anyplot API.
Centralized cache management with consistent key patterns.
Includes stampede protection (per-key asyncio.Lock) and
stale-while-revalidate (background refresh before TTL expiry).
"""
import asyncio
import logging
import time
from collections.abc import Awaitable, Callable
from typing import Any, TypeVar, cast
from cachetools import TTLCache
from core.config import settings
T = TypeVar("T")
logger = logging.getLogger(__name__)
class _LockPruningTTLCache(TTLCache):
"""TTLCache that prunes the per-key asyncio.Lock when an entry is evicted.
Binding lock lifecycle to cache-entry lifecycle gives us two guarantees
a separate bounded `_locks` collection cannot:
1. **No unbounded growth** — every lock that gets created either ends up
in the cache (and is later pruned via TTL/LRU/explicit-delete) or is
cleaned up by clear_cache().
2. **No race between lock-eviction and lock-holder** — a lock can only
disappear once its cache entry has been written, which means any
in-flight `factory()` will have called `set_cache(key, ...)` before
the lock entry can be reaped. New callers either find the cached
value (no factory re-run) or take a fresh lock (which is fine,
because at that point nobody else is in the critical section).
"""
def __delitem__(self, key, *args, **kwargs):
try:
super().__delitem__(key, *args, **kwargs)
finally:
_locks.pop(key, None)
# Global cache instance. Stores `(value, monotonic_set_at)` tuples — folding
# the timestamp into the entry keeps cache age + payload on a single lifecycle
# (was a separate _timestamps dict that grew unbounded under high-cardinality
# traffic such as /plots/filter or /og/* keys).
_cache: _LockPruningTTLCache = _LockPruningTTLCache(maxsize=settings.cache_maxsize, ttl=settings.cache_ttl)
# Per-key locks for stampede protection. Plain dict; lifecycle is bound to
# `_cache` via `_LockPruningTTLCache.__delitem__`. Refresh-locks stored under
# `_refresh:<key>` never have a corresponding cache entry and are pruned by
# `_background_refresh` on completion.
_locks: dict[str, asyncio.Lock] = {}
def _get_lock(key: str) -> asyncio.Lock:
"""Get or create a lock for a specific cache key.
Safe because asyncio is single-threaded — no race on dict access.
"""
if key not in _locks:
_locks[key] = asyncio.Lock()
return _locks[key]
def cache_key(*parts: str) -> str:
"""
Build consistent cache key from parts.
Args:
*parts: Key components to join.
Returns:
Cache key string.
Example:
>>> cache_key("spec", "scatter-basic")
"spec:scatter-basic"
"""
return ":".join(str(p) for p in parts if p)
def get_cache(key: str) -> Any | None:
"""
Get value from cache.
Args:
key: Cache key.
Returns:
Cached value or None if not found.
"""
entry = _cache.get(key)
return entry[0] if entry is not None else None
def set_cache(key: str, value: Any) -> None:
"""
Set value in cache.
Args:
key: Cache key.
value: Value to cache.
"""
_cache[key] = (value, time.monotonic())
def cache_age(key: str) -> float | None:
"""Seconds since key was last set, or None if not tracked."""
entry = _cache.get(key)
return time.monotonic() - entry[1] if entry is not None else None
def clear_cache() -> None:
"""
Clear entire cache.
Use this when you want to invalidate all cached data.
Called automatically after database synchronization.
Example:
>>> clear_cache() # Invalidates all cached responses
"""
_cache.clear()
# Per-entry locks are pruned by _LockPruningTTLCache.__delitem__, but
# also clear refresh-locks (`_refresh:*`) that have no cache entry and
# any locks for cold-miss attempts whose factory never called set_cache.
_locks.clear()
def clear_cache_by_pattern(pattern: str) -> int:
"""
Clear cache entries matching a pattern.
Args:
pattern: String pattern to match (substring match)
Returns:
Number of cache entries cleared
Example:
>>> clear_cache_by_pattern("spec:") # Clears all spec-related cache
15
>>> clear_cache_by_pattern("filter:") # Clears all filter cache
42
"""
keys_to_delete = [key for key in _cache.keys() if pattern in key]
# _LockPruningTTLCache.__delitem__ also prunes _locks[key].
for key in keys_to_delete:
del _cache[key]
return len(keys_to_delete)
def clear_spec_cache(spec_id: str) -> int:
"""
Clear all cache entries related to a specific spec.
Clears spec detail, spec images, spec list, filters, and stats caches.
Args:
spec_id: The specification ID
Returns:
Total count across all cleared patterns (may count overlapping keys multiple times)
Example:
>>> clear_spec_cache("scatter-basic")
5
"""
# Clear spec detail, spec images, and spec list cache
count = 0
count += clear_cache_by_pattern(f"spec:{spec_id}")
count += clear_cache_by_pattern(f"spec_images:{spec_id}")
count += clear_cache_by_pattern("specs_list") # List might have changed
count += clear_cache_by_pattern("specs_map") # Map page payload might have changed
count += clear_cache_by_pattern("filter:") # Filters might be affected
count += clear_cache_by_pattern("stats") # Stats might have changed
count += clear_cache_by_pattern("sitemap") # Sitemap includes spec URLs
count += clear_cache_by_pattern(f"seo:{spec_id}") # SEO proxy pages for this spec
count += clear_cache_by_pattern(f"og:{spec_id}") # OG images for this spec
return count
def clear_library_cache(library_id: str) -> int:
"""
Clear all cache entries for a specific library.
Args:
library_id: The library ID
Returns:
Number of cache entries cleared
Example:
>>> clear_library_cache("matplotlib")
3
"""
# Clear library images and lists
count = 0
count += clear_cache_by_pattern(f"lib_images:{library_id}")
count += clear_cache_by_pattern("libraries") # List might have changed
count += clear_cache_by_pattern("filter:") # Filters might be affected
count += clear_cache_by_pattern("stats") # Stats might have changed
count += clear_cache_by_pattern("sitemap") # Sitemap includes library URLs
return count
def get_cache_stats() -> dict:
"""
Get cache statistics.
Returns:
Dict with cache size, maxsize, and TTL
Example:
>>> get_cache_stats()
{"size": 42, "maxsize": 1000, "ttl": 600}
"""
return {"size": len(_cache), "maxsize": _cache.maxsize, "ttl": _cache.ttl}
# ---------------------------------------------------------------------------
# Stampede protection + stale-while-revalidate
# ---------------------------------------------------------------------------
async def get_or_set_cache(
key: str,
factory: Callable[[], Awaitable[T]],
*,
refresh_after: float | None = None,
refresh_factory: Callable[[], Awaitable[T]] | None = None,
) -> T:
"""Get cached value or compute it. Prevents stampede via per-key lock.
If *refresh_after* is set and the cached entry is older than that many
seconds, a background refresh is scheduled and the stale value is
returned immediately (stale-while-revalidate).
Args:
key: Cache key.
factory: Async callable that produces the value (e.g. DB query).
Used for cold-miss (inline). May capture a request-scoped DB session.
refresh_after: Seconds after which to trigger background refresh.
refresh_factory: Standalone async callable for background refresh.
Must create its own DB session (via get_db_context). Only used
when refresh_after is set. Falls back to *factory* if not provided.
"""
cached = get_cache(key)
if cached is not None:
# Stale-while-revalidate: schedule background refresh if stale
if refresh_after is not None:
age = cache_age(key)
if age is not None and age > refresh_after:
_schedule_refresh(key, refresh_factory or factory)
return cast(T, cached)
# Cold miss — must await. Lock prevents stampede.
async with _get_lock(key):
# Double-check after acquiring lock
cached = get_cache(key)
if cached is not None:
return cast(T, cached)
result = await factory()
set_cache(key, result)
return result
def _schedule_refresh(key: str, factory: Callable[[], Awaitable[Any]]) -> None:
"""Schedule a background cache refresh if one isn't already running."""
refresh_key = f"_refresh:{key}"
lock = _get_lock(refresh_key)
if lock.locked():
return # refresh already in progress
asyncio.create_task(_background_refresh(key, refresh_key, factory, lock))
async def _background_refresh(
key: str, refresh_key: str, factory: Callable[[], Awaitable[Any]], lock: asyncio.Lock
) -> None:
"""Run factory in background and update cache. Errors are logged, not raised.
Refresh-locks (`_refresh:<key>`) have no corresponding cache entry, so the
`_LockPruningTTLCache.__delitem__` hook never reaps them — they would
accumulate one-per-refreshed-key indefinitely. Pop in `finally` to bound
`_locks` growth. A duplicate-task race (a second caller taking the same
released lock before pop runs) only costs duplicated factory work, not
correctness, since `set_cache` is last-write-wins.
"""
try:
async with lock:
try:
result = await factory()
set_cache(key, result)
except Exception:
logger.warning("Background cache refresh failed for key: %s", key, exc_info=True)
finally:
_locks.pop(refresh_key, None)