Skip to content

Commit 2e65c81

Browse files
committed
feat(codec): add compression API, standard algorithms, and global registry
This change introduces the foundational compression abstractions and implementations for the grpc server component. - **Compression API (`grpc/src/codec/compression.rs`)**: Defines the `Compressor` and `Decompressor` traits for handling the compression and decompression of gRPC payloads via byte buffers. - **Simplified Design Philosophy**: The API is intentionally kept simple. It deliberately avoids handling concerns like limiting input and output buffer sizes, instead delegating these safety boundaries to the higher-level gRPC business logic. - **Performance Improvements**: By relying on direct buffer manipulation, this implementation avoids the hidden memory copying overhead present in the `tonic` crate (which utilizes 8KB intermediate buffers for I/O). - **Standard Algorithms**: Implements `gzip`, `deflate`, and `zstd` compression logic in their respective modules. These are conditionally compiled based on feature flags. - **Global Registry (`registry.rs`)**: Introduces a thread-safe `GlobalCompressionRegistry` (backed by `RwLock` and `Arc`) to manage registered compressors and decompressors. It handles safe concurrent access and automatically updates the broadcasted `grpc-accept-encoding` headers. - **Module Integration**: Exposes the new `codec` and `compression` modules to the library's root structure.
1 parent a55efae commit 2e65c81

8 files changed

Lines changed: 801 additions & 0 deletions

File tree

grpc/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,15 @@ tls-rustls = [
4141
"dep:rustls-platform-verifier",
4242
"dep:rustls-webpki",
4343
]
44+
deflate = ["dep:flate2"]
45+
gzip = ["dep:flate2"]
46+
zstd = ["dep:zstd"]
4447

4548
[dependencies]
4649
base64 = "0.22"
4750
bytes = "1.10.1"
51+
flate2 = { version = "1.0", optional = true }
52+
zstd = { version = "0.13", optional = true }
4853
futures = { version = "0.3", default-features = false, optional = true }
4954
hickory-resolver = { version = "0.25.1", optional = true }
5055
http = "1.1.0"

grpc/src/codec.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
*
3+
* Copyright 2026 gRPC authors.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to
7+
* deal in the Software without restriction, including without limitation the
8+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9+
* sell copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21+
* IN THE SOFTWARE.
22+
*
23+
*/
24+
25+
pub(crate) mod compression;

grpc/src/codec/compression.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
*
3+
* Copyright 2026 gRPC authors.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to
7+
* deal in the Software without restriction, including without limitation the
8+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9+
* sell copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21+
* IN THE SOFTWARE.
22+
*
23+
*/
24+
25+
use bytes::Buf;
26+
use bytes::BufMut;
27+
28+
#[cfg(feature = "deflate")]
29+
mod deflate;
30+
#[cfg(feature = "gzip")]
31+
mod gzip;
32+
#[cfg(feature = "zstd")]
33+
mod zstd;
34+
35+
pub(crate) mod registry;
36+
37+
/// A trait for compressing outgoing gRPC payloads.
38+
pub trait Compressor: Send + Sync + 'static {
39+
/// The canonical gRPC content coding name (e.g., "gzip").
40+
fn name(&self) -> &'static str;
41+
42+
/// Compress data from `source` into `destination`.
43+
///
44+
/// # Errors
45+
///
46+
/// Returns an `io::Error` if compression fails. Implementations should gracefully
47+
/// handle constrained `destination` buffers by returning an error rather than panicking
48+
/// (e.g., by verifying `destination.remaining_mut()` is sufficient before writing).
49+
fn compress(&self, source: &mut dyn Buf, destination: &mut dyn BufMut) -> Result<(), String>;
50+
}
51+
52+
/// A trait for decompressing incoming gRPC payloads.
53+
pub trait Decompressor: Send + Sync + 'static {
54+
/// The canonical gRPC content coding name (e.g., "gzip").
55+
fn name(&self) -> &'static str;
56+
57+
/// Decompress data from `source` into `destination`.
58+
///
59+
/// # Errors
60+
///
61+
/// Returns an `io::Error` if decompression fails. Implementations should gracefully
62+
/// handle constrained `destination` buffers by returning an error rather than panicking
63+
/// (e.g., by verifying `destination.remaining_mut()` is sufficient before writing).
64+
fn decompress(&self, source: &mut dyn Buf, destination: &mut dyn BufMut) -> Result<(), String>;
65+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
*
3+
* Copyright 2026 gRPC authors.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to
7+
* deal in the Software without restriction, including without limitation the
8+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9+
* sell copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21+
* IN THE SOFTWARE.
22+
*
23+
*/
24+
25+
use std::io::Write;
26+
use std::io::{self};
27+
28+
use bytes::Buf;
29+
use bytes::BufMut;
30+
use flate2::Compression as FlateCompression;
31+
use flate2::write::ZlibDecoder;
32+
use flate2::write::ZlibEncoder;
33+
34+
use crate::codec::compression::Compressor;
35+
use crate::codec::compression::Decompressor;
36+
37+
/// A deflate compression implementation.
38+
#[derive(Debug, Clone, Copy)]
39+
pub struct Deflate {
40+
level: FlateCompression,
41+
}
42+
43+
impl Deflate {
44+
/// Creates a new deflate compression implementation.
45+
pub fn new() -> Self {
46+
Self {
47+
level: FlateCompression::new(6),
48+
}
49+
}
50+
}
51+
52+
impl Default for Deflate {
53+
fn default() -> Self {
54+
Self::new()
55+
}
56+
}
57+
58+
impl Compressor for Deflate {
59+
fn name(&self) -> &'static str {
60+
"deflate"
61+
}
62+
63+
fn compress(&self, source: &mut dyn Buf, destination: &mut dyn BufMut) -> Result<(), String> {
64+
let mut encoder = ZlibEncoder::new(destination.writer(), self.level);
65+
while source.has_remaining() {
66+
let chunk = source.chunk();
67+
encoder.write_all(chunk).map_err(|e| e.to_string())?;
68+
source.advance(chunk.len());
69+
}
70+
encoder.finish().map_err(|e| e.to_string())?;
71+
Ok(())
72+
}
73+
}
74+
75+
impl Decompressor for Deflate {
76+
fn name(&self) -> &'static str {
77+
"deflate"
78+
}
79+
80+
fn decompress(&self, source: &mut dyn Buf, destination: &mut dyn BufMut) -> Result<(), String> {
81+
let mut decoder = ZlibDecoder::new(destination.writer());
82+
while source.has_remaining() {
83+
let chunk = source.chunk();
84+
decoder.write_all(chunk).map_err(|e| e.to_string())?;
85+
source.advance(chunk.len());
86+
}
87+
decoder.finish().map_err(|e| e.to_string())?;
88+
Ok(())
89+
}
90+
}
91+
92+
#[cfg(test)]
93+
mod tests {
94+
use bytes::Bytes;
95+
96+
use super::*;
97+
98+
#[test]
99+
fn deflate_compress_decompress() {
100+
let compressor = Deflate::new();
101+
let data = Bytes::from_static(b"hello world");
102+
let mut compressed = Vec::new();
103+
compressor
104+
.compress(&mut data.clone(), &mut compressed)
105+
.unwrap();
106+
107+
assert_ne!(compressed.as_slice(), data);
108+
let mut decompressed = Vec::new();
109+
compressor
110+
.decompress(&mut compressed.as_slice(), &mut decompressed)
111+
.unwrap();
112+
assert_eq!(data, decompressed.as_slice());
113+
}
114+
}

grpc/src/codec/compression/gzip.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
*
3+
* Copyright 2026 gRPC authors.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to
7+
* deal in the Software without restriction, including without limitation the
8+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9+
* sell copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21+
* IN THE SOFTWARE.
22+
*
23+
*/
24+
25+
use std::io::Write;
26+
use std::io::{self};
27+
28+
use bytes::Buf;
29+
use bytes::BufMut;
30+
use flate2::Compression as FlateCompression;
31+
use flate2::write::GzDecoder;
32+
use flate2::write::GzEncoder;
33+
34+
use crate::codec::compression::Compressor;
35+
use crate::codec::compression::Decompressor;
36+
37+
/// A gzip compression implementation.
38+
#[derive(Debug, Clone, Copy)]
39+
pub struct Gzip {
40+
level: FlateCompression,
41+
}
42+
43+
impl Gzip {
44+
/// Creates a new gzip compression implementation.
45+
pub fn new() -> Self {
46+
Self {
47+
level: FlateCompression::new(6),
48+
}
49+
}
50+
}
51+
52+
impl Default for Gzip {
53+
fn default() -> Self {
54+
Self::new()
55+
}
56+
}
57+
58+
impl Compressor for Gzip {
59+
fn name(&self) -> &'static str {
60+
"gzip"
61+
}
62+
63+
fn compress(&self, source: &mut dyn Buf, destination: &mut dyn BufMut) -> Result<(), String> {
64+
let mut encoder = GzEncoder::new(destination.writer(), self.level);
65+
while source.has_remaining() {
66+
let chunk = source.chunk();
67+
encoder.write_all(chunk).map_err(|e| e.to_string())?;
68+
source.advance(chunk.len());
69+
}
70+
encoder.finish().map_err(|e| e.to_string())?;
71+
Ok(())
72+
}
73+
}
74+
75+
impl Decompressor for Gzip {
76+
fn name(&self) -> &'static str {
77+
"gzip"
78+
}
79+
80+
fn decompress(&self, source: &mut dyn Buf, destination: &mut dyn BufMut) -> Result<(), String> {
81+
let mut decoder = GzDecoder::new(destination.writer());
82+
while source.has_remaining() {
83+
let chunk = source.chunk();
84+
decoder.write_all(chunk).map_err(|e| e.to_string())?;
85+
source.advance(chunk.len());
86+
}
87+
decoder.finish().map_err(|e| e.to_string())?;
88+
Ok(())
89+
}
90+
}
91+
92+
#[cfg(test)]
93+
mod tests {
94+
use bytes::Bytes;
95+
96+
use super::*;
97+
98+
#[test]
99+
fn gzip_compress_decompress() {
100+
let compressor = Gzip::new();
101+
let data = Bytes::from_static(b"hello world");
102+
let mut compressed = Vec::new();
103+
compressor
104+
.compress(&mut data.clone(), &mut compressed)
105+
.unwrap();
106+
107+
assert_ne!(compressed.as_slice(), data);
108+
let mut decompressed = Vec::new();
109+
compressor
110+
.decompress(&mut compressed.as_slice(), &mut decompressed)
111+
.unwrap();
112+
assert_eq!(data, decompressed.as_slice());
113+
}
114+
}

0 commit comments

Comments
 (0)