Skip to content

Commit 9c1f5b2

Browse files
committed
Address some bugs.
1 parent f7ae835 commit 9c1f5b2

5 files changed

Lines changed: 145 additions & 102 deletions

File tree

linera-storage-service/src/client.rs

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -265,20 +265,22 @@ impl ReadableKeyValueStore for StorageServiceStoreInternal {
265265
is_finished,
266266
} = response;
267267
let prefix_len = self.start_key.len();
268-
if num_chunks == 0 {
269-
let keys = keys
270-
.into_iter()
271-
.map(|key| key[prefix_len..].to_vec())
272-
.collect();
273-
Ok((keys, is_finished))
268+
let keys = if num_chunks == 0 {
269+
keys
274270
} else {
275-
let keys: Vec<Vec<u8>> = self.read_entries(message_index, num_chunks).await?;
276-
let keys = keys
277-
.into_iter()
278-
.map(|key| key[prefix_len..].to_vec())
279-
.collect();
280-
Ok((keys, is_finished))
281-
}
271+
self.read_entries(message_index, num_chunks).await?
272+
};
273+
let keys = keys
274+
.into_iter()
275+
.map(|key| {
276+
ensure!(
277+
key.starts_with(&self.start_key),
278+
StorageServiceStoreError::KeyOutsidePartition
279+
);
280+
Ok::<_, StorageServiceStoreError>(key[prefix_len..].to_vec())
281+
})
282+
.collect::<Result<Vec<_>, _>>()?;
283+
Ok((keys, is_finished))
282284
}
283285

284286
async fn find_key_values_in_interval(
@@ -333,21 +335,22 @@ impl ReadableKeyValueStore for StorageServiceStoreInternal {
333335
is_finished,
334336
} = response;
335337
let prefix_len = self.start_key.len();
336-
if num_chunks == 0 {
337-
let key_values = key_values
338-
.into_iter()
339-
.map(|x| (x.key[prefix_len..].to_vec(), x.value))
340-
.collect::<Vec<_>>();
341-
Ok((key_values, is_finished))
338+
let key_values: Vec<(Vec<u8>, Vec<u8>)> = if num_chunks == 0 {
339+
key_values.into_iter().map(|x| (x.key, x.value)).collect()
342340
} else {
343-
let key_values: Vec<(Vec<u8>, Vec<u8>)> =
344-
self.read_entries(message_index, num_chunks).await?;
345-
let key_values = key_values
346-
.into_iter()
347-
.map(|(key, value)| (key[prefix_len..].to_vec(), value))
348-
.collect();
349-
Ok((key_values, is_finished))
350-
}
341+
self.read_entries(message_index, num_chunks).await?
342+
};
343+
let key_values = key_values
344+
.into_iter()
345+
.map(|(key, value)| {
346+
ensure!(
347+
key.starts_with(&self.start_key),
348+
StorageServiceStoreError::KeyOutsidePartition
349+
);
350+
Ok::<_, StorageServiceStoreError>((key[prefix_len..].to_vec(), value))
351+
})
352+
.collect::<Result<Vec<_>, _>>()?;
353+
Ok((key_values, is_finished))
351354
}
352355
}
353356

linera-storage-service/src/common.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ pub enum StorageServiceStoreError {
5151
#[error("The key size must be at most 1 MB")]
5252
KeyTooLong,
5353

54+
/// The server returned a key that does not lie within the requesting
55+
/// client's partition. This indicates either a malformed request (e.g. an
56+
/// `Unbounded` end with an all-`0xFF` `start_key`, which leaves the
57+
/// underlying scan unbounded) or a server-side bug.
58+
#[error("server returned a key outside this client's partition")]
59+
KeyOutsidePartition,
60+
5461
/// Transport error
5562
#[error(transparent)]
5663
TransportError(#[from] tonic::transport::Error),

linera-views/src/backends/lru_caching.rs

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,12 @@ use serde::{Deserialize, Serialize};
1111
use crate::memory::MemoryDatabase;
1212
#[cfg(with_testing)]
1313
use crate::store::TestKeyValueDatabase;
14-
use std::ops::Bound;
15-
1614
use crate::{
1715
batch::{Batch, WriteOperation},
1816
lru_prefix_cache::{LruPrefixCache, StorageCacheConfig},
19-
store::{
20-
KeyInterval, KeyIntervalStart, KeyValueDatabase, ReadableKeyValueStore, WithError,
21-
WritableKeyValueStore,
22-
},
17+
store::{KeyInterval, KeyValueDatabase, ReadableKeyValueStore, WithError, WritableKeyValueStore},
2318
};
2419

25-
/// Returns the longest common byte prefix shared by `start` and `end`. Any key
26-
/// inside `[start, end]` (any inclusivity) must start with this prefix. For
27-
/// `Unbounded` ends the LCP is empty (only an empty-prefix cache entry could
28-
/// cover the request).
29-
fn lcp_of_interval(key_interval: &KeyInterval) -> Vec<u8> {
30-
let start = match &key_interval.start {
31-
KeyIntervalStart::Included(k) | KeyIntervalStart::Excluded(k) => k.as_slice(),
32-
};
33-
let end = match &key_interval.end {
34-
Bound::Included(k) | Bound::Excluded(k) => k.as_slice(),
35-
Bound::Unbounded => return Vec::new(),
36-
};
37-
let n = start
38-
.iter()
39-
.zip(end.iter())
40-
.take_while(|(a, b)| a == b)
41-
.count();
42-
start[..n].to_vec()
43-
}
44-
4520
#[cfg(with_metrics)]
4621
mod metrics {
4722
use std::sync::LazyLock;
@@ -368,7 +343,7 @@ where
368343
// Any key in the user interval starts with `lcp(start, end)`. The
369344
// prefix cache walks back from this prefix to any shorter cached
370345
// prefix that covers it, so a single probe is sufficient.
371-
let lcp = lcp_of_interval(&key_interval);
346+
let lcp = key_interval.common_prefix();
372347
let cached = {
373348
let mut cache = cache.lock().unwrap();
374349
cache.query_find_keys(&lcp)
@@ -438,7 +413,7 @@ where
438413
let Some(cache) = self.get_exclusive_cache() else {
439414
return self.store.find_key_values_in_interval(key_interval).await;
440415
};
441-
let lcp = lcp_of_interval(&key_interval);
416+
let lcp = key_interval.common_prefix();
442417
let cached = {
443418
let mut cache = cache.lock().unwrap();
444419
cache.query_find_key_values(&lcp)

linera-views/src/store.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,26 @@ impl KeyInterval {
8282
}
8383
}
8484

85+
/// Returns the longest common byte prefix shared by the start and end
86+
/// keys of this interval. Any key inside the interval (any inclusivity)
87+
/// must start with this prefix. Returns the empty vector when the end
88+
/// is `Unbounded` (only an empty-prefix scan can cover such a request).
89+
pub fn common_prefix(&self) -> Vec<u8> {
90+
let start = match &self.start {
91+
KeyIntervalStart::Included(k) | KeyIntervalStart::Excluded(k) => k.as_slice(),
92+
};
93+
let end = match &self.end {
94+
Included(k) | Excluded(k) => k.as_slice(),
95+
Unbounded => return Vec::new(),
96+
};
97+
let n = start
98+
.iter()
99+
.zip(end.iter())
100+
.take_while(|(a, b)| a == b)
101+
.count();
102+
start[..n].to_vec()
103+
}
104+
85105
/// Returns `true` if `key` falls within this interval's bounds. The
86106
/// `limit` field is ignored.
87107
pub fn contains(&self, key: &[u8]) -> bool {

linera-views/src/views/key_value_store_view.rs

Lines changed: 85 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
batch::{Batch, WriteOperation},
2626
common::{
2727
from_bytes_option, from_bytes_option_or_default, get_key_range_for_prefix, get_upper_bound,
28-
key_matches_interval, DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
28+
DeletionSet, HasherOutput, SuffixClosedSetIterator, Update,
2929
},
3030
context::Context,
3131
hashable_wrapper::WrappedHashableContainerView,
@@ -1012,42 +1012,63 @@ impl<C: Context> KeyValueStoreView<C> {
10121012
&self,
10131013
key_interval: KeyInterval,
10141014
) -> Result<(Vec<Vec<u8>>, bool), ViewError> {
1015-
let key_values = self.find_key_values_by_prefix(&[]).await?;
1015+
if key_interval.is_empty() {
1016+
return Ok((Vec::new(), true));
1017+
}
1018+
// Any key in the interval starts with the longest common prefix of
1019+
// its bounds, so the prefix-scan path (which already merges in-memory
1020+
// updates with the underlying store and respects the deletion set)
1021+
// is a sufficient superset. We then trim it to the interval and limit.
1022+
let prefix = key_interval.common_prefix();
1023+
let prefix_keys = self.find_keys_by_prefix(&prefix).await?;
10161024
let mut keys = Vec::new();
1017-
for (key, _value) in key_values {
1018-
if key_matches_interval(&key, key_interval.start_bound(), key_interval.end_bound()) {
1019-
keys.push(key);
1020-
if key_interval.limit.is_some_and(|limit| keys.len() >= limit) {
1021-
break;
1022-
}
1025+
let mut more_after_limit = false;
1026+
for stripped in prefix_keys {
1027+
let mut full = prefix.clone();
1028+
full.extend(&stripped);
1029+
if !key_interval.contains(&full) {
1030+
continue;
1031+
}
1032+
if key_interval
1033+
.limit
1034+
.is_some_and(|limit| keys.len() >= limit)
1035+
{
1036+
more_after_limit = true;
1037+
break;
10231038
}
1039+
keys.push(full);
10241040
}
1025-
let is_finished = key_interval.limit.is_none_or(|limit| keys.len() < limit);
1026-
Ok((keys, is_finished))
1041+
Ok((keys, !more_after_limit))
10271042
}
10281043

10291044
/// Iterates over all key-value pairs matching the given interval.
10301045
pub async fn find_key_values_in_interval(
10311046
&self,
10321047
key_interval: KeyInterval,
10331048
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), ViewError> {
1034-
let entries = self.find_key_values_by_prefix(&[]).await?;
1049+
if key_interval.is_empty() {
1050+
return Ok((Vec::new(), true));
1051+
}
1052+
let prefix = key_interval.common_prefix();
1053+
let prefix_entries = self.find_key_values_by_prefix(&prefix).await?;
10351054
let mut key_values = Vec::new();
1036-
for (key, value) in entries {
1037-
if key_matches_interval(&key, key_interval.start_bound(), key_interval.end_bound()) {
1038-
key_values.push((key, value));
1039-
if key_interval
1040-
.limit
1041-
.is_some_and(|limit| key_values.len() >= limit)
1042-
{
1043-
break;
1044-
}
1055+
let mut more_after_limit = false;
1056+
for (stripped, value) in prefix_entries {
1057+
let mut full = prefix.clone();
1058+
full.extend(&stripped);
1059+
if !key_interval.contains(&full) {
1060+
continue;
1061+
}
1062+
if key_interval
1063+
.limit
1064+
.is_some_and(|limit| key_values.len() >= limit)
1065+
{
1066+
more_after_limit = true;
1067+
break;
10451068
}
1069+
key_values.push((full, value));
10461070
}
1047-
let is_finished = key_interval
1048-
.limit
1049-
.is_none_or(|limit| key_values.len() < limit);
1050-
Ok((key_values, is_finished))
1071+
Ok((key_values, !more_after_limit))
10511072
}
10521073

10531074
/// Iterates over all the keys matching the given prefix. The prefix is not included in the returned keys.
@@ -1330,43 +1351,60 @@ impl<C: Context> ReadableKeyValueStore for ViewContainer<C> {
13301351
&self,
13311352
key_interval: KeyInterval,
13321353
) -> Result<(Vec<Vec<u8>>, bool), ViewContainerError> {
1354+
if key_interval.is_empty() {
1355+
return Ok((Vec::new(), true));
1356+
}
13331357
let view = self.view.read().await;
1334-
let key_values = view.find_key_values_by_prefix(&[]).await?;
1358+
let prefix = key_interval.common_prefix();
1359+
let prefix_keys = view.find_keys_by_prefix(&prefix).await?;
13351360
let mut keys = Vec::new();
1336-
for (key, _value) in key_values {
1337-
if key_matches_interval(&key, key_interval.start_bound(), key_interval.end_bound()) {
1338-
keys.push(key);
1339-
if key_interval.limit.is_some_and(|limit| keys.len() >= limit) {
1340-
break;
1341-
}
1361+
let mut more_after_limit = false;
1362+
for stripped in prefix_keys {
1363+
let mut full = prefix.clone();
1364+
full.extend(&stripped);
1365+
if !key_interval.contains(&full) {
1366+
continue;
1367+
}
1368+
if key_interval
1369+
.limit
1370+
.is_some_and(|limit| keys.len() >= limit)
1371+
{
1372+
more_after_limit = true;
1373+
break;
13421374
}
1375+
keys.push(full);
13431376
}
1344-
let is_finished = key_interval.limit.is_none_or(|limit| keys.len() < limit);
1345-
Ok((keys, is_finished))
1377+
Ok((keys, !more_after_limit))
13461378
}
13471379

13481380
async fn find_key_values_in_interval(
13491381
&self,
13501382
key_interval: KeyInterval,
13511383
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), ViewContainerError> {
1384+
if key_interval.is_empty() {
1385+
return Ok((Vec::new(), true));
1386+
}
13521387
let view = self.view.read().await;
1353-
let entries = view.find_key_values_by_prefix(&[]).await?;
1388+
let prefix = key_interval.common_prefix();
1389+
let prefix_entries = view.find_key_values_by_prefix(&prefix).await?;
13541390
let mut key_values = Vec::new();
1355-
for (key, value) in entries {
1356-
if key_matches_interval(&key, key_interval.start_bound(), key_interval.end_bound()) {
1357-
key_values.push((key, value));
1358-
if key_interval
1359-
.limit
1360-
.is_some_and(|limit| key_values.len() >= limit)
1361-
{
1362-
break;
1363-
}
1391+
let mut more_after_limit = false;
1392+
for (stripped, value) in prefix_entries {
1393+
let mut full = prefix.clone();
1394+
full.extend(&stripped);
1395+
if !key_interval.contains(&full) {
1396+
continue;
1397+
}
1398+
if key_interval
1399+
.limit
1400+
.is_some_and(|limit| key_values.len() >= limit)
1401+
{
1402+
more_after_limit = true;
1403+
break;
13641404
}
1405+
key_values.push((full, value));
13651406
}
1366-
let is_finished = key_interval
1367-
.limit
1368-
.is_none_or(|limit| key_values.len() < limit);
1369-
Ok((key_values, is_finished))
1407+
Ok((key_values, !more_after_limit))
13701408
}
13711409
}
13721410

0 commit comments

Comments
 (0)