Skip to content

Commit ee08c9b

Browse files
committed
fix(universaldb): preserve plain set values
1 parent 4799acd commit ee08c9b

5 files changed

Lines changed: 45 additions & 65 deletions

File tree

engine/packages/universaldb/src/driver/postgres/transaction_task.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ use crate::{
99
options::{ConflictRangeType, MutationType},
1010
tx_ops::Operation,
1111
value::{KeyValue, Slice, Values},
12-
versionstamp::{
13-
generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete,
14-
},
12+
versionstamp::{generate_versionstamp, substitute_raw_versionstamp},
1513
};
1614

1715
pub enum TransactionCommand {
@@ -385,10 +383,7 @@ impl TransactionTask {
385383

386384
for op in operations {
387385
match op {
388-
Operation::Set { key, value } => {
389-
// TODO: versionstamps need to be calculated on the sql side, not in rust
390-
let value = substitute_versionstamp_if_incomplete(value.clone(), 0);
391-
386+
Operation::SetValue { key, value } => {
392387
let query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2";
393388
let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?;
394389

engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ use crate::{
1414
options::{ConflictRangeType, MutationType},
1515
tx_ops::Operation,
1616
value::{KeyValue, Slice, Values},
17-
versionstamp::{
18-
generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete,
19-
},
17+
versionstamp::{generate_versionstamp, substitute_raw_versionstamp},
2018
};
2119

2220
pub enum TransactionCommand {
@@ -321,12 +319,7 @@ impl TransactionTask {
321319
// Apply all operations to the transaction
322320
for op in operations {
323321
match op {
324-
Operation::Set { key, value } => {
325-
// Substitute versionstamp if incomplete
326-
// For now, just use the simple substitution - we can improve this later
327-
// to ensure all versionstamps in a transaction have the same base timestamp
328-
let value = substitute_versionstamp_if_incomplete(value.clone(), 0);
329-
322+
Operation::SetValue { key, value } => {
330323
txn.put(key, &value)
331324
.context("failed to set key in rocksdb")?;
332325
}

engine/packages/universaldb/src/tx_ops.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616

1717
#[derive(Debug, Clone)]
1818
pub enum Operation {
19-
Set {
19+
SetValue {
2020
key: Vec<u8>,
2121
value: Vec<u8>,
2222
},
@@ -68,7 +68,7 @@ impl TransactionOperations {
6868
pub fn set(&self, key: &[u8], value: &[u8]) {
6969
self.add_conflict_range(key, &end_of_key_range(key), ConflictRangeType::Write);
7070

71-
self.add_operation(Operation::Set {
71+
self.add_operation(Operation::SetValue {
7272
key: key.to_vec(),
7373
value: value.to_vec(),
7474
});
@@ -109,7 +109,7 @@ impl TransactionOperations {
109109
// Iterate through operations in reverse order to find the most recent operation for this key
110110
for op in self.operations().iter().rev() {
111111
match op {
112-
Operation::Set {
112+
Operation::SetValue {
113113
key: set_key,
114114
value,
115115
} if set_key.as_slice() == key => {
@@ -231,7 +231,7 @@ impl TransactionOperations {
231231

232232
for op in &*self.operations() {
233233
match op {
234-
Operation::Set { key, .. } => {
234+
Operation::SetValue { key, .. } => {
235235
local_keys.insert(key.clone(), ());
236236
}
237237
Operation::Clear { key } => {
@@ -356,7 +356,7 @@ impl TransactionOperations {
356356
// Apply local operations
357357
for op in &*self.operations() {
358358
match op {
359-
Operation::Set { key, value } => {
359+
Operation::SetValue { key, value } => {
360360
if key.as_slice() >= begin && key.as_slice() < end {
361361
result_map.insert(key.clone(), value.clone());
362362
}

engine/packages/universaldb/src/versionstamp.rs

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -164,47 +164,3 @@ pub fn pack_and_substitute_versionstamp<T: TuplePack>(
164164

165165
Ok(packed_data)
166166
}
167-
168-
/// Checks if a value might contain an incomplete versionstamp and attempts to substitute it
169-
///
170-
/// This is a helper function for database drivers that want to support versionstamp substitution.
171-
/// It detects if a value contains an incomplete versionstamp by checking for the offset marker
172-
/// at the end, and substitutes it with a generated versionstamp.
173-
///
174-
/// Returns the potentially modified value. If the value doesn't contain a versionstamp marker
175-
/// or substitution fails, returns the original value.
176-
pub fn substitute_versionstamp_if_incomplete(mut value: Vec<u8>, user_version: u16) -> Vec<u8> {
177-
// Check if the value contains an incomplete versionstamp
178-
// An incomplete versionstamp has a 4-byte offset at the end
179-
if value.len() >= 4 {
180-
// Try to read the offset from the last 4 bytes
181-
let offset_bytes = &value[value.len() - 4..];
182-
let offset = u32::from_le_bytes([
183-
offset_bytes[0],
184-
offset_bytes[1],
185-
offset_bytes[2],
186-
offset_bytes[3],
187-
]) as usize;
188-
189-
// Check if this could be a valid versionstamp offset
190-
// The offset should point within the value (excluding the 4 offset bytes)
191-
if offset < value.len() - 4 {
192-
// Check for versionstamp marker at or near the offset
193-
const VERSIONSTAMP_MARKER: u8 = 0x33;
194-
let has_marker = value.get(offset) == Some(&VERSIONSTAMP_MARKER)
195-
|| (offset > 0 && value.get(offset - 1) == Some(&VERSIONSTAMP_MARKER));
196-
197-
if has_marker {
198-
// This looks like it contains an incomplete versionstamp
199-
// Generate a versionstamp and substitute it
200-
let versionstamp = generate_versionstamp(user_version);
201-
202-
// Substitute the versionstamp
203-
// This will do nothing if the versionstamp is already complete
204-
let _ = substitute_versionstamp(&mut value, versionstamp);
205-
}
206-
}
207-
}
208-
209-
value
210-
}

engine/packages/universaldb/tests/integration.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ async fn run_all_tests(db: universaldb::Database) {
7474
test_basic_operations(&db).await;
7575
clear_test_namespace(&db).await.unwrap();
7676

77+
// Test that plain Set stores arbitrary binary values verbatim.
78+
test_plain_set_preserves_versionstamp_like_binary(&db).await;
79+
clear_test_namespace(&db).await.unwrap();
80+
7781
// Test range operations
7882
test_range_operations(&db).await;
7983
clear_test_namespace(&db).await.unwrap();
@@ -210,8 +214,40 @@ async fn clear_test_namespace(db: &Database) -> Result<()> {
210214
let (begin, end) = test_subspace.range();
211215
tx.clear_range(&begin, &end);
212216
Ok(())
217+
})
218+
.await
219+
}
220+
221+
async fn test_plain_set_preserves_versionstamp_like_binary(db: &Database) {
222+
let test_subspace = Subspace::from("test");
223+
let key = test_subspace.pack(&("plain-set-versionstamp-like-binary",));
224+
let mut value = Vec::from([
225+
0x33, 0x5a, 0x33, 0x59, 0x41, 0x37, 0x6c, 0x1d, 0x00, 0x09, 0x1f, 0x6b, 0x1d, 0x00,
226+
0x09, 0x1f,
227+
]);
228+
value.extend_from_slice(&[0; 64]);
229+
230+
db.run(|tx| {
231+
let key = key.clone();
232+
let value = value.clone();
233+
async move {
234+
tx.set(&key, &value);
235+
Ok(())
236+
}
213237
})
214238
.await
239+
.unwrap();
240+
241+
let actual = db
242+
.run(|tx| {
243+
let key = key.clone();
244+
async move { tx.get(&key, Serializable).await }
245+
})
246+
.await
247+
.unwrap()
248+
.unwrap();
249+
250+
assert_eq!(actual.as_slice(), value.as_slice());
215251
}
216252

217253
async fn test_basic_operations(db: &Database) {

0 commit comments

Comments
 (0)