Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 134 additions & 20 deletions sdk/couchbase-core/src/compressionmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,24 @@
*
*/

use snap::raw::Encoder;
use std::borrow::Cow;
use std::fmt::Debug;
use std::marker::PhantomData;

use snap::raw::Encoder;

use crate::error;
use crate::error::ErrorKind;
use crate::memdx::datatype::DataTypeFlag;
use crate::options::agent::{CompressionConfig, CompressionMode};

pub(crate) trait Compressor: Send + Sync + Debug {
fn new(compression_config: &CompressionConfig) -> Self;
// This is a bit of a weird signature, but it allows us to avoid allocations when no compression occurs.
fn compress<'a>(
&'a mut self,
&mut self,
connection_supports_snappy: bool,
datatype: DataTypeFlag,
input: &'a [u8],
) -> error::Result<(&'a [u8], u8)>;
) -> error::Result<(Cow<'a, [u8]>, u8)>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -64,12 +63,8 @@ pub(crate) struct StdCompressor {
compression_enabled: bool,
compression_min_size: usize,
compression_min_ratio: f64,

compressed_value: Vec<u8>,
}

impl StdCompressor {}

impl Compressor for StdCompressor {
fn new(compression_config: &CompressionConfig) -> Self {
let (compression_enabled, compression_min_size, compression_min_ratio) =
Expand All @@ -85,33 +80,31 @@ impl Compressor for StdCompressor {
compression_enabled,
compression_min_size,
compression_min_ratio,

compressed_value: Vec::new(),
}
}

fn compress<'a>(
&'a mut self,
&mut self,
connection_supports_snappy: bool,
datatype: DataTypeFlag,
input: &'a [u8],
) -> error::Result<(&'a [u8], u8)> {
) -> error::Result<(Cow<'a, [u8]>, u8)> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're considering leveraging Bytes elsewhere, why not use it here as well? One issue with using Cow when you use Bytes elsewhere is that you do not get the ability to maintain the same Bytes object through a function like this without resorting to some hackery.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for Cow here is that we return actually either a borrow or an owned value. In the case of Bytes i think we'd need to allocate in the borrow case.

if !connection_supports_snappy || !self.compression_enabled {
return Ok((input, u8::from(datatype)));
return Ok((Cow::Borrowed(input), u8::from(datatype)));
}

let datatype = u8::from(datatype);

// If the packet is already compressed then we don't want to compress it again.
if datatype & u8::from(DataTypeFlag::Compressed) != 0 {
return Ok((input, datatype));
return Ok((Cow::Borrowed(input), datatype));
}

let packet_size = input.len();

// Only compress values that are large enough to be worthwhile.
if packet_size <= self.compression_min_size {
return Ok((input, datatype));
return Ok((Cow::Borrowed(input), datatype));
}

let mut encoder = Encoder::new();
Expand All @@ -121,14 +114,135 @@ impl Compressor for StdCompressor {

// Only return the compressed value if the ratio of compressed:original is small enough.
if compressed_value.len() as f64 / packet_size as f64 > self.compression_min_ratio {
return Ok((input, datatype));
return Ok((Cow::Borrowed(input), datatype));
}

self.compressed_value = compressed_value;

Ok((
&self.compressed_value,
Cow::Owned(compressed_value),
datatype | u8::from(DataTypeFlag::Compressed),
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::borrow::Cow;

fn enabled_config(min_size: usize, min_ratio: f64) -> CompressionConfig {
CompressionConfig::new(CompressionMode::Enabled {
min_size,
min_ratio,
})
}

fn disabled_config() -> CompressionConfig {
CompressionConfig::new(CompressionMode::Disabled)
}

#[test]
fn disabled_compression_returns_input_unchanged() {
let mut compressor = StdCompressor::new(&disabled_config());
let input = b"hello world";

let (output, dt) = compressor
.compress(true, DataTypeFlag::Json, input)
.unwrap();

assert!(matches!(output, Cow::Borrowed(_)));
assert_eq!(&*output, input.as_slice());
assert_eq!(dt, u8::from(DataTypeFlag::Json));
}

#[test]
fn connection_without_snappy_returns_input_unchanged() {
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
let input = b"hello world";

let (output, dt) = compressor
.compress(false, DataTypeFlag::Json, input)
.unwrap();

assert!(matches!(output, Cow::Borrowed(_)));
assert_eq!(&*output, input.as_slice());
}

#[test]
fn already_compressed_returns_input_unchanged() {
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
let input = b"already compressed data";

let (output, dt) = compressor
.compress(true, DataTypeFlag::Compressed, input)
.unwrap();

assert!(matches!(output, Cow::Borrowed(_)));
assert_eq!(&*output, input.as_slice());
assert_eq!(dt, u8::from(DataTypeFlag::Compressed));
}

#[test]
fn input_below_min_size_returns_input_unchanged() {
let mut compressor = StdCompressor::new(&enabled_config(1024, 1.0));
let input = b"small";

let (output, dt) = compressor
.compress(true, DataTypeFlag::Json, input)
.unwrap();

assert!(matches!(output, Cow::Borrowed(_)));
assert_eq!(&*output, input.as_slice());
assert_eq!(dt, u8::from(DataTypeFlag::Json));
}

#[test]
fn compressible_input_returns_owned_with_compressed_flag() {
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
// Highly compressible: repeated bytes.
let input = vec![b'a'; 4096];

let (output, dt) = compressor
.compress(true, DataTypeFlag::Json, &input)
.unwrap();

assert!(matches!(output, Cow::Owned(_)));
assert!(output.len() < input.len());
assert_eq!(
dt,
u8::from(DataTypeFlag::Json) | u8::from(DataTypeFlag::Compressed)
);

// Verify it round-trips through snappy.
let decompressed = snap::raw::Decoder::new().decompress_vec(&output).unwrap();
assert_eq!(decompressed, input);
}

#[test]
fn poor_ratio_returns_input_unchanged() {
// Set a very aggressive ratio that compressed output can't beat.
let mut compressor = StdCompressor::new(&enabled_config(0, 0.01));
let input = vec![b'a'; 256];

let (output, dt) = compressor
.compress(true, DataTypeFlag::Json, &input)
.unwrap();

assert!(matches!(output, Cow::Borrowed(_)));
assert_eq!(&*output, input.as_slice());
assert_eq!(dt, u8::from(DataTypeFlag::Json));
}

#[test]
fn compression_manager_creates_compressor() {
let manager = CompressionManager::<StdCompressor>::new(enabled_config(0, 1.0));
let mut compressor = manager.compressor();
let input = vec![b'x'; 4096];

let (output, _dt) = compressor
.compress(true, DataTypeFlag::None, &input)
.unwrap();

assert!(matches!(output, Cow::Owned(_)));
assert!(output.len() < input.len());
}
}
18 changes: 9 additions & 9 deletions sdk/couchbase-core/src/crudcomponent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<
key: opts.key,
vbucket_id,
flags: opts.flags,
value,
value: &value,
datatype,
expiry: opts.expiry,
preserve_expiry: opts.preserve_expiry,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl<
Error::new_contextual_memdx_error(e)
})
.map_ok(|resp| GetResult {
value: resp.value.to_vec(),
value: resp.value,
datatype: resp.datatype,
cas: resp.cas,
flags: resp.flags,
Expand Down Expand Up @@ -233,7 +233,7 @@ impl<
Error::new_contextual_memdx_error(e)
})
.map_ok(|resp| GetMetaResult {
value: resp.value.to_vec(),
value: resp.value,
datatype: resp.datatype,
server_duration: resp.server_duration,
expiry: resp.expiry,
Expand Down Expand Up @@ -323,7 +323,7 @@ impl<
Error::new_contextual_memdx_error(e)
})
.map_ok(|resp| GetAndLockResult {
value: resp.value.to_vec(),
value: resp.value,
datatype: resp.datatype,
cas: resp.cas,
flags: resp.flags,
Expand Down Expand Up @@ -362,7 +362,7 @@ impl<
Error::new_contextual_memdx_error(e)
})
.map_ok(|resp| GetAndTouchResult {
value: resp.value.to_vec(),
value: resp.value,
datatype: resp.datatype,
cas: resp.cas,
flags: resp.flags,
Expand Down Expand Up @@ -469,7 +469,7 @@ impl<
key: opts.key,
vbucket_id,
flags: opts.flags,
value,
value: &value,
datatype,
expiry: opts.expiry,
on_behalf_of: None,
Expand Down Expand Up @@ -531,7 +531,7 @@ impl<
key: opts.key,
vbucket_id,
flags: opts.flags,
value,
value: &value,
datatype,
expiry: opts.expiry,
preserve_expiry: opts.preserve_expiry,
Expand Down Expand Up @@ -594,7 +594,7 @@ impl<
collection_id,
key: opts.key,
vbucket_id,
value,
value: &value,
datatype,
cas: opts.cas,
on_behalf_of: None,
Expand Down Expand Up @@ -655,7 +655,7 @@ impl<
collection_id,
key: opts.key,
vbucket_id,
value,
value: &value,
datatype,
cas: opts.cas,
on_behalf_of: None,
Expand Down
10 changes: 6 additions & 4 deletions sdk/couchbase-core/src/results/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*
*/

use bytes::Bytes;

use crate::error;
use crate::mutationtoken::MutationToken;
use std::time::Duration;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct GetResult {
pub value: Vec<u8>,
pub value: Bytes,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a serious breaking change. I'm not sure you can change this without a v2 release?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in core, which is basically labelled as "do not use" so should be very low risk.

pub flags: u32,
pub datatype: u8,
pub cas: u64,
Expand All @@ -32,7 +34,7 @@ pub struct GetResult {
pub struct GetMetaResult {
pub cas: u64,
pub flags: u32,
pub value: Vec<u8>,
pub value: Bytes,
pub datatype: u8,
pub server_duration: Option<Duration>,
pub expiry: u32,
Expand All @@ -54,15 +56,15 @@ pub struct DeleteResult {

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct GetAndLockResult {
pub value: Vec<u8>,
pub value: Bytes,
pub flags: u32,
pub datatype: u8,
pub cas: u64,
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct GetAndTouchResult {
pub value: Vec<u8>,
pub value: Bytes,
pub flags: u32,
pub datatype: u8,
pub cas: u64,
Expand Down
Loading
Loading