From bedefae39b623213d69916db3cb236828f121dae Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 19:56:48 +0200 Subject: [PATCH 01/27] Implement the find_keys_by_prefix_iter (first step) --- Cargo.lock | 1 + Cargo.toml | 1 + linera-views/Cargo.toml | 1 + linera-views/src/backends/dual.rs | 49 ++++++- linera-views/src/backends/dynamo_db.rs | 62 ++++++++- linera-views/src/backends/journaling.rs | 29 +++- linera-views/src/backends/lru_caching.rs | 71 +++++++++- linera-views/src/backends/metering.rs | 27 +++- linera-views/src/backends/rocks_db.rs | 137 ++++++++++++++++++- linera-views/src/backends/scylla_db.rs | 64 ++++++++- linera-views/src/backends/value_splitting.rs | 56 +++++++- linera-views/src/store.rs | 56 +++++++- 12 files changed, 539 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 771b561710cb..e80280a2bcb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6357,6 +6357,7 @@ dependencies = [ "anyhow", "async-graphql", "async-lock", + "async-stream", "aws-config", "aws-sdk-dynamodb", "aws-smithy-types", diff --git a/Cargo.toml b/Cargo.toml index 069029d969bd..21dbd2f908c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ async-graphql-axum = "=7.0.17" async-graphql-derive = "=7.0.17" async-graphql-value = { version = "=7.0.17", features = ["raw_value"] } async-lock = "3.3.0" +async-stream = "0.3" async-trait = "0.1.77" async-tungstenite = { version = "0.22", features = ["tokio-runtime"] } aws-smithy-types = "1.3.1" diff --git a/linera-views/Cargo.toml b/linera-views/Cargo.toml index 2fc1852081de..8e75d1c999bc 100644 --- a/linera-views/Cargo.toml +++ b/linera-views/Cargo.toml @@ -36,6 +36,7 @@ allocative.workspace = true anyhow.workspace = true async-graphql.workspace = true async-lock.workspace = true +async-stream.workspace = true aws-config = { workspace = true, optional = true } aws-sdk-dynamodb = { workspace = true, optional = true } aws-smithy-types = { workspace = true, optional = true } diff --git a/linera-views/src/backends/dual.rs b/linera-views/src/backends/dual.rs index 5a1c2ac8d912..0ff2df7dd3bd 100644 --- a/linera-views/src/backends/dual.rs +++ b/linera-views/src/backends/dual.rs @@ -3,6 +3,7 @@ //! Implements [`crate::store::KeyValueStore`] by combining two existing stores. +use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -11,8 +12,8 @@ use crate::store::TestKeyValueDatabase; use crate::{ batch::Batch, store::{ - KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError, - WritableKeyValueStore, + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, KeyValueStoreError, + ReadableKeyValueStore, WithError, WritableKeyValueStore, }, }; @@ -175,6 +176,28 @@ where Ok(result) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + match self { + Self::First(store) => { + let mut stream = store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::First); + } + } + Self::Second(store) => { + let mut stream = store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::Second); + } + } + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -191,6 +214,28 @@ where }; Ok(result) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + match self { + Self::First(store) => { + let mut stream = store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::First); + } + } + Self::Second(store) => { + let mut stream = store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::Second); + } + } + } + }) + } } impl WritableKeyValueStore for DualStore diff --git a/linera-views/src/backends/dynamo_db.rs b/linera-views/src/backends/dynamo_db.rs index d62ed2318296..7dc9c9201587 100644 --- a/linera-views/src/backends/dynamo_db.rs +++ b/linera-views/src/backends/dynamo_db.rs @@ -47,8 +47,8 @@ use crate::{ journaling::JournalingKeyValueDatabase, lru_caching::{LruCachingConfig, LruCachingDatabase}, store::{ - DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, - WithError, + DirectWritableKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase, + KeyValueStoreError, ReadableKeyValueStore, WithError, }, value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; @@ -886,6 +886,32 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { .collect() } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + check_key_size(key_prefix)?; + let prefix_len = key_prefix.len(); + let mut start_key_map = None; + loop { + let response = self + .get_query_output(KEY_ATTRIBUTE, &self.start_key, key_prefix, start_key_map) + .await?; + let last_evaluated = response.last_evaluated_key.clone(); + for item in response.items.iter().flatten() { + yield extract_key(prefix_len, item).map(|k| k.to_vec()); + } + match last_evaluated { + None => break, + Some(value) => { + start_key_map = Some(value); + } + } + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -898,6 +924,38 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec()))) .collect() } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + check_key_size(key_prefix)?; + let prefix_len = key_prefix.len(); + let mut start_key_map = None; + loop { + let response = self + .get_query_output( + KEY_VALUE_ATTRIBUTE, + &self.start_key, + key_prefix, + start_key_map, + ) + .await?; + let last_evaluated = response.last_evaluated_key.clone(); + for item in response.items.iter().flatten() { + yield extract_key_value(prefix_len, item) + .map(|(k, v)| (k.to_vec(), v.to_vec())); + } + match last_evaluated { + None => break, + Some(value) => { + start_key_map = Some(value); + } + } + } + }) + } } impl DirectWritableKeyValueStore for DynamoDbStoreInternal { diff --git a/linera-views/src/backends/journaling.rs b/linera-views/src/backends/journaling.rs index 45cfe7d1ac4d..730c92cdc1d9 100644 --- a/linera-views/src/backends/journaling.rs +++ b/linera-views/src/backends/journaling.rs @@ -19,6 +19,7 @@ //! time the data in a block are written, the journal header is updated in the same //! transaction to mark the block as processed. +use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; use static_assertions as sa; use thiserror::Error; @@ -26,8 +27,8 @@ use thiserror::Error; use crate::{ batch::{Batch, BatchValueWriter, DeletePrefixExpander, SimplifiedBatch}, store::{ - DirectKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, - WithError, WritableKeyValueStore, + DirectKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase, + KeyValueStoreError, ReadableKeyValueStore, WithError, WritableKeyValueStore, }, views::MIN_VIEW_TAG, }; @@ -196,12 +197,36 @@ where Ok(self.store.find_keys_by_prefix(key_prefix).await?) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(JournalingError::Inner); + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], ) -> Result, Vec)>, Self::Error> { Ok(self.store.find_key_values_by_prefix(key_prefix).await?) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(JournalingError::Inner); + } + }) + } } impl KeyValueDatabase for JournalingKeyValueDatabase diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index 98e75b8b54f5..55b695f24f3a 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -11,10 +11,15 @@ use serde::{Deserialize, Serialize}; use crate::memory::MemoryDatabase; #[cfg(with_testing)] use crate::store::TestKeyValueDatabase; +use futures::stream::StreamExt; + use crate::{ batch::{Batch, WriteOperation}, lru_prefix_cache::{LruPrefixCache, StorageCacheConfig}, - store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore}, + store::{ + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, ReadableKeyValueStore, WithError, + WritableKeyValueStore, + }, }; #[cfg(with_metrics)] @@ -317,6 +322,38 @@ where Ok(keys) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + if let Some(cache) = self.get_exclusive_cache() { + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_keys(key_prefix) + }; + if let Some(keys) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + for key in keys { + yield Ok(key); + } + return; + } + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + } + let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item; + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -343,6 +380,38 @@ where cache.insert_find_key_values(key_prefix.to_vec(), &key_values); Ok(key_values) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + if let Some(cache) = self.get_exclusive_cache() { + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_key_values(key_prefix) + }; + if let Some(key_values) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + for key_value in key_values { + yield Ok(key_value); + } + return; + } + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + } + let mut stream = self.store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item; + } + }) + } } impl WritableKeyValueStore for LruCachingStore diff --git a/linera-views/src/backends/metering.rs b/linera-views/src/backends/metering.rs index f63c18cefe7e..d1388983a6a0 100644 --- a/linera-views/src/backends/metering.rs +++ b/linera-views/src/backends/metering.rs @@ -19,7 +19,10 @@ use prometheus::{exponential_buckets, HistogramVec, IntCounterVec}; use crate::store::TestKeyValueDatabase; use crate::{ batch::Batch, - store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore}, + store::{ + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, ReadableKeyValueStore, WithError, + WritableKeyValueStore, + }, }; #[derive(Clone)] @@ -438,6 +441,17 @@ where Ok(result) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.counter + .find_keys_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_keys_by_prefix_iter(key_prefix) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -465,6 +479,17 @@ where .observe(key_values_size as f64); Ok(result) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.counter + .find_key_values_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_key_values_by_prefix_iter(key_prefix) + } } impl WritableKeyValueStore for MeteredStore diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 720e892df87e..55a658cc52e2 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -29,12 +29,16 @@ use crate::{ common::get_upper_bound_option, lru_caching::{LruCachingConfig, LruCachingDatabase}, store::{ - KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError, - WritableKeyValueStore, + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, KeyValueStoreError, + ReadableKeyValueStore, WithError, WritableKeyValueStore, }, value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; +/// The size of batches in `find_keys_by_prefix_iter` and +/// `find_key_values_by_prefix_iter`. +const FIND_BATCH_SIZE: usize = 256; + /// The prefixes being used in the system static ROOT_KEY_DOMAIN: [u8; 1] = [0]; static STORED_ROOT_KEYS_PREFIX: u8 = 1; @@ -217,6 +221,40 @@ impl RocksDbStoreExecutor { Ok(keys) } + /// Reads up to `batch_size` keys for the given full prefix, starting at `seek` + /// (which must already include `start_key + key_prefix`). Returns the keys + /// (with the prefix already stripped) and the full key to seek next, or + /// `None` when the iterator is exhausted. + fn find_keys_chunk_internal( + &self, + full_prefix: Vec, + seek: Vec, + batch_size: usize, + ) -> (Vec>, Option>) { + let len = full_prefix.len(); + + let mut read_opts = rocksdb::ReadOptions::default(); + read_opts.set_async_io(true); + if let Some(upper_bound) = get_upper_bound_option(&full_prefix) { + read_opts.set_iterate_upper_bound(upper_bound); + } + let mut iter = self.db.raw_iterator_opt(read_opts); + iter.seek(&seek); + + let mut keys = Vec::with_capacity(batch_size); + while keys.len() < batch_size { + match iter.key() { + Some(key) => { + keys.push(key[len..].to_vec()); + iter.next(); + } + None => return (keys, None), + } + } + let next = iter.key().map(|k| k.to_vec()); + (keys, next) + } + #[expect(clippy::type_complexity)] fn find_key_values_by_prefix_internal( &self, @@ -237,6 +275,37 @@ impl RocksDbStoreExecutor { Ok(key_values) } + #[expect(clippy::type_complexity)] + fn find_key_values_chunk_internal( + &self, + full_prefix: Vec, + seek: Vec, + batch_size: usize, + ) -> (Vec<(Vec, Vec)>, Option>) { + let len = full_prefix.len(); + + let mut read_opts = rocksdb::ReadOptions::default(); + read_opts.set_async_io(true); + if let Some(upper_bound) = get_upper_bound_option(&full_prefix) { + read_opts.set_iterate_upper_bound(upper_bound); + } + let mut iter = self.db.raw_iterator_opt(read_opts); + iter.seek(&seek); + + let mut key_values = Vec::with_capacity(batch_size); + while key_values.len() < batch_size { + match iter.item() { + Some((key, value)) => { + key_values.push((key[len..].to_vec(), value.to_vec())); + iter.next(); + } + None => return (key_values, None), + } + } + let next = iter.key().map(|k| k.to_vec()); + (key_values, next) + } + fn write_batch_internal( &self, batch: Batch, @@ -526,6 +595,36 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { .await } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + let key_prefix = key_prefix.to_vec(); + Box::pin(async_stream::stream! { + check_key_size(&key_prefix)?; + let mut full_prefix = self.executor.start_key.clone(); + full_prefix.extend(&key_prefix); + let mut next_seek = Some(full_prefix.clone()); + while let Some(seek) = next_seek.take() { + let executor = self.executor.clone(); + let prefix = full_prefix.clone(); + let (keys, resume) = self + .spawn_mode + .spawn( + move |(prefix, seek)| { + Ok(executor.find_keys_chunk_internal(prefix, seek, FIND_BATCH_SIZE)) + }, + (prefix, seek), + ) + .await?; + for key in keys { + yield Ok(key); + } + next_seek = resume; + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -539,6 +638,40 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { ) .await } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + let key_prefix = key_prefix.to_vec(); + Box::pin(async_stream::stream! { + check_key_size(&key_prefix)?; + let mut full_prefix = self.executor.start_key.clone(); + full_prefix.extend(&key_prefix); + let mut next_seek = Some(full_prefix.clone()); + while let Some(seek) = next_seek.take() { + let executor = self.executor.clone(); + let prefix = full_prefix.clone(); + let (key_values, resume) = self + .spawn_mode + .spawn( + move |(prefix, seek)| { + Ok(executor.find_key_values_chunk_internal( + prefix, + seek, + FIND_BATCH_SIZE, + )) + }, + (prefix, seek), + ) + .await?; + for kv in key_values { + yield Ok(kv); + } + next_seek = resume; + } + }) + } } impl WritableKeyValueStore for RocksDbStoreInternal { diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 28ee7eca7a98..5ee005255fce 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -47,8 +47,8 @@ use crate::{ journaling::{JournalingError, JournalingKeyValueDatabase}, lru_caching::{LruCachingConfig, LruCachingDatabase}, store::{ - DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, - WithError, + DirectWritableKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase, + KeyValueStoreError, ReadableKeyValueStore, WithError, }, value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; @@ -667,6 +667,36 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let key_prefix = key_prefix.to_vec(); + ScyllaDbClient::check_key_size(&key_prefix)?; + let _guard = self.acquire().await; + let session = &self.store.session; + let len = key_prefix.len(); + let query_unbounded = &self.store.find_keys_by_prefix_unbounded; + let query_bounded = &self.store.find_keys_by_prefix_bounded; + let rows = match get_upper_bound_option(&key_prefix) { + None => { + let values = (self.root_key.clone(), key_prefix.clone()); + Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? + } + Some(upper_bound) => { + let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); + Box::pin(session.execute_iter(query_bounded.clone(), values)).await? + } + }; + let mut rows = rows.rows_stream::<(Vec,)>()?; + while let Some(row) = rows.next().await { + let (key,) = row?; + yield Ok(key[len..].to_vec()); + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -676,6 +706,36 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())) .await } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let key_prefix = key_prefix.to_vec(); + ScyllaDbClient::check_key_size(&key_prefix)?; + let _guard = self.acquire().await; + let session = &self.store.session; + let len = key_prefix.len(); + let query_unbounded = &self.store.find_key_values_by_prefix_unbounded; + let query_bounded = &self.store.find_key_values_by_prefix_bounded; + let rows = match get_upper_bound_option(&key_prefix) { + None => { + let values = (self.root_key.clone(), key_prefix.clone()); + Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? + } + Some(upper_bound) => { + let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); + Box::pin(session.execute_iter(query_bounded.clone(), values)).await? + } + }; + let mut rows = rows.rows_stream::<(Vec, Vec)>()?; + while let Some(row) = rows.next().await { + let (key, value) = row?; + yield Ok((key[len..].to_vec(), value)); + } + }) + } } impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 841c8a6b3cc9..401e30e87a09 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -3,14 +3,15 @@ //! Adds support for large values to a given store by splitting them between several keys. +use futures::stream::StreamExt; use linera_base::ensure; use thiserror::Error; use crate::{ batch::{Batch, WriteOperation}, store::{ - KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError, - WritableKeyValueStore, + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, KeyValueStoreError, + ReadableKeyValueStore, WithError, WritableKeyValueStore, }, }; #[cfg(with_testing)] @@ -221,6 +222,22 @@ where Ok(keys) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + let big_key = item.map_err(ValueSplittingError::InnerStoreError)?; + let len = big_key.len(); + if Self::read_index_from_key(&big_key)? == 0 { + yield Ok(big_key[0..len - 4].to_vec()); + } + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -252,6 +269,41 @@ where } Ok(key_values) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + let (mut big_key, value) = + item.map_err(ValueSplittingError::InnerStoreError)?; + if Self::read_index_from_key(&big_key)? != 0 { + continue; // Leftover segment from an earlier value. + } + big_key.truncate(big_key.len() - 4); + let key = big_key; + let count = Self::read_count_from_value(&value)?; + let mut big_value = value[4..].to_vec(); + for idx in 1..count { + let next = stream.next().await + .ok_or(ValueSplittingError::MissingSegment)?; + let (big_key, value) = + next.map_err(ValueSplittingError::InnerStoreError)?; + if !(Self::read_index_from_key(&big_key)? == idx + && big_key.starts_with(&key) + && big_key.len() == key.len() + 4) + { + yield Err(ValueSplittingError::MissingSegment); + return; + } + big_value.extend(value); + } + yield Ok((key, big_value)); + } + }) + } } impl WritableKeyValueStore for ValueSplittingStore diff --git a/linera-views/src/store.rs b/linera-views/src/store.rs index f449749f6d24..457c9f6331cd 100644 --- a/linera-views/src/store.rs +++ b/linera-views/src/store.rs @@ -3,8 +3,9 @@ //! This provides the trait definitions for the stores. -use std::{fmt::Debug, future::Future}; +use std::{fmt::Debug, future::Future, pin::Pin}; +use futures::stream::Stream; use serde::{de::DeserializeOwned, Serialize}; #[cfg(with_testing)] @@ -15,6 +16,24 @@ use crate::{ ViewError, }; +/// A boxed stream for iterating over keys returned by `find_keys_by_prefix_iter`. +#[cfg(not(web))] +pub type FindKeysStream<'a, E> = Pin, E>> + Send + 'a>>; + +/// A boxed stream for iterating over keys returned by `find_keys_by_prefix_iter`. +#[cfg(web)] +pub type FindKeysStream<'a, E> = Pin, E>> + 'a>>; + +/// A boxed stream for iterating over key/value pairs returned by `find_key_values_by_prefix_iter`. +#[cfg(not(web))] +pub type FindKeyValuesStream<'a, E> = + Pin, Vec), E>> + Send + 'a>>; + +/// A boxed stream for iterating over key/value pairs returned by `find_key_values_by_prefix_iter`. +#[cfg(web)] +pub type FindKeyValuesStream<'a, E> = + Pin, Vec), E>> + 'a>>; + /// The error type for the key-value stores. pub trait KeyValueStoreError: std::error::Error + From + Debug + Send + Sync + 'static @@ -76,12 +95,47 @@ pub trait ReadableKeyValueStore: WithError { /// Finds the `key` matching the prefix. The prefix is not included in the returned keys. async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result>, Self::Error>; + /// Returns a boxed stream that yields the keys matching the prefix one by one. + /// The prefix is not included in the returned keys. The stream may be dropped + /// early to stop the iteration. The default implementation downloads the full + /// list eagerly via `find_keys_by_prefix` and yields the entries afterwards; + /// backends should override this to stream incrementally whenever possible. + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let keys = self.find_keys_by_prefix(key_prefix).await?; + for key in keys { + yield Ok(key); + } + }) + } + /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys. async fn find_key_values_by_prefix( &self, key_prefix: &[u8], ) -> Result, Vec)>, Self::Error>; + /// Returns a boxed stream that yields the `(key, value)` pairs matching the + /// prefix one by one. The prefix is not included in the returned keys. The + /// stream may be dropped early to stop the iteration. The default + /// implementation downloads the full list eagerly via `find_key_values_by_prefix` + /// and yields the entries afterwards; backends should override this to stream + /// incrementally whenever possible. + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let key_values = self.find_key_values_by_prefix(key_prefix).await?; + for key_value in key_values { + yield Ok(key_value); + } + }) + } + // We can't use `async fn` here in the below implementations due to // https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed // we can revert them to `async fn` syntax, which is neater. From dbc767ed31ba780397d859c879bc57c7c4562a2b Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 20:28:13 +0200 Subject: [PATCH 02/27] Correct the rocks_db implementation. --- linera-views/src/backends/rocks_db.rs | 198 ++++++++++---------------- 1 file changed, 73 insertions(+), 125 deletions(-) diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 55a658cc52e2..65921fb7278d 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -35,9 +35,9 @@ use crate::{ value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; -/// The size of batches in `find_keys_by_prefix_iter` and -/// `find_key_values_by_prefix_iter`. -const FIND_BATCH_SIZE: usize = 256; +/// Channel buffer for streaming results from the blocking RocksDB iterator +/// to the async stream consumer. +const FIND_STREAM_BUFFER: usize = 16; /// The prefixes being used in the system static ROOT_KEY_DOMAIN: [u8; 1] = [0]; @@ -221,40 +221,6 @@ impl RocksDbStoreExecutor { Ok(keys) } - /// Reads up to `batch_size` keys for the given full prefix, starting at `seek` - /// (which must already include `start_key + key_prefix`). Returns the keys - /// (with the prefix already stripped) and the full key to seek next, or - /// `None` when the iterator is exhausted. - fn find_keys_chunk_internal( - &self, - full_prefix: Vec, - seek: Vec, - batch_size: usize, - ) -> (Vec>, Option>) { - let len = full_prefix.len(); - - let mut read_opts = rocksdb::ReadOptions::default(); - read_opts.set_async_io(true); - if let Some(upper_bound) = get_upper_bound_option(&full_prefix) { - read_opts.set_iterate_upper_bound(upper_bound); - } - let mut iter = self.db.raw_iterator_opt(read_opts); - iter.seek(&seek); - - let mut keys = Vec::with_capacity(batch_size); - while keys.len() < batch_size { - match iter.key() { - Some(key) => { - keys.push(key[len..].to_vec()); - iter.next(); - } - None => return (keys, None), - } - } - let next = iter.key().map(|k| k.to_vec()); - (keys, next) - } - #[expect(clippy::type_complexity)] fn find_key_values_by_prefix_internal( &self, @@ -275,37 +241,6 @@ impl RocksDbStoreExecutor { Ok(key_values) } - #[expect(clippy::type_complexity)] - fn find_key_values_chunk_internal( - &self, - full_prefix: Vec, - seek: Vec, - batch_size: usize, - ) -> (Vec<(Vec, Vec)>, Option>) { - let len = full_prefix.len(); - - let mut read_opts = rocksdb::ReadOptions::default(); - read_opts.set_async_io(true); - if let Some(upper_bound) = get_upper_bound_option(&full_prefix) { - read_opts.set_iterate_upper_bound(upper_bound); - } - let mut iter = self.db.raw_iterator_opt(read_opts); - iter.seek(&seek); - - let mut key_values = Vec::with_capacity(batch_size); - while key_values.len() < batch_size { - match iter.item() { - Some((key, value)) => { - key_values.push((key[len..].to_vec(), value.to_vec())); - iter.next(); - } - None => return (key_values, None), - } - } - let next = iter.key().map(|k| k.to_vec()); - (key_values, next) - } - fn write_batch_internal( &self, batch: Batch, @@ -595,36 +530,6 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { .await } - fn find_keys_by_prefix_iter<'a>( - &'a self, - key_prefix: &'a [u8], - ) -> FindKeysStream<'a, Self::Error> { - let key_prefix = key_prefix.to_vec(); - Box::pin(async_stream::stream! { - check_key_size(&key_prefix)?; - let mut full_prefix = self.executor.start_key.clone(); - full_prefix.extend(&key_prefix); - let mut next_seek = Some(full_prefix.clone()); - while let Some(seek) = next_seek.take() { - let executor = self.executor.clone(); - let prefix = full_prefix.clone(); - let (keys, resume) = self - .spawn_mode - .spawn( - move |(prefix, seek)| { - Ok(executor.find_keys_chunk_internal(prefix, seek, FIND_BATCH_SIZE)) - }, - (prefix, seek), - ) - .await?; - for key in keys { - yield Ok(key); - } - next_seek = resume; - } - }) - } - async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -639,36 +544,79 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { .await } - fn find_key_values_by_prefix_iter<'a>( - &'a self, - key_prefix: &'a [u8], - ) -> FindKeyValuesStream<'a, Self::Error> { + fn find_keys_by_prefix_iter( + &self, + key_prefix: &[u8], + ) -> FindKeysStream<'_, Self::Error> { + let executor = self.executor.clone(); let key_prefix = key_prefix.to_vec(); Box::pin(async_stream::stream! { - check_key_size(&key_prefix)?; - let mut full_prefix = self.executor.start_key.clone(); - full_prefix.extend(&key_prefix); - let mut next_seek = Some(full_prefix.clone()); - while let Some(seek) = next_seek.take() { - let executor = self.executor.clone(); - let prefix = full_prefix.clone(); - let (key_values, resume) = self - .spawn_mode - .spawn( - move |(prefix, seek)| { - Ok(executor.find_key_values_chunk_internal( - prefix, - seek, - FIND_BATCH_SIZE, - )) - }, - (prefix, seek), - ) - .await?; - for kv in key_values { - yield Ok(kv); + if let Err(error) = check_key_size(&key_prefix) { + yield Err(error); + return; + } + let (tx, mut rx) = tokio::sync::mpsc::channel::< + Result, RocksDbStoreInternalError>, + >(FIND_STREAM_BUFFER); + let handle = tokio::task::spawn_blocking(move || { + let mut prefix = executor.start_key.clone(); + prefix.extend(&key_prefix); + let len = prefix.len(); + let mut iter = executor.get_find_prefix_iterator(&prefix); + while let Some(key) = iter.key() { + if tx.blocking_send(Ok(key[len..].to_vec())).is_err() { + return; + } + iter.next(); } - next_seek = resume; + if let Err(error) = iter.status() { + let _ = tx.blocking_send(Err(error.into())); + } + }); + while let Some(item) = rx.recv().await { + yield item; + } + if let Err(error) = handle.await { + yield Err(RocksDbStoreInternalError::TokioJoinError(error)); + } + }) + } + + fn find_key_values_by_prefix_iter( + &self, + key_prefix: &[u8], + ) -> FindKeyValuesStream<'_, Self::Error> { + let executor = self.executor.clone(); + let key_prefix = key_prefix.to_vec(); + Box::pin(async_stream::stream! { + if let Err(error) = check_key_size(&key_prefix) { + yield Err(error); + return; + } + let (tx, mut rx) = tokio::sync::mpsc::channel::< + Result<(Vec, Vec), RocksDbStoreInternalError>, + >(FIND_STREAM_BUFFER); + let handle = tokio::task::spawn_blocking(move || { + let mut prefix = executor.start_key.clone(); + prefix.extend(&key_prefix); + let len = prefix.len(); + let mut iter = executor.get_find_prefix_iterator(&prefix); + while let Some((key, value)) = iter.item() { + let pair = (key[len..].to_vec(), value.to_vec()); + if tx.blocking_send(Ok(pair)).is_err() { + return; + } + iter.next(); + } + if let Err(error) = iter.status() { + let _ = tx.blocking_send(Err(error.into())); + } + }); + while let Some(item) = rx.recv().await { + yield item; + } + if let Err(error) = handle.await { + yield Err(RocksDbStoreInternalError::TokioJoinError(error)); } }) } From 56d7ec3b1c4f3888aade64228f398becd6f20f54 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 20:30:38 +0200 Subject: [PATCH 03/27] Some simplification. --- linera-views/src/backends/rocks_db.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 65921fb7278d..4499544455a0 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -35,10 +35,6 @@ use crate::{ value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; -/// Channel buffer for streaming results from the blocking RocksDB iterator -/// to the async stream consumer. -const FIND_STREAM_BUFFER: usize = 16; - /// The prefixes being used in the system static ROOT_KEY_DOMAIN: [u8; 1] = [0]; static STORED_ROOT_KEYS_PREFIX: u8 = 1; @@ -555,9 +551,8 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { yield Err(error); return; } - let (tx, mut rx) = tokio::sync::mpsc::channel::< - Result, RocksDbStoreInternalError>, - >(FIND_STREAM_BUFFER); + let (tx, mut rx) = + tokio::sync::mpsc::channel::, RocksDbStoreInternalError>>(1); let handle = tokio::task::spawn_blocking(move || { let mut prefix = executor.start_key.clone(); prefix.extend(&key_prefix); @@ -595,7 +590,7 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { } let (tx, mut rx) = tokio::sync::mpsc::channel::< Result<(Vec, Vec), RocksDbStoreInternalError>, - >(FIND_STREAM_BUFFER); + >(1); let handle = tokio::task::spawn_blocking(move || { let mut prefix = executor.start_key.clone(); prefix.extend(&key_prefix); From 5a073b94e9d4b2ce0ffc6a29ae027abc9e2d7347 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 20:48:30 +0200 Subject: [PATCH 04/27] Demonstrate the use of the find_keys_by_prefix to the functionality. --- linera-views/src/test_utils/mod.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 5a36fadc7f42..7393de463373 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -9,6 +9,7 @@ pub mod performance; use std::collections::{BTreeMap, BTreeSet, HashSet}; +use futures::TryStreamExt; use rand::{seq::SliceRandom, Rng}; use crate::{ @@ -162,6 +163,8 @@ pub fn span_random_reordering_put_delete( /// * `read_value_bytes` /// * `read_multi_values_bytes` /// * `find_keys_by_prefix` / `find_key_values_by_prefix` +/// * `find_keys_by_prefix_iter` / `find_key_values_by_prefix_iter` (streaming +/// variants, including early termination) /// * The ordering of keys returned by `find_keys_by_prefix` and `find_key_values_by_prefix` pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec)>) { // We need a nontrivial key_prefix because dynamo requires a non-trivial prefix @@ -184,7 +187,7 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec let mut set_key_value1 = HashSet::new(); let mut keys_request_deriv = Vec::new(); let key_values_by_prefix = store.find_key_values_by_prefix(key_prefix).await.unwrap(); - for (key, value) in key_values_by_prefix { + for (key, value) in key_values_by_prefix.clone() { keys_request_deriv.push(key.clone()); set_key_value1.insert((key, value)); } @@ -202,6 +205,19 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec } } assert_eq!(set_key_value1, set_key_value2); + // Streaming variants must agree with the eager methods. + let keys_iter: Vec> = store + .find_keys_by_prefix_iter(key_prefix) + .try_collect() + .await + .unwrap(); + assert_eq!(keys_iter, keys_request); + let key_values_iter: Vec<(Vec, Vec)> = store + .find_key_values_by_prefix_iter(key_prefix) + .try_collect() + .await + .unwrap(); + assert_eq!(key_values_iter, key_values_by_prefix); } // Now checking the read_multi_values_bytes let mut rng = make_deterministic_rng(); From 0b3fab658ec8f50149f830570f4716349ff3e08f Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 21:24:32 +0200 Subject: [PATCH 05/27] Implement the reverse iterators. --- linera-views/src/backends/dual.rs | 44 ++++++++ linera-views/src/backends/dynamo_db.rs | 84 +++++++++++++++ linera-views/src/backends/journaling.rs | 24 +++++ linera-views/src/backends/lru_caching.rs | 64 +++++++++++ linera-views/src/backends/metering.rs | 22 ++++ linera-views/src/backends/rocks_db.rs | 108 +++++++++++++++++++ linera-views/src/backends/scylla_db.rs | 96 +++++++++++++++++ linera-views/src/backends/value_splitting.rs | 65 +++++++++++ linera-views/src/store.rs | 36 +++++++ linera-views/src/test_utils/mod.rs | 17 +++ 10 files changed, 560 insertions(+) diff --git a/linera-views/src/backends/dual.rs b/linera-views/src/backends/dual.rs index 0ff2df7dd3bd..2856fd0fff8e 100644 --- a/linera-views/src/backends/dual.rs +++ b/linera-views/src/backends/dual.rs @@ -236,6 +236,50 @@ where } }) } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + match self { + Self::First(store) => { + let mut stream = store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::First); + } + } + Self::Second(store) => { + let mut stream = store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::Second); + } + } + } + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + match self { + Self::First(store) => { + let mut stream = store.find_key_values_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::First); + } + } + Self::Second(store) => { + let mut stream = store.find_key_values_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(DualStoreError::Second); + } + } + } + }) + } } impl WritableKeyValueStore for DualStore diff --git a/linera-views/src/backends/dynamo_db.rs b/linera-views/src/backends/dynamo_db.rs index 7dc9c9201587..2312e4702394 100644 --- a/linera-views/src/backends/dynamo_db.rs +++ b/linera-views/src/backends/dynamo_db.rs @@ -608,6 +608,24 @@ impl DynamoDbStoreInternal { start_key: &[u8], key_prefix: &[u8], start_key_map: Option>, + ) -> Result { + self.get_query_output_with_direction( + attribute_str, + start_key, + key_prefix, + start_key_map, + true, + ) + .await + } + + async fn get_query_output_with_direction( + &self, + attribute_str: &str, + start_key: &[u8], + key_prefix: &[u8], + start_key_map: Option>, + forward: bool, ) -> Result { let _guard = self.acquire().await; let start_key = start_key.to_vec(); @@ -627,6 +645,7 @@ impl DynamoDbStoreInternal { AttributeValue::B(Blob::new(prefixed_key_prefix)), ) .set_exclusive_start_key(start_key_map) + .scan_index_forward(forward) .send() .boxed_sync() .await?; @@ -956,6 +975,71 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { } }) } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + check_key_size(key_prefix)?; + let prefix_len = key_prefix.len(); + let mut start_key_map = None; + loop { + let response = self + .get_query_output_with_direction( + KEY_ATTRIBUTE, + &self.start_key, + key_prefix, + start_key_map, + false, + ) + .await?; + let last_evaluated = response.last_evaluated_key.clone(); + for item in response.items.iter().flatten() { + yield extract_key(prefix_len, item).map(|k| k.to_vec()); + } + match last_evaluated { + None => break, + Some(value) => { + start_key_map = Some(value); + } + } + } + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + check_key_size(key_prefix)?; + let prefix_len = key_prefix.len(); + let mut start_key_map = None; + loop { + let response = self + .get_query_output_with_direction( + KEY_VALUE_ATTRIBUTE, + &self.start_key, + key_prefix, + start_key_map, + false, + ) + .await?; + let last_evaluated = response.last_evaluated_key.clone(); + for item in response.items.iter().flatten() { + yield extract_key_value(prefix_len, item) + .map(|(k, v)| (k.to_vec(), v.to_vec())); + } + match last_evaluated { + None => break, + Some(value) => { + start_key_map = Some(value); + } + } + } + }) + } } impl DirectWritableKeyValueStore for DynamoDbStoreInternal { diff --git a/linera-views/src/backends/journaling.rs b/linera-views/src/backends/journaling.rs index 730c92cdc1d9..e8a4ca87a3ba 100644 --- a/linera-views/src/backends/journaling.rs +++ b/linera-views/src/backends/journaling.rs @@ -227,6 +227,30 @@ where } }) } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(JournalingError::Inner); + } + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_key_values_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item.map_err(JournalingError::Inner); + } + }) + } } impl KeyValueDatabase for JournalingKeyValueDatabase diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index 55b695f24f3a..96ba2a5b1e5e 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -412,6 +412,70 @@ where } }) } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + if let Some(cache) = self.get_exclusive_cache() { + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_keys(key_prefix) + }; + if let Some(keys) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + for key in keys.into_iter().rev() { + yield Ok(key); + } + return; + } + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + } + let mut stream = self.store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item; + } + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + if let Some(cache) = self.get_exclusive_cache() { + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_key_values(key_prefix) + }; + if let Some(key_values) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + for key_value in key_values.into_iter().rev() { + yield Ok(key_value); + } + return; + } + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + } + let mut stream = self.store.find_key_values_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + yield item; + } + }) + } } impl WritableKeyValueStore for LruCachingStore diff --git a/linera-views/src/backends/metering.rs b/linera-views/src/backends/metering.rs index d1388983a6a0..3751614f1f91 100644 --- a/linera-views/src/backends/metering.rs +++ b/linera-views/src/backends/metering.rs @@ -490,6 +490,28 @@ where .observe(key_prefix.len() as f64); self.store.find_key_values_by_prefix_iter(key_prefix) } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.counter + .find_keys_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_keys_by_prefix_rev_iter(key_prefix) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.counter + .find_key_values_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_key_values_by_prefix_rev_iter(key_prefix) + } } impl WritableKeyValueStore for MeteredStore diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 4499544455a0..376a077545c4 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -198,6 +198,32 @@ impl RocksDbStoreExecutor { iter } + /// Returns an iterator positioned at the largest key starting with `prefix`, + /// iterating in reverse. `total_order_seek` is enabled because the database + /// uses a fixed-length prefix extractor; without it, `seek_for_prev` would + /// only search within the bloom-prefix scope and could miss keys whose + /// extractor-prefix differs from the seek target. + fn get_find_prefix_reverse_iterator( + &self, + prefix: &[u8], + ) -> rocksdb::DBRawIteratorWithThreadMode<'_, DB> { + let mut read_opts = rocksdb::ReadOptions::default(); + read_opts.set_async_io(true); + read_opts.set_total_order_seek(true); + let upper_bound = get_upper_bound_option(prefix); + let mut iter = self.db.raw_iterator_opt(read_opts); + match upper_bound.as_deref() { + Some(ub) => { + iter.seek_for_prev(ub); + if iter.key().is_some_and(|k| k == ub) { + iter.prev(); + } + } + None => iter.seek_to_last(), + } + iter + } + fn find_keys_by_prefix_internal( &self, key_prefix: Vec, @@ -615,6 +641,88 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { } }) } + + fn find_keys_by_prefix_rev_iter( + &self, + key_prefix: &[u8], + ) -> FindKeysStream<'_, Self::Error> { + let executor = self.executor.clone(); + let key_prefix = key_prefix.to_vec(); + Box::pin(async_stream::stream! { + if let Err(error) = check_key_size(&key_prefix) { + yield Err(error); + return; + } + let (tx, mut rx) = + tokio::sync::mpsc::channel::, RocksDbStoreInternalError>>(1); + let handle = tokio::task::spawn_blocking(move || { + let mut prefix = executor.start_key.clone(); + prefix.extend(&key_prefix); + let len = prefix.len(); + let mut iter = executor.get_find_prefix_reverse_iterator(&prefix); + while let Some(key) = iter.key() { + if !key.starts_with(&prefix) { + break; + } + if tx.blocking_send(Ok(key[len..].to_vec())).is_err() { + return; + } + iter.prev(); + } + if let Err(error) = iter.status() { + let _ = tx.blocking_send(Err(error.into())); + } + }); + while let Some(item) = rx.recv().await { + yield item; + } + if let Err(error) = handle.await { + yield Err(RocksDbStoreInternalError::TokioJoinError(error)); + } + }) + } + + fn find_key_values_by_prefix_rev_iter( + &self, + key_prefix: &[u8], + ) -> FindKeyValuesStream<'_, Self::Error> { + let executor = self.executor.clone(); + let key_prefix = key_prefix.to_vec(); + Box::pin(async_stream::stream! { + if let Err(error) = check_key_size(&key_prefix) { + yield Err(error); + return; + } + let (tx, mut rx) = tokio::sync::mpsc::channel::< + Result<(Vec, Vec), RocksDbStoreInternalError>, + >(1); + let handle = tokio::task::spawn_blocking(move || { + let mut prefix = executor.start_key.clone(); + prefix.extend(&key_prefix); + let len = prefix.len(); + let mut iter = executor.get_find_prefix_reverse_iterator(&prefix); + while let Some((key, value)) = iter.item() { + if !key.starts_with(&prefix) { + break; + } + let pair = (key[len..].to_vec(), value.to_vec()); + if tx.blocking_send(Ok(pair)).is_err() { + return; + } + iter.prev(); + } + if let Err(error) = iter.status() { + let _ = tx.blocking_send(Err(error.into())); + } + }); + while let Some(item) = rx.recv().await { + yield item; + } + if let Err(error) = handle.await { + yield Err(RocksDbStoreInternalError::TokioJoinError(error)); + } + }) + } } impl WritableKeyValueStore for RocksDbStoreInternal { diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 5ee005255fce..8f61a4b50ebd 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -113,6 +113,10 @@ struct ScyllaDbClient { find_keys_by_prefix_bounded: PreparedStatement, find_key_values_by_prefix_unbounded: PreparedStatement, find_key_values_by_prefix_bounded: PreparedStatement, + find_keys_by_prefix_rev_unbounded: PreparedStatement, + find_keys_by_prefix_rev_bounded: PreparedStatement, + find_key_values_by_prefix_rev_unbounded: PreparedStatement, + find_key_values_by_prefix_rev_bounded: PreparedStatement, multi_key_values: papaya::HashMap, multi_keys: papaya::HashMap, } @@ -180,6 +184,34 @@ impl ScyllaDbClient { )) .await?; + let find_keys_by_prefix_rev_unbounded = session + .prepare(format!( + "SELECT k FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? ORDER BY k DESC" + )) + .await?; + + let find_keys_by_prefix_rev_bounded = session + .prepare(format!( + "SELECT k FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? AND k < ? ORDER BY k DESC" + )) + .await?; + + let find_key_values_by_prefix_rev_unbounded = session + .prepare(format!( + "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? ORDER BY k DESC" + )) + .await?; + + let find_key_values_by_prefix_rev_bounded = session + .prepare(format!( + "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? AND k < ? ORDER BY k DESC" + )) + .await?; + Ok(Self { session, namespace, @@ -193,6 +225,10 @@ impl ScyllaDbClient { find_keys_by_prefix_bounded, find_key_values_by_prefix_unbounded, find_key_values_by_prefix_bounded, + find_keys_by_prefix_rev_unbounded, + find_keys_by_prefix_rev_bounded, + find_key_values_by_prefix_rev_unbounded, + find_key_values_by_prefix_rev_bounded, multi_key_values: papaya::HashMap::new(), multi_keys: papaya::HashMap::new(), }) @@ -736,6 +772,66 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { } }) } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let key_prefix = key_prefix.to_vec(); + ScyllaDbClient::check_key_size(&key_prefix)?; + let _guard = self.acquire().await; + let session = &self.store.session; + let len = key_prefix.len(); + let query_unbounded = &self.store.find_keys_by_prefix_rev_unbounded; + let query_bounded = &self.store.find_keys_by_prefix_rev_bounded; + let rows = match get_upper_bound_option(&key_prefix) { + None => { + let values = (self.root_key.clone(), key_prefix.clone()); + Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? + } + Some(upper_bound) => { + let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); + Box::pin(session.execute_iter(query_bounded.clone(), values)).await? + } + }; + let mut rows = rows.rows_stream::<(Vec,)>()?; + while let Some(row) = rows.next().await { + let (key,) = row?; + yield Ok(key[len..].to_vec()); + } + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let key_prefix = key_prefix.to_vec(); + ScyllaDbClient::check_key_size(&key_prefix)?; + let _guard = self.acquire().await; + let session = &self.store.session; + let len = key_prefix.len(); + let query_unbounded = &self.store.find_key_values_by_prefix_rev_unbounded; + let query_bounded = &self.store.find_key_values_by_prefix_rev_bounded; + let rows = match get_upper_bound_option(&key_prefix) { + None => { + let values = (self.root_key.clone(), key_prefix.clone()); + Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? + } + Some(upper_bound) => { + let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); + Box::pin(session.execute_iter(query_bounded.clone(), values)).await? + } + }; + let mut rows = rows.rows_stream::<(Vec, Vec)>()?; + while let Some(row) = rows.next().await { + let (key, value) = row?; + yield Ok((key[len..].to_vec(), value)); + } + }) + } } impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 401e30e87a09..2142fd351517 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -304,6 +304,71 @@ where } }) } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + let big_key = item.map_err(ValueSplittingError::InnerStoreError)?; + let len = big_key.len(); + if Self::read_index_from_key(&big_key)? == 0 { + yield Ok(big_key[0..len - 4].to_vec()); + } + } + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_key_values_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + let (mut big_key, value) = + item.map_err(ValueSplittingError::InnerStoreError)?; + let last_index = Self::read_index_from_key(&big_key)?; + big_key.truncate(big_key.len() - 4); + let key = big_key; + // Collect segments in reverse order: from last_index down to 0. + let mut segments: Vec<(u32, Vec)> = vec![(last_index, value)]; + while segments.last().unwrap().0 > 0 { + let expected = segments.last().unwrap().0 - 1; + let next = stream.next().await + .ok_or(ValueSplittingError::MissingSegment)?; + let (big_key2, value2) = + next.map_err(ValueSplittingError::InnerStoreError)?; + if !(Self::read_index_from_key(&big_key2)? == expected + && big_key2.starts_with(&key) + && big_key2.len() == key.len() + 4) + { + yield Err(ValueSplittingError::MissingSegment); + return; + } + segments.push((expected, value2)); + } + // Segment 0 carries the count; indices >= count are stranded leftovers. + let (_, segment_zero_value) = segments.last().unwrap(); + let count = Self::read_count_from_value(segment_zero_value)?; + if count == 0 || count > last_index + 1 { + yield Err(ValueSplittingError::MissingSegment); + return; + } + let mut big_value = Vec::new(); + for (idx, val) in segments.iter().rev() { + if *idx == 0 { + big_value.extend_from_slice(&val[4..]); + } else if *idx < count { + big_value.extend_from_slice(val); + } + } + yield Ok((key, big_value)); + } + }) + } } impl WritableKeyValueStore for ValueSplittingStore diff --git a/linera-views/src/store.rs b/linera-views/src/store.rs index 457c9f6331cd..1b4d96bbf2a7 100644 --- a/linera-views/src/store.rs +++ b/linera-views/src/store.rs @@ -136,6 +136,42 @@ pub trait ReadableKeyValueStore: WithError { }) } + /// Returns a boxed stream that yields the keys matching the prefix in + /// reverse (descending) order. The prefix is not included in the returned + /// keys. The stream may be dropped early to stop the iteration. The default + /// implementation downloads the full list eagerly via `find_keys_by_prefix` + /// and yields the entries in reverse afterwards; backends should override + /// this to stream incrementally whenever possible. + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let keys = self.find_keys_by_prefix(key_prefix).await?; + for key in keys.into_iter().rev() { + yield Ok(key); + } + }) + } + + /// Returns a boxed stream that yields the `(key, value)` pairs matching the + /// prefix in reverse (descending) order. The prefix is not included in the + /// returned keys. The stream may be dropped early to stop the iteration. + /// The default implementation downloads the full list eagerly via + /// `find_key_values_by_prefix` and yields the entries in reverse afterwards; + /// backends should override this to stream incrementally whenever possible. + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let key_values = self.find_key_values_by_prefix(key_prefix).await?; + for key_value in key_values.into_iter().rev() { + yield Ok(key_value); + } + }) + } + // We can't use `async fn` here in the below implementations due to // https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed // we can revert them to `async fn` syntax, which is neater. diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 7393de463373..1d39aa072674 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -218,6 +218,23 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec .await .unwrap(); assert_eq!(key_values_iter, key_values_by_prefix); + // Reverse streaming variants must yield the eager results in reverse order. + let keys_rev_iter: Vec> = store + .find_keys_by_prefix_rev_iter(key_prefix) + .try_collect() + .await + .unwrap(); + let mut keys_request_rev = keys_request.clone(); + keys_request_rev.reverse(); + assert_eq!(keys_rev_iter, keys_request_rev); + let key_values_rev_iter: Vec<(Vec, Vec)> = store + .find_key_values_by_prefix_rev_iter(key_prefix) + .try_collect() + .await + .unwrap(); + let mut key_values_by_prefix_rev = key_values_by_prefix.clone(); + key_values_by_prefix_rev.reverse(); + assert_eq!(key_values_rev_iter, key_values_by_prefix_rev); } // Now checking the read_multi_values_bytes let mut rng = make_deterministic_rng(); From d0f173ecf6e815058626d1cc5d6808fd338c8a84 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 21:41:38 +0200 Subject: [PATCH 06/27] Simplify the code. --- linera-views/src/backends/rocks_db.rs | 115 ++++++++++--------------- linera-views/src/backends/scylla_db.rs | 103 +++++++++++----------- 2 files changed, 94 insertions(+), 124 deletions(-) diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 376a077545c4..ad1a53f29996 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -70,6 +70,13 @@ const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB /// The RocksDB client that we use. type DB = rocksdb::DBWithThreadMode; +/// Iteration direction selector used by the streaming `find_*_iter` helpers. +#[derive(Clone, Copy)] +enum IterDirection { + Forward, + Reverse, +} + /// The choice of the spawning mode. /// `SpawnBlocking` always works and is the safest. /// `BlockInPlace` can only be used in multi-threaded environment. @@ -570,82 +577,37 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { &self, key_prefix: &[u8], ) -> FindKeysStream<'_, Self::Error> { - let executor = self.executor.clone(); - let key_prefix = key_prefix.to_vec(); - Box::pin(async_stream::stream! { - if let Err(error) = check_key_size(&key_prefix) { - yield Err(error); - return; - } - let (tx, mut rx) = - tokio::sync::mpsc::channel::, RocksDbStoreInternalError>>(1); - let handle = tokio::task::spawn_blocking(move || { - let mut prefix = executor.start_key.clone(); - prefix.extend(&key_prefix); - let len = prefix.len(); - let mut iter = executor.get_find_prefix_iterator(&prefix); - while let Some(key) = iter.key() { - if tx.blocking_send(Ok(key[len..].to_vec())).is_err() { - return; - } - iter.next(); - } - if let Err(error) = iter.status() { - let _ = tx.blocking_send(Err(error.into())); - } - }); - while let Some(item) = rx.recv().await { - yield item; - } - if let Err(error) = handle.await { - yield Err(RocksDbStoreInternalError::TokioJoinError(error)); - } - }) + self.find_keys_stream(key_prefix, IterDirection::Forward) } fn find_key_values_by_prefix_iter( &self, key_prefix: &[u8], ) -> FindKeyValuesStream<'_, Self::Error> { - let executor = self.executor.clone(); - let key_prefix = key_prefix.to_vec(); - Box::pin(async_stream::stream! { - if let Err(error) = check_key_size(&key_prefix) { - yield Err(error); - return; - } - let (tx, mut rx) = tokio::sync::mpsc::channel::< - Result<(Vec, Vec), RocksDbStoreInternalError>, - >(1); - let handle = tokio::task::spawn_blocking(move || { - let mut prefix = executor.start_key.clone(); - prefix.extend(&key_prefix); - let len = prefix.len(); - let mut iter = executor.get_find_prefix_iterator(&prefix); - while let Some((key, value)) = iter.item() { - let pair = (key[len..].to_vec(), value.to_vec()); - if tx.blocking_send(Ok(pair)).is_err() { - return; - } - iter.next(); - } - if let Err(error) = iter.status() { - let _ = tx.blocking_send(Err(error.into())); - } - }); - while let Some(item) = rx.recv().await { - yield item; - } - if let Err(error) = handle.await { - yield Err(RocksDbStoreInternalError::TokioJoinError(error)); - } - }) + self.find_key_values_stream(key_prefix, IterDirection::Forward) } fn find_keys_by_prefix_rev_iter( &self, key_prefix: &[u8], ) -> FindKeysStream<'_, Self::Error> { + self.find_keys_stream(key_prefix, IterDirection::Reverse) + } + + fn find_key_values_by_prefix_rev_iter( + &self, + key_prefix: &[u8], + ) -> FindKeyValuesStream<'_, Self::Error> { + self.find_key_values_stream(key_prefix, IterDirection::Reverse) + } +} + +impl RocksDbStoreInternal { + fn find_keys_stream( + &self, + key_prefix: &[u8], + direction: IterDirection, + ) -> FindKeysStream<'_, RocksDbStoreInternalError> { let executor = self.executor.clone(); let key_prefix = key_prefix.to_vec(); Box::pin(async_stream::stream! { @@ -659,7 +621,10 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { let mut prefix = executor.start_key.clone(); prefix.extend(&key_prefix); let len = prefix.len(); - let mut iter = executor.get_find_prefix_reverse_iterator(&prefix); + let mut iter = match direction { + IterDirection::Forward => executor.get_find_prefix_iterator(&prefix), + IterDirection::Reverse => executor.get_find_prefix_reverse_iterator(&prefix), + }; while let Some(key) = iter.key() { if !key.starts_with(&prefix) { break; @@ -667,7 +632,10 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { if tx.blocking_send(Ok(key[len..].to_vec())).is_err() { return; } - iter.prev(); + match direction { + IterDirection::Forward => iter.next(), + IterDirection::Reverse => iter.prev(), + } } if let Err(error) = iter.status() { let _ = tx.blocking_send(Err(error.into())); @@ -682,10 +650,11 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { }) } - fn find_key_values_by_prefix_rev_iter( + fn find_key_values_stream( &self, key_prefix: &[u8], - ) -> FindKeyValuesStream<'_, Self::Error> { + direction: IterDirection, + ) -> FindKeyValuesStream<'_, RocksDbStoreInternalError> { let executor = self.executor.clone(); let key_prefix = key_prefix.to_vec(); Box::pin(async_stream::stream! { @@ -700,7 +669,10 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { let mut prefix = executor.start_key.clone(); prefix.extend(&key_prefix); let len = prefix.len(); - let mut iter = executor.get_find_prefix_reverse_iterator(&prefix); + let mut iter = match direction { + IterDirection::Forward => executor.get_find_prefix_iterator(&prefix), + IterDirection::Reverse => executor.get_find_prefix_reverse_iterator(&prefix), + }; while let Some((key, value)) = iter.item() { if !key.starts_with(&prefix) { break; @@ -709,7 +681,10 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { if tx.blocking_send(Ok(pair)).is_err() { return; } - iter.prev(); + match direction { + IterDirection::Forward => iter.next(), + IterDirection::Reverse => iter.prev(), + } } if let Err(error) = iter.status() { let _ = tx.blocking_send(Err(error.into())); diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 8f61a4b50ebd..b72c4aa5fad7 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -96,6 +96,13 @@ const MAX_BATCH_SIZE: usize = 5000; /// The keyspace to use for the ScyllaDB database. const KEYSPACE: &str = "kv"; +/// Iteration direction selector used by the streaming `find_*_iter` helpers. +#[derive(Clone, Copy)] +enum IterDirection { + Forward, + Reverse, +} + /// The client for ScyllaDB: /// * The session allows to pass queries /// * The namespace that is being assigned to the database @@ -707,30 +714,7 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let key_prefix = key_prefix.to_vec(); - ScyllaDbClient::check_key_size(&key_prefix)?; - let _guard = self.acquire().await; - let session = &self.store.session; - let len = key_prefix.len(); - let query_unbounded = &self.store.find_keys_by_prefix_unbounded; - let query_bounded = &self.store.find_keys_by_prefix_bounded; - let rows = match get_upper_bound_option(&key_prefix) { - None => { - let values = (self.root_key.clone(), key_prefix.clone()); - Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? - } - Some(upper_bound) => { - let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); - Box::pin(session.execute_iter(query_bounded.clone(), values)).await? - } - }; - let mut rows = rows.rows_stream::<(Vec,)>()?; - while let Some(row) = rows.next().await { - let (key,) = row?; - yield Ok(key[len..].to_vec()); - } - }) + self.find_keys_stream(key_prefix, IterDirection::Forward) } async fn find_key_values_by_prefix( @@ -747,44 +731,46 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let key_prefix = key_prefix.to_vec(); - ScyllaDbClient::check_key_size(&key_prefix)?; - let _guard = self.acquire().await; - let session = &self.store.session; - let len = key_prefix.len(); - let query_unbounded = &self.store.find_key_values_by_prefix_unbounded; - let query_bounded = &self.store.find_key_values_by_prefix_bounded; - let rows = match get_upper_bound_option(&key_prefix) { - None => { - let values = (self.root_key.clone(), key_prefix.clone()); - Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? - } - Some(upper_bound) => { - let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); - Box::pin(session.execute_iter(query_bounded.clone(), values)).await? - } - }; - let mut rows = rows.rows_stream::<(Vec, Vec)>()?; - while let Some(row) = rows.next().await { - let (key, value) = row?; - yield Ok((key[len..].to_vec(), value)); - } - }) + self.find_key_values_stream(key_prefix, IterDirection::Forward) } fn find_keys_by_prefix_rev_iter<'a>( &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { + self.find_keys_stream(key_prefix, IterDirection::Reverse) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.find_key_values_stream(key_prefix, IterDirection::Reverse) + } +} + +impl ScyllaDbStoreInternal { + fn find_keys_stream<'a>( + &'a self, + key_prefix: &'a [u8], + direction: IterDirection, + ) -> FindKeysStream<'a, ScyllaDbStoreInternalError> { Box::pin(async_stream::stream! { let key_prefix = key_prefix.to_vec(); ScyllaDbClient::check_key_size(&key_prefix)?; let _guard = self.acquire().await; let session = &self.store.session; let len = key_prefix.len(); - let query_unbounded = &self.store.find_keys_by_prefix_rev_unbounded; - let query_bounded = &self.store.find_keys_by_prefix_rev_bounded; + let (query_unbounded, query_bounded) = match direction { + IterDirection::Forward => ( + &self.store.find_keys_by_prefix_unbounded, + &self.store.find_keys_by_prefix_bounded, + ), + IterDirection::Reverse => ( + &self.store.find_keys_by_prefix_rev_unbounded, + &self.store.find_keys_by_prefix_rev_bounded, + ), + }; let rows = match get_upper_bound_option(&key_prefix) { None => { let values = (self.root_key.clone(), key_prefix.clone()); @@ -803,18 +789,27 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { }) } - fn find_key_values_by_prefix_rev_iter<'a>( + fn find_key_values_stream<'a>( &'a self, key_prefix: &'a [u8], - ) -> FindKeyValuesStream<'a, Self::Error> { + direction: IterDirection, + ) -> FindKeyValuesStream<'a, ScyllaDbStoreInternalError> { Box::pin(async_stream::stream! { let key_prefix = key_prefix.to_vec(); ScyllaDbClient::check_key_size(&key_prefix)?; let _guard = self.acquire().await; let session = &self.store.session; let len = key_prefix.len(); - let query_unbounded = &self.store.find_key_values_by_prefix_rev_unbounded; - let query_bounded = &self.store.find_key_values_by_prefix_rev_bounded; + let (query_unbounded, query_bounded) = match direction { + IterDirection::Forward => ( + &self.store.find_key_values_by_prefix_unbounded, + &self.store.find_key_values_by_prefix_bounded, + ), + IterDirection::Reverse => ( + &self.store.find_key_values_by_prefix_rev_unbounded, + &self.store.find_key_values_by_prefix_rev_bounded, + ), + }; let rows = match get_upper_bound_option(&key_prefix) { None => { let values = (self.root_key.clone(), key_prefix.clone()); From 99869fa1f01c3e4ce016b1682e9ccfce7356b7c0 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 21:50:54 +0200 Subject: [PATCH 07/27] Simplify the DynamoDB code for the iterators. --- linera-views/src/backends/dynamo_db.rs | 97 +++++++------------------- 1 file changed, 27 insertions(+), 70 deletions(-) diff --git a/linera-views/src/backends/dynamo_db.rs b/linera-views/src/backends/dynamo_db.rs index 2312e4702394..5627654bbfef 100644 --- a/linera-views/src/backends/dynamo_db.rs +++ b/linera-views/src/backends/dynamo_db.rs @@ -608,23 +608,6 @@ impl DynamoDbStoreInternal { start_key: &[u8], key_prefix: &[u8], start_key_map: Option>, - ) -> Result { - self.get_query_output_with_direction( - attribute_str, - start_key, - key_prefix, - start_key_map, - true, - ) - .await - } - - async fn get_query_output_with_direction( - &self, - attribute_str: &str, - start_key: &[u8], - key_prefix: &[u8], - start_key_map: Option>, forward: bool, ) -> Result { let _guard = self.acquire().await; @@ -704,7 +687,7 @@ impl DynamoDbStoreInternal { let mut start_key_map = None; loop { let response = self - .get_query_output(attribute, start_key, key_prefix, start_key_map) + .get_query_output(attribute, start_key, key_prefix, start_key_map, true) .await?; let last_evaluated = response.last_evaluated_key.clone(); responses.push(response); @@ -909,26 +892,7 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - check_key_size(key_prefix)?; - let prefix_len = key_prefix.len(); - let mut start_key_map = None; - loop { - let response = self - .get_query_output(KEY_ATTRIBUTE, &self.start_key, key_prefix, start_key_map) - .await?; - let last_evaluated = response.last_evaluated_key.clone(); - for item in response.items.iter().flatten() { - yield extract_key(prefix_len, item).map(|k| k.to_vec()); - } - match last_evaluated { - None => break, - Some(value) => { - start_key_map = Some(value); - } - } - } - }) + self.find_keys_stream(key_prefix, true) } async fn find_key_values_by_prefix( @@ -948,50 +912,42 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - check_key_size(key_prefix)?; - let prefix_len = key_prefix.len(); - let mut start_key_map = None; - loop { - let response = self - .get_query_output( - KEY_VALUE_ATTRIBUTE, - &self.start_key, - key_prefix, - start_key_map, - ) - .await?; - let last_evaluated = response.last_evaluated_key.clone(); - for item in response.items.iter().flatten() { - yield extract_key_value(prefix_len, item) - .map(|(k, v)| (k.to_vec(), v.to_vec())); - } - match last_evaluated { - None => break, - Some(value) => { - start_key_map = Some(value); - } - } - } - }) + self.find_key_values_stream(key_prefix, true) } fn find_keys_by_prefix_rev_iter<'a>( &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { + self.find_keys_stream(key_prefix, false) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.find_key_values_stream(key_prefix, false) + } +} + +impl DynamoDbStoreInternal { + fn find_keys_stream<'a>( + &'a self, + key_prefix: &'a [u8], + forward: bool, + ) -> FindKeysStream<'a, DynamoDbStoreInternalError> { Box::pin(async_stream::stream! { check_key_size(key_prefix)?; let prefix_len = key_prefix.len(); let mut start_key_map = None; loop { let response = self - .get_query_output_with_direction( + .get_query_output( KEY_ATTRIBUTE, &self.start_key, key_prefix, start_key_map, - false, + forward, ) .await?; let last_evaluated = response.last_evaluated_key.clone(); @@ -1008,22 +964,23 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { }) } - fn find_key_values_by_prefix_rev_iter<'a>( + fn find_key_values_stream<'a>( &'a self, key_prefix: &'a [u8], - ) -> FindKeyValuesStream<'a, Self::Error> { + forward: bool, + ) -> FindKeyValuesStream<'a, DynamoDbStoreInternalError> { Box::pin(async_stream::stream! { check_key_size(key_prefix)?; let prefix_len = key_prefix.len(); let mut start_key_map = None; loop { let response = self - .get_query_output_with_direction( + .get_query_output( KEY_VALUE_ATTRIBUTE, &self.start_key, key_prefix, start_key_map, - false, + forward, ) .await?; let last_evaluated = response.last_evaluated_key.clone(); From 69c23a7e2c86a7e1f889ee1171ca523c966d70ec Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 21:51:34 +0200 Subject: [PATCH 08/27] Reformat. --- linera-views/src/backends/lru_caching.rs | 3 +-- linera-views/src/backends/rocks_db.rs | 10 ++-------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index 96ba2a5b1e5e..c0093eb9bf55 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -5,14 +5,13 @@ use std::sync::{Arc, Mutex}; +use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; #[cfg(with_testing)] use crate::memory::MemoryDatabase; #[cfg(with_testing)] use crate::store::TestKeyValueDatabase; -use futures::stream::StreamExt; - use crate::{ batch::{Batch, WriteOperation}, lru_prefix_cache::{LruPrefixCache, StorageCacheConfig}, diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index ad1a53f29996..43a8a98cbcb1 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -573,10 +573,7 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { .await } - fn find_keys_by_prefix_iter( - &self, - key_prefix: &[u8], - ) -> FindKeysStream<'_, Self::Error> { + fn find_keys_by_prefix_iter(&self, key_prefix: &[u8]) -> FindKeysStream<'_, Self::Error> { self.find_keys_stream(key_prefix, IterDirection::Forward) } @@ -587,10 +584,7 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { self.find_key_values_stream(key_prefix, IterDirection::Forward) } - fn find_keys_by_prefix_rev_iter( - &self, - key_prefix: &[u8], - ) -> FindKeysStream<'_, Self::Error> { + fn find_keys_by_prefix_rev_iter(&self, key_prefix: &[u8]) -> FindKeysStream<'_, Self::Error> { self.find_keys_stream(key_prefix, IterDirection::Reverse) } From 5af87f1d70d176dd8df9872d3df17546f4081e76 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Fri, 1 May 2026 22:14:29 +0200 Subject: [PATCH 09/27] Resolve the bug in value splitting. --- linera-views/src/backends/value_splitting.rs | 62 +++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 2142fd351517..271a2030cbbe 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -327,45 +327,49 @@ where ) -> FindKeyValuesStream<'a, Self::Error> { Box::pin(async_stream::stream! { let mut stream = self.store.find_key_values_by_prefix_rev_iter(key_prefix); + // Accumulator: (current_key, top_index, segments_descending). + // segments[i] is the segment at index `top_index - i`. + // An accumulation that never reaches index 0 is composed of + // leftover segments (from a previously deleted big value, since + // delete_key only removes the master) and is silently dropped. + let mut state: Option<(Vec, u32, Vec>)> = None; while let Some(item) = stream.next().await { let (mut big_key, value) = item.map_err(ValueSplittingError::InnerStoreError)?; - let last_index = Self::read_index_from_key(&big_key)?; + let index = Self::read_index_from_key(&big_key)?; big_key.truncate(big_key.len() - 4); let key = big_key; - // Collect segments in reverse order: from last_index down to 0. - let mut segments: Vec<(u32, Vec)> = vec![(last_index, value)]; - while segments.last().unwrap().0 > 0 { - let expected = segments.last().unwrap().0 - 1; - let next = stream.next().await - .ok_or(ValueSplittingError::MissingSegment)?; - let (big_key2, value2) = - next.map_err(ValueSplittingError::InnerStoreError)?; - if !(Self::read_index_from_key(&big_key2)? == expected - && big_key2.starts_with(&key) - && big_key2.len() == key.len() + 4) - { + let continues = match &state { + Some((current_key, top, segs)) => { + let segs_len = segs.len() as u32; + *current_key == key && segs_len <= *top && index == *top - segs_len + } + None => false, + }; + if continues { + state.as_mut().unwrap().2.push(value); + } else { + state = Some((key, index, vec![value])); + } + if index == 0 { + let (key, top, segs) = state.take().unwrap(); + let segment_zero_value = segs.last().unwrap(); + let count = Self::read_count_from_value(segment_zero_value)?; + if count == 0 || count > top + 1 { yield Err(ValueSplittingError::MissingSegment); return; } - segments.push((expected, value2)); - } - // Segment 0 carries the count; indices >= count are stranded leftovers. - let (_, segment_zero_value) = segments.last().unwrap(); - let count = Self::read_count_from_value(segment_zero_value)?; - if count == 0 || count > last_index + 1 { - yield Err(ValueSplittingError::MissingSegment); - return; - } - let mut big_value = Vec::new(); - for (idx, val) in segments.iter().rev() { - if *idx == 0 { - big_value.extend_from_slice(&val[4..]); - } else if *idx < count { - big_value.extend_from_slice(val); + let mut big_value = Vec::new(); + for (rev_pos, val) in segs.iter().rev().enumerate() { + let idx = rev_pos as u32; + if idx == 0 { + big_value.extend_from_slice(&val[4..]); + } else if idx < count { + big_value.extend_from_slice(val); + } } + yield Ok((key, big_value)); } - yield Ok((key, big_value)); } }) } From a04cfa863ce15d3056e332a19803e80f243e8736 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Sat, 2 May 2026 08:24:00 +0200 Subject: [PATCH 10/27] Correct the Cargo.lock --- examples/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/Cargo.lock b/examples/Cargo.lock index a3b42efea752..3b00bcc6f68d 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -4121,6 +4121,7 @@ dependencies = [ "anyhow", "async-graphql", "async-lock", + "async-stream", "bcs", "cfg_aliases", "convert_case", From 39ab1a4d3aba55c1390bb8f6796a56bc43731c36 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Sat, 2 May 2026 08:30:30 +0200 Subject: [PATCH 11/27] Some corrections for CI. --- linera-views/src/backends/rocks_db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 43a8a98cbcb1..17bbaed917c3 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -632,7 +632,7 @@ impl RocksDbStoreInternal { } } if let Err(error) = iter.status() { - let _ = tx.blocking_send(Err(error.into())); + _ = tx.blocking_send(Err(error.into())); } }); while let Some(item) = rx.recv().await { @@ -681,7 +681,7 @@ impl RocksDbStoreInternal { } } if let Err(error) = iter.status() { - let _ = tx.blocking_send(Err(error.into())); + _ = tx.blocking_send(Err(error.into())); } }); while let Some(item) = rx.recv().await { From 0c19c8296ddcda37b6f79f54463710c134cb190b Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Sat, 2 May 2026 11:05:50 +0200 Subject: [PATCH 12/27] A forgotten entry in the Cargo.lock --- linera-sdk/tests/fixtures/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/linera-sdk/tests/fixtures/Cargo.lock b/linera-sdk/tests/fixtures/Cargo.lock index 83e09659f0d0..9a0af87028ac 100644 --- a/linera-sdk/tests/fixtures/Cargo.lock +++ b/linera-sdk/tests/fixtures/Cargo.lock @@ -2475,6 +2475,7 @@ dependencies = [ "anyhow", "async-graphql", "async-lock", + "async-stream", "bcs", "cfg_aliases", "convert_case", From 65c36dd8b478d58bb9159ff9690ea94fd8e5f376 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Sat, 2 May 2026 18:51:42 +0200 Subject: [PATCH 13/27] Add a test that exercise the ValueSplitting part of the code. --- linera-views/tests/store_tests.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/linera-views/tests/store_tests.rs b/linera-views/tests/store_tests.rs index 8ea1ab421d73..27a0cebb13ce 100644 --- a/linera-views/tests/store_tests.rs +++ b/linera-views/tests/store_tests.rs @@ -9,8 +9,8 @@ use linera_views::{ random::make_deterministic_rng, store::{ReadableKeyValueStore as _, TestKeyValueDatabase as _, WritableKeyValueStore as _}, test_utils::{ - big_read_multi_values, get_random_test_scenarios, run_big_write_read, run_reads, - run_writes_from_blank, run_writes_from_state, + big_read_multi_values, get_random_byte_vector, get_random_test_scenarios, + run_big_write_read, run_reads, run_writes_from_blank, run_writes_from_state, }, value_splitting::create_value_splitting_memory_store, }; @@ -56,6 +56,32 @@ async fn test_reads_test_memory() { } } +#[tokio::test] +async fn test_reads_value_splitting_varying_value_sizes() { + use std::collections::HashSet; + + use rand::Rng; + let mut rng = make_deterministic_rng(); + let key_prefix = vec![0]; + let mut key_values = Vec::new(); + let mut unique_keys = HashSet::new(); + let num_entries = 30; + let len_key = 4; + while key_values.len() < num_entries { + let mut key = key_prefix.clone(); + for _ in 0..len_key { + key.push(rng.gen()); + } + if unique_keys.insert(key.clone()) { + let len_value = rng.gen_range(0..=500); + let value = get_random_byte_vector(&mut rng, &[], len_value); + key_values.push((key, value)); + } + } + let key_value_store = create_value_splitting_memory_store(); + run_reads(key_value_store, key_values).await; +} + #[tokio::test] async fn test_reads_memory() { for scenario in get_random_test_scenarios() { From 42adda2cf68c32907658eda87f69ad06190df912 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 10:58:41 +0200 Subject: [PATCH 14/27] Update the tests. --- linera-views/src/backends/lru_caching.rs | 92 +++++++++++++++++++ linera-views/src/backends/value_splitting.rs | 96 +++++++++++++++++++- 2 files changed, 187 insertions(+), 1 deletion(-) diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index c0093eb9bf55..e08b2fdadb78 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -617,6 +617,98 @@ impl LruCachingStore { #[cfg(with_testing)] pub type LruCachingMemoryDatabase = LruCachingDatabase; +#[cfg(test)] +mod tests { + // The strategy of every test below: build an `LruCachingStore` over a + // shared inner `MemoryStore`, populate the cache through reads/writes done + // via the LRU layer, then mutate the inner store *directly* (bypassing the + // cache). A subsequent read through the LRU layer that still observes the + // original value proves the cache actually served the request — if it + // hadn't, the read would have hit the inner store and seen the new value. + + use futures::stream::TryStreamExt; + + use crate::{ + backends::lru_caching::{LruCachingStore, DEFAULT_STORAGE_CACHE_CONFIG}, + batch::Batch, + memory::MemoryStore, + store::{ReadableKeyValueStore, WritableKeyValueStore}, + }; + + fn make_lru() -> (MemoryStore, LruCachingStore) { + let inner = MemoryStore::new_for_testing(); + let lru = LruCachingStore::new( + inner.clone(), + DEFAULT_STORAGE_CACHE_CONFIG, + /* has_exclusive_access */ true, + ); + (inner, lru) + } + + async fn put_direct(inner: &MemoryStore, key: &[u8], value: &[u8]) { + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.to_vec(), value.to_vec()); + inner.write_batch(batch).await.unwrap(); + } + + #[tokio::test] + async fn test_lru_cache_serves_find_by_prefix() { + let (inner, lru) = make_lru(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(vec![1, 0], vec![10]); + batch.put_key_value_bytes(vec![1, 1], vec![11]); + lru.write_batch(batch).await.unwrap(); + + // Populate the find-keys / find-key-values caches for prefix [1]. + let keys = lru.find_keys_by_prefix(&[1]).await.unwrap(); + assert_eq!(keys, vec![vec![0], vec![1]]); + let kv = lru.find_key_values_by_prefix(&[1]).await.unwrap(); + assert_eq!(kv, vec![(vec![0], vec![10]), (vec![1], vec![11])]); + + // Diverge the inner store: add a new key under the prefix and mutate + // an existing one — neither should be visible through the LRU layer. + put_direct(&inner, &[1, 2], &[12]).await; + put_direct(&inner, &[1, 0], &[99]).await; + + assert_eq!( + lru.find_keys_by_prefix(&[1]).await.unwrap(), + vec![vec![0], vec![1]] + ); + assert_eq!( + lru.find_key_values_by_prefix(&[1]).await.unwrap(), + vec![(vec![0], vec![10]), (vec![1], vec![11])] + ); + + // The streaming variants share the same cache. + let iter_keys: Vec> = lru + .find_keys_by_prefix_iter(&[1]) + .try_collect() + .await + .unwrap(); + assert_eq!(iter_keys, vec![vec![0], vec![1]]); + let iter_kv: Vec<(Vec, Vec)> = lru + .find_key_values_by_prefix_iter(&[1]) + .try_collect() + .await + .unwrap(); + assert_eq!(iter_kv, vec![(vec![0], vec![10]), (vec![1], vec![11])]); + + // The reverse streaming variants serve the cached entries in reverse. + let rev_keys: Vec> = lru + .find_keys_by_prefix_rev_iter(&[1]) + .try_collect() + .await + .unwrap(); + assert_eq!(rev_keys, vec![vec![1], vec![0]]); + let rev_kv: Vec<(Vec, Vec)> = lru + .find_key_values_by_prefix_rev_iter(&[1]) + .try_collect() + .await + .unwrap(); + assert_eq!(rev_kv, vec![(vec![1], vec![11]), (vec![0], vec![10])]); + } +} + #[cfg(with_testing)] impl TestKeyValueDatabase for LruCachingDatabase where diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 271a2030cbbe..17ed235cf9c7 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -634,10 +634,13 @@ pub fn create_value_splitting_memory_store() -> ValueSplittingStore> = (1u8..=8u8).map(|b| vec![b, 0, 0]).collect(); + + // Mix of small (single-segment) and large (multi-segment) values, so both + // code paths in the iterators are exercised. + let lengths = [ + 10, + MAX_LEN + 5, + MAX_LEN - 10, + 2 * MAX_LEN + 7, + 3 * MAX_LEN - 4, + 50, + 2 * MAX_LEN, + 4 * MAX_LEN + 1, + ]; + let mut rng = crate::random::make_deterministic_rng(); + let values: Vec> = lengths + .iter() + .map(|&len| (0..len).map(|_| rng.gen::()).collect()) + .collect(); + + // Write all values. + let mut batch = Batch::new(); + for (key, value) in keys.iter().zip(values.iter()) { + batch.put_key_value_bytes(key.clone(), value.clone()); + } + big_store.write_batch(batch).await.unwrap(); + + // Delete a few via DeleteKey: for the multi-segment ones only the master + // segment is removed, so leftover continuation segments remain in the + // underlying store and the iterators must skip them. + let to_delete = [1usize, 3, 6]; + let mut batch = Batch::new(); + for &i in &to_delete { + batch.delete_key(keys[i].clone()); + } + big_store.write_batch(batch).await.unwrap(); + + // Surviving logical entries, in ascending key order. + let expected_kv: Vec<(Vec, Vec)> = (0..keys.len()) + .filter(|i| !to_delete.contains(i)) + .map(|i| (keys[i].clone(), values[i].clone())) + .collect(); + let expected_keys: Vec> = + expected_kv.iter().map(|(k, _)| k.clone()).collect(); + let expected_kv_rev: Vec<(Vec, Vec)> = + expected_kv.iter().rev().cloned().collect(); + let expected_keys_rev: Vec> = + expected_keys.iter().rev().cloned().collect(); + + // 1. find_keys_by_prefix_iter + let mut stream = big_store.find_keys_by_prefix_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_keys); + + // 2. find_key_values_by_prefix_iter + let mut stream = big_store.find_key_values_by_prefix_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_kv); + + // 3. find_keys_by_prefix_rev_iter + let mut stream = big_store.find_keys_by_prefix_rev_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_keys_rev); + + // 4. find_key_values_by_prefix_rev_iter + let mut stream = big_store.find_key_values_by_prefix_rev_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_kv_rev); + } } From 65b9b7a643dc03c03ee67873d768a6988c6cca9e Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 13:09:08 +0200 Subject: [PATCH 15/27] Correction from CI. --- linera-views/src/backends/lru_caching.rs | 30 ++++++++++---------- linera-views/src/backends/value_splitting.rs | 9 ++---- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index e08b2fdadb78..310f998b31e9 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -617,6 +617,21 @@ impl LruCachingStore { #[cfg(with_testing)] pub type LruCachingMemoryDatabase = LruCachingDatabase; +#[cfg(with_testing)] +impl TestKeyValueDatabase for LruCachingDatabase +where + D: TestKeyValueDatabase, +{ + async fn new_test_config() -> Result, D::Error> { + let inner_config = D::new_test_config().await?; + let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG; + Ok(LruCachingConfig { + inner_config, + storage_cache_config, + }) + } +} + #[cfg(test)] mod tests { // The strategy of every test below: build an `LruCachingStore` over a @@ -708,18 +723,3 @@ mod tests { assert_eq!(rev_kv, vec![(vec![1], vec![11]), (vec![0], vec![10])]); } } - -#[cfg(with_testing)] -impl TestKeyValueDatabase for LruCachingDatabase -where - D: TestKeyValueDatabase, -{ - async fn new_test_config() -> Result, D::Error> { - let inner_config = D::new_test_config().await?; - let storage_cache_config = DEFAULT_STORAGE_CACHE_CONFIG; - Ok(LruCachingConfig { - inner_config, - storage_cache_config, - }) - } -} diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 17ed235cf9c7..b352dbd34139 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -789,12 +789,9 @@ mod tests { .filter(|i| !to_delete.contains(i)) .map(|i| (keys[i].clone(), values[i].clone())) .collect(); - let expected_keys: Vec> = - expected_kv.iter().map(|(k, _)| k.clone()).collect(); - let expected_kv_rev: Vec<(Vec, Vec)> = - expected_kv.iter().rev().cloned().collect(); - let expected_keys_rev: Vec> = - expected_keys.iter().rev().cloned().collect(); + let expected_keys: Vec> = expected_kv.iter().map(|(k, _)| k.clone()).collect(); + let expected_kv_rev: Vec<(Vec, Vec)> = expected_kv.iter().rev().cloned().collect(); + let expected_keys_rev: Vec> = expected_keys.iter().rev().cloned().collect(); // 1. find_keys_by_prefix_iter let mut stream = big_store.find_keys_by_prefix_iter(&[]); From 46bed0583bfcdca9ee38f5bcd7672494a8a03e9e Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 14:59:22 +0200 Subject: [PATCH 16/27] Add another test for an entry forgotten from the coverage. --- linera-views/src/backends/value_splitting.rs | 38 ++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index b352dbd34139..37389c5fae0e 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -825,4 +825,42 @@ mod tests { } assert_eq!(got, expected_kv_rev); } + + // Overwriting a long value with a shorter one only rewrites the segments up + // to the new count — the high-index segments of the original write remain in + // the underlying store as leftovers. The reverse iterator collects them all + // (it walks indices top..0 to find the master) and then must drop the + // ones whose index is >= the new count when reconstructing the value. + #[tokio::test] + async fn test_value_splitting5_rev_iter_drops_overwrite_leftovers() { + let big_store = create_value_splitting_memory_store(); + let key = vec![0, 0]; + + // Write a 250-byte value: with MAX_VALUE_SIZE = 100 this becomes 3 + // segments (96 bytes at index 0 plus the count, then 100 + 54 bytes + // at indices 1 and 2). + let mut rng = crate::random::make_deterministic_rng(); + let long_value: Vec = (0..250).map(|_| rng.gen::()).collect(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.clone(), long_value); + big_store.write_batch(batch).await.unwrap(); + + // Overwrite with a single-segment value. Only index 0 is rewritten; + // indices 1 and 2 stay in the underlying store as orphan segments. + let short_value: Vec = (0..10).map(|_| rng.gen::()).collect(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.clone(), short_value.clone()); + big_store.write_batch(batch).await.unwrap(); + + // The reverse iterator must accumulate all three segments (it sees them + // in order index 2, 1, 0) and then, when reconstructing, skip the two + // leftovers because their position in the reversed segs list is >= the + // new count of 1. + let mut stream = big_store.find_key_values_by_prefix_rev_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, vec![(key, short_value)]); + } } From e463c9a8b1d2746c51cd3241b3ed0a194dc8f03e Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 15:32:02 +0200 Subject: [PATCH 17/27] Changes to dual and journaling. --- linera-views/src/backends/dual.rs | 112 ++++++++++-------------- linera-views/src/backends/journaling.rs | 44 +++++----- 2 files changed, 68 insertions(+), 88 deletions(-) diff --git a/linera-views/src/backends/dual.rs b/linera-views/src/backends/dual.rs index 2856fd0fff8e..7ea50d840d9e 100644 --- a/linera-views/src/backends/dual.rs +++ b/linera-views/src/backends/dual.rs @@ -180,22 +180,18 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - match self { - Self::First(store) => { - let mut stream = store.find_keys_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::First); - } - } - Self::Second(store) => { - let mut stream = store.find_keys_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::Second); - } - } - } - }) + match self { + Self::First(store) => Box::pin( + store + .find_keys_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_keys_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } } async fn find_key_values_by_prefix( @@ -219,66 +215,54 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - match self { - Self::First(store) => { - let mut stream = store.find_key_values_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::First); - } - } - Self::Second(store) => { - let mut stream = store.find_key_values_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::Second); - } - } - } - }) + match self { + Self::First(store) => Box::pin( + store + .find_key_values_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_key_values_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } } fn find_keys_by_prefix_rev_iter<'a>( &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - match self { - Self::First(store) => { - let mut stream = store.find_keys_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::First); - } - } - Self::Second(store) => { - let mut stream = store.find_keys_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::Second); - } - } - } - }) + match self { + Self::First(store) => Box::pin( + store + .find_keys_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_keys_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } } fn find_key_values_by_prefix_rev_iter<'a>( &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - match self { - Self::First(store) => { - let mut stream = store.find_key_values_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::First); - } - } - Self::Second(store) => { - let mut stream = store.find_key_values_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(DualStoreError::Second); - } - } - } - }) + match self { + Self::First(store) => Box::pin( + store + .find_key_values_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_key_values_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } } } diff --git a/linera-views/src/backends/journaling.rs b/linera-views/src/backends/journaling.rs index e8a4ca87a3ba..ff32c26479d8 100644 --- a/linera-views/src/backends/journaling.rs +++ b/linera-views/src/backends/journaling.rs @@ -201,12 +201,11 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(JournalingError::Inner); - } - }) + Box::pin( + self.store + .find_keys_by_prefix_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) } async fn find_key_values_by_prefix( @@ -220,36 +219,33 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let mut stream = self.store.find_key_values_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(JournalingError::Inner); - } - }) + Box::pin( + self.store + .find_key_values_by_prefix_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) } fn find_keys_by_prefix_rev_iter<'a>( &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let mut stream = self.store.find_keys_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(JournalingError::Inner); - } - }) + Box::pin( + self.store + .find_keys_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) } fn find_key_values_by_prefix_rev_iter<'a>( &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let mut stream = self.store.find_key_values_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item.map_err(JournalingError::Inner); - } - }) + Box::pin( + self.store + .find_key_values_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) } } From d867a2eaa42f70584d099ee7a08072cd2a6c0601 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 15:51:47 +0200 Subject: [PATCH 18/27] Add the insertion of the values in the cache if the iterator ends. --- linera-views/src/backends/lru_caching.rs | 211 ++++++++++++++--------- 1 file changed, 128 insertions(+), 83 deletions(-) diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index 310f998b31e9..ee74ed2e3f5f 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; -use futures::stream::StreamExt; +use futures::stream::{self, StreamExt}; use serde::{Deserialize, Serialize}; #[cfg(with_testing)] @@ -325,31 +325,43 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_keys_by_prefix_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_keys(key_prefix) + }; + if let Some(keys) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(keys.into_iter().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + // Forward the inner stream while accumulating; if (and only if) the + // consumer drains it fully without errors, populate the cache. Box::pin(async_stream::stream! { - if let Some(cache) = self.get_exclusive_cache() { - let cached = { - let mut cache = cache.lock().unwrap(); - cache.query_find_keys(key_prefix) - }; - if let Some(keys) = cached { - #[cfg(with_metrics)] - metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT - .with_label_values(&[]) - .inc(); - for key in keys { + let mut accumulated = Vec::new(); + let mut inner = self.store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = inner.next().await { + match item { + Ok(key) => { + accumulated.push(key.clone()); yield Ok(key); } - return; + Err(e) => { + yield Err(e); + return; + } } - #[cfg(with_metrics)] - metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT - .with_label_values(&[]) - .inc(); - } - let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item; } + let mut cache = cache.lock().unwrap(); + cache.insert_find_keys(key_prefix.to_vec(), &accumulated); }) } @@ -384,31 +396,41 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_key_values_by_prefix_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_key_values(key_prefix) + }; + if let Some(key_values) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(key_values.into_iter().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); Box::pin(async_stream::stream! { - if let Some(cache) = self.get_exclusive_cache() { - let cached = { - let mut cache = cache.lock().unwrap(); - cache.query_find_key_values(key_prefix) - }; - if let Some(key_values) = cached { - #[cfg(with_metrics)] - metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT - .with_label_values(&[]) - .inc(); - for key_value in key_values { - yield Ok(key_value); + let mut accumulated = Vec::new(); + let mut inner = self.store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = inner.next().await { + match item { + Ok(kv) => { + accumulated.push(kv.clone()); + yield Ok(kv); + } + Err(e) => { + yield Err(e); + return; } - return; } - #[cfg(with_metrics)] - metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT - .with_label_values(&[]) - .inc(); - } - let mut stream = self.store.find_key_values_by_prefix_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item; } + let mut cache = cache.lock().unwrap(); + cache.insert_find_key_values(key_prefix.to_vec(), &accumulated); }) } @@ -416,31 +438,43 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_keys_by_prefix_rev_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_keys(key_prefix) + }; + if let Some(keys) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(keys.into_iter().rev().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); Box::pin(async_stream::stream! { - if let Some(cache) = self.get_exclusive_cache() { - let cached = { - let mut cache = cache.lock().unwrap(); - cache.query_find_keys(key_prefix) - }; - if let Some(keys) = cached { - #[cfg(with_metrics)] - metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT - .with_label_values(&[]) - .inc(); - for key in keys.into_iter().rev() { + let mut accumulated = Vec::new(); + let mut inner = self.store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = inner.next().await { + match item { + Ok(key) => { + accumulated.push(key.clone()); yield Ok(key); } - return; + Err(e) => { + yield Err(e); + return; + } } - #[cfg(with_metrics)] - metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT - .with_label_values(&[]) - .inc(); - } - let mut stream = self.store.find_keys_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item; } + // Cache stores keys in ascending order, shared with the forward iter. + accumulated.reverse(); + let mut cache = cache.lock().unwrap(); + cache.insert_find_keys(key_prefix.to_vec(), &accumulated); }) } @@ -448,31 +482,42 @@ where &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_key_values_by_prefix_rev_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_key_values(key_prefix) + }; + if let Some(key_values) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(key_values.into_iter().rev().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); Box::pin(async_stream::stream! { - if let Some(cache) = self.get_exclusive_cache() { - let cached = { - let mut cache = cache.lock().unwrap(); - cache.query_find_key_values(key_prefix) - }; - if let Some(key_values) = cached { - #[cfg(with_metrics)] - metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT - .with_label_values(&[]) - .inc(); - for key_value in key_values.into_iter().rev() { - yield Ok(key_value); + let mut accumulated = Vec::new(); + let mut inner = self.store.find_key_values_by_prefix_rev_iter(key_prefix); + while let Some(item) = inner.next().await { + match item { + Ok(kv) => { + accumulated.push(kv.clone()); + yield Ok(kv); + } + Err(e) => { + yield Err(e); + return; } - return; } - #[cfg(with_metrics)] - metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT - .with_label_values(&[]) - .inc(); - } - let mut stream = self.store.find_key_values_by_prefix_rev_iter(key_prefix); - while let Some(item) = stream.next().await { - yield item; } + accumulated.reverse(); + let mut cache = cache.lock().unwrap(); + cache.insert_find_key_values(key_prefix.to_vec(), &accumulated); }) } } From 3fce0d5e8c69aaa45fadbc4b0e88b33e9b7f6458 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 16:13:14 +0200 Subject: [PATCH 19/27] Simplify the LRU caching code. --- linera-views/src/backends/lru_caching.rs | 60 +++++++----------------- 1 file changed, 16 insertions(+), 44 deletions(-) diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index ee74ed2e3f5f..e10150082962 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -345,20 +345,13 @@ where .inc(); // Forward the inner stream while accumulating; if (and only if) the // consumer drains it fully without errors, populate the cache. - Box::pin(async_stream::stream! { + Box::pin(async_stream::try_stream! { let mut accumulated = Vec::new(); let mut inner = self.store.find_keys_by_prefix_iter(key_prefix); while let Some(item) = inner.next().await { - match item { - Ok(key) => { - accumulated.push(key.clone()); - yield Ok(key); - } - Err(e) => { - yield Err(e); - return; - } - } + let key = item?; + accumulated.push(key.clone()); + yield key; } let mut cache = cache.lock().unwrap(); cache.insert_find_keys(key_prefix.to_vec(), &accumulated); @@ -414,20 +407,13 @@ where metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT .with_label_values(&[]) .inc(); - Box::pin(async_stream::stream! { + Box::pin(async_stream::try_stream! { let mut accumulated = Vec::new(); let mut inner = self.store.find_key_values_by_prefix_iter(key_prefix); while let Some(item) = inner.next().await { - match item { - Ok(kv) => { - accumulated.push(kv.clone()); - yield Ok(kv); - } - Err(e) => { - yield Err(e); - return; - } - } + let kv = item?; + accumulated.push(kv.clone()); + yield kv; } let mut cache = cache.lock().unwrap(); cache.insert_find_key_values(key_prefix.to_vec(), &accumulated); @@ -456,20 +442,13 @@ where metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT .with_label_values(&[]) .inc(); - Box::pin(async_stream::stream! { + Box::pin(async_stream::try_stream! { let mut accumulated = Vec::new(); let mut inner = self.store.find_keys_by_prefix_rev_iter(key_prefix); while let Some(item) = inner.next().await { - match item { - Ok(key) => { - accumulated.push(key.clone()); - yield Ok(key); - } - Err(e) => { - yield Err(e); - return; - } - } + let key = item?; + accumulated.push(key.clone()); + yield key; } // Cache stores keys in ascending order, shared with the forward iter. accumulated.reverse(); @@ -500,20 +479,13 @@ where metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT .with_label_values(&[]) .inc(); - Box::pin(async_stream::stream! { + Box::pin(async_stream::try_stream! { let mut accumulated = Vec::new(); let mut inner = self.store.find_key_values_by_prefix_rev_iter(key_prefix); while let Some(item) = inner.next().await { - match item { - Ok(kv) => { - accumulated.push(kv.clone()); - yield Ok(kv); - } - Err(e) => { - yield Err(e); - return; - } - } + let kv = item?; + accumulated.push(kv.clone()); + yield kv; } accumulated.reverse(); let mut cache = cache.lock().unwrap(); From 41075b964770f295ab352a02d7e8a0870831e723 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 16:23:36 +0200 Subject: [PATCH 20/27] Some additional cleanups by using try_stream. --- linera-views/src/backends/lru_caching.rs | 16 ++++++------- linera-views/src/backends/rocks_db.rs | 30 ++++++++++-------------- linera-views/src/backends/scylla_db.rs | 8 +++---- 3 files changed, 24 insertions(+), 30 deletions(-) diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index e10150082962..523778e62a26 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -712,29 +712,29 @@ mod tests { ); // The streaming variants share the same cache. - let iter_keys: Vec> = lru + let iter_keys = lru .find_keys_by_prefix_iter(&[1]) - .try_collect() + .try_collect::>() .await .unwrap(); assert_eq!(iter_keys, vec![vec![0], vec![1]]); - let iter_kv: Vec<(Vec, Vec)> = lru + let iter_kv = lru .find_key_values_by_prefix_iter(&[1]) - .try_collect() + .try_collect::>() .await .unwrap(); assert_eq!(iter_kv, vec![(vec![0], vec![10]), (vec![1], vec![11])]); // The reverse streaming variants serve the cached entries in reverse. - let rev_keys: Vec> = lru + let rev_keys = lru .find_keys_by_prefix_rev_iter(&[1]) - .try_collect() + .try_collect::>() .await .unwrap(); assert_eq!(rev_keys, vec![vec![1], vec![0]]); - let rev_kv: Vec<(Vec, Vec)> = lru + let rev_kv = lru .find_key_values_by_prefix_rev_iter(&[1]) - .try_collect() + .try_collect::>() .await .unwrap(); assert_eq!(rev_kv, vec![(vec![1], vec![11]), (vec![0], vec![10])]); diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 17bbaed917c3..609f211b88c6 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -604,11 +604,8 @@ impl RocksDbStoreInternal { ) -> FindKeysStream<'_, RocksDbStoreInternalError> { let executor = self.executor.clone(); let key_prefix = key_prefix.to_vec(); - Box::pin(async_stream::stream! { - if let Err(error) = check_key_size(&key_prefix) { - yield Err(error); - return; - } + Box::pin(async_stream::try_stream! { + check_key_size(&key_prefix)?; let (tx, mut rx) = tokio::sync::mpsc::channel::, RocksDbStoreInternalError>>(1); let handle = tokio::task::spawn_blocking(move || { @@ -635,12 +632,14 @@ impl RocksDbStoreInternal { _ = tx.blocking_send(Err(error.into())); } }); + // The blocking task sends 0+ Ok items followed by at most one + // terminal Err; `?` short-circuits on that Err. If the stream ends + // cleanly we still await the join handle so a panic in the task is + // surfaced as a TokioJoinError. while let Some(item) = rx.recv().await { - yield item; - } - if let Err(error) = handle.await { - yield Err(RocksDbStoreInternalError::TokioJoinError(error)); + yield item?; } + handle.await.map_err(RocksDbStoreInternalError::TokioJoinError)?; }) } @@ -651,11 +650,8 @@ impl RocksDbStoreInternal { ) -> FindKeyValuesStream<'_, RocksDbStoreInternalError> { let executor = self.executor.clone(); let key_prefix = key_prefix.to_vec(); - Box::pin(async_stream::stream! { - if let Err(error) = check_key_size(&key_prefix) { - yield Err(error); - return; - } + Box::pin(async_stream::try_stream! { + check_key_size(&key_prefix)?; let (tx, mut rx) = tokio::sync::mpsc::channel::< Result<(Vec, Vec), RocksDbStoreInternalError>, >(1); @@ -685,11 +681,9 @@ impl RocksDbStoreInternal { } }); while let Some(item) = rx.recv().await { - yield item; - } - if let Err(error) = handle.await { - yield Err(RocksDbStoreInternalError::TokioJoinError(error)); + yield item?; } + handle.await.map_err(RocksDbStoreInternalError::TokioJoinError)?; }) } } diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index b72c4aa5fad7..c8a111639a04 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -755,7 +755,7 @@ impl ScyllaDbStoreInternal { key_prefix: &'a [u8], direction: IterDirection, ) -> FindKeysStream<'a, ScyllaDbStoreInternalError> { - Box::pin(async_stream::stream! { + Box::pin(async_stream::try_stream! { let key_prefix = key_prefix.to_vec(); ScyllaDbClient::check_key_size(&key_prefix)?; let _guard = self.acquire().await; @@ -784,7 +784,7 @@ impl ScyllaDbStoreInternal { let mut rows = rows.rows_stream::<(Vec,)>()?; while let Some(row) = rows.next().await { let (key,) = row?; - yield Ok(key[len..].to_vec()); + yield key[len..].to_vec(); } }) } @@ -794,7 +794,7 @@ impl ScyllaDbStoreInternal { key_prefix: &'a [u8], direction: IterDirection, ) -> FindKeyValuesStream<'a, ScyllaDbStoreInternalError> { - Box::pin(async_stream::stream! { + Box::pin(async_stream::try_stream! { let key_prefix = key_prefix.to_vec(); ScyllaDbClient::check_key_size(&key_prefix)?; let _guard = self.acquire().await; @@ -823,7 +823,7 @@ impl ScyllaDbStoreInternal { let mut rows = rows.rows_stream::<(Vec, Vec)>()?; while let Some(row) = rows.next().await { let (key, value) = row?; - yield Ok((key[len..].to_vec(), value)); + yield (key[len..].to_vec(), value); } }) } From a46c0a5edf4c0382b8c85ca40456cdeeeb8ffd2c Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 16:31:33 +0200 Subject: [PATCH 21/27] More type annotations eliminated. --- linera-views/src/backends/value_splitting.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 37389c5fae0e..da18f7d84682 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -762,10 +762,10 @@ mod tests { 4 * MAX_LEN + 1, ]; let mut rng = crate::random::make_deterministic_rng(); - let values: Vec> = lengths + let values = lengths .iter() .map(|&len| (0..len).map(|_| rng.gen::()).collect()) - .collect(); + .collect::>>(); // Write all values. let mut batch = Batch::new(); @@ -785,13 +785,13 @@ mod tests { big_store.write_batch(batch).await.unwrap(); // Surviving logical entries, in ascending key order. - let expected_kv: Vec<(Vec, Vec)> = (0..keys.len()) + let expected_kv = (0..keys.len()) .filter(|i| !to_delete.contains(i)) .map(|i| (keys[i].clone(), values[i].clone())) - .collect(); - let expected_keys: Vec> = expected_kv.iter().map(|(k, _)| k.clone()).collect(); - let expected_kv_rev: Vec<(Vec, Vec)> = expected_kv.iter().rev().cloned().collect(); - let expected_keys_rev: Vec> = expected_keys.iter().rev().cloned().collect(); + .collect::>(); + let expected_keys = expected_kv.iter().map(|(k, _)| k.clone()).collect::>(); + let expected_kv_rev = expected_kv.iter().rev().cloned().collect::>(); + let expected_keys_rev = expected_keys.iter().rev().cloned().collect::>(); // 1. find_keys_by_prefix_iter let mut stream = big_store.find_keys_by_prefix_iter(&[]); @@ -840,14 +840,14 @@ mod tests { // segments (96 bytes at index 0 plus the count, then 100 + 54 bytes // at indices 1 and 2). let mut rng = crate::random::make_deterministic_rng(); - let long_value: Vec = (0..250).map(|_| rng.gen::()).collect(); + let long_value = (0..250).map(|_| rng.gen::()).collect::>(); let mut batch = Batch::new(); batch.put_key_value_bytes(key.clone(), long_value); big_store.write_batch(batch).await.unwrap(); // Overwrite with a single-segment value. Only index 0 is rewritten; // indices 1 and 2 stay in the underlying store as orphan segments. - let short_value: Vec = (0..10).map(|_| rng.gen::()).collect(); + let short_value = (0..10).map(|_| rng.gen::()).collect::>(); let mut batch = Batch::new(); batch.put_key_value_bytes(key.clone(), short_value.clone()); big_store.write_batch(batch).await.unwrap(); From 3b14200db1e5154548432618ec2a640f6bed508e Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 16:34:10 +0200 Subject: [PATCH 22/27] More type annotation eliminated. --- linera-views/src/test_utils/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 1d39aa072674..87b5d1301e2e 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -206,30 +206,30 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec } assert_eq!(set_key_value1, set_key_value2); // Streaming variants must agree with the eager methods. - let keys_iter: Vec> = store + let keys_iter = store .find_keys_by_prefix_iter(key_prefix) - .try_collect() + .try_collect::>() .await .unwrap(); assert_eq!(keys_iter, keys_request); - let key_values_iter: Vec<(Vec, Vec)> = store + let key_values_iter = store .find_key_values_by_prefix_iter(key_prefix) - .try_collect() + .try_collect::>() .await .unwrap(); assert_eq!(key_values_iter, key_values_by_prefix); // Reverse streaming variants must yield the eager results in reverse order. - let keys_rev_iter: Vec> = store + let keys_rev_iter = store .find_keys_by_prefix_rev_iter(key_prefix) - .try_collect() + .try_collect::>() .await .unwrap(); let mut keys_request_rev = keys_request.clone(); keys_request_rev.reverse(); assert_eq!(keys_rev_iter, keys_request_rev); - let key_values_rev_iter: Vec<(Vec, Vec)> = store + let key_values_rev_iter = store .find_key_values_by_prefix_rev_iter(key_prefix) - .try_collect() + .try_collect::>() .await .unwrap(); let mut key_values_by_prefix_rev = key_values_by_prefix.clone(); From d46cfc49db5c3320c17e27ada1bbc59e64e05ef2 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 16:48:37 +0200 Subject: [PATCH 23/27] Simplify the trait implementation and rename _reverse_iterator as _rev_iter. --- linera-views/src/backends/rocks_db.rs | 6 ++-- linera-views/src/store.rs | 46 ++++++++++++--------------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 609f211b88c6..a77fa2d30685 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -210,7 +210,7 @@ impl RocksDbStoreExecutor { /// uses a fixed-length prefix extractor; without it, `seek_for_prev` would /// only search within the bloom-prefix scope and could miss keys whose /// extractor-prefix differs from the seek target. - fn get_find_prefix_reverse_iterator( + fn get_find_prefix_rev_iter( &self, prefix: &[u8], ) -> rocksdb::DBRawIteratorWithThreadMode<'_, DB> { @@ -614,7 +614,7 @@ impl RocksDbStoreInternal { let len = prefix.len(); let mut iter = match direction { IterDirection::Forward => executor.get_find_prefix_iterator(&prefix), - IterDirection::Reverse => executor.get_find_prefix_reverse_iterator(&prefix), + IterDirection::Reverse => executor.get_find_prefix_rev_iter(&prefix), }; while let Some(key) = iter.key() { if !key.starts_with(&prefix) { @@ -661,7 +661,7 @@ impl RocksDbStoreInternal { let len = prefix.len(); let mut iter = match direction { IterDirection::Forward => executor.get_find_prefix_iterator(&prefix), - IterDirection::Reverse => executor.get_find_prefix_reverse_iterator(&prefix), + IterDirection::Reverse => executor.get_find_prefix_rev_iter(&prefix), }; while let Some((key, value)) = iter.item() { if !key.starts_with(&prefix) { diff --git a/linera-views/src/store.rs b/linera-views/src/store.rs index 1b4d96bbf2a7..76cc248d7a99 100644 --- a/linera-views/src/store.rs +++ b/linera-views/src/store.rs @@ -5,7 +5,7 @@ use std::{fmt::Debug, future::Future, pin::Pin}; -use futures::stream::Stream; +use futures::stream::{self, Stream, TryStreamExt}; use serde::{de::DeserializeOwned, Serialize}; #[cfg(with_testing)] @@ -104,12 +104,11 @@ pub trait ReadableKeyValueStore: WithError { &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let keys = self.find_keys_by_prefix(key_prefix).await?; - for key in keys { - yield Ok(key); - } - }) + Box::pin( + stream::once(self.find_keys_by_prefix(key_prefix)) + .map_ok(|keys| stream::iter(keys.into_iter().map(Ok))) + .try_flatten(), + ) } /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys. @@ -128,12 +127,11 @@ pub trait ReadableKeyValueStore: WithError { &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let key_values = self.find_key_values_by_prefix(key_prefix).await?; - for key_value in key_values { - yield Ok(key_value); - } - }) + Box::pin( + stream::once(self.find_key_values_by_prefix(key_prefix)) + .map_ok(|kvs| stream::iter(kvs.into_iter().map(Ok))) + .try_flatten(), + ) } /// Returns a boxed stream that yields the keys matching the prefix in @@ -146,12 +144,11 @@ pub trait ReadableKeyValueStore: WithError { &'a self, key_prefix: &'a [u8], ) -> FindKeysStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let keys = self.find_keys_by_prefix(key_prefix).await?; - for key in keys.into_iter().rev() { - yield Ok(key); - } - }) + Box::pin( + stream::once(self.find_keys_by_prefix(key_prefix)) + .map_ok(|keys| stream::iter(keys.into_iter().rev().map(Ok))) + .try_flatten(), + ) } /// Returns a boxed stream that yields the `(key, value)` pairs matching the @@ -164,12 +161,11 @@ pub trait ReadableKeyValueStore: WithError { &'a self, key_prefix: &'a [u8], ) -> FindKeyValuesStream<'a, Self::Error> { - Box::pin(async_stream::stream! { - let key_values = self.find_key_values_by_prefix(key_prefix).await?; - for key_value in key_values.into_iter().rev() { - yield Ok(key_value); - } - }) + Box::pin( + stream::once(self.find_key_values_by_prefix(key_prefix)) + .map_ok(|kvs| stream::iter(kvs.into_iter().rev().map(Ok))) + .try_flatten(), + ) } // We can't use `async fn` here in the below implementations due to From 6bb04f04a4d9afc152984bf6c737591bf0978d47 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 16:58:19 +0200 Subject: [PATCH 24/27] Some edit. --- linera-views/src/backends/value_splitting.rs | 23 +++++++++++--------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index da18f7d84682..16fea9388467 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -339,18 +339,21 @@ where let index = Self::read_index_from_key(&big_key)?; big_key.truncate(big_key.len() - 4); let key = big_key; - let continues = match &state { - Some((current_key, top, segs)) => { - let segs_len = segs.len() as u32; - *current_key == key && segs_len <= *top && index == *top - segs_len + state = match state.take() { + Some((current_key, top, mut segs)) + if current_key == key + && (segs.len() as u32) <= top + && index == top - segs.len() as u32 => + { + segs.push(value); + Some((current_key, top, segs)) } - None => false, + // Either no prior state, or the prior state belongs to a + // different key / a non-contiguous index — drop it (those + // segments are leftovers from a deleted big value) and + // start a fresh accumulation. + _ => Some((key, index, vec![value])), }; - if continues { - state.as_mut().unwrap().2.push(value); - } else { - state = Some((key, index, vec![value])); - } if index == 0 { let (key, top, segs) = state.take().unwrap(); let segment_zero_value = segs.last().unwrap(); From 4483dc886e2911e5840c9b3ce19dbbc1c6ab98d3 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 16:58:40 +0200 Subject: [PATCH 25/27] Reformat. --- linera-views/src/backends/value_splitting.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 16fea9388467..b39ea80810ec 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -792,7 +792,10 @@ mod tests { .filter(|i| !to_delete.contains(i)) .map(|i| (keys[i].clone(), values[i].clone())) .collect::>(); - let expected_keys = expected_kv.iter().map(|(k, _)| k.clone()).collect::>(); + let expected_keys = expected_kv + .iter() + .map(|(k, _)| k.clone()) + .collect::>(); let expected_kv_rev = expected_kv.iter().rev().cloned().collect::>(); let expected_keys_rev = expected_keys.iter().rev().cloned().collect::>(); From 2dafd783e615db1f61f10358943a65b1438c06e6 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 17:01:34 +0200 Subject: [PATCH 26/27] Restructure the code. --- linera-views/src/backends/value_splitting.rs | 21 ++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index b39ea80810ec..5466ed0590d4 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -356,20 +356,21 @@ where }; if index == 0 { let (key, top, segs) = state.take().unwrap(); - let segment_zero_value = segs.last().unwrap(); - let count = Self::read_count_from_value(segment_zero_value)?; + let count = Self::read_count_from_value(segs.last().unwrap())?; if count == 0 || count > top + 1 { yield Err(ValueSplittingError::MissingSegment); return; } - let mut big_value = Vec::new(); - for (rev_pos, val) in segs.iter().rev().enumerate() { - let idx = rev_pos as u32; - if idx == 0 { - big_value.extend_from_slice(&val[4..]); - } else if idx < count { - big_value.extend_from_slice(val); - } + // `segs` is in push order — descending index with the master + // (index 0) at the end. Take the master's data (skipping its + // count prefix), then the `count - 1` lowest-indexed segments + // in ascending order. Any segments above index `count - 1` + // are leftovers from a previous longer write to this key and + // are silently dropped. + let (master, others) = segs.split_last().unwrap(); + let mut big_value = master[4..].to_vec(); + for val in others.iter().rev().take(count as usize - 1) { + big_value.extend_from_slice(val); } yield Ok((key, big_value)); } From 1dc8cadf82a663d165aa37ba60c37111faa4f212 Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 4 May 2026 18:38:36 +0200 Subject: [PATCH 27/27] Remove a possible deadlock in ScyllaDB and add a test for such deadlocks in run_reads. --- linera-views/src/backends/scylla_db.rs | 8 ++++++-- linera-views/src/test_utils/mod.rs | 28 +++++++++++++++++++------- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index c8a111639a04..23aedc8c0a19 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -758,7 +758,11 @@ impl ScyllaDbStoreInternal { Box::pin(async_stream::try_stream! { let key_prefix = key_prefix.to_vec(); ScyllaDbClient::check_key_size(&key_prefix)?; - let _guard = self.acquire().await; + // Intentionally no `self.acquire()` here: streaming iterators run + // for an unbounded duration and would otherwise hold a semaphore + // permit across user-controlled `await` points, leading to + // potential deadlocks when callers interleave other store + // operations (e.g. `read_value`) with iterator polling. let session = &self.store.session; let len = key_prefix.len(); let (query_unbounded, query_bounded) = match direction { @@ -797,7 +801,7 @@ impl ScyllaDbStoreInternal { Box::pin(async_stream::try_stream! { let key_prefix = key_prefix.to_vec(); ScyllaDbClient::check_key_size(&key_prefix)?; - let _guard = self.acquire().await; + // See `find_keys_stream` for why no semaphore permit is acquired. let session = &self.store.session; let len = key_prefix.len(); let (query_unbounded, query_bounded) = match direction { diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 87b5d1301e2e..929e69741408 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -9,7 +9,7 @@ pub mod performance; use std::collections::{BTreeMap, BTreeSet, HashSet}; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use rand::{seq::SliceRandom, Rng}; use crate::{ @@ -205,13 +205,27 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec } } assert_eq!(set_key_value1, set_key_value2); - // Streaming variants must agree with the eager methods. - let keys_iter = store - .find_keys_by_prefix_iter(key_prefix) - .try_collect::>() - .await - .unwrap(); + // Streaming variants must agree with the eager methods. The forward + // `find_keys_by_prefix_iter` is drained by interleaving each yielded + // key with a `read_value_bytes` call against the same store. Beyond + // checking the iter result, this exercises that holding the streaming + // iterator does not deadlock other operations on the same store + // (e.g. via a semaphore permit held across the stream body). + let mut keys_iter = Vec::new(); + let mut key_values_via_iter = Vec::new(); + { + let mut stream = store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + let key = item.unwrap(); + let mut full_key = key_prefix.to_vec(); + full_key.extend(&key); + let value = store.read_value_bytes(&full_key).await.unwrap().unwrap(); + keys_iter.push(key.clone()); + key_values_via_iter.push((key, value)); + } + } assert_eq!(keys_iter, keys_request); + assert_eq!(key_values_via_iter, key_values_by_prefix); let key_values_iter = store .find_key_values_by_prefix_iter(key_prefix) .try_collect::>()