Skip to content

Commit dbe70e3

Browse files
ma2bdclaude
andcommitted
Fix cancellation safety of LruCachingStore::write_batch
The previous implementation wrote to the DB first, then updated the LRU cache. If the future was cancelled between the two (e.g. by the RollbackGuard introduced in linera-io#5790), the DB would have the new data but the cache would retain stale entries. Subsequent reads would hit the stale cache, and the next save would overwrite the DB with old data, causing silent data loss. Fix: invalidate cache entries BEFORE writing to the DB, then repopulate after success. If cancelled at any point after invalidation, subsequent reads go directly to the DB and see the correct state. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent bd52363 commit dbe70e3

2 files changed

Lines changed: 44 additions & 10 deletions

File tree

linera-views/src/backends/lru_caching.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,29 @@ where
353353
const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;
354354

355355
async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> {
356+
// Invalidate cache entries BEFORE writing to the store. This ensures
357+
// cancellation safety: if the write is cancelled after the store commits
358+
// but before this function returns, subsequent reads will go to the
359+
// store and see the correct (committed) data instead of stale cache.
360+
// We use `invalidate_key` (not `delete_key`) to avoid recording a
361+
// negative `DoesNotExist` entry — we don't know the outcome yet.
362+
if let Some(cache) = &self.cache {
363+
let mut cache = cache.lock().unwrap();
364+
for operation in &batch.operations {
365+
match operation {
366+
WriteOperation::Put { key, .. } | WriteOperation::Delete { key } => {
367+
cache.invalidate_key(key);
368+
}
369+
WriteOperation::DeletePrefix { key_prefix } => {
370+
cache.invalidate_prefix(key_prefix);
371+
}
372+
}
373+
}
374+
}
356375
self.store.write_batch(batch.clone()).await?;
376+
// Repopulate the cache with the written values. If cancelled here,
377+
// the cache is simply empty for these keys (invalidated above) and
378+
// subsequent reads will go to the store — which is correct.
357379
if let Some(cache) = &self.cache {
358380
let mut cache = cache.lock().unwrap();
359381
for operation in &batch.operations {

linera-views/src/lru_prefix_cache.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,13 @@ impl LruPrefixCache {
555555
}
556556
}
557557

558+
/// Removes a key's entry from the cache without recording a negative result.
559+
/// Used for invalidation before a write, where we don't yet know the outcome.
560+
pub(crate) fn invalidate_key(&mut self, key: &[u8]) {
561+
let cache_key = CacheKey::Value(key.to_vec());
562+
self.remove_cache_key_if_exists(&cache_key);
563+
}
564+
558565
/// Deletes a key from the cache.
559566
pub(crate) fn delete_key(&mut self, key: &[u8]) {
560567
if self.has_exclusive_access {
@@ -731,12 +738,9 @@ impl LruPrefixCache {
731738

732739
/// Marks cached keys that match the prefix as deleted. Importantly, this does not
733740
/// create new entries in the cache.
734-
pub(crate) fn delete_prefix(&mut self, key_prefix: &[u8]) {
735-
// When using delete_prefix, we do not insert `ValueEntry::DoesNotExist`
736-
// and instead drop the entries from the value-cache
737-
// This is because:
738-
// * In non-exclusive access, this could be added by another user.
739-
// * In exclusive access, we do this via the `FindKeyValues`.
741+
/// Removes all cache entries matching a prefix without recording negative results.
742+
/// Used for invalidation before a write, where we don't yet know the outcome.
743+
pub(crate) fn invalidate_prefix(&mut self, key_prefix: &[u8]) {
740744
let mut keys = Vec::new();
741745
for (key, _) in self
742746
.value_map
@@ -792,7 +796,7 @@ impl LruPrefixCache {
792796
let cache_key = CacheKey::FindKeys(lower_bound.clone());
793797
self.update_cache_key_sizes(&cache_key, new_cache_size);
794798
}
795-
// Finding a containing FindKeyValues. If existing update, if not insert.
799+
// Finding a containing FindKeyValues. If existing update.
796800
let lower_bound = self.get_existing_find_key_values_entry_mut(key_prefix);
797801
let result = if let Some((lower_bound, find_entry)) = lower_bound {
798802
// Delete the keys (or key/values) in the entry
@@ -807,9 +811,17 @@ impl LruPrefixCache {
807811
// Update the size without changing the position.
808812
let cache_key = CacheKey::FindKeyValues(lower_bound.clone());
809813
self.update_cache_key_sizes(&cache_key, new_cache_size);
810-
} else {
811-
// There is no lower bound. Therefore we can insert
812-
// the deleted prefix as a FindKeyValues.
814+
}
815+
}
816+
}
817+
818+
pub(crate) fn delete_prefix(&mut self, key_prefix: &[u8]) {
819+
self.invalidate_prefix(key_prefix);
820+
if self.has_exclusive_access {
821+
// Record the deletion as an empty FindKeyValues entry if there
822+
// is no containing entry already.
823+
let lower_bound = self.get_existing_find_key_values_entry_mut(key_prefix);
824+
if lower_bound.is_none() {
813825
let size = key_prefix.len();
814826
let cache_key = CacheKey::FindKeyValues(key_prefix.to_vec());
815827
let find_key_values_entry = FindKeyValuesEntry(BTreeMap::new());

0 commit comments

Comments
 (0)