diff --git a/Cargo.lock b/Cargo.lock index 771b561710cb..e80280a2bcb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6357,6 +6357,7 @@ dependencies = [ "anyhow", "async-graphql", "async-lock", + "async-stream", "aws-config", "aws-sdk-dynamodb", "aws-smithy-types", diff --git a/Cargo.toml b/Cargo.toml index 069029d969bd..21dbd2f908c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ async-graphql-axum = "=7.0.17" async-graphql-derive = "=7.0.17" async-graphql-value = { version = "=7.0.17", features = ["raw_value"] } async-lock = "3.3.0" +async-stream = "0.3" async-trait = "0.1.77" async-tungstenite = { version = "0.22", features = ["tokio-runtime"] } aws-smithy-types = "1.3.1" diff --git a/examples/Cargo.lock b/examples/Cargo.lock index a3b42efea752..3b00bcc6f68d 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -4121,6 +4121,7 @@ dependencies = [ "anyhow", "async-graphql", "async-lock", + "async-stream", "bcs", "cfg_aliases", "convert_case", diff --git a/linera-sdk/tests/fixtures/Cargo.lock b/linera-sdk/tests/fixtures/Cargo.lock index 83e09659f0d0..9a0af87028ac 100644 --- a/linera-sdk/tests/fixtures/Cargo.lock +++ b/linera-sdk/tests/fixtures/Cargo.lock @@ -2475,6 +2475,7 @@ dependencies = [ "anyhow", "async-graphql", "async-lock", + "async-stream", "bcs", "cfg_aliases", "convert_case", diff --git a/linera-views/Cargo.toml b/linera-views/Cargo.toml index 2fc1852081de..8e75d1c999bc 100644 --- a/linera-views/Cargo.toml +++ b/linera-views/Cargo.toml @@ -36,6 +36,7 @@ allocative.workspace = true anyhow.workspace = true async-graphql.workspace = true async-lock.workspace = true +async-stream.workspace = true aws-config = { workspace = true, optional = true } aws-sdk-dynamodb = { workspace = true, optional = true } aws-smithy-types = { workspace = true, optional = true } diff --git a/linera-views/src/backends/dual.rs b/linera-views/src/backends/dual.rs index 5a1c2ac8d912..7ea50d840d9e 100644 --- a/linera-views/src/backends/dual.rs +++ b/linera-views/src/backends/dual.rs @@ -3,6 +3,7 @@ //! Implements [`crate::store::KeyValueStore`] by combining two existing stores. +use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -11,8 +12,8 @@ use crate::store::TestKeyValueDatabase; use crate::{ batch::Batch, store::{ - KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError, - WritableKeyValueStore, + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, KeyValueStoreError, + ReadableKeyValueStore, WithError, WritableKeyValueStore, }, }; @@ -175,6 +176,24 @@ where Ok(result) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + match self { + Self::First(store) => Box::pin( + store + .find_keys_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_keys_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -191,6 +210,60 @@ where }; Ok(result) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + match self { + Self::First(store) => Box::pin( + store + .find_key_values_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_key_values_by_prefix_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } + } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + match self { + Self::First(store) => Box::pin( + store + .find_keys_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_keys_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + match self { + Self::First(store) => Box::pin( + store + .find_key_values_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::First)), + ), + Self::Second(store) => Box::pin( + store + .find_key_values_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(DualStoreError::Second)), + ), + } + } } impl WritableKeyValueStore for DualStore diff --git a/linera-views/src/backends/dynamo_db.rs b/linera-views/src/backends/dynamo_db.rs index d62ed2318296..5627654bbfef 100644 --- a/linera-views/src/backends/dynamo_db.rs +++ b/linera-views/src/backends/dynamo_db.rs @@ -47,8 +47,8 @@ use crate::{ journaling::JournalingKeyValueDatabase, lru_caching::{LruCachingConfig, LruCachingDatabase}, store::{ - DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, - WithError, + DirectWritableKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase, + KeyValueStoreError, ReadableKeyValueStore, WithError, }, value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; @@ -608,6 +608,7 @@ impl DynamoDbStoreInternal { start_key: &[u8], key_prefix: &[u8], start_key_map: Option>, + forward: bool, ) -> Result { let _guard = self.acquire().await; let start_key = start_key.to_vec(); @@ -627,6 +628,7 @@ impl DynamoDbStoreInternal { AttributeValue::B(Blob::new(prefixed_key_prefix)), ) .set_exclusive_start_key(start_key_map) + .scan_index_forward(forward) .send() .boxed_sync() .await?; @@ -685,7 +687,7 @@ impl DynamoDbStoreInternal { let mut start_key_map = None; loop { let response = self - .get_query_output(attribute, start_key, key_prefix, start_key_map) + .get_query_output(attribute, start_key, key_prefix, start_key_map, true) .await?; let last_evaluated = response.last_evaluated_key.clone(); responses.push(response); @@ -886,6 +888,13 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { .collect() } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.find_keys_stream(key_prefix, true) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -898,6 +907,96 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal { .map(|entry| entry.map(|(key, value)| (key.to_vec(), value.to_vec()))) .collect() } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.find_key_values_stream(key_prefix, true) + } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.find_keys_stream(key_prefix, false) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.find_key_values_stream(key_prefix, false) + } +} + +impl DynamoDbStoreInternal { + fn find_keys_stream<'a>( + &'a self, + key_prefix: &'a [u8], + forward: bool, + ) -> FindKeysStream<'a, DynamoDbStoreInternalError> { + Box::pin(async_stream::stream! { + check_key_size(key_prefix)?; + let prefix_len = key_prefix.len(); + let mut start_key_map = None; + loop { + let response = self + .get_query_output( + KEY_ATTRIBUTE, + &self.start_key, + key_prefix, + start_key_map, + forward, + ) + .await?; + let last_evaluated = response.last_evaluated_key.clone(); + for item in response.items.iter().flatten() { + yield extract_key(prefix_len, item).map(|k| k.to_vec()); + } + match last_evaluated { + None => break, + Some(value) => { + start_key_map = Some(value); + } + } + } + }) + } + + fn find_key_values_stream<'a>( + &'a self, + key_prefix: &'a [u8], + forward: bool, + ) -> FindKeyValuesStream<'a, DynamoDbStoreInternalError> { + Box::pin(async_stream::stream! { + check_key_size(key_prefix)?; + let prefix_len = key_prefix.len(); + let mut start_key_map = None; + loop { + let response = self + .get_query_output( + KEY_VALUE_ATTRIBUTE, + &self.start_key, + key_prefix, + start_key_map, + forward, + ) + .await?; + let last_evaluated = response.last_evaluated_key.clone(); + for item in response.items.iter().flatten() { + yield extract_key_value(prefix_len, item) + .map(|(k, v)| (k.to_vec(), v.to_vec())); + } + match last_evaluated { + None => break, + Some(value) => { + start_key_map = Some(value); + } + } + } + }) + } } impl DirectWritableKeyValueStore for DynamoDbStoreInternal { diff --git a/linera-views/src/backends/journaling.rs b/linera-views/src/backends/journaling.rs index 45cfe7d1ac4d..ff32c26479d8 100644 --- a/linera-views/src/backends/journaling.rs +++ b/linera-views/src/backends/journaling.rs @@ -19,6 +19,7 @@ //! time the data in a block are written, the journal header is updated in the same //! transaction to mark the block as processed. +use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; use static_assertions as sa; use thiserror::Error; @@ -26,8 +27,8 @@ use thiserror::Error; use crate::{ batch::{Batch, BatchValueWriter, DeletePrefixExpander, SimplifiedBatch}, store::{ - DirectKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, - WithError, WritableKeyValueStore, + DirectKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase, + KeyValueStoreError, ReadableKeyValueStore, WithError, WritableKeyValueStore, }, views::MIN_VIEW_TAG, }; @@ -196,12 +197,56 @@ where Ok(self.store.find_keys_by_prefix(key_prefix).await?) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin( + self.store + .find_keys_by_prefix_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], ) -> Result, Vec)>, Self::Error> { Ok(self.store.find_key_values_by_prefix(key_prefix).await?) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin( + self.store + .find_key_values_by_prefix_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) + } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin( + self.store + .find_keys_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin( + self.store + .find_key_values_by_prefix_rev_iter(key_prefix) + .map(|item| item.map_err(JournalingError::Inner)), + ) + } } impl KeyValueDatabase for JournalingKeyValueDatabase diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index 98e75b8b54f5..523778e62a26 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex}; +use futures::stream::{self, StreamExt}; use serde::{Deserialize, Serialize}; #[cfg(with_testing)] @@ -14,7 +15,10 @@ use crate::store::TestKeyValueDatabase; use crate::{ batch::{Batch, WriteOperation}, lru_prefix_cache::{LruPrefixCache, StorageCacheConfig}, - store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore}, + store::{ + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, ReadableKeyValueStore, WithError, + WritableKeyValueStore, + }, }; #[cfg(with_metrics)] @@ -317,6 +321,43 @@ where Ok(keys) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_keys_by_prefix_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_keys(key_prefix) + }; + if let Some(keys) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(keys.into_iter().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + // Forward the inner stream while accumulating; if (and only if) the + // consumer drains it fully without errors, populate the cache. + Box::pin(async_stream::try_stream! { + let mut accumulated = Vec::new(); + let mut inner = self.store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = inner.next().await { + let key = item?; + accumulated.push(key.clone()); + yield key; + } + let mut cache = cache.lock().unwrap(); + cache.insert_find_keys(key_prefix.to_vec(), &accumulated); + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -343,6 +384,114 @@ where cache.insert_find_key_values(key_prefix.to_vec(), &key_values); Ok(key_values) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_key_values_by_prefix_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_key_values(key_prefix) + }; + if let Some(key_values) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(key_values.into_iter().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + Box::pin(async_stream::try_stream! { + let mut accumulated = Vec::new(); + let mut inner = self.store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = inner.next().await { + let kv = item?; + accumulated.push(kv.clone()); + yield kv; + } + let mut cache = cache.lock().unwrap(); + cache.insert_find_key_values(key_prefix.to_vec(), &accumulated); + }) + } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_keys_by_prefix_rev_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_keys(key_prefix) + }; + if let Some(keys) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(keys.into_iter().rev().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEYS_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + Box::pin(async_stream::try_stream! { + let mut accumulated = Vec::new(); + let mut inner = self.store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = inner.next().await { + let key = item?; + accumulated.push(key.clone()); + yield key; + } + // Cache stores keys in ascending order, shared with the forward iter. + accumulated.reverse(); + let mut cache = cache.lock().unwrap(); + cache.insert_find_keys(key_prefix.to_vec(), &accumulated); + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + let Some(cache) = self.get_exclusive_cache().cloned() else { + return self.store.find_key_values_by_prefix_rev_iter(key_prefix); + }; + let cached = { + let mut cache = cache.lock().unwrap(); + cache.query_find_key_values(key_prefix) + }; + if let Some(key_values) = cached { + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_HIT_COUNT + .with_label_values(&[]) + .inc(); + return Box::pin(stream::iter(key_values.into_iter().rev().map(Ok))); + } + #[cfg(with_metrics)] + metrics::FIND_KEY_VALUES_BY_PREFIX_CACHE_MISS_COUNT + .with_label_values(&[]) + .inc(); + Box::pin(async_stream::try_stream! { + let mut accumulated = Vec::new(); + let mut inner = self.store.find_key_values_by_prefix_rev_iter(key_prefix); + while let Some(item) = inner.next().await { + let kv = item?; + accumulated.push(kv.clone()); + yield kv; + } + accumulated.reverse(); + let mut cache = cache.lock().unwrap(); + cache.insert_find_key_values(key_prefix.to_vec(), &accumulated); + }) + } } impl WritableKeyValueStore for LruCachingStore @@ -499,3 +648,95 @@ where }) } } + +#[cfg(test)] +mod tests { + // The strategy of every test below: build an `LruCachingStore` over a + // shared inner `MemoryStore`, populate the cache through reads/writes done + // via the LRU layer, then mutate the inner store *directly* (bypassing the + // cache). A subsequent read through the LRU layer that still observes the + // original value proves the cache actually served the request — if it + // hadn't, the read would have hit the inner store and seen the new value. + + use futures::stream::TryStreamExt; + + use crate::{ + backends::lru_caching::{LruCachingStore, DEFAULT_STORAGE_CACHE_CONFIG}, + batch::Batch, + memory::MemoryStore, + store::{ReadableKeyValueStore, WritableKeyValueStore}, + }; + + fn make_lru() -> (MemoryStore, LruCachingStore) { + let inner = MemoryStore::new_for_testing(); + let lru = LruCachingStore::new( + inner.clone(), + DEFAULT_STORAGE_CACHE_CONFIG, + /* has_exclusive_access */ true, + ); + (inner, lru) + } + + async fn put_direct(inner: &MemoryStore, key: &[u8], value: &[u8]) { + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.to_vec(), value.to_vec()); + inner.write_batch(batch).await.unwrap(); + } + + #[tokio::test] + async fn test_lru_cache_serves_find_by_prefix() { + let (inner, lru) = make_lru(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(vec![1, 0], vec![10]); + batch.put_key_value_bytes(vec![1, 1], vec![11]); + lru.write_batch(batch).await.unwrap(); + + // Populate the find-keys / find-key-values caches for prefix [1]. + let keys = lru.find_keys_by_prefix(&[1]).await.unwrap(); + assert_eq!(keys, vec![vec![0], vec![1]]); + let kv = lru.find_key_values_by_prefix(&[1]).await.unwrap(); + assert_eq!(kv, vec![(vec![0], vec![10]), (vec![1], vec![11])]); + + // Diverge the inner store: add a new key under the prefix and mutate + // an existing one — neither should be visible through the LRU layer. + put_direct(&inner, &[1, 2], &[12]).await; + put_direct(&inner, &[1, 0], &[99]).await; + + assert_eq!( + lru.find_keys_by_prefix(&[1]).await.unwrap(), + vec![vec![0], vec![1]] + ); + assert_eq!( + lru.find_key_values_by_prefix(&[1]).await.unwrap(), + vec![(vec![0], vec![10]), (vec![1], vec![11])] + ); + + // The streaming variants share the same cache. + let iter_keys = lru + .find_keys_by_prefix_iter(&[1]) + .try_collect::>() + .await + .unwrap(); + assert_eq!(iter_keys, vec![vec![0], vec![1]]); + let iter_kv = lru + .find_key_values_by_prefix_iter(&[1]) + .try_collect::>() + .await + .unwrap(); + assert_eq!(iter_kv, vec![(vec![0], vec![10]), (vec![1], vec![11])]); + + // The reverse streaming variants serve the cached entries in reverse. + let rev_keys = lru + .find_keys_by_prefix_rev_iter(&[1]) + .try_collect::>() + .await + .unwrap(); + assert_eq!(rev_keys, vec![vec![1], vec![0]]); + let rev_kv = lru + .find_key_values_by_prefix_rev_iter(&[1]) + .try_collect::>() + .await + .unwrap(); + assert_eq!(rev_kv, vec![(vec![1], vec![11]), (vec![0], vec![10])]); + } +} diff --git a/linera-views/src/backends/metering.rs b/linera-views/src/backends/metering.rs index f63c18cefe7e..3751614f1f91 100644 --- a/linera-views/src/backends/metering.rs +++ b/linera-views/src/backends/metering.rs @@ -19,7 +19,10 @@ use prometheus::{exponential_buckets, HistogramVec, IntCounterVec}; use crate::store::TestKeyValueDatabase; use crate::{ batch::Batch, - store::{KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore}, + store::{ + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, ReadableKeyValueStore, WithError, + WritableKeyValueStore, + }, }; #[derive(Clone)] @@ -438,6 +441,17 @@ where Ok(result) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.counter + .find_keys_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_keys_by_prefix_iter(key_prefix) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -465,6 +479,39 @@ where .observe(key_values_size as f64); Ok(result) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.counter + .find_key_values_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_key_values_by_prefix_iter(key_prefix) + } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.counter + .find_keys_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_keys_by_prefix_rev_iter(key_prefix) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.counter + .find_key_values_by_prefix_prefix_size + .with_label_values(&[]) + .observe(key_prefix.len() as f64); + self.store.find_key_values_by_prefix_rev_iter(key_prefix) + } } impl WritableKeyValueStore for MeteredStore diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 720e892df87e..a77fa2d30685 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -29,8 +29,8 @@ use crate::{ common::get_upper_bound_option, lru_caching::{LruCachingConfig, LruCachingDatabase}, store::{ - KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError, - WritableKeyValueStore, + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, KeyValueStoreError, + ReadableKeyValueStore, WithError, WritableKeyValueStore, }, value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; @@ -70,6 +70,13 @@ const HYPER_CLOCK_CACHE_BLOCK_SIZE: usize = 8 * 1024; // 8 KiB /// The RocksDB client that we use. type DB = rocksdb::DBWithThreadMode; +/// Iteration direction selector used by the streaming `find_*_iter` helpers. +#[derive(Clone, Copy)] +enum IterDirection { + Forward, + Reverse, +} + /// The choice of the spawning mode. /// `SpawnBlocking` always works and is the safest. /// `BlockInPlace` can only be used in multi-threaded environment. @@ -198,6 +205,32 @@ impl RocksDbStoreExecutor { iter } + /// Returns an iterator positioned at the largest key starting with `prefix`, + /// iterating in reverse. `total_order_seek` is enabled because the database + /// uses a fixed-length prefix extractor; without it, `seek_for_prev` would + /// only search within the bloom-prefix scope and could miss keys whose + /// extractor-prefix differs from the seek target. + fn get_find_prefix_rev_iter( + &self, + prefix: &[u8], + ) -> rocksdb::DBRawIteratorWithThreadMode<'_, DB> { + let mut read_opts = rocksdb::ReadOptions::default(); + read_opts.set_async_io(true); + read_opts.set_total_order_seek(true); + let upper_bound = get_upper_bound_option(prefix); + let mut iter = self.db.raw_iterator_opt(read_opts); + match upper_bound.as_deref() { + Some(ub) => { + iter.seek_for_prev(ub); + if iter.key().is_some_and(|k| k == ub) { + iter.prev(); + } + } + None => iter.seek_to_last(), + } + iter + } + fn find_keys_by_prefix_internal( &self, key_prefix: Vec, @@ -539,6 +572,120 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { ) .await } + + fn find_keys_by_prefix_iter(&self, key_prefix: &[u8]) -> FindKeysStream<'_, Self::Error> { + self.find_keys_stream(key_prefix, IterDirection::Forward) + } + + fn find_key_values_by_prefix_iter( + &self, + key_prefix: &[u8], + ) -> FindKeyValuesStream<'_, Self::Error> { + self.find_key_values_stream(key_prefix, IterDirection::Forward) + } + + fn find_keys_by_prefix_rev_iter(&self, key_prefix: &[u8]) -> FindKeysStream<'_, Self::Error> { + self.find_keys_stream(key_prefix, IterDirection::Reverse) + } + + fn find_key_values_by_prefix_rev_iter( + &self, + key_prefix: &[u8], + ) -> FindKeyValuesStream<'_, Self::Error> { + self.find_key_values_stream(key_prefix, IterDirection::Reverse) + } +} + +impl RocksDbStoreInternal { + fn find_keys_stream( + &self, + key_prefix: &[u8], + direction: IterDirection, + ) -> FindKeysStream<'_, RocksDbStoreInternalError> { + let executor = self.executor.clone(); + let key_prefix = key_prefix.to_vec(); + Box::pin(async_stream::try_stream! { + check_key_size(&key_prefix)?; + let (tx, mut rx) = + tokio::sync::mpsc::channel::, RocksDbStoreInternalError>>(1); + let handle = tokio::task::spawn_blocking(move || { + let mut prefix = executor.start_key.clone(); + prefix.extend(&key_prefix); + let len = prefix.len(); + let mut iter = match direction { + IterDirection::Forward => executor.get_find_prefix_iterator(&prefix), + IterDirection::Reverse => executor.get_find_prefix_rev_iter(&prefix), + }; + while let Some(key) = iter.key() { + if !key.starts_with(&prefix) { + break; + } + if tx.blocking_send(Ok(key[len..].to_vec())).is_err() { + return; + } + match direction { + IterDirection::Forward => iter.next(), + IterDirection::Reverse => iter.prev(), + } + } + if let Err(error) = iter.status() { + _ = tx.blocking_send(Err(error.into())); + } + }); + // The blocking task sends 0+ Ok items followed by at most one + // terminal Err; `?` short-circuits on that Err. If the stream ends + // cleanly we still await the join handle so a panic in the task is + // surfaced as a TokioJoinError. + while let Some(item) = rx.recv().await { + yield item?; + } + handle.await.map_err(RocksDbStoreInternalError::TokioJoinError)?; + }) + } + + fn find_key_values_stream( + &self, + key_prefix: &[u8], + direction: IterDirection, + ) -> FindKeyValuesStream<'_, RocksDbStoreInternalError> { + let executor = self.executor.clone(); + let key_prefix = key_prefix.to_vec(); + Box::pin(async_stream::try_stream! { + check_key_size(&key_prefix)?; + let (tx, mut rx) = tokio::sync::mpsc::channel::< + Result<(Vec, Vec), RocksDbStoreInternalError>, + >(1); + let handle = tokio::task::spawn_blocking(move || { + let mut prefix = executor.start_key.clone(); + prefix.extend(&key_prefix); + let len = prefix.len(); + let mut iter = match direction { + IterDirection::Forward => executor.get_find_prefix_iterator(&prefix), + IterDirection::Reverse => executor.get_find_prefix_rev_iter(&prefix), + }; + while let Some((key, value)) = iter.item() { + if !key.starts_with(&prefix) { + break; + } + let pair = (key[len..].to_vec(), value.to_vec()); + if tx.blocking_send(Ok(pair)).is_err() { + return; + } + match direction { + IterDirection::Forward => iter.next(), + IterDirection::Reverse => iter.prev(), + } + } + if let Err(error) = iter.status() { + _ = tx.blocking_send(Err(error.into())); + } + }); + while let Some(item) = rx.recv().await { + yield item?; + } + handle.await.map_err(RocksDbStoreInternalError::TokioJoinError)?; + }) + } } impl WritableKeyValueStore for RocksDbStoreInternal { diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 28ee7eca7a98..23aedc8c0a19 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -47,8 +47,8 @@ use crate::{ journaling::{JournalingError, JournalingKeyValueDatabase}, lru_caching::{LruCachingConfig, LruCachingDatabase}, store::{ - DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, - WithError, + DirectWritableKeyValueStore, FindKeyValuesStream, FindKeysStream, KeyValueDatabase, + KeyValueStoreError, ReadableKeyValueStore, WithError, }, value_splitting::{ValueSplittingDatabase, ValueSplittingError}, }; @@ -96,6 +96,13 @@ const MAX_BATCH_SIZE: usize = 5000; /// The keyspace to use for the ScyllaDB database. const KEYSPACE: &str = "kv"; +/// Iteration direction selector used by the streaming `find_*_iter` helpers. +#[derive(Clone, Copy)] +enum IterDirection { + Forward, + Reverse, +} + /// The client for ScyllaDB: /// * The session allows to pass queries /// * The namespace that is being assigned to the database @@ -113,6 +120,10 @@ struct ScyllaDbClient { find_keys_by_prefix_bounded: PreparedStatement, find_key_values_by_prefix_unbounded: PreparedStatement, find_key_values_by_prefix_bounded: PreparedStatement, + find_keys_by_prefix_rev_unbounded: PreparedStatement, + find_keys_by_prefix_rev_bounded: PreparedStatement, + find_key_values_by_prefix_rev_unbounded: PreparedStatement, + find_key_values_by_prefix_rev_bounded: PreparedStatement, multi_key_values: papaya::HashMap, multi_keys: papaya::HashMap, } @@ -180,6 +191,34 @@ impl ScyllaDbClient { )) .await?; + let find_keys_by_prefix_rev_unbounded = session + .prepare(format!( + "SELECT k FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? ORDER BY k DESC" + )) + .await?; + + let find_keys_by_prefix_rev_bounded = session + .prepare(format!( + "SELECT k FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? AND k < ? ORDER BY k DESC" + )) + .await?; + + let find_key_values_by_prefix_rev_unbounded = session + .prepare(format!( + "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? ORDER BY k DESC" + )) + .await?; + + let find_key_values_by_prefix_rev_bounded = session + .prepare(format!( + "SELECT k,v FROM {KEYSPACE}.\"{namespace}\" \ + WHERE root_key = ? AND k >= ? AND k < ? ORDER BY k DESC" + )) + .await?; + Ok(Self { session, namespace, @@ -193,6 +232,10 @@ impl ScyllaDbClient { find_keys_by_prefix_bounded, find_key_values_by_prefix_unbounded, find_key_values_by_prefix_bounded, + find_keys_by_prefix_rev_unbounded, + find_keys_by_prefix_rev_bounded, + find_key_values_by_prefix_rev_unbounded, + find_key_values_by_prefix_rev_bounded, multi_key_values: papaya::HashMap::new(), multi_keys: papaya::HashMap::new(), }) @@ -667,6 +710,13 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { Box::pin(store.find_keys_by_prefix_internal(&self.root_key, key_prefix.to_vec())).await } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.find_keys_stream(key_prefix, IterDirection::Forward) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -676,6 +726,111 @@ impl ReadableKeyValueStore for ScyllaDbStoreInternal { Box::pin(store.find_key_values_by_prefix_internal(&self.root_key, key_prefix.to_vec())) .await } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.find_key_values_stream(key_prefix, IterDirection::Forward) + } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + self.find_keys_stream(key_prefix, IterDirection::Reverse) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + self.find_key_values_stream(key_prefix, IterDirection::Reverse) + } +} + +impl ScyllaDbStoreInternal { + fn find_keys_stream<'a>( + &'a self, + key_prefix: &'a [u8], + direction: IterDirection, + ) -> FindKeysStream<'a, ScyllaDbStoreInternalError> { + Box::pin(async_stream::try_stream! { + let key_prefix = key_prefix.to_vec(); + ScyllaDbClient::check_key_size(&key_prefix)?; + // Intentionally no `self.acquire()` here: streaming iterators run + // for an unbounded duration and would otherwise hold a semaphore + // permit across user-controlled `await` points, leading to + // potential deadlocks when callers interleave other store + // operations (e.g. `read_value`) with iterator polling. + let session = &self.store.session; + let len = key_prefix.len(); + let (query_unbounded, query_bounded) = match direction { + IterDirection::Forward => ( + &self.store.find_keys_by_prefix_unbounded, + &self.store.find_keys_by_prefix_bounded, + ), + IterDirection::Reverse => ( + &self.store.find_keys_by_prefix_rev_unbounded, + &self.store.find_keys_by_prefix_rev_bounded, + ), + }; + let rows = match get_upper_bound_option(&key_prefix) { + None => { + let values = (self.root_key.clone(), key_prefix.clone()); + Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? + } + Some(upper_bound) => { + let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); + Box::pin(session.execute_iter(query_bounded.clone(), values)).await? + } + }; + let mut rows = rows.rows_stream::<(Vec,)>()?; + while let Some(row) = rows.next().await { + let (key,) = row?; + yield key[len..].to_vec(); + } + }) + } + + fn find_key_values_stream<'a>( + &'a self, + key_prefix: &'a [u8], + direction: IterDirection, + ) -> FindKeyValuesStream<'a, ScyllaDbStoreInternalError> { + Box::pin(async_stream::try_stream! { + let key_prefix = key_prefix.to_vec(); + ScyllaDbClient::check_key_size(&key_prefix)?; + // See `find_keys_stream` for why no semaphore permit is acquired. + let session = &self.store.session; + let len = key_prefix.len(); + let (query_unbounded, query_bounded) = match direction { + IterDirection::Forward => ( + &self.store.find_key_values_by_prefix_unbounded, + &self.store.find_key_values_by_prefix_bounded, + ), + IterDirection::Reverse => ( + &self.store.find_key_values_by_prefix_rev_unbounded, + &self.store.find_key_values_by_prefix_rev_bounded, + ), + }; + let rows = match get_upper_bound_option(&key_prefix) { + None => { + let values = (self.root_key.clone(), key_prefix.clone()); + Box::pin(session.execute_iter(query_unbounded.clone(), values)).await? + } + Some(upper_bound) => { + let values = (self.root_key.clone(), key_prefix.clone(), upper_bound); + Box::pin(session.execute_iter(query_bounded.clone(), values)).await? + } + }; + let mut rows = rows.rows_stream::<(Vec, Vec)>()?; + while let Some(row) = rows.next().await { + let (key, value) = row?; + yield (key[len..].to_vec(), value); + } + }) + } } impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { diff --git a/linera-views/src/backends/value_splitting.rs b/linera-views/src/backends/value_splitting.rs index 841c8a6b3cc9..5466ed0590d4 100644 --- a/linera-views/src/backends/value_splitting.rs +++ b/linera-views/src/backends/value_splitting.rs @@ -3,14 +3,15 @@ //! Adds support for large values to a given store by splitting them between several keys. +use futures::stream::StreamExt; use linera_base::ensure; use thiserror::Error; use crate::{ batch::{Batch, WriteOperation}, store::{ - KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore, WithError, - WritableKeyValueStore, + FindKeyValuesStream, FindKeysStream, KeyValueDatabase, KeyValueStoreError, + ReadableKeyValueStore, WithError, WritableKeyValueStore, }, }; #[cfg(with_testing)] @@ -221,6 +222,22 @@ where Ok(keys) } + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + let big_key = item.map_err(ValueSplittingError::InnerStoreError)?; + let len = big_key.len(); + if Self::read_index_from_key(&big_key)? == 0 { + yield Ok(big_key[0..len - 4].to_vec()); + } + } + }) + } + async fn find_key_values_by_prefix( &self, key_prefix: &[u8], @@ -252,6 +269,114 @@ where } Ok(key_values) } + + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_key_values_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + let (mut big_key, value) = + item.map_err(ValueSplittingError::InnerStoreError)?; + if Self::read_index_from_key(&big_key)? != 0 { + continue; // Leftover segment from an earlier value. + } + big_key.truncate(big_key.len() - 4); + let key = big_key; + let count = Self::read_count_from_value(&value)?; + let mut big_value = value[4..].to_vec(); + for idx in 1..count { + let next = stream.next().await + .ok_or(ValueSplittingError::MissingSegment)?; + let (big_key, value) = + next.map_err(ValueSplittingError::InnerStoreError)?; + if !(Self::read_index_from_key(&big_key)? == idx + && big_key.starts_with(&key) + && big_key.len() == key.len() + 4) + { + yield Err(ValueSplittingError::MissingSegment); + return; + } + big_value.extend(value); + } + yield Ok((key, big_value)); + } + }) + } + + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_keys_by_prefix_rev_iter(key_prefix); + while let Some(item) = stream.next().await { + let big_key = item.map_err(ValueSplittingError::InnerStoreError)?; + let len = big_key.len(); + if Self::read_index_from_key(&big_key)? == 0 { + yield Ok(big_key[0..len - 4].to_vec()); + } + } + }) + } + + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin(async_stream::stream! { + let mut stream = self.store.find_key_values_by_prefix_rev_iter(key_prefix); + // Accumulator: (current_key, top_index, segments_descending). + // segments[i] is the segment at index `top_index - i`. + // An accumulation that never reaches index 0 is composed of + // leftover segments (from a previously deleted big value, since + // delete_key only removes the master) and is silently dropped. + let mut state: Option<(Vec, u32, Vec>)> = None; + while let Some(item) = stream.next().await { + let (mut big_key, value) = + item.map_err(ValueSplittingError::InnerStoreError)?; + let index = Self::read_index_from_key(&big_key)?; + big_key.truncate(big_key.len() - 4); + let key = big_key; + state = match state.take() { + Some((current_key, top, mut segs)) + if current_key == key + && (segs.len() as u32) <= top + && index == top - segs.len() as u32 => + { + segs.push(value); + Some((current_key, top, segs)) + } + // Either no prior state, or the prior state belongs to a + // different key / a non-contiguous index — drop it (those + // segments are leftovers from a deleted big value) and + // start a fresh accumulation. + _ => Some((key, index, vec![value])), + }; + if index == 0 { + let (key, top, segs) = state.take().unwrap(); + let count = Self::read_count_from_value(segs.last().unwrap())?; + if count == 0 || count > top + 1 { + yield Err(ValueSplittingError::MissingSegment); + return; + } + // `segs` is in push order — descending index with the master + // (index 0) at the end. Take the master's data (skipping its + // count prefix), then the `count - 1` lowest-indexed segments + // in ascending order. Any segments above index `count - 1` + // are leftovers from a previous longer write to this key and + // are silently dropped. + let (master, others) = segs.split_last().unwrap(); + let mut big_value = master[4..].to_vec(); + for val in others.iter().rev().take(count as usize - 1) { + big_value.extend_from_slice(val); + } + yield Ok((key, big_value)); + } + } + }) + } } impl WritableKeyValueStore for ValueSplittingStore @@ -513,10 +638,13 @@ pub fn create_value_splitting_memory_store() -> ValueSplittingStore> = (1u8..=8u8).map(|b| vec![b, 0, 0]).collect(); + + // Mix of small (single-segment) and large (multi-segment) values, so both + // code paths in the iterators are exercised. + let lengths = [ + 10, + MAX_LEN + 5, + MAX_LEN - 10, + 2 * MAX_LEN + 7, + 3 * MAX_LEN - 4, + 50, + 2 * MAX_LEN, + 4 * MAX_LEN + 1, + ]; + let mut rng = crate::random::make_deterministic_rng(); + let values = lengths + .iter() + .map(|&len| (0..len).map(|_| rng.gen::()).collect()) + .collect::>>(); + + // Write all values. + let mut batch = Batch::new(); + for (key, value) in keys.iter().zip(values.iter()) { + batch.put_key_value_bytes(key.clone(), value.clone()); + } + big_store.write_batch(batch).await.unwrap(); + + // Delete a few via DeleteKey: for the multi-segment ones only the master + // segment is removed, so leftover continuation segments remain in the + // underlying store and the iterators must skip them. + let to_delete = [1usize, 3, 6]; + let mut batch = Batch::new(); + for &i in &to_delete { + batch.delete_key(keys[i].clone()); + } + big_store.write_batch(batch).await.unwrap(); + + // Surviving logical entries, in ascending key order. + let expected_kv = (0..keys.len()) + .filter(|i| !to_delete.contains(i)) + .map(|i| (keys[i].clone(), values[i].clone())) + .collect::>(); + let expected_keys = expected_kv + .iter() + .map(|(k, _)| k.clone()) + .collect::>(); + let expected_kv_rev = expected_kv.iter().rev().cloned().collect::>(); + let expected_keys_rev = expected_keys.iter().rev().cloned().collect::>(); + + // 1. find_keys_by_prefix_iter + let mut stream = big_store.find_keys_by_prefix_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_keys); + + // 2. find_key_values_by_prefix_iter + let mut stream = big_store.find_key_values_by_prefix_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_kv); + + // 3. find_keys_by_prefix_rev_iter + let mut stream = big_store.find_keys_by_prefix_rev_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_keys_rev); + + // 4. find_key_values_by_prefix_rev_iter + let mut stream = big_store.find_key_values_by_prefix_rev_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, expected_kv_rev); + } + + // Overwriting a long value with a shorter one only rewrites the segments up + // to the new count — the high-index segments of the original write remain in + // the underlying store as leftovers. The reverse iterator collects them all + // (it walks indices top..0 to find the master) and then must drop the + // ones whose index is >= the new count when reconstructing the value. + #[tokio::test] + async fn test_value_splitting5_rev_iter_drops_overwrite_leftovers() { + let big_store = create_value_splitting_memory_store(); + let key = vec![0, 0]; + + // Write a 250-byte value: with MAX_VALUE_SIZE = 100 this becomes 3 + // segments (96 bytes at index 0 plus the count, then 100 + 54 bytes + // at indices 1 and 2). + let mut rng = crate::random::make_deterministic_rng(); + let long_value = (0..250).map(|_| rng.gen::()).collect::>(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.clone(), long_value); + big_store.write_batch(batch).await.unwrap(); + + // Overwrite with a single-segment value. Only index 0 is rewritten; + // indices 1 and 2 stay in the underlying store as orphan segments. + let short_value = (0..10).map(|_| rng.gen::()).collect::>(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.clone(), short_value.clone()); + big_store.write_batch(batch).await.unwrap(); + + // The reverse iterator must accumulate all three segments (it sees them + // in order index 2, 1, 0) and then, when reconstructing, skip the two + // leftovers because their position in the reversed segs list is >= the + // new count of 1. + let mut stream = big_store.find_key_values_by_prefix_rev_iter(&[]); + let mut got = Vec::new(); + while let Some(item) = stream.next().await { + got.push(item.unwrap()); + } + assert_eq!(got, vec![(key, short_value)]); + } } diff --git a/linera-views/src/store.rs b/linera-views/src/store.rs index f449749f6d24..76cc248d7a99 100644 --- a/linera-views/src/store.rs +++ b/linera-views/src/store.rs @@ -3,8 +3,9 @@ //! This provides the trait definitions for the stores. -use std::{fmt::Debug, future::Future}; +use std::{fmt::Debug, future::Future, pin::Pin}; +use futures::stream::{self, Stream, TryStreamExt}; use serde::{de::DeserializeOwned, Serialize}; #[cfg(with_testing)] @@ -15,6 +16,24 @@ use crate::{ ViewError, }; +/// A boxed stream for iterating over keys returned by `find_keys_by_prefix_iter`. +#[cfg(not(web))] +pub type FindKeysStream<'a, E> = Pin, E>> + Send + 'a>>; + +/// A boxed stream for iterating over keys returned by `find_keys_by_prefix_iter`. +#[cfg(web)] +pub type FindKeysStream<'a, E> = Pin, E>> + 'a>>; + +/// A boxed stream for iterating over key/value pairs returned by `find_key_values_by_prefix_iter`. +#[cfg(not(web))] +pub type FindKeyValuesStream<'a, E> = + Pin, Vec), E>> + Send + 'a>>; + +/// A boxed stream for iterating over key/value pairs returned by `find_key_values_by_prefix_iter`. +#[cfg(web)] +pub type FindKeyValuesStream<'a, E> = + Pin, Vec), E>> + 'a>>; + /// The error type for the key-value stores. pub trait KeyValueStoreError: std::error::Error + From + Debug + Send + Sync + 'static @@ -76,12 +95,79 @@ pub trait ReadableKeyValueStore: WithError { /// Finds the `key` matching the prefix. The prefix is not included in the returned keys. async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result>, Self::Error>; + /// Returns a boxed stream that yields the keys matching the prefix one by one. + /// The prefix is not included in the returned keys. The stream may be dropped + /// early to stop the iteration. The default implementation downloads the full + /// list eagerly via `find_keys_by_prefix` and yields the entries afterwards; + /// backends should override this to stream incrementally whenever possible. + fn find_keys_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin( + stream::once(self.find_keys_by_prefix(key_prefix)) + .map_ok(|keys| stream::iter(keys.into_iter().map(Ok))) + .try_flatten(), + ) + } + /// Finds the `(key,value)` pairs matching the prefix. The prefix is not included in the returned keys. async fn find_key_values_by_prefix( &self, key_prefix: &[u8], ) -> Result, Vec)>, Self::Error>; + /// Returns a boxed stream that yields the `(key, value)` pairs matching the + /// prefix one by one. The prefix is not included in the returned keys. The + /// stream may be dropped early to stop the iteration. The default + /// implementation downloads the full list eagerly via `find_key_values_by_prefix` + /// and yields the entries afterwards; backends should override this to stream + /// incrementally whenever possible. + fn find_key_values_by_prefix_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin( + stream::once(self.find_key_values_by_prefix(key_prefix)) + .map_ok(|kvs| stream::iter(kvs.into_iter().map(Ok))) + .try_flatten(), + ) + } + + /// Returns a boxed stream that yields the keys matching the prefix in + /// reverse (descending) order. The prefix is not included in the returned + /// keys. The stream may be dropped early to stop the iteration. The default + /// implementation downloads the full list eagerly via `find_keys_by_prefix` + /// and yields the entries in reverse afterwards; backends should override + /// this to stream incrementally whenever possible. + fn find_keys_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeysStream<'a, Self::Error> { + Box::pin( + stream::once(self.find_keys_by_prefix(key_prefix)) + .map_ok(|keys| stream::iter(keys.into_iter().rev().map(Ok))) + .try_flatten(), + ) + } + + /// Returns a boxed stream that yields the `(key, value)` pairs matching the + /// prefix in reverse (descending) order. The prefix is not included in the + /// returned keys. The stream may be dropped early to stop the iteration. + /// The default implementation downloads the full list eagerly via + /// `find_key_values_by_prefix` and yields the entries in reverse afterwards; + /// backends should override this to stream incrementally whenever possible. + fn find_key_values_by_prefix_rev_iter<'a>( + &'a self, + key_prefix: &'a [u8], + ) -> FindKeyValuesStream<'a, Self::Error> { + Box::pin( + stream::once(self.find_key_values_by_prefix(key_prefix)) + .map_ok(|kvs| stream::iter(kvs.into_iter().rev().map(Ok))) + .try_flatten(), + ) + } + // We can't use `async fn` here in the below implementations due to // https://github.com/rust-lang/impl-trait-utils/issues/17, but once that bug is fixed // we can revert them to `async fn` syntax, which is neater. diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 5a36fadc7f42..929e69741408 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -9,6 +9,7 @@ pub mod performance; use std::collections::{BTreeMap, BTreeSet, HashSet}; +use futures::{StreamExt, TryStreamExt}; use rand::{seq::SliceRandom, Rng}; use crate::{ @@ -162,6 +163,8 @@ pub fn span_random_reordering_put_delete( /// * `read_value_bytes` /// * `read_multi_values_bytes` /// * `find_keys_by_prefix` / `find_key_values_by_prefix` +/// * `find_keys_by_prefix_iter` / `find_key_values_by_prefix_iter` (streaming +/// variants, including early termination) /// * The ordering of keys returned by `find_keys_by_prefix` and `find_key_values_by_prefix` pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec)>) { // We need a nontrivial key_prefix because dynamo requires a non-trivial prefix @@ -184,7 +187,7 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec let mut set_key_value1 = HashSet::new(); let mut keys_request_deriv = Vec::new(); let key_values_by_prefix = store.find_key_values_by_prefix(key_prefix).await.unwrap(); - for (key, value) in key_values_by_prefix { + for (key, value) in key_values_by_prefix.clone() { keys_request_deriv.push(key.clone()); set_key_value1.insert((key, value)); } @@ -202,6 +205,50 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec } } assert_eq!(set_key_value1, set_key_value2); + // Streaming variants must agree with the eager methods. The forward + // `find_keys_by_prefix_iter` is drained by interleaving each yielded + // key with a `read_value_bytes` call against the same store. Beyond + // checking the iter result, this exercises that holding the streaming + // iterator does not deadlock other operations on the same store + // (e.g. via a semaphore permit held across the stream body). + let mut keys_iter = Vec::new(); + let mut key_values_via_iter = Vec::new(); + { + let mut stream = store.find_keys_by_prefix_iter(key_prefix); + while let Some(item) = stream.next().await { + let key = item.unwrap(); + let mut full_key = key_prefix.to_vec(); + full_key.extend(&key); + let value = store.read_value_bytes(&full_key).await.unwrap().unwrap(); + keys_iter.push(key.clone()); + key_values_via_iter.push((key, value)); + } + } + assert_eq!(keys_iter, keys_request); + assert_eq!(key_values_via_iter, key_values_by_prefix); + let key_values_iter = store + .find_key_values_by_prefix_iter(key_prefix) + .try_collect::>() + .await + .unwrap(); + assert_eq!(key_values_iter, key_values_by_prefix); + // Reverse streaming variants must yield the eager results in reverse order. + let keys_rev_iter = store + .find_keys_by_prefix_rev_iter(key_prefix) + .try_collect::>() + .await + .unwrap(); + let mut keys_request_rev = keys_request.clone(); + keys_request_rev.reverse(); + assert_eq!(keys_rev_iter, keys_request_rev); + let key_values_rev_iter = store + .find_key_values_by_prefix_rev_iter(key_prefix) + .try_collect::>() + .await + .unwrap(); + let mut key_values_by_prefix_rev = key_values_by_prefix.clone(); + key_values_by_prefix_rev.reverse(); + assert_eq!(key_values_rev_iter, key_values_by_prefix_rev); } // Now checking the read_multi_values_bytes let mut rng = make_deterministic_rng(); diff --git a/linera-views/tests/store_tests.rs b/linera-views/tests/store_tests.rs index 8ea1ab421d73..27a0cebb13ce 100644 --- a/linera-views/tests/store_tests.rs +++ b/linera-views/tests/store_tests.rs @@ -9,8 +9,8 @@ use linera_views::{ random::make_deterministic_rng, store::{ReadableKeyValueStore as _, TestKeyValueDatabase as _, WritableKeyValueStore as _}, test_utils::{ - big_read_multi_values, get_random_test_scenarios, run_big_write_read, run_reads, - run_writes_from_blank, run_writes_from_state, + big_read_multi_values, get_random_byte_vector, get_random_test_scenarios, + run_big_write_read, run_reads, run_writes_from_blank, run_writes_from_state, }, value_splitting::create_value_splitting_memory_store, }; @@ -56,6 +56,32 @@ async fn test_reads_test_memory() { } } +#[tokio::test] +async fn test_reads_value_splitting_varying_value_sizes() { + use std::collections::HashSet; + + use rand::Rng; + let mut rng = make_deterministic_rng(); + let key_prefix = vec![0]; + let mut key_values = Vec::new(); + let mut unique_keys = HashSet::new(); + let num_entries = 30; + let len_key = 4; + while key_values.len() < num_entries { + let mut key = key_prefix.clone(); + for _ in 0..len_key { + key.push(rng.gen()); + } + if unique_keys.insert(key.clone()) { + let len_value = rng.gen_range(0..=500); + let value = get_random_byte_vector(&mut rng, &[], len_value); + key_values.push((key, value)); + } + } + let key_value_store = create_value_splitting_memory_store(); + run_reads(key_value_store, key_values).await; +} + #[tokio::test] async fn test_reads_memory() { for scenario in get_random_test_scenarios() {