Skip to content

Commit 40be986

Browse files
committed
fix(universaldb): bound rocksdb range scans
1 parent 133b3e1 commit 40be986

3 files changed

Lines changed: 74 additions & 76 deletions

File tree

engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs

Lines changed: 8 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
error::DatabaseError,
1313
key_selector::KeySelector,
1414
options::{ConflictRangeType, MutationType},
15-
tx_ops::Operation,
15+
tx_ops::{Operation, range_begin_contains, range_end_contains},
1616
value::{KeyValue, Slice, Values},
1717
versionstamp::{
1818
generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete,
@@ -438,17 +438,8 @@ impl TransactionTask {
438438
let txn = self.create_transaction();
439439
let read_opts = ReadOptions::default();
440440

441-
// Resolve the begin selector
442-
let resolved_begin =
443-
self.resolve_key_selector_for_range(&txn, &begin, begin_or_equal, begin_offset)?;
444-
445-
// Resolve the end selector
446-
let resolved_end =
447-
self.resolve_key_selector_for_range(&txn, &end, end_or_equal, end_offset)?;
448-
449-
// Now execute the range query with resolved keys
450441
let iter = txn.iterator_opt(
451-
rocksdb::IteratorMode::From(&resolved_begin, rocksdb::Direction::Forward),
442+
rocksdb::IteratorMode::From(&begin, rocksdb::Direction::Forward),
452443
read_opts,
453444
);
454445

@@ -457,8 +448,12 @@ impl TransactionTask {
457448

458449
for item in iter {
459450
let (k, v) = item.context("failed to iterate rocksdb for get range")?;
460-
// Check if we've reached the end key
461-
if k.as_ref() >= resolved_end.as_slice() {
451+
452+
if !range_begin_contains(k.as_ref(), &begin, begin_or_equal, begin_offset) {
453+
continue;
454+
}
455+
456+
if !range_end_contains(k.as_ref(), &end, end_or_equal, end_offset) {
462457
break;
463458
}
464459

@@ -477,64 +472,6 @@ impl TransactionTask {
477472
Ok(Values::new(results))
478473
}
479474

480-
fn resolve_key_selector_for_range(
481-
&self,
482-
txn: &RocksDbTransaction<OptimisticTransactionDB>,
483-
key: &[u8],
484-
or_equal: bool,
485-
offset: i32,
486-
) -> Result<Vec<u8>> {
487-
// Based on PostgreSQL's interpretation:
488-
// (false, 1) => first_greater_or_equal
489-
// (true, 1) => first_greater_than
490-
// (false, 0) => last_less_than
491-
// (true, 0) => last_less_or_equal
492-
493-
let read_opts = ReadOptions::default();
494-
495-
match (or_equal, offset) {
496-
(false, 1) => {
497-
// first_greater_or_equal: find first key >= search_key
498-
let iter = txn.iterator_opt(
499-
rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward),
500-
read_opts,
501-
);
502-
for item in iter {
503-
let (k, _v) = item.context(
504-
"failed to iterate rocksdb for range selector first_greater_or_equal",
505-
)?;
506-
return Ok(k.to_vec());
507-
}
508-
// If no key found, return a key that will make the range empty
509-
Ok(vec![0xff; 255])
510-
}
511-
(true, 1) => {
512-
// first_greater_than: find first key > search_key
513-
let iter = txn.iterator_opt(
514-
rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward),
515-
read_opts,
516-
);
517-
for item in iter {
518-
let (k, _v) = item.context(
519-
"failed to iterate rocksdb for range selector first_greater_than",
520-
)?;
521-
// Skip if it's the exact key
522-
if k.as_ref() == key {
523-
continue;
524-
}
525-
return Ok(k.to_vec());
526-
}
527-
// If no key found, return a key that will make the range empty
528-
Ok(vec![0xff; 255])
529-
}
530-
_ => {
531-
// For other cases, just use the key as-is for now
532-
// This is a simplification - full implementation would handle all cases
533-
Ok(key.to_vec())
534-
}
535-
}
536-
}
537-
538475
async fn handle_get_estimated_range_size(&mut self, begin: &[u8], end: &[u8]) -> Result<i64> {
539476
let range = rocksdb::Range::new(begin, end);
540477

engine/packages/universaldb/src/tx_ops.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,6 @@ impl TransactionOperations {
342342
return Ok(db_values);
343343
}
344344

345-
let begin = opt.begin.key();
346-
let end = opt.end.key();
347-
348345
// Start with database results in a map
349346
let mut result_map = BTreeMap::new();
350347
for kv in db_values.into_iter() {
@@ -357,7 +354,7 @@ impl TransactionOperations {
357354
for op in &*self.operations() {
358355
match op {
359356
Operation::Set { key, value } => {
360-
if key.as_slice() >= begin && key.as_slice() < end {
357+
if range_contains(key.as_slice(), opt) {
361358
result_map.insert(key.clone(), value.clone());
362359
}
363360
}
@@ -382,7 +379,7 @@ impl TransactionOperations {
382379
param,
383380
op_type,
384381
} => {
385-
if key.as_slice() >= begin && key.as_slice() < end {
382+
if range_contains(key.as_slice(), opt) {
386383
// Get current value for this key (from result_map or empty if not exists)
387384
let current_value = result_map.get(key);
388385
let current_slice = current_value.map(|v| &**v);
@@ -423,3 +420,32 @@ impl TransactionOperations {
423420
.push((begin.to_vec(), end.to_vec(), conflict_type));
424421
}
425422
}
423+
424+
fn range_contains(key: &[u8], opt: &RangeOption<'_>) -> bool {
425+
range_begin_contains(
426+
key,
427+
opt.begin.key(),
428+
opt.begin.or_equal(),
429+
opt.begin.offset(),
430+
) && range_end_contains(key, opt.end.key(), opt.end.or_equal(), opt.end.offset())
431+
}
432+
433+
pub(crate) fn range_begin_contains(key: &[u8], begin: &[u8], or_equal: bool, offset: i32) -> bool {
434+
match (or_equal, offset) {
435+
(false, 1) => key >= begin,
436+
(true, 1) => key > begin,
437+
(false, 0) => key > begin,
438+
(true, 0) => key >= begin,
439+
_ => key >= begin,
440+
}
441+
}
442+
443+
pub(crate) fn range_end_contains(key: &[u8], end: &[u8], or_equal: bool, offset: i32) -> bool {
444+
match (or_equal, offset) {
445+
(false, 1) => key < end,
446+
(true, 1) => key <= end,
447+
(false, 0) => key < end,
448+
(true, 0) => key <= end,
449+
_ => key < end,
450+
}
451+
}

engine/packages/universaldb/tests/integration.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,41 @@ async fn test_range_options(db: &Database) {
705705
assert_eq!(results[2].key(), test_subspace.pack(&("range_e",)));
706706
assert_eq!(results[2].value(), b"val_e");
707707

708+
// Test 5: local writes outside the range should not be merged into the result
709+
let results = db
710+
.run(|tx| async move {
711+
let test_subspace = Subspace::from("test");
712+
let key_b = test_subspace.pack(&("range_b",));
713+
let key_d = test_subspace.pack(&("range_d",));
714+
let key_z = test_subspace.pack(&("range_z",));
715+
716+
tx.set(&key_z, b"val_z");
717+
718+
let range = RangeOption {
719+
begin: KeySelector::first_greater_or_equal(Cow::Owned(key_b)),
720+
end: KeySelector::first_greater_or_equal(Cow::Owned(key_d)),
721+
limit: None,
722+
reverse: false,
723+
mode: StreamingMode::WantAll,
724+
target_bytes: 0,
725+
..RangeOption::default()
726+
};
727+
728+
let vals = tx.get_range(&range, 1, Serializable).await?;
729+
Ok(vals.into_vec())
730+
})
731+
.await
732+
.unwrap();
733+
734+
assert_eq!(
735+
results.len(),
736+
2,
737+
"Expected local key outside [b, d) to be excluded"
738+
);
739+
assert!(!results
740+
.iter()
741+
.any(|r| r.key() == test_subspace.pack(&("range_z",))));
742+
708743
// Clear test data
709744
db.run(|tx| async move {
710745
let test_subspace = Subspace::from("test");

0 commit comments

Comments
 (0)