Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::{
options::{ConflictRangeType, MutationType},
tx_ops::Operation,
value::{KeyValue, Slice, Values},
versionstamp::{
generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete,
},
versionstamp::{generate_versionstamp, substitute_raw_versionstamp},
};

pub enum TransactionCommand {
Expand Down Expand Up @@ -385,10 +383,7 @@ impl TransactionTask {

for op in operations {
match op {
Operation::Set { key, value } => {
// TODO: versionstamps need to be calculated on the sql side, not in rust
let value = substitute_versionstamp_if_incomplete(value.clone(), 0);

Operation::SetValue { key, value } => {
let query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2";
let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::{
options::{ConflictRangeType, MutationType},
tx_ops::Operation,
value::{KeyValue, Slice, Values},
versionstamp::{
generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete,
},
versionstamp::{generate_versionstamp, substitute_raw_versionstamp},
};

pub enum TransactionCommand {
Expand Down Expand Up @@ -321,12 +319,7 @@ impl TransactionTask {
// Apply all operations to the transaction
for op in operations {
match op {
Operation::Set { key, value } => {
// Substitute versionstamp if incomplete
// For now, just use the simple substitution - we can improve this later
// to ensure all versionstamps in a transaction have the same base timestamp
let value = substitute_versionstamp_if_incomplete(value.clone(), 0);

Operation::SetValue { key, value } => {
txn.put(key, &value)
.context("failed to set key in rocksdb")?;
}
Expand Down
10 changes: 5 additions & 5 deletions engine/packages/universaldb/src/tx_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{

#[derive(Debug, Clone)]
pub enum Operation {
Set {
SetValue {
key: Vec<u8>,
value: Vec<u8>,
},
Expand Down Expand Up @@ -68,7 +68,7 @@ impl TransactionOperations {
pub fn set(&self, key: &[u8], value: &[u8]) {
self.add_conflict_range(key, &end_of_key_range(key), ConflictRangeType::Write);

self.add_operation(Operation::Set {
self.add_operation(Operation::SetValue {
key: key.to_vec(),
value: value.to_vec(),
});
Expand Down Expand Up @@ -109,7 +109,7 @@ impl TransactionOperations {
// Iterate through operations in reverse order to find the most recent operation for this key
for op in self.operations().iter().rev() {
match op {
Operation::Set {
Operation::SetValue {
key: set_key,
value,
} if set_key.as_slice() == key => {
Expand Down Expand Up @@ -231,7 +231,7 @@ impl TransactionOperations {

for op in &*self.operations() {
match op {
Operation::Set { key, .. } => {
Operation::SetValue { key, .. } => {
local_keys.insert(key.clone(), ());
}
Operation::Clear { key } => {
Expand Down Expand Up @@ -356,7 +356,7 @@ impl TransactionOperations {
// Apply local operations
for op in &*self.operations() {
match op {
Operation::Set { key, value } => {
Operation::SetValue { key, value } => {
if key.as_slice() >= begin && key.as_slice() < end {
result_map.insert(key.clone(), value.clone());
}
Expand Down
44 changes: 0 additions & 44 deletions engine/packages/universaldb/src/versionstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
offset_bytes[3],
]) as usize;

if offset >= data_len {

Check warning on line 80 in engine/packages/universaldb/src/versionstamp.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/universaldb/src/versionstamp.rs
return Err(format!(
"Invalid versionstamp offset: {} exceeds data length {}",
offset,
Expand Down Expand Up @@ -138,7 +138,7 @@
.checked_add(versionstamp_len)
.ok_or_else(|| "Versionstamp offset overflowed".to_string())?;

if versionstamp_end > data_len {

Check warning on line 141 in engine/packages/universaldb/src/versionstamp.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/universaldb/src/versionstamp.rs
return Err(format!(
"Invalid versionstamp offset: {} exceeds data length {}",
offset,
Expand All @@ -164,47 +164,3 @@

Ok(packed_data)
}

/// Checks if a value might contain an incomplete versionstamp and attempts to substitute it
///
/// This is a helper function for database drivers that want to support versionstamp substitution.
/// It detects if a value contains an incomplete versionstamp by checking for the offset marker
/// at the end, and substitutes it with a generated versionstamp.
///
/// Returns the potentially modified value. If the value doesn't contain a versionstamp marker
/// or substitution fails, returns the original value.
pub fn substitute_versionstamp_if_incomplete(mut value: Vec<u8>, user_version: u16) -> Vec<u8> {
// Check if the value contains an incomplete versionstamp
// An incomplete versionstamp has a 4-byte offset at the end
if value.len() >= 4 {
// Try to read the offset from the last 4 bytes
let offset_bytes = &value[value.len() - 4..];
let offset = u32::from_le_bytes([
offset_bytes[0],
offset_bytes[1],
offset_bytes[2],
offset_bytes[3],
]) as usize;

// Check if this could be a valid versionstamp offset
// The offset should point within the value (excluding the 4 offset bytes)
if offset < value.len() - 4 {
// Check for versionstamp marker at or near the offset
const VERSIONSTAMP_MARKER: u8 = 0x33;
let has_marker = value.get(offset) == Some(&VERSIONSTAMP_MARKER)
|| (offset > 0 && value.get(offset - 1) == Some(&VERSIONSTAMP_MARKER));

if has_marker {
// This looks like it contains an incomplete versionstamp
// Generate a versionstamp and substitute it
let versionstamp = generate_versionstamp(user_version);

// Substitute the versionstamp
// This will do nothing if the versionstamp is already complete
let _ = substitute_versionstamp(&mut value, versionstamp);
}
}
}

value
}
36 changes: 36 additions & 0 deletions engine/packages/universaldb/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
test_basic_operations(&db).await;
clear_test_namespace(&db).await.unwrap();

// Test that plain Set stores arbitrary binary values verbatim.
test_plain_set_preserves_versionstamp_like_binary(&db).await;
clear_test_namespace(&db).await.unwrap();

// Test range operations
test_range_operations(&db).await;
clear_test_namespace(&db).await.unwrap();
Expand Down Expand Up @@ -208,10 +212,42 @@
db.run(|tx| async move {
let test_subspace = Subspace::from("test");
let (begin, end) = test_subspace.range();
tx.clear_range(&begin, &end);

Check warning on line 215 in engine/packages/universaldb/tests/integration.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/universaldb/tests/integration.rs
Ok(())
})
.await
}

async fn test_plain_set_preserves_versionstamp_like_binary(db: &Database) {
let test_subspace = Subspace::from("test");

Check warning on line 222 in engine/packages/universaldb/tests/integration.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/universaldb/tests/integration.rs
let key = test_subspace.pack(&("plain-set-versionstamp-like-binary",));
let mut value = Vec::from([
0x33, 0x5a, 0x33, 0x59, 0x41, 0x37, 0x6c, 0x1d, 0x00, 0x09, 0x1f, 0x6b, 0x1d, 0x00,
0x09, 0x1f,
]);
value.extend_from_slice(&[0; 64]);

db.run(|tx| {
let key = key.clone();
let value = value.clone();
async move {
tx.set(&key, &value);
Ok(())
}
})
.await
.unwrap();

let actual = db
.run(|tx| {
let key = key.clone();
async move { tx.get(&key, Serializable).await }
})
.await
.unwrap()
.unwrap();

assert_eq!(actual.as_slice(), value.as_slice());
}

async fn test_basic_operations(db: &Database) {
Expand Down
Loading