Skip to content

Commit 5829ed4

Browse files
committed
Make the is_finished precise when possible.
1 parent 9c1f5b2 commit 5829ed4

7 files changed

Lines changed: 157 additions & 60 deletions

File tree

linera-views/src/backends/dynamo_db.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -692,14 +692,20 @@ impl DynamoDbStoreInternal {
692692
)),
693693
};
694694
if let Some(limit) = user_limit {
695+
// Ask DynamoDB for one row past the user's remaining quota
696+
// so we can tell whether more matches exist without an extra
697+
// round-trip. `last_evaluated_key` alone is unreliable —
698+
// DynamoDB can set it even when the scan range is exhausted.
695699
let remaining = limit - items.len();
696-
query = query.limit(i32::try_from(remaining).unwrap_or(i32::MAX));
700+
let request_limit = remaining.saturating_add(1);
701+
query = query.limit(i32::try_from(request_limit).unwrap_or(i32::MAX));
697702
}
698703

699704
let response = query.send().boxed_sync().await?;
700705
let last_evaluated_key = response.last_evaluated_key;
701706
let response_items = response.items.unwrap_or_default();
702707

708+
let mut hit_extra = false;
703709
for item in response_items {
704710
if drop_upper {
705711
if let Some(upper_bytes) = &upper {
@@ -714,10 +720,17 @@ impl DynamoDbStoreInternal {
714720
}
715721
}
716722
}
717-
items.push(item);
718723
if user_limit.is_some_and(|limit| items.len() >= limit) {
719-
return Ok((items, false));
724+
// We already have the user-requested count; this is the
725+
// extra item, which proves more matches exist.
726+
hit_extra = true;
727+
break;
720728
}
729+
items.push(item);
730+
}
731+
732+
if hit_extra {
733+
return Ok((items, false));
721734
}
722735

723736
match last_evaluated_key {

linera-views/src/backends/indexed_db.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,14 @@ impl ReadableKeyValueStore for IndexedDbStore {
192192
key_interval: KeyInterval,
193193
) -> Result<(Vec<Vec<u8>>, bool)> {
194194
let range = interval_to_range(&self.start_key, &key_interval);
195-
let keys = self
196-
.with_object_store(move |o| {
197-
o.get_all_keys_in(range, key_interval.limit.map(|limit| limit as u32))
198-
})
195+
// Ask for one extra key past the user limit so we can decide
196+
// `is_finished` precisely without an extra round-trip.
197+
let user_limit = key_interval.limit;
198+
let fetch_limit = user_limit.map(|limit| {
199+
u32::try_from(limit.saturating_add(1).min(u32::MAX as usize)).unwrap_or(u32::MAX)
200+
});
201+
let mut keys = self
202+
.with_object_store(move |o| o.get_all_keys_in(range, fetch_limit))
199203
.await??
200204
.into_iter()
201205
.map(|key| {
@@ -204,7 +208,17 @@ impl ReadableKeyValueStore for IndexedDbStore {
204208
.to_vec()
205209
})
206210
.collect::<Vec<_>>();
207-
let is_finished = key_interval.limit.is_none_or(|limit| keys.len() < limit);
211+
let is_finished = match user_limit {
212+
Some(limit) => {
213+
if keys.len() > limit {
214+
keys.truncate(limit);
215+
false
216+
} else {
217+
true
218+
}
219+
}
220+
None => true,
221+
};
208222
Ok((keys, is_finished))
209223
}
210224

@@ -214,7 +228,12 @@ impl ReadableKeyValueStore for IndexedDbStore {
214228
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool)> {
215229
let range = interval_to_range(&self.start_key, &key_interval);
216230
let prefix_len = self.start_key.len() as u32;
217-
let key_values = self
231+
let user_limit = key_interval.limit;
232+
// Walk one cursor step past the user limit so we know whether the
233+
// database has more matches — the cursor advance is the only extra
234+
// cost.
235+
let fetch_target = user_limit.map(|limit| limit.saturating_add(1));
236+
let mut key_values = self
218237
.with_object_store(move |object_store| async move {
219238
let mut key_values = vec![];
220239
let mut cursor = object_store.cursor().range(range)?.open().await?;
@@ -230,10 +249,7 @@ impl ReadableKeyValueStore for IndexedDbStore {
230249
)
231250
.to_vec(),
232251
));
233-
if key_interval
234-
.limit
235-
.is_some_and(|limit| key_values.len() >= limit)
236-
{
252+
if fetch_target.is_some_and(|target| key_values.len() >= target) {
237253
break;
238254
}
239255
cursor.advance(1).await?;
@@ -242,9 +258,17 @@ impl ReadableKeyValueStore for IndexedDbStore {
242258
Ok::<_, IndexedDbStoreError>(key_values)
243259
})
244260
.await??;
245-
let is_finished = key_interval
246-
.limit
247-
.is_none_or(|limit| key_values.len() < limit);
261+
let is_finished = match user_limit {
262+
Some(limit) => {
263+
if key_values.len() > limit {
264+
key_values.truncate(limit);
265+
false
266+
} else {
267+
true
268+
}
269+
}
270+
None => true,
271+
};
248272
Ok((key_values, is_finished))
249273
}
250274
}

linera-views/src/backends/memory.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,17 +190,26 @@ impl ReadableKeyValueStore for MemoryStore {
190190
.map
191191
.read()
192192
.expect("MemoryStore lock should not be poisoned");
193+
let mut iter = map.range(get_interval_range(key_interval.start, key_interval.end));
193194
let mut values = Vec::new();
194-
for (key, _value) in map.range(get_interval_range(key_interval.start, key_interval.end)) {
195+
let mut hit_limit = false;
196+
while let Some((key, _value)) = iter.next() {
195197
values.push(key.to_vec());
196198
if key_interval
197199
.limit
198200
.is_some_and(|limit| values.len() >= limit)
199201
{
202+
hit_limit = true;
200203
break;
201204
}
202205
}
203-
let is_finished = key_interval.limit.is_none_or(|limit| values.len() < limit);
206+
// Precise `is_finished`: when we stopped at the limit, peek one
207+
// BTreeMap step ahead — `next()` is the only extra cost.
208+
let is_finished = if hit_limit {
209+
iter.next().is_none()
210+
} else {
211+
true
212+
};
204213
Ok((values, is_finished))
205214
}
206215

@@ -215,20 +224,24 @@ impl ReadableKeyValueStore for MemoryStore {
215224
.map
216225
.read()
217226
.expect("MemoryStore lock should not be poisoned");
227+
let mut iter = map.range(get_interval_range(key_interval.start, key_interval.end));
218228
let mut key_values = Vec::new();
219-
for (key, value) in map.range(get_interval_range(key_interval.start, key_interval.end)) {
220-
let key_value = (key.to_vec(), value.to_vec());
221-
key_values.push(key_value);
229+
let mut hit_limit = false;
230+
while let Some((key, value)) = iter.next() {
231+
key_values.push((key.to_vec(), value.to_vec()));
222232
if key_interval
223233
.limit
224234
.is_some_and(|limit| key_values.len() >= limit)
225235
{
236+
hit_limit = true;
226237
break;
227238
}
228239
}
229-
let is_finished = key_interval
230-
.limit
231-
.is_none_or(|limit| key_values.len() < limit);
240+
let is_finished = if hit_limit {
241+
iter.next().is_none()
242+
} else {
243+
true
244+
};
232245
Ok((key_values, is_finished))
233246
}
234247
}

linera-views/src/backends/rocks_db.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,14 +273,23 @@ impl RocksDbStoreExecutor {
273273
let len = self.start_key.len();
274274
let mut iter = self.get_interval_iterator(&start, &end);
275275
let mut keys = Vec::new();
276+
let mut hit_limit = false;
276277
while let Some(key) = iter.key() {
277278
keys.push(key[len..].to_vec());
278279
if key_interval.limit.is_some_and(|limit| keys.len() >= limit) {
280+
hit_limit = true;
279281
break;
280282
}
281283
iter.next();
282284
}
283-
let is_finished = key_interval.limit.is_none_or(|limit| keys.len() < limit);
285+
// Precise `is_finished`: when we stopped at the limit, advance the
286+
// iterator once more and check whether anything remains.
287+
let is_finished = if hit_limit {
288+
iter.next();
289+
iter.key().is_none()
290+
} else {
291+
true
292+
};
284293
Ok((keys, is_finished))
285294
}
286295

@@ -293,20 +302,24 @@ impl RocksDbStoreExecutor {
293302
let len = self.start_key.len();
294303
let mut iter = self.get_interval_iterator(&start, &end);
295304
let mut key_values = Vec::new();
305+
let mut hit_limit = false;
296306
while let Some((key, value)) = iter.item() {
297-
let key_value = (key[len..].to_vec(), value.to_vec());
298-
key_values.push(key_value);
307+
key_values.push((key[len..].to_vec(), value.to_vec()));
299308
if key_interval
300309
.limit
301310
.is_some_and(|limit| key_values.len() >= limit)
302311
{
312+
hit_limit = true;
303313
break;
304314
}
305315
iter.next();
306316
}
307-
let is_finished = key_interval
308-
.limit
309-
.is_none_or(|limit| key_values.len() < limit);
317+
let is_finished = if hit_limit {
318+
iter.next();
319+
iter.item().is_none()
320+
} else {
321+
true
322+
};
310323
Ok((key_values, is_finished))
311324
}
312325

linera-views/src/backends/scylla_db.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -599,9 +599,10 @@ impl ScyllaDbClient {
599599
key_interval: KeyInterval,
600600
) -> Result<(Vec<Vec<u8>>, bool), ScyllaDbStoreInternalError> {
601601
let session = &self.session;
602-
let limit = key_interval
603-
.limit
604-
.map(|limit| limit.min(i32::MAX as usize) as i32);
602+
// Ask for one row past the user-requested limit so we can tell
603+
// whether the database has more matches without an extra round-trip.
604+
let user_limit = key_interval.limit;
605+
let limit = user_limit.map(|limit| limit.saturating_add(1).min(i32::MAX as usize) as i32);
605606
let rows = match (key_interval.start, key_interval.end) {
606607
(KeyIntervalStart::Included(start), Included(end)) => {
607608
Self::check_key_size(&start)?;
@@ -740,7 +741,19 @@ impl ScyllaDbClient {
740741
let (key,) = row?;
741742
keys.push(key);
742743
}
743-
let is_finished = key_interval.limit.is_none_or(|limit| keys.len() < limit);
744+
// We asked for one extra row above. If we got it, drop it and report
745+
// unfinished; otherwise the scan is exhausted within the user limit.
746+
let is_finished = match user_limit {
747+
Some(limit) => {
748+
if keys.len() > limit {
749+
keys.truncate(limit);
750+
false
751+
} else {
752+
true
753+
}
754+
}
755+
None => true,
756+
};
744757
Ok((keys, is_finished))
745758
}
746759

@@ -750,9 +763,8 @@ impl ScyllaDbClient {
750763
key_interval: KeyInterval,
751764
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), ScyllaDbStoreInternalError> {
752765
let session = &self.session;
753-
let limit = key_interval
754-
.limit
755-
.map(|limit| limit.min(i32::MAX as usize) as i32);
766+
let user_limit = key_interval.limit;
767+
let limit = user_limit.map(|limit| limit.saturating_add(1).min(i32::MAX as usize) as i32);
756768
let rows = match (key_interval.start, key_interval.end) {
757769
(KeyIntervalStart::Included(start), Included(end)) => {
758770
Self::check_key_size(&start)?;
@@ -909,9 +921,17 @@ impl ScyllaDbClient {
909921
let (key, value) = row?;
910922
key_values.push((key, value));
911923
}
912-
let is_finished = key_interval
913-
.limit
914-
.is_none_or(|limit| key_values.len() < limit);
924+
let is_finished = match user_limit {
925+
Some(limit) => {
926+
if key_values.len() > limit {
927+
key_values.truncate(limit);
928+
false
929+
} else {
930+
true
931+
}
932+
}
933+
None => true,
934+
};
915935
Ok((key_values, is_finished))
916936
}
917937
}

linera-views/src/backends/value_splitting.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ where
230230
})
231231
.await?;
232232
let mut last_big_key = None;
233-
for big_key in big_keys {
233+
let n_big_keys = big_keys.len();
234+
for (i, big_key) in big_keys.into_iter().enumerate() {
234235
let len = big_key.len();
235236
last_big_key = Some(big_key.clone());
236237
if Self::read_index_from_key(&big_key)? != 0 {
@@ -246,7 +247,11 @@ where
246247
}
247248
keys.push(key);
248249
if key_interval.limit.is_some_and(|limit| keys.len() >= limit) {
249-
return Ok((keys, false));
250+
// Precise `is_finished`: if we just consumed the last
251+
// big-key of the batch and the inner store reports it is
252+
// exhausted, no more matches are possible.
253+
let is_finished = i + 1 == n_big_keys && is_big_finished;
254+
return Ok((keys, is_finished));
250255
}
251256
}
252257
if is_big_finished {
@@ -344,7 +349,14 @@ where
344349
}
345350
}
346351
if limit_reached {
347-
return Ok((key_values, false));
352+
// Precise `is_finished`: if the batch iterator has no more
353+
// items (the tail segments of the current user key were all
354+
// consumed in the inner while-loop, so the next item would
355+
// be segment 0 of a fresh user key) and the inner store is
356+
// exhausted, no more matches are possible.
357+
let batch_exhausted = iter.next().is_none();
358+
let is_finished = batch_exhausted && is_big_finished;
359+
return Ok((key_values, is_finished));
348360
}
349361
if is_big_finished {
350362
return Ok((key_values, true));

0 commit comments

Comments
 (0)