diff --git a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs index 132ffaa9d5..edb2febcb3 100644 --- a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs @@ -9,9 +9,7 @@ use crate::{ options::{ConflictRangeType, MutationType}, tx_ops::Operation, value::{KeyValue, Slice, Values}, - versionstamp::{ - generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete, - }, + versionstamp::{generate_versionstamp, substitute_raw_versionstamp}, }; pub enum TransactionCommand { @@ -385,10 +383,7 @@ impl TransactionTask { for op in operations { match op { - Operation::Set { key, value } => { - // TODO: versionstamps need to be calculated on the sql side, not in rust - let value = substitute_versionstamp_if_incomplete(value.clone(), 0); - + Operation::SetValue { key, value } => { let query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2"; let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs index 4d4d7fb3e3..5c8157f6a9 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs @@ -14,9 +14,7 @@ use crate::{ options::{ConflictRangeType, MutationType}, tx_ops::Operation, value::{KeyValue, Slice, Values}, - versionstamp::{ - generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete, - }, + versionstamp::{generate_versionstamp, substitute_raw_versionstamp}, }; pub enum TransactionCommand { @@ -321,12 +319,7 @@ impl TransactionTask { // Apply all operations to the transaction for op in operations { match op { - Operation::Set { key, value } => { - // Substitute versionstamp if incomplete - // For now, just use the simple substitution - we can improve this later - // to ensure all versionstamps in a transaction have the same base timestamp - let value = substitute_versionstamp_if_incomplete(value.clone(), 0); - + Operation::SetValue { key, value } => { txn.put(key, &value) .context("failed to set key in rocksdb")?; } diff --git a/engine/packages/universaldb/src/tx_ops.rs b/engine/packages/universaldb/src/tx_ops.rs index 878ae168e8..995315e00d 100644 --- a/engine/packages/universaldb/src/tx_ops.rs +++ b/engine/packages/universaldb/src/tx_ops.rs @@ -16,7 +16,7 @@ use crate::{ #[derive(Debug, Clone)] pub enum Operation { - Set { + SetValue { key: Vec, value: Vec, }, @@ -68,7 +68,7 @@ impl TransactionOperations { pub fn set(&self, key: &[u8], value: &[u8]) { self.add_conflict_range(key, &end_of_key_range(key), ConflictRangeType::Write); - self.add_operation(Operation::Set { + self.add_operation(Operation::SetValue { key: key.to_vec(), value: value.to_vec(), }); @@ -109,7 +109,7 @@ impl TransactionOperations { // Iterate through operations in reverse order to find the most recent operation for this key for op in self.operations().iter().rev() { match op { - Operation::Set { + Operation::SetValue { key: set_key, value, } if set_key.as_slice() == key => { @@ -231,7 +231,7 @@ impl TransactionOperations { for op in &*self.operations() { match op { - Operation::Set { key, .. } => { + Operation::SetValue { key, .. } => { local_keys.insert(key.clone(), ()); } Operation::Clear { key } => { @@ -356,7 +356,7 @@ impl TransactionOperations { // Apply local operations for op in &*self.operations() { match op { - Operation::Set { key, value } => { + Operation::SetValue { key, value } => { if key.as_slice() >= begin && key.as_slice() < end { result_map.insert(key.clone(), value.clone()); } diff --git a/engine/packages/universaldb/src/versionstamp.rs b/engine/packages/universaldb/src/versionstamp.rs index 1b84f8a2b4..53cb9107ea 100644 --- a/engine/packages/universaldb/src/versionstamp.rs +++ b/engine/packages/universaldb/src/versionstamp.rs @@ -164,47 +164,3 @@ pub fn pack_and_substitute_versionstamp( Ok(packed_data) } - -/// Checks if a value might contain an incomplete versionstamp and attempts to substitute it -/// -/// This is a helper function for database drivers that want to support versionstamp substitution. -/// It detects if a value contains an incomplete versionstamp by checking for the offset marker -/// at the end, and substitutes it with a generated versionstamp. -/// -/// Returns the potentially modified value. If the value doesn't contain a versionstamp marker -/// or substitution fails, returns the original value. -pub fn substitute_versionstamp_if_incomplete(mut value: Vec, user_version: u16) -> Vec { - // Check if the value contains an incomplete versionstamp - // An incomplete versionstamp has a 4-byte offset at the end - if value.len() >= 4 { - // Try to read the offset from the last 4 bytes - let offset_bytes = &value[value.len() - 4..]; - let offset = u32::from_le_bytes([ - offset_bytes[0], - offset_bytes[1], - offset_bytes[2], - offset_bytes[3], - ]) as usize; - - // Check if this could be a valid versionstamp offset - // The offset should point within the value (excluding the 4 offset bytes) - if offset < value.len() - 4 { - // Check for versionstamp marker at or near the offset - const VERSIONSTAMP_MARKER: u8 = 0x33; - let has_marker = value.get(offset) == Some(&VERSIONSTAMP_MARKER) - || (offset > 0 && value.get(offset - 1) == Some(&VERSIONSTAMP_MARKER)); - - if has_marker { - // This looks like it contains an incomplete versionstamp - // Generate a versionstamp and substitute it - let versionstamp = generate_versionstamp(user_version); - - // Substitute the versionstamp - // This will do nothing if the versionstamp is already complete - let _ = substitute_versionstamp(&mut value, versionstamp); - } - } - } - - value -} diff --git a/engine/packages/universaldb/tests/integration.rs b/engine/packages/universaldb/tests/integration.rs index ef990a8146..1d2504c418 100644 --- a/engine/packages/universaldb/tests/integration.rs +++ b/engine/packages/universaldb/tests/integration.rs @@ -74,6 +74,10 @@ async fn run_all_tests(db: universaldb::Database) { test_basic_operations(&db).await; clear_test_namespace(&db).await.unwrap(); + // Test that plain Set stores arbitrary binary values verbatim. + test_plain_set_preserves_versionstamp_like_binary(&db).await; + clear_test_namespace(&db).await.unwrap(); + // Test range operations test_range_operations(&db).await; clear_test_namespace(&db).await.unwrap(); @@ -210,8 +214,40 @@ async fn clear_test_namespace(db: &Database) -> Result<()> { let (begin, end) = test_subspace.range(); tx.clear_range(&begin, &end); Ok(()) + }) + .await +} + +async fn test_plain_set_preserves_versionstamp_like_binary(db: &Database) { + let test_subspace = Subspace::from("test"); + let key = test_subspace.pack(&("plain-set-versionstamp-like-binary",)); + let mut value = Vec::from([ + 0x33, 0x5a, 0x33, 0x59, 0x41, 0x37, 0x6c, 0x1d, 0x00, 0x09, 0x1f, 0x6b, 0x1d, 0x00, + 0x09, 0x1f, + ]); + value.extend_from_slice(&[0; 64]); + + db.run(|tx| { + let key = key.clone(); + let value = value.clone(); + async move { + tx.set(&key, &value); + Ok(()) + } }) .await + .unwrap(); + + let actual = db + .run(|tx| { + let key = key.clone(); + async move { tx.get(&key, Serializable).await } + }) + .await + .unwrap() + .unwrap(); + + assert_eq!(actual.as_slice(), value.as_slice()); } async fn test_basic_operations(db: &Database) {