Implement forward and reverse iterators for the find_{keys,key_values}_by_prefix#6202
Implement forward and reverse iterators for the find_{keys,key_values}_by_prefix#6202MathieuDutSik wants to merge 27 commits into
find_{keys,key_values}_by_prefix#6202Conversation
Instruction Count Benchmark Results
Deterministic metrics — reproducible across runs (34 benchmarks)
Cache-dependent metrics — expect fluctuations between runs (34 benchmarks)
|
8a2d3e7 to
ab7fa4b
Compare
find_{keys,key_values}_by_prefixfind_{keys,key_values}_by_prefix
|
If I had to choose, I think I prefer this one over #6183. To which extend do you the tests cover the new code? Especially value-splitting and LRU-caching? |
The test The test The Note that PR #6183 also implements some function for |
a461945 to
42adda2
Compare
| } | ||
| assert_eq!(set_key_value1, set_key_value2); | ||
| // Streaming variants must agree with the eager methods. | ||
| let keys_iter: Vec<Vec<u8>> = store |
There was a problem hiding this comment.
The policy is to put type annotations like these on the method instead (try_collect in this case); also below.
There was a problem hiding this comment.
Ok, corrected.
| Box::pin(async_stream::stream! { | ||
| let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); | ||
| while let Some(item) = stream.next().await { | ||
| yield item.map_err(JournalingError::Inner); | ||
| } | ||
| }) |
There was a problem hiding this comment.
Why not just stream.map(|item| item.map_err(JournalingError::Inner))?
(Also below.)
| } | ||
| let mut stream = self.store.find_keys_by_prefix_iter(key_prefix); | ||
| while let Some(item) = stream.next().await { | ||
| yield item; |
There was a problem hiding this comment.
The iterator can be prematurely ended (for example when doing a find_first_key_in_prefix). So, it may not be possible to cache it. But let me look at that, maybe we can do it.
There was a problem hiding this comment.
So, yes we can.
| /// uses a fixed-length prefix extractor; without it, `seek_for_prev` would | ||
| /// only search within the bloom-prefix scope and could miss keys whose | ||
| /// extractor-prefix differs from the seek target. | ||
| fn get_find_prefix_reverse_iterator( |
There was a problem hiding this comment.
(Elsewhere we use rev_iter rather than reverse_iterator.)
There was a problem hiding this comment.
So, what would you prefer?
For myself, rev_iter.
There was a problem hiding this comment.
I think I'd prefer the short form, too, yes. 👍
| Box::pin(async_stream::stream! { | ||
| if let Err(error) = check_key_size(&key_prefix) { | ||
| yield Err(error); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Does this work, too?
| Box::pin(async_stream::stream! { | |
| if let Err(error) = check_key_size(&key_prefix) { | |
| yield Err(error); | |
| return; | |
| } | |
| Box::pin(async_stream::try_stream! { | |
| check_key_size(&key_prefix)?; |
There was a problem hiding this comment.
Ok, done. Use try_stream when possible.
| None => false, | ||
| }; | ||
| if continues { | ||
| state.as_mut().unwrap().2.push(value); |
There was a problem hiding this comment.
Maybe this unwrap can be avoided if it's done inside the match arm.
| let mut big_value = Vec::new(); | ||
| for (rev_pos, val) in segs.iter().rev().enumerate() { | ||
| let idx = rev_pos as u32; | ||
| if idx == 0 { | ||
| big_value.extend_from_slice(&val[4..]); | ||
| } else if idx < count { | ||
| big_value.extend_from_slice(val); | ||
| } | ||
| } |
There was a problem hiding this comment.
| let mut big_value = Vec::new(); | |
| for (rev_pos, val) in segs.iter().rev().enumerate() { | |
| let idx = rev_pos as u32; | |
| if idx == 0 { | |
| big_value.extend_from_slice(&val[4..]); | |
| } else if idx < count { | |
| big_value.extend_from_slice(val); | |
| } | |
| } | |
| let big_value = vec![segment_zero_value[4..].to_vec()]; | |
| for val in segs[1..count].iter().rev() { | |
| big_value.extend_from_slice(val); | |
| } |
| fn find_keys_by_prefix_iter<'a>( | ||
| &'a self, | ||
| key_prefix: &'a [u8], | ||
| ) -> FindKeysStream<'a, Self::Error> { |
There was a problem hiding this comment.
I think @Twey had the idea to make the return type impl Stream<Item = Result<Vec<u8>>>; would that make it Send or !Send automatically as needed?
There was a problem hiding this comment.
With the design that I propose (and exercised first on the read_multi_values_bytes_iter) it compiles on Wasm32.
So, I am not sure what problem is left to resolve.
There was a problem hiding this comment.
Just that it would be simpler (if it works!) and use less conditional compilation.
| yield Ok(key_value); | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
Could these all use map instead of async_stream::stream!?
There was a problem hiding this comment.
For the trait implementation, we can improve but that is not so simple since for a start the _iter function is not async.
| yield item; | ||
| } | ||
| let mut cache = cache.lock().unwrap(); | ||
| cache.insert_find_keys(key_prefix.to_vec(), &accumulated); |
There was a problem hiding this comment.
OK, maybe that was a bad idea: A lot of time could have passed since we loaded all those values; couldn't they have changed in the meantime?
There was a problem hiding this comment.
The caching of find_key_values_by_prefix is only done when we have exclusive access. So, not with blobs, events, and similar.
So, could the state have changed meanwhile? The answer is yes: The construction of the iterator does not create a lock, in general. It certainly does not for DynamoDB and ScyllaDB. But if some keys are inserted or deleted during the operations then your iterator will be unstable. It cannot guarantee that it works correctly.
So, my verdict: Yes, it is fine to save the keys because yes, if that has changed, then your code would anyway not have worked correctly.
But this makes me realize another concern. Could it be that establishing an iterator prevents other operations from working? In other words, is the following code valid:
let mut iter = store.find_keys_by_prefix_iter(&key)?;
let mut values = Vec::new();
while let Some(key) = iter.next().await? {
let value = store.read_value(&key).await?;
values.push(value);
}And the answer is that it is a problem for ScyllaDB. So, do not acquire a semaphore for the creation of an iterator so as not to introduce a deadlock. Correction done. I also added a test for that deadlocking in run_reads.
There was a problem hiding this comment.
Maybe if the iterator can be unstable when the underlying state changes while it iterates, it should actually prevent such changes? But I guess that depends on what the use cases are.
Would it make the most sense if only operations on the keys that match the iterator's prefix were blocked? But perhaps it is not easy to achieve.
…cks in run_reads.
| }; | ||
| if index == 0 { | ||
| let (key, top, segs) = state.take().unwrap(); | ||
| let count = Self::read_count_from_value(segs.last().unwrap())?; |
There was a problem hiding this comment.
Do we want these unwraps? I see a few
Motivation
The
find_{keys,key_values}_by_prefixfunctions are building sets that are intrinsically big.Async iterators are actually the right solution here.
Proposal
Add the functions in the trait and a default implementation. The default implementation is used for the
linera-storage-service,indexed-db,System-apiandViewContainer.The types are inspired from #4975
The reverse iterators are inspired from #6183 which is itself inspired by #6171
For
RocksDB,DynamoDB,ScyllaDB, the functionality essentially already exists in the code.The `ValueSplitting is the more challenging part.
Test Plan
CI
Tests have been added to the
run_readsfunctions.Release Plan
Can be backported to
testnet_conway.Links
None