Skip to content

Commit ebd4dc0

Browse files
committed
RSCBC-32: Reduce allocations on KV hot paths
Changes: - Get result values use Bytes instead of Vec<u8>, eliminating a full document-body memcpy on every get/get_and_lock/get_and_touch/get_meta by propagating zero-copy Bytes from the codec through to the SDK layer - RetryRequest.retry_reasons is now Option<HashSet>, only allocating on the first retry attempt instead of every operation - Compressor.compress returns Cow<[u8]> instead of borrowing from self, removing heap state from StdCompressor entirely - MutationToken bucket_name uses Arc<str> instead of String, replacing a per-mutation String::clone with an atomic ref-count bump - Add unit tests for compression manager
1 parent cf5ae22 commit ebd4dc0

9 files changed

Lines changed: 199 additions & 69 deletions

File tree

sdk/couchbase-core/src/compressionmanager.rs

Lines changed: 133 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*
1717
*/
1818

19+
use std::borrow::Cow;
1920
use std::fmt::Debug;
2021
use std::marker::PhantomData;
2122

@@ -28,13 +29,12 @@ use crate::options::agent::{CompressionConfig, CompressionMode};
2829

2930
pub(crate) trait Compressor: Send + Sync + Debug {
3031
fn new(compression_config: &CompressionConfig) -> Self;
31-
// This is a bit of a weird signature, but it allows us to avoid allocations when no compression occurs.
3232
fn compress<'a>(
33-
&'a mut self,
33+
&mut self,
3434
connection_supports_snappy: bool,
3535
datatype: DataTypeFlag,
3636
input: &'a [u8],
37-
) -> error::Result<(&'a [u8], u8)>;
37+
) -> error::Result<(Cow<'a, [u8]>, u8)>;
3838
}
3939

4040
#[derive(Debug)]
@@ -64,12 +64,8 @@ pub(crate) struct StdCompressor {
6464
compression_enabled: bool,
6565
compression_min_size: usize,
6666
compression_min_ratio: f64,
67-
68-
compressed_value: Vec<u8>,
6967
}
7068

71-
impl StdCompressor {}
72-
7369
impl Compressor for StdCompressor {
7470
fn new(compression_config: &CompressionConfig) -> Self {
7571
let (compression_enabled, compression_min_size, compression_min_ratio) =
@@ -85,33 +81,31 @@ impl Compressor for StdCompressor {
8581
compression_enabled,
8682
compression_min_size,
8783
compression_min_ratio,
88-
89-
compressed_value: Vec::new(),
9084
}
9185
}
9286

9387
fn compress<'a>(
94-
&'a mut self,
88+
&mut self,
9589
connection_supports_snappy: bool,
9690
datatype: DataTypeFlag,
9791
input: &'a [u8],
98-
) -> error::Result<(&'a [u8], u8)> {
92+
) -> error::Result<(Cow<'a, [u8]>, u8)> {
9993
if !connection_supports_snappy || !self.compression_enabled {
100-
return Ok((input, u8::from(datatype)));
94+
return Ok((Cow::Borrowed(input), u8::from(datatype)));
10195
}
10296

10397
let datatype = u8::from(datatype);
10498

10599
// If the packet is already compressed then we don't want to compress it again.
106100
if datatype & u8::from(DataTypeFlag::Compressed) != 0 {
107-
return Ok((input, datatype));
101+
return Ok((Cow::Borrowed(input), datatype));
108102
}
109103

110104
let packet_size = input.len();
111105

112106
// Only compress values that are large enough to be worthwhile.
113107
if packet_size <= self.compression_min_size {
114-
return Ok((input, datatype));
108+
return Ok((Cow::Borrowed(input), datatype));
115109
}
116110

117111
let mut encoder = Encoder::new();
@@ -121,14 +115,135 @@ impl Compressor for StdCompressor {
121115

122116
// Only return the compressed value if the ratio of compressed:original is small enough.
123117
if compressed_value.len() as f64 / packet_size as f64 > self.compression_min_ratio {
124-
return Ok((input, datatype));
118+
return Ok((Cow::Borrowed(input), datatype));
125119
}
126120

127-
self.compressed_value = compressed_value;
128-
129121
Ok((
130-
&self.compressed_value,
122+
Cow::Owned(compressed_value),
131123
datatype | u8::from(DataTypeFlag::Compressed),
132124
))
133125
}
134126
}
127+
128+
#[cfg(test)]
129+
mod tests {
130+
use super::*;
131+
use std::borrow::Cow;
132+
133+
fn enabled_config(min_size: usize, min_ratio: f64) -> CompressionConfig {
134+
CompressionConfig::new(CompressionMode::Enabled {
135+
min_size,
136+
min_ratio,
137+
})
138+
}
139+
140+
fn disabled_config() -> CompressionConfig {
141+
CompressionConfig::new(CompressionMode::Disabled)
142+
}
143+
144+
#[test]
145+
fn disabled_compression_returns_input_unchanged() {
146+
let mut compressor = StdCompressor::new(&disabled_config());
147+
let input = b"hello world";
148+
149+
let (output, dt) = compressor
150+
.compress(true, DataTypeFlag::Json, input)
151+
.unwrap();
152+
153+
assert!(matches!(output, Cow::Borrowed(_)));
154+
assert_eq!(&*output, input.as_slice());
155+
assert_eq!(dt, u8::from(DataTypeFlag::Json));
156+
}
157+
158+
#[test]
159+
fn connection_without_snappy_returns_input_unchanged() {
160+
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
161+
let input = b"hello world";
162+
163+
let (output, dt) = compressor
164+
.compress(false, DataTypeFlag::Json, input)
165+
.unwrap();
166+
167+
assert!(matches!(output, Cow::Borrowed(_)));
168+
assert_eq!(&*output, input.as_slice());
169+
}
170+
171+
#[test]
172+
fn already_compressed_returns_input_unchanged() {
173+
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
174+
let input = b"already compressed data";
175+
176+
let (output, dt) = compressor
177+
.compress(true, DataTypeFlag::Compressed, input)
178+
.unwrap();
179+
180+
assert!(matches!(output, Cow::Borrowed(_)));
181+
assert_eq!(&*output, input.as_slice());
182+
assert_eq!(dt, u8::from(DataTypeFlag::Compressed));
183+
}
184+
185+
#[test]
186+
fn input_below_min_size_returns_input_unchanged() {
187+
let mut compressor = StdCompressor::new(&enabled_config(1024, 1.0));
188+
let input = b"small";
189+
190+
let (output, dt) = compressor
191+
.compress(true, DataTypeFlag::Json, input)
192+
.unwrap();
193+
194+
assert!(matches!(output, Cow::Borrowed(_)));
195+
assert_eq!(&*output, input.as_slice());
196+
assert_eq!(dt, u8::from(DataTypeFlag::Json));
197+
}
198+
199+
#[test]
200+
fn compressible_input_returns_owned_with_compressed_flag() {
201+
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
202+
// Highly compressible: repeated bytes.
203+
let input = vec![b'a'; 4096];
204+
205+
let (output, dt) = compressor
206+
.compress(true, DataTypeFlag::Json, &input)
207+
.unwrap();
208+
209+
assert!(matches!(output, Cow::Owned(_)));
210+
assert!(output.len() < input.len());
211+
assert_eq!(
212+
dt,
213+
u8::from(DataTypeFlag::Json) | u8::from(DataTypeFlag::Compressed)
214+
);
215+
216+
// Verify it round-trips through snappy.
217+
let decompressed = snap::raw::Decoder::new().decompress_vec(&output).unwrap();
218+
assert_eq!(decompressed, input);
219+
}
220+
221+
#[test]
222+
fn poor_ratio_returns_input_unchanged() {
223+
// Set a very aggressive ratio that compressed output can't beat.
224+
let mut compressor = StdCompressor::new(&enabled_config(0, 0.01));
225+
let input = vec![b'a'; 256];
226+
227+
let (output, dt) = compressor
228+
.compress(true, DataTypeFlag::Json, &input)
229+
.unwrap();
230+
231+
assert!(matches!(output, Cow::Borrowed(_)));
232+
assert_eq!(&*output, input.as_slice());
233+
assert_eq!(dt, u8::from(DataTypeFlag::Json));
234+
}
235+
236+
#[test]
237+
fn compression_manager_creates_compressor() {
238+
let manager = CompressionManager::<StdCompressor>::new(enabled_config(0, 1.0));
239+
let mut compressor = manager.compressor();
240+
let input = vec![b'x'; 4096];
241+
242+
let (output, _dt) = compressor
243+
.compress(true, DataTypeFlag::None, &input)
244+
.unwrap();
245+
246+
assert!(matches!(output, Cow::Owned(_)));
247+
assert!(output.len() < input.len());
248+
}
249+
}

sdk/couchbase-core/src/crudcomponent.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl<
130130
key: opts.key,
131131
vbucket_id,
132132
flags: opts.flags,
133-
value,
133+
value: &value,
134134
datatype,
135135
expiry: opts.expiry,
136136
preserve_expiry: opts.preserve_expiry,
@@ -195,7 +195,7 @@ impl<
195195
Error::new_contextual_memdx_error(e)
196196
})
197197
.map_ok(|resp| GetResult {
198-
value: resp.value.to_vec(),
198+
value: resp.value,
199199
datatype: resp.datatype,
200200
cas: resp.cas,
201201
flags: resp.flags,
@@ -233,7 +233,7 @@ impl<
233233
Error::new_contextual_memdx_error(e)
234234
})
235235
.map_ok(|resp| GetMetaResult {
236-
value: resp.value.to_vec(),
236+
value: resp.value,
237237
datatype: resp.datatype,
238238
server_duration: resp.server_duration,
239239
expiry: resp.expiry,
@@ -323,7 +323,7 @@ impl<
323323
Error::new_contextual_memdx_error(e)
324324
})
325325
.map_ok(|resp| GetAndLockResult {
326-
value: resp.value.to_vec(),
326+
value: resp.value,
327327
datatype: resp.datatype,
328328
cas: resp.cas,
329329
flags: resp.flags,
@@ -362,7 +362,7 @@ impl<
362362
Error::new_contextual_memdx_error(e)
363363
})
364364
.map_ok(|resp| GetAndTouchResult {
365-
value: resp.value.to_vec(),
365+
value: resp.value,
366366
datatype: resp.datatype,
367367
cas: resp.cas,
368368
flags: resp.flags,
@@ -469,7 +469,7 @@ impl<
469469
key: opts.key,
470470
vbucket_id,
471471
flags: opts.flags,
472-
value,
472+
value: &value,
473473
datatype,
474474
expiry: opts.expiry,
475475
on_behalf_of: None,
@@ -531,7 +531,7 @@ impl<
531531
key: opts.key,
532532
vbucket_id,
533533
flags: opts.flags,
534-
value,
534+
value: &value,
535535
datatype,
536536
expiry: opts.expiry,
537537
preserve_expiry: opts.preserve_expiry,
@@ -594,7 +594,7 @@ impl<
594594
collection_id,
595595
key: opts.key,
596596
vbucket_id,
597-
value,
597+
value: &value,
598598
datatype,
599599
cas: opts.cas,
600600
on_behalf_of: None,
@@ -655,7 +655,7 @@ impl<
655655
collection_id,
656656
key: opts.key,
657657
vbucket_id,
658-
value,
658+
value: &value,
659659
datatype,
660660
cas: opts.cas,
661661
on_behalf_of: None,

sdk/couchbase-core/src/results/kv.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
*
1717
*/
1818

19+
use bytes::Bytes;
20+
1921
use crate::error;
2022
use crate::mutationtoken::MutationToken;
2123
use std::time::Duration;
2224

2325
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
2426
pub struct GetResult {
25-
pub value: Vec<u8>,
27+
pub value: Bytes,
2628
pub flags: u32,
2729
pub datatype: u8,
2830
pub cas: u64,
@@ -32,7 +34,7 @@ pub struct GetResult {
3234
pub struct GetMetaResult {
3335
pub cas: u64,
3436
pub flags: u32,
35-
pub value: Vec<u8>,
37+
pub value: Bytes,
3638
pub datatype: u8,
3739
pub server_duration: Option<Duration>,
3840
pub expiry: u32,
@@ -54,15 +56,15 @@ pub struct DeleteResult {
5456

5557
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
5658
pub struct GetAndLockResult {
57-
pub value: Vec<u8>,
59+
pub value: Bytes,
5860
pub flags: u32,
5961
pub datatype: u8,
6062
pub cas: u64,
6163
}
6264

6365
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
6466
pub struct GetAndTouchResult {
65-
pub value: Vec<u8>,
67+
pub value: Bytes,
6668
pub flags: u32,
6769
pub datatype: u8,
6870
pub cas: u64,

0 commit comments

Comments
 (0)