diff --git a/sdk/couchbase-core/src/compressionmanager.rs b/sdk/couchbase-core/src/compressionmanager.rs index c814bf34..af650684 100644 --- a/sdk/couchbase-core/src/compressionmanager.rs +++ b/sdk/couchbase-core/src/compressionmanager.rs @@ -16,11 +16,11 @@ * */ +use snap::raw::Encoder; +use std::borrow::Cow; use std::fmt::Debug; use std::marker::PhantomData; -use snap::raw::Encoder; - use crate::error; use crate::error::ErrorKind; use crate::memdx::datatype::DataTypeFlag; @@ -28,13 +28,12 @@ use crate::options::agent::{CompressionConfig, CompressionMode}; pub(crate) trait Compressor: Send + Sync + Debug { fn new(compression_config: &CompressionConfig) -> Self; - // This is a bit of a weird signature, but it allows us to avoid allocations when no compression occurs. fn compress<'a>( - &'a mut self, + &mut self, connection_supports_snappy: bool, datatype: DataTypeFlag, input: &'a [u8], - ) -> error::Result<(&'a [u8], u8)>; + ) -> error::Result<(Cow<'a, [u8]>, u8)>; } #[derive(Debug)] @@ -64,12 +63,8 @@ pub(crate) struct StdCompressor { compression_enabled: bool, compression_min_size: usize, compression_min_ratio: f64, - - compressed_value: Vec, } -impl StdCompressor {} - impl Compressor for StdCompressor { fn new(compression_config: &CompressionConfig) -> Self { let (compression_enabled, compression_min_size, compression_min_ratio) = @@ -85,33 +80,31 @@ impl Compressor for StdCompressor { compression_enabled, compression_min_size, compression_min_ratio, - - compressed_value: Vec::new(), } } fn compress<'a>( - &'a mut self, + &mut self, connection_supports_snappy: bool, datatype: DataTypeFlag, input: &'a [u8], - ) -> error::Result<(&'a [u8], u8)> { + ) -> error::Result<(Cow<'a, [u8]>, u8)> { if !connection_supports_snappy || !self.compression_enabled { - return Ok((input, u8::from(datatype))); + return Ok((Cow::Borrowed(input), u8::from(datatype))); } let datatype = u8::from(datatype); // If the packet is already compressed then we don't want to compress it again. if datatype & u8::from(DataTypeFlag::Compressed) != 0 { - return Ok((input, datatype)); + return Ok((Cow::Borrowed(input), datatype)); } let packet_size = input.len(); // Only compress values that are large enough to be worthwhile. if packet_size <= self.compression_min_size { - return Ok((input, datatype)); + return Ok((Cow::Borrowed(input), datatype)); } let mut encoder = Encoder::new(); @@ -121,14 +114,135 @@ impl Compressor for StdCompressor { // Only return the compressed value if the ratio of compressed:original is small enough. if compressed_value.len() as f64 / packet_size as f64 > self.compression_min_ratio { - return Ok((input, datatype)); + return Ok((Cow::Borrowed(input), datatype)); } - self.compressed_value = compressed_value; - Ok(( - &self.compressed_value, + Cow::Owned(compressed_value), datatype | u8::from(DataTypeFlag::Compressed), )) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::borrow::Cow; + + fn enabled_config(min_size: usize, min_ratio: f64) -> CompressionConfig { + CompressionConfig::new(CompressionMode::Enabled { + min_size, + min_ratio, + }) + } + + fn disabled_config() -> CompressionConfig { + CompressionConfig::new(CompressionMode::Disabled) + } + + #[test] + fn disabled_compression_returns_input_unchanged() { + let mut compressor = StdCompressor::new(&disabled_config()); + let input = b"hello world"; + + let (output, dt) = compressor + .compress(true, DataTypeFlag::Json, input) + .unwrap(); + + assert!(matches!(output, Cow::Borrowed(_))); + assert_eq!(&*output, input.as_slice()); + assert_eq!(dt, u8::from(DataTypeFlag::Json)); + } + + #[test] + fn connection_without_snappy_returns_input_unchanged() { + let mut compressor = StdCompressor::new(&enabled_config(0, 1.0)); + let input = b"hello world"; + + let (output, dt) = compressor + .compress(false, DataTypeFlag::Json, input) + .unwrap(); + + assert!(matches!(output, Cow::Borrowed(_))); + assert_eq!(&*output, input.as_slice()); + } + + #[test] + fn already_compressed_returns_input_unchanged() { + let mut compressor = StdCompressor::new(&enabled_config(0, 1.0)); + let input = b"already compressed data"; + + let (output, dt) = compressor + .compress(true, DataTypeFlag::Compressed, input) + .unwrap(); + + assert!(matches!(output, Cow::Borrowed(_))); + assert_eq!(&*output, input.as_slice()); + assert_eq!(dt, u8::from(DataTypeFlag::Compressed)); + } + + #[test] + fn input_below_min_size_returns_input_unchanged() { + let mut compressor = StdCompressor::new(&enabled_config(1024, 1.0)); + let input = b"small"; + + let (output, dt) = compressor + .compress(true, DataTypeFlag::Json, input) + .unwrap(); + + assert!(matches!(output, Cow::Borrowed(_))); + assert_eq!(&*output, input.as_slice()); + assert_eq!(dt, u8::from(DataTypeFlag::Json)); + } + + #[test] + fn compressible_input_returns_owned_with_compressed_flag() { + let mut compressor = StdCompressor::new(&enabled_config(0, 1.0)); + // Highly compressible: repeated bytes. + let input = vec![b'a'; 4096]; + + let (output, dt) = compressor + .compress(true, DataTypeFlag::Json, &input) + .unwrap(); + + assert!(matches!(output, Cow::Owned(_))); + assert!(output.len() < input.len()); + assert_eq!( + dt, + u8::from(DataTypeFlag::Json) | u8::from(DataTypeFlag::Compressed) + ); + + // Verify it round-trips through snappy. + let decompressed = snap::raw::Decoder::new().decompress_vec(&output).unwrap(); + assert_eq!(decompressed, input); + } + + #[test] + fn poor_ratio_returns_input_unchanged() { + // Set a very aggressive ratio that compressed output can't beat. + let mut compressor = StdCompressor::new(&enabled_config(0, 0.01)); + let input = vec![b'a'; 256]; + + let (output, dt) = compressor + .compress(true, DataTypeFlag::Json, &input) + .unwrap(); + + assert!(matches!(output, Cow::Borrowed(_))); + assert_eq!(&*output, input.as_slice()); + assert_eq!(dt, u8::from(DataTypeFlag::Json)); + } + + #[test] + fn compression_manager_creates_compressor() { + let manager = CompressionManager::::new(enabled_config(0, 1.0)); + let mut compressor = manager.compressor(); + let input = vec![b'x'; 4096]; + + let (output, _dt) = compressor + .compress(true, DataTypeFlag::None, &input) + .unwrap(); + + assert!(matches!(output, Cow::Owned(_))); + assert!(output.len() < input.len()); + } +} diff --git a/sdk/couchbase-core/src/crudcomponent.rs b/sdk/couchbase-core/src/crudcomponent.rs index 15ea9b08..feacfc0e 100644 --- a/sdk/couchbase-core/src/crudcomponent.rs +++ b/sdk/couchbase-core/src/crudcomponent.rs @@ -130,7 +130,7 @@ impl< key: opts.key, vbucket_id, flags: opts.flags, - value, + value: &value, datatype, expiry: opts.expiry, preserve_expiry: opts.preserve_expiry, @@ -195,7 +195,7 @@ impl< Error::new_contextual_memdx_error(e) }) .map_ok(|resp| GetResult { - value: resp.value.to_vec(), + value: resp.value, datatype: resp.datatype, cas: resp.cas, flags: resp.flags, @@ -233,7 +233,7 @@ impl< Error::new_contextual_memdx_error(e) }) .map_ok(|resp| GetMetaResult { - value: resp.value.to_vec(), + value: resp.value, datatype: resp.datatype, server_duration: resp.server_duration, expiry: resp.expiry, @@ -323,7 +323,7 @@ impl< Error::new_contextual_memdx_error(e) }) .map_ok(|resp| GetAndLockResult { - value: resp.value.to_vec(), + value: resp.value, datatype: resp.datatype, cas: resp.cas, flags: resp.flags, @@ -362,7 +362,7 @@ impl< Error::new_contextual_memdx_error(e) }) .map_ok(|resp| GetAndTouchResult { - value: resp.value.to_vec(), + value: resp.value, datatype: resp.datatype, cas: resp.cas, flags: resp.flags, @@ -469,7 +469,7 @@ impl< key: opts.key, vbucket_id, flags: opts.flags, - value, + value: &value, datatype, expiry: opts.expiry, on_behalf_of: None, @@ -531,7 +531,7 @@ impl< key: opts.key, vbucket_id, flags: opts.flags, - value, + value: &value, datatype, expiry: opts.expiry, preserve_expiry: opts.preserve_expiry, @@ -594,7 +594,7 @@ impl< collection_id, key: opts.key, vbucket_id, - value, + value: &value, datatype, cas: opts.cas, on_behalf_of: None, @@ -655,7 +655,7 @@ impl< collection_id, key: opts.key, vbucket_id, - value, + value: &value, datatype, cas: opts.cas, on_behalf_of: None, diff --git a/sdk/couchbase-core/src/results/kv.rs b/sdk/couchbase-core/src/results/kv.rs index ec455db1..cce9b05e 100644 --- a/sdk/couchbase-core/src/results/kv.rs +++ b/sdk/couchbase-core/src/results/kv.rs @@ -16,13 +16,15 @@ * */ +use bytes::Bytes; + use crate::error; use crate::mutationtoken::MutationToken; use std::time::Duration; #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct GetResult { - pub value: Vec, + pub value: Bytes, pub flags: u32, pub datatype: u8, pub cas: u64, @@ -32,7 +34,7 @@ pub struct GetResult { pub struct GetMetaResult { pub cas: u64, pub flags: u32, - pub value: Vec, + pub value: Bytes, pub datatype: u8, pub server_duration: Option, pub expiry: u32, @@ -54,7 +56,7 @@ pub struct DeleteResult { #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct GetAndLockResult { - pub value: Vec, + pub value: Bytes, pub flags: u32, pub datatype: u8, pub cas: u64, @@ -62,7 +64,7 @@ pub struct GetAndLockResult { #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct GetAndTouchResult { - pub value: Vec, + pub value: Bytes, pub flags: u32, pub datatype: u8, pub cas: u64, diff --git a/sdk/couchbase-core/src/retry.rs b/sdk/couchbase-core/src/retry.rs index 20657ee0..323d4c8d 100644 --- a/sdk/couchbase-core/src/retry.rs +++ b/sdk/couchbase-core/src/retry.rs @@ -213,7 +213,8 @@ pub struct RetryRequest { /// The number of retry attempts that have already been made. pub retry_attempts: u32, /// The set of reasons this request has been retried so far. - pub retry_reasons: HashSet, + /// `None` until the first retry attempt occurs. + pub retry_reasons: Option>, pub(crate) unique_id: Option, } @@ -223,7 +224,7 @@ impl RetryRequest { operation, is_idempotent, retry_attempts: 0, - retry_reasons: Default::default(), + retry_reasons: None, unique_id: None, } } @@ -231,7 +232,9 @@ impl RetryRequest { pub(crate) fn add_retry_attempt(&mut self, reason: RetryReason) { self.retry_attempts += 1; tracing::Span::current().record(SPAN_ATTRIB_RETRIES, self.retry_attempts); - self.retry_reasons.insert(reason); + self.retry_reasons + .get_or_insert_with(HashSet::new) + .insert(reason); } pub fn is_idempotent(&self) -> bool { @@ -242,8 +245,8 @@ impl RetryRequest { self.retry_attempts } - pub fn retry_reasons(&self) -> &HashSet { - &self.retry_reasons + pub fn retry_reasons(&self) -> Option<&HashSet> { + self.retry_reasons.as_ref() } } @@ -253,14 +256,16 @@ impl Display for RetryRequest { f, "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}", self.operation, - self.unique_id.as_ref().unwrap_or(&"".to_string()), + self.unique_id.as_deref().unwrap_or(""), self.is_idempotent, self.retry_attempts, - self.retry_reasons - .iter() - .map(|r| r.to_string()) - .collect::>() - .join(", ") + self.retry_reasons() + .map(|reasons| reasons + .iter() + .map(|r| r.to_string()) + .collect::>() + .join(", ")) + .unwrap_or_default() ) } } diff --git a/sdk/couchbase-core/tests/agent_ops.rs b/sdk/couchbase-core/tests/agent_ops.rs index 9b17c0c9..8224faac 100644 --- a/sdk/couchbase-core/tests/agent_ops.rs +++ b/sdk/couchbase-core/tests/agent_ops.rs @@ -353,7 +353,7 @@ fn test_append_and_prepend() { .await .unwrap(); - assert_eq!("the answer is 42".as_bytes(), get_result.value.as_slice()); + assert_eq!("the answer is 42".as_bytes(), &get_result.value[..]); assert_eq!(append_result.cas, get_result.cas); }) } diff --git a/sdk/couchbase-core/tests/allocations.rs b/sdk/couchbase-core/tests/allocations.rs index da0c56f5..eeaa782d 100644 --- a/sdk/couchbase-core/tests/allocations.rs +++ b/sdk/couchbase-core/tests/allocations.rs @@ -181,9 +181,9 @@ fn get() { .retry_strategy(Arc::new(FailFastRetryStrategy::default())); let expected_allocs: u64 = if agent.test_setup_config.use_ssl { - 14 + 13 } else { - 12 + 11 }; ensure_agent_ready(&agent).await; @@ -230,8 +230,9 @@ where let total_allocs = stats2.total_blocks - stats1.total_blocks; - dhat::assert!( - total_allocs <= expected_allocs, + dhat::assert_eq!( + total_allocs, + expected_allocs, "Expected max {} allocations, was {}", expected_allocs, total_allocs diff --git a/sdk/couchbase/src/clients/couchbase_core_kv_client.rs b/sdk/couchbase/src/clients/couchbase_core_kv_client.rs index 8e8b2708..3970298a 100644 --- a/sdk/couchbase/src/clients/couchbase_core_kv_client.rs +++ b/sdk/couchbase/src/clients/couchbase_core_kv_client.rs @@ -34,6 +34,7 @@ use crate::results::projection::{build_from_full_doc, build_from_subdoc_entries} use crate::retry::RetryStrategy; use crate::subdoc::lookup_in_specs::{GetSpecOptions, LookupInSpec}; use crate::subdoc::mutate_in_specs::MutateInSpec; +use bytes::Bytes; use chrono::{DateTime, Utc}; use couchbase_core::memdx::subdoc::{reorder_subdoc_ops, MutateInOp, SubdocDocFlag}; use std::sync::Arc; @@ -44,7 +45,7 @@ const SECS_IN_DAY: u64 = 24 * 60 * 60; #[derive(Clone)] pub(crate) struct CouchbaseCoreKvClient { agent_provider: CouchbaseAgentProvider, - bucket_name: String, + bucket_name: Arc, scope_name: String, collection_name: String, @@ -61,7 +62,7 @@ impl CouchbaseCoreKvClient { ) -> Self { Self { agent_provider, - bucket_name, + bucket_name: Arc::from(bucket_name), scope_name, collection_name, default_retry_strategy, @@ -344,15 +345,14 @@ impl CouchbaseCoreKvClient { } } - let mut content: Vec; - if full_lookup { - content = build_from_full_doc(&res, options.projections.as_deref())?; + let content = if full_lookup { + build_from_full_doc(&res, options.projections.as_deref())? } else { - content = build_from_subdoc_entries(&specs, &res.entries)?; - } + build_from_subdoc_entries(&specs, &res.entries)? + }; Ok(GetResult { - content, + content: Bytes::from(content), flags, cas: res.cas, expiry_time: expires_at, diff --git a/sdk/couchbase/src/mutation_state.rs b/sdk/couchbase/src/mutation_state.rs index 0371b582..68e1fda8 100644 --- a/sdk/couchbase/src/mutation_state.rs +++ b/sdk/couchbase/src/mutation_state.rs @@ -30,6 +30,7 @@ use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::collections::HashMap; use std::fmt; use std::fmt::Write; +use std::sync::Arc; /// A token representing a specific mutation on a specific vBucket. /// @@ -39,22 +40,27 @@ use std::fmt::Write; #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct MutationToken { pub(crate) token: couchbase_core::mutationtoken::MutationToken, - pub(crate) bucket_name: String, + pub(crate) bucket_name: Arc, } impl MutationToken { pub(crate) fn new( token: couchbase_core::mutationtoken::MutationToken, - bucket_name: String, + bucket_name: Arc, ) -> Self { Self { token, bucket_name } } #[cfg(feature = "internal")] - pub fn from_parts(vbid: u16, vbuuid: u64, seqno: u64, bucket_name: String) -> Self { + pub fn from_parts( + vbid: u16, + vbuuid: u64, + seqno: u64, + bucket_name: impl Into>, + ) -> Self { Self { token: couchbase_core::mutationtoken::MutationToken::new(vbid, vbuuid, seqno), - bucket_name, + bucket_name: bucket_name.into(), } } @@ -114,7 +120,7 @@ pub struct MutationState { #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] struct MutationStateKey { - bucket_name: String, + bucket_name: Arc, vbid: u16, } @@ -124,7 +130,7 @@ impl Serialize for MutationStateKey { S: Serializer, { let mut map = serializer.serialize_map(Some(1))?; - map.serialize_entry(&self.bucket_name, &self.vbid)?; + map.serialize_entry(&*self.bucket_name, &self.vbid)?; map.end() } } @@ -187,7 +193,7 @@ impl From for HashMap { fn from(value: MutationState) -> Self { let mut buckets: HashMap = HashMap::default(); for (key, token) in value.tokens { - let bucket = buckets.entry(key.bucket_name.clone()).or_default(); + let bucket = buckets.entry(key.bucket_name.to_string()).or_default(); bucket.insert( key.vbid.to_string(), ScanVectorEntry::new(token.seqno(), token.vbuuid().to_string()), @@ -207,7 +213,7 @@ impl Serialize for MutationState { let mut buckets: HashMap> = HashMap::new(); for (key, token) in &self.tokens { - let bucket = buckets.entry(key.bucket_name.clone()).or_default(); + let bucket = buckets.entry(key.bucket_name.to_string()).or_default(); bucket.insert( key.vbid.to_string(), (token.seqno(), token.vbuuid().to_string()), @@ -245,6 +251,7 @@ impl<'de> Deserialize<'de> for MutationState { while let Some((bucket_name, vbuckets)) = map.next_entry::>()? { + let bucket_name = Arc::::from(bucket_name); for (vbid, (seqno, vbuuid)) in vbuckets { let key = MutationStateKey { bucket_name: bucket_name.clone(), @@ -284,17 +291,18 @@ macro_rules! mutation_state { mod tests { use crate::mutation_state::MutationState; use crate::mutation_state::MutationToken; + use std::sync::Arc; #[test] fn serialization() { let mutation_state = mutation_state! { MutationToken::new( couchbase_core::mutationtoken::MutationToken::new(1, 1234, 1), - "default".to_string(), + Arc::from("default"), ), MutationToken::new( couchbase_core::mutationtoken::MutationToken::new(25, 5678, 10), - "beer-sample".to_string(), + Arc::from("beer-sample"), ) }; @@ -311,11 +319,11 @@ mod tests { let tokens = mutation_state.tokens(); assert!(tokens.contains(&MutationToken::new( couchbase_core::mutationtoken::MutationToken::new(1, 1234, 1), - "default".to_string(), + Arc::from("default"), ))); assert!(tokens.contains(&MutationToken::new( couchbase_core::mutationtoken::MutationToken::new(25, 5678, 10), - "beer-sample".to_string(), + Arc::from("beer-sample"), ))); } } diff --git a/sdk/couchbase/src/results/kv_results.rs b/sdk/couchbase/src/results/kv_results.rs index fcf485db..5c215104 100644 --- a/sdk/couchbase/src/results/kv_results.rs +++ b/sdk/couchbase/src/results/kv_results.rs @@ -49,7 +49,7 @@ use serde::de::DeserializeOwned; /// ``` #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] pub struct GetResult { - pub(crate) content: Vec, + pub(crate) content: Bytes, pub(crate) flags: u32, pub(crate) cas: u64, pub(crate) expiry_time: Option>,