Skip to content

Commit db718f9

Browse files
committed
feat(codec): add compression API, standard algorithms, and global registry
This change introduces the foundational compression abstractions and implementations for the grpc server component. - **Compression API (`grpc/src/codec/compression.rs`)**: Defines the `Compressor` and `Decompressor` traits for handling the compression and decompression of gRPC payloads via byte buffers. - **Simplified Design Philosophy**: The API is intentionally kept simple. It deliberately avoids handling concerns like limiting input and output buffer sizes, instead delegating these safety boundaries to the higher-level gRPC business logic. - **Performance Improvements**: By relying on direct buffer manipulation, this implementation avoids the hidden memory copying overhead present in the `tonic` crate (which utilizes 8KB intermediate buffers for I/O). - **Standard Algorithms**: Implements `gzip`, `deflate`, and `zstd` compression logic in their respective modules. These are conditionally compiled based on feature flags. - **Global Registry (`registry.rs`)**: Introduces a thread-safe `GlobalCompressionRegistry` (backed by `RwLock` and `Arc`) to manage registered compressors and decompressors. It handles safe concurrent access and automatically updates the broadcasted `grpc-accept-encoding` headers. - **Module Integration**: Exposes the new `codec` and `compression` modules to the library's root structure.
1 parent a55efae commit db718f9

8 files changed

Lines changed: 683 additions & 0 deletions

File tree

grpc/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,15 @@ tls-rustls = [
4141
"dep:rustls-platform-verifier",
4242
"dep:rustls-webpki",
4343
]
44+
deflate = ["dep:flate2"]
45+
gzip = ["dep:flate2"]
46+
zstd = ["dep:zstd"]
4447

4548
[dependencies]
4649
base64 = "0.22"
4750
bytes = "1.10.1"
51+
flate2 = { version = "1.0", optional = true }
52+
zstd = { version = "0.13", optional = true }
4853
futures = { version = "0.3", default-features = false, optional = true }
4954
hickory-resolver = { version = "0.25.1", optional = true }
5055
http = "1.1.0"

grpc/src/codec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub(crate) mod compression;

grpc/src/codec/compression.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::io;
2+
3+
use bytes::Buf;
4+
use bytes::BufMut;
5+
6+
#[cfg(feature = "deflate")]
7+
mod deflate;
8+
#[cfg(feature = "gzip")]
9+
mod gzip;
10+
#[cfg(feature = "zstd")]
11+
mod zstd;
12+
13+
pub(crate) mod registry;
14+
15+
/// A trait for compressing outgoing gRPC payloads.
16+
pub trait Compressor: Send + Sync + 'static {
17+
/// The canonical gRPC content coding name (e.g., "gzip").
18+
fn name(&self) -> &'static str;
19+
20+
/// Compress data from `source` into `destination`.
21+
///
22+
/// # Errors
23+
///
24+
/// Returns an `io::Error` if compression fails. Implementations should gracefully
25+
/// handle constrained `destination` buffers by returning an error rather than panicking
26+
/// (e.g., by verifying `destination.remaining_mut()` is sufficient before writing).
27+
fn compress(&self, source: &mut dyn Buf, destination: &mut dyn BufMut)
28+
-> Result<(), io::Error>;
29+
}
30+
31+
/// A trait for decompressing incoming gRPC payloads.
32+
pub trait Decompressor: Send + Sync + 'static {
33+
/// The canonical gRPC content coding name (e.g., "gzip").
34+
fn name(&self) -> &'static str;
35+
36+
/// Decompress data from `source` into `destination`.
37+
///
38+
/// # Errors
39+
///
40+
/// Returns an `io::Error` if decompression fails. Implementations should gracefully
41+
/// handle constrained `destination` buffers by returning an error rather than panicking
42+
/// (e.g., by verifying `destination.remaining_mut()` is sufficient before writing).
43+
fn decompress(
44+
&self,
45+
source: &mut dyn Buf,
46+
destination: &mut dyn BufMut,
47+
) -> Result<(), io::Error>;
48+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::io::Write;
2+
use std::io::{self};
3+
4+
use bytes::Buf;
5+
use bytes::BufMut;
6+
use flate2::Compression as FlateCompression;
7+
use flate2::write::ZlibDecoder;
8+
use flate2::write::ZlibEncoder;
9+
10+
use crate::codec::compression::Compressor;
11+
use crate::codec::compression::Decompressor;
12+
13+
/// A deflate compression implementation.
14+
#[derive(Debug, Clone, Copy)]
15+
pub struct Deflate {
16+
level: FlateCompression,
17+
}
18+
19+
impl Deflate {
20+
/// Creates a new deflate compression implementation.
21+
pub fn new() -> Self {
22+
Self {
23+
level: FlateCompression::new(6),
24+
}
25+
}
26+
}
27+
28+
impl Default for Deflate {
29+
fn default() -> Self {
30+
Self::new()
31+
}
32+
}
33+
34+
impl Compressor for Deflate {
35+
fn name(&self) -> &'static str {
36+
"deflate"
37+
}
38+
39+
fn compress(
40+
&self,
41+
source: &mut dyn Buf,
42+
destination: &mut dyn BufMut,
43+
) -> Result<(), io::Error> {
44+
let mut encoder = ZlibEncoder::new(destination.writer(), self.level);
45+
while source.has_remaining() {
46+
let chunk = source.chunk();
47+
encoder.write_all(chunk)?;
48+
source.advance(chunk.len());
49+
}
50+
encoder.finish()?;
51+
Ok(())
52+
}
53+
}
54+
55+
impl Decompressor for Deflate {
56+
fn name(&self) -> &'static str {
57+
"deflate"
58+
}
59+
60+
fn decompress(
61+
&self,
62+
source: &mut dyn Buf,
63+
destination: &mut dyn BufMut,
64+
) -> Result<(), io::Error> {
65+
let mut decoder = ZlibDecoder::new(destination.writer());
66+
while source.has_remaining() {
67+
let chunk = source.chunk();
68+
decoder.write_all(chunk)?;
69+
source.advance(chunk.len());
70+
}
71+
decoder.finish()?;
72+
Ok(())
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {
78+
use bytes::Bytes;
79+
80+
use super::*;
81+
82+
#[test]
83+
fn deflate_compress_decompress() {
84+
let compressor = Deflate::new();
85+
let data = Bytes::from_static(b"hello world");
86+
let mut compressed = Vec::new();
87+
compressor
88+
.compress(&mut data.clone(), &mut compressed)
89+
.unwrap();
90+
let mut decompressed = Vec::new();
91+
compressor
92+
.decompress(&mut compressed.as_slice(), &mut decompressed)
93+
.unwrap();
94+
assert_eq!(data, decompressed.as_slice());
95+
}
96+
}

grpc/src/codec/compression/gzip.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::io::Write;
2+
use std::io::{self};
3+
4+
use bytes::Buf;
5+
use bytes::BufMut;
6+
use flate2::Compression as FlateCompression;
7+
use flate2::write::GzDecoder;
8+
use flate2::write::GzEncoder;
9+
10+
use crate::codec::compression::Compressor;
11+
use crate::codec::compression::Decompressor;
12+
13+
/// A gzip compression implementation.
14+
#[derive(Debug, Clone, Copy)]
15+
pub struct Gzip {
16+
level: FlateCompression,
17+
}
18+
19+
impl Gzip {
20+
/// Creates a new gzip compression implementation.
21+
pub fn new() -> Self {
22+
Self {
23+
level: FlateCompression::new(6),
24+
}
25+
}
26+
}
27+
28+
impl Default for Gzip {
29+
fn default() -> Self {
30+
Self::new()
31+
}
32+
}
33+
34+
impl Compressor for Gzip {
35+
fn name(&self) -> &'static str {
36+
"gzip"
37+
}
38+
39+
fn compress(
40+
&self,
41+
source: &mut dyn Buf,
42+
destination: &mut dyn BufMut,
43+
) -> Result<(), io::Error> {
44+
let mut encoder = GzEncoder::new(destination.writer(), self.level);
45+
while source.has_remaining() {
46+
let chunk = source.chunk();
47+
encoder.write_all(chunk)?;
48+
source.advance(chunk.len());
49+
}
50+
encoder.finish()?;
51+
Ok(())
52+
}
53+
}
54+
55+
impl Decompressor for Gzip {
56+
fn name(&self) -> &'static str {
57+
"gzip"
58+
}
59+
60+
fn decompress(
61+
&self,
62+
source: &mut dyn Buf,
63+
destination: &mut dyn BufMut,
64+
) -> Result<(), io::Error> {
65+
let mut decoder = GzDecoder::new(destination.writer());
66+
while source.has_remaining() {
67+
let chunk = source.chunk();
68+
decoder.write_all(chunk)?;
69+
source.advance(chunk.len());
70+
}
71+
decoder.finish()?;
72+
Ok(())
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {
78+
use bytes::Bytes;
79+
80+
use super::*;
81+
82+
#[test]
83+
fn gzip_compress_decompress() {
84+
let compressor = Gzip::new();
85+
let data = Bytes::from_static(b"hello world");
86+
let mut compressed = Vec::new();
87+
compressor
88+
.compress(&mut data.clone(), &mut compressed)
89+
.unwrap();
90+
let mut decompressed = Vec::new();
91+
compressor
92+
.decompress(&mut compressed.as_slice(), &mut decompressed)
93+
.unwrap();
94+
assert_eq!(data, decompressed.as_slice());
95+
}
96+
}

0 commit comments

Comments
 (0)