Skip to content

Commit 9d08e8a

Browse files
committed
Support CacheStore.with_read_only
1 parent 2affa68 commit 9d08e8a

2 files changed

Lines changed: 138 additions & 63 deletions

File tree

src/zarr/experimental/cache_store.py

Lines changed: 72 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@
1515
from zarr.core.buffer.core import Buffer, BufferPrototype
1616

1717

18+
class _CacheState:
19+
_cache_order: OrderedDict[str, None] # Track access order for LRU
20+
_current_size: int # Track current cache size
21+
_key_sizes: dict[str, int] # Track size of each cached key
22+
_lock: asyncio.Lock
23+
_hits: int # Cache hit counter
24+
_misses: int # Cache miss counter
25+
_evictions: int # Cache eviction counter
26+
27+
1828
class CacheStore(WrapperStore[Store]):
1929
"""
2030
A dual-store caching implementation for Zarr stores.
@@ -71,13 +81,7 @@ class CacheStore(WrapperStore[Store]):
7181
max_size: int | None
7282
key_insert_times: dict[str, float]
7383
cache_set_data: bool
74-
_cache_order: OrderedDict[str, None] # Track access order for LRU
75-
_current_size: int # Track current cache size
76-
_key_sizes: dict[str, int] # Track size of each cached key
77-
_lock: asyncio.Lock
78-
_hits: int # Cache hit counter
79-
_misses: int # Cache miss counter
80-
_evictions: int # Cache eviction counter
84+
_state: _CacheState
8185

8286
def __init__(
8387
self,
@@ -107,24 +111,39 @@ def __init__(
107111
else:
108112
self.max_age_seconds = max_age_seconds
109113
self.max_size = max_size
114+
self.cache_set_data = cache_set_data
115+
self._state = _CacheState()
116+
110117
if key_insert_times is None:
111118
self.key_insert_times = {}
112119
else:
113120
self.key_insert_times = key_insert_times
114-
self.cache_set_data = cache_set_data
115-
self._cache_order = OrderedDict()
116-
self._current_size = 0
117-
self._key_sizes = {}
118-
self._lock = asyncio.Lock()
119-
self._hits = 0
120-
self._misses = 0
121-
self._evictions = 0
121+
self._state._cache_order = OrderedDict()
122+
self._state._current_size = 0
123+
self._state._key_sizes = {}
124+
self._state._lock = asyncio.Lock()
125+
self._state._hits = 0
126+
self._state._misses = 0
127+
self._state._evictions = 0
122128

123129
def _with_store(self, store: Store) -> Self:
124130
# Cannot support this operation because it would share a cache, but have a new store
125131
# So cache keys would conflict
126132
raise NotImplementedError("CacheStore does not support this operation.")
127133

134+
def with_read_only(self, read_only: bool = False) -> Self:
135+
# Create a new cache store that shares the same cache and mutable state
136+
store = type(self)(
137+
store=self._store.with_read_only(read_only),
138+
cache_store=self._cache,
139+
max_age_seconds=self.max_age_seconds,
140+
max_size=self.max_size,
141+
key_insert_times=self.key_insert_times,
142+
cache_set_data=self.cache_set_data,
143+
)
144+
store._state = self._state
145+
return store
146+
128147
def _is_key_fresh(self, key: str) -> bool:
129148
"""Check if a cached key is still fresh based on max_age_seconds.
130149
@@ -145,9 +164,9 @@ async def _accommodate_value(self, value_size: int) -> None:
145164
return
146165

147166
# Remove least recently used items until we have enough space
148-
while self._current_size + value_size > self.max_size and self._cache_order:
167+
while self._state._current_size + value_size > self.max_size and self._state._cache_order:
149168
# Get the least recently used key (first in OrderedDict)
150-
lru_key = next(iter(self._cache_order))
169+
lru_key = next(iter(self._state._cache_order))
151170
await self._evict_key(lru_key)
152171

153172
async def _evict_key(self, key: str) -> None:
@@ -157,15 +176,15 @@ async def _evict_key(self, key: str) -> None:
157176
Updates size tracking atomically with deletion.
158177
"""
159178
try:
160-
key_size = self._key_sizes.get(key, 0)
179+
key_size = self._state._key_sizes.get(key, 0)
161180

162181
# Delete from cache store
163182
await self._cache.delete(key)
164183

165184
# Update tracking after successful deletion
166185
self._remove_from_tracking(key)
167-
self._current_size = max(0, self._current_size - key_size)
168-
self._evictions += 1
186+
self._state._current_size = max(0, self._state._current_size - key_size)
187+
self._state._evictions += 1
169188

170189
logger.debug("_evict_key: evicted key %s, freed %d bytes", key, key_size)
171190
except Exception:
@@ -188,39 +207,39 @@ async def _cache_value(self, key: str, value: Buffer) -> None:
188207
)
189208
return
190209

191-
async with self._lock:
210+
async with self._state._lock:
192211
# If key already exists, subtract old size first
193-
if key in self._key_sizes:
194-
old_size = self._key_sizes[key]
195-
self._current_size -= old_size
212+
if key in self._state._key_sizes:
213+
old_size = self._state._key_sizes[key]
214+
self._state._current_size -= old_size
196215
logger.debug("_cache_value: updating existing key %s, old size %d", key, old_size)
197216

198217
# Make room for the new value (this calls _evict_key_locked internally)
199218
await self._accommodate_value(value_size)
200219

201220
# Update tracking atomically
202-
self._cache_order[key] = None # OrderedDict to track access order
203-
self._current_size += value_size
204-
self._key_sizes[key] = value_size
221+
self._state._cache_order[key] = None # OrderedDict to track access order
222+
self._state._current_size += value_size
223+
self._state._key_sizes[key] = value_size
205224
self.key_insert_times[key] = time.monotonic()
206225

207226
logger.debug("_cache_value: cached key %s with size %d bytes", key, value_size)
208227

209228
async def _update_access_order(self, key: str) -> None:
210229
"""Update the access order for LRU tracking."""
211-
if key in self._cache_order:
212-
async with self._lock:
230+
if key in self._state._cache_order:
231+
async with self._state._lock:
213232
# Move to end (most recently used)
214-
self._cache_order.move_to_end(key)
233+
self._state._cache_order.move_to_end(key)
215234

216235
def _remove_from_tracking(self, key: str) -> None:
217236
"""Remove a key from all tracking structures.
218237
219-
Must be called while holding self._lock.
238+
Must be called while holding self._state._lock.
220239
"""
221-
self._cache_order.pop(key, None)
240+
self._state._cache_order.pop(key, None)
222241
self.key_insert_times.pop(key, None)
223-
self._key_sizes.pop(key, None)
242+
self._state._key_sizes.pop(key, None)
224243

225244
async def _get_try_cache(
226245
self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
@@ -229,20 +248,20 @@ async def _get_try_cache(
229248
maybe_cached_result = await self._cache.get(key, prototype, byte_range)
230249
if maybe_cached_result is not None:
231250
logger.debug("_get_try_cache: key %s found in cache (HIT)", key)
232-
self._hits += 1
251+
self._state._hits += 1
233252
# Update access order for LRU
234253
await self._update_access_order(key)
235254
return maybe_cached_result
236255
else:
237256
logger.debug(
238257
"_get_try_cache: key %s not found in cache (MISS), fetching from store", key
239258
)
240-
self._misses += 1
259+
self._state._misses += 1
241260
maybe_fresh_result = await super().get(key, prototype, byte_range)
242261
if maybe_fresh_result is None:
243262
# Key doesn't exist in source store
244263
await self._cache.delete(key)
245-
async with self._lock:
264+
async with self._state._lock:
246265
self._remove_from_tracking(key)
247266
else:
248267
# Cache the newly fetched value
@@ -254,12 +273,12 @@ async def _get_no_cache(
254273
self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
255274
) -> Buffer | None:
256275
"""Get data directly from source store and update cache."""
257-
self._misses += 1
276+
self._state._misses += 1
258277
maybe_fresh_result = await super().get(key, prototype, byte_range)
259278
if maybe_fresh_result is None:
260279
# Key doesn't exist in source, remove from cache and tracking
261280
await self._cache.delete(key)
262-
async with self._lock:
281+
async with self._state._lock:
263282
self._remove_from_tracking(key)
264283
else:
265284
logger.debug("_get_no_cache: key %s found in store, setting in cache", key)
@@ -317,7 +336,7 @@ async def set(self, key: str, value: Buffer) -> None:
317336
else:
318337
logger.debug("set: deleting key %s from cache", key)
319338
await self._cache.delete(key)
320-
async with self._lock:
339+
async with self._state._lock:
321340
self._remove_from_tracking(key)
322341

323342
async def delete(self, key: str) -> None:
@@ -333,7 +352,7 @@ async def delete(self, key: str) -> None:
333352
await super().delete(key)
334353
logger.debug("delete: deleting key %s from cache", key)
335354
await self._cache.delete(key)
336-
async with self._lock:
355+
async with self._state._lock:
337356
self._remove_from_tracking(key)
338357

339358
def cache_info(self) -> dict[str, Any]:
@@ -344,20 +363,20 @@ def cache_info(self) -> dict[str, Any]:
344363
if self.max_age_seconds == "infinity"
345364
else self.max_age_seconds,
346365
"max_size": self.max_size,
347-
"current_size": self._current_size,
366+
"current_size": self._state._current_size,
348367
"cache_set_data": self.cache_set_data,
349368
"tracked_keys": len(self.key_insert_times),
350-
"cached_keys": len(self._cache_order),
369+
"cached_keys": len(self._state._cache_order),
351370
}
352371

353372
def cache_stats(self) -> dict[str, Any]:
354373
"""Return cache performance statistics."""
355-
total_requests = self._hits + self._misses
356-
hit_rate = self._hits / total_requests if total_requests > 0 else 0.0
374+
total_requests = self._state._hits + self._state._misses
375+
hit_rate = self._state._hits / total_requests if total_requests > 0 else 0.0
357376
return {
358-
"hits": self._hits,
359-
"misses": self._misses,
360-
"evictions": self._evictions,
377+
"hits": self._state._hits,
378+
"misses": self._state._misses,
379+
"evictions": self._state._evictions,
361380
"total_requests": total_requests,
362381
"hit_rate": hit_rate,
363382
}
@@ -369,11 +388,11 @@ async def clear_cache(self) -> None:
369388
await self._cache.clear()
370389

371390
# Reset tracking
372-
async with self._lock:
391+
async with self._state._lock:
373392
self.key_insert_times.clear()
374-
self._cache_order.clear()
375-
self._key_sizes.clear()
376-
self._current_size = 0
393+
self._state._cache_order.clear()
394+
self._state._key_sizes.clear()
395+
self._state._current_size = 0
377396
logger.debug("clear_cache: cleared all cache data")
378397

379398
def __repr__(self) -> str:
@@ -384,6 +403,6 @@ def __repr__(self) -> str:
384403
f"cache_store={self._cache!r}, "
385404
f"max_age_seconds={self.max_age_seconds}, "
386405
f"max_size={self.max_size}, "
387-
f"current_size={self._current_size}, "
388-
f"cached_keys={len(self._cache_order)})"
406+
f"current_size={self._state._current_size}, "
407+
f"cached_keys={len(self._state._cache_order)})"
389408
)

0 commit comments

Comments
 (0)