Skip to content

Commit b4af9e3

Browse files
committed
feat(codec): Add ServerCompressionInterceptor for gRPC compression
⚠️ TRADEOFFS & LIMITATIONS ⚠️ Due to the current limitations of the `RecvStream` API (which lacks out-of-band compression signals), this implementation introduces a high degree of coupling between the HTTP transport, compression, and serialization layers. Consequently, this design necessitates type erasure(IncomingRawMessage) and incurs a forced `Box` allocation penalty to function within the existing stream boundaries. This change introduces the `ServerCompressionInterceptor`, which integrates compression and decompression capabilities into the server-side gRPC request/response lifecycle. Key additions: - **`ServerCompressionInterceptor`**: Implements the `Intercept` trait to wrap incoming requests. It parses the `grpc-encoding` and `grpc-accept-encoding` headers to determine the appropriate `Compressor` and `Decompressor` to use. - **Decompression Bomb Mitigation**: Protects against decompression bomb (zip bomb) attacks by strictly limiting writes during the decompression process. - **`CompressionResolver` Integration**: Uses a `CompressionResolver` registry to look up algorithms by name. - **Stream Wrapping**: Wraps the underlying `RecvStream` (to decompress incoming messages) and `SendStream` (to compress outgoing messages and inject the correct `grpc-encoding` header). - **Error Handling**: Returns standard gRPC status errors if an encoding is unsupported or if compression/decompression fails during stream processing. - **Unit Tests**: Includes a comprehensive suite of Tokio-based tests covering successful compression/decompression, unsupported encoding errors, registry lookup failures, and fallback mechanisms.
1 parent 2e65c81 commit b4af9e3

4 files changed

Lines changed: 1583 additions & 0 deletions

File tree

grpc/src/codec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@
2323
*/
2424

2525
pub(crate) mod compression;
26+
pub(crate) mod message;

grpc/src/codec/message.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use std::any::TypeId;
2+
use std::collections::VecDeque;
3+
4+
use bytes::Buf;
5+
use bytes::Bytes;
6+
7+
use crate::core::MessageType;
8+
use crate::core::RecvMessage;
9+
use crate::core::SendMessage;
10+
11+
/// An immutable value-type struct representing an incoming raw gRPC message.
12+
pub(crate) struct IncomingRawMessage {
13+
buf: Box<dyn Buf + Send + Sync>,
14+
compressed: bool,
15+
}
16+
17+
impl IncomingRawMessage {
18+
/// Constructs a new `IncomingRawMessage` initialized with a cheap empty buffer.
19+
pub(crate) fn new() -> Self {
20+
Self {
21+
buf: Box::new(Bytes::new()),
22+
compressed: false,
23+
}
24+
}
25+
26+
/// Destructures the message by value into its raw payload buffer and compression flag.
27+
pub(crate) fn into_parts(self) -> (Box<dyn Buf + Send + Sync>, bool) {
28+
(self.buf, self.compressed)
29+
}
30+
31+
/// Safely sets the per-message compression flag.
32+
pub(crate) fn set_compressed(&mut self, compressed: bool) {
33+
self.compressed = compressed;
34+
}
35+
}
36+
37+
impl Default for IncomingRawMessage {
38+
fn default() -> Self {
39+
Self::new()
40+
}
41+
}
42+
43+
impl MessageType for IncomingRawMessage {
44+
type Target<'a> = IncomingRawMessage;
45+
}
46+
47+
impl RecvMessage for IncomingRawMessage {
48+
fn decode(&mut self, data: &mut dyn Buf) -> Result<(), String> {
49+
// Directly updates the immutable value-type container's inner buffer
50+
self.buf = Box::new(data.copy_to_bytes(data.remaining()));
51+
Ok(())
52+
}
53+
54+
unsafe fn _ptr_for(&mut self, id: TypeId) -> Option<*mut ()> {
55+
if id == TypeId::of::<IncomingRawMessage>() {
56+
Some(self as *mut IncomingRawMessage as *mut ())
57+
} else {
58+
None
59+
}
60+
}
61+
}
62+
63+
/// A custom `Buf` implementation that streams sequentially through a deque of `Bytes` chunks.
64+
struct ChunkedBuf {
65+
chunks: VecDeque<Bytes>,
66+
}
67+
68+
impl Buf for ChunkedBuf {
69+
fn remaining(&self) -> usize {
70+
self.chunks.iter().map(|b| b.len()).sum()
71+
}
72+
73+
fn chunk(&self) -> &[u8] {
74+
self.chunks.front().map(|b| b.chunk()).unwrap_or(&[])
75+
}
76+
77+
fn advance(&mut self, mut cnt: usize) {
78+
while cnt > 0 {
79+
if let Some(front) = self.chunks.front_mut() {
80+
let len = front.len();
81+
if cnt >= len {
82+
cnt -= len;
83+
self.chunks.pop_front();
84+
} else {
85+
front.advance(cnt);
86+
break;
87+
}
88+
} else {
89+
break;
90+
}
91+
}
92+
}
93+
}
94+
95+
/// A raw outgoing message usable for configuring SendOptions cleanly.
96+
/// Stores data as a hybrid enum to allow zero-copy outbound serialization
97+
/// without allocating a `VecDeque` for standard contiguous messages.
98+
pub(crate) enum RawMessage {
99+
Contiguous(Bytes),
100+
Chunks(VecDeque<Bytes>),
101+
}
102+
103+
impl RawMessage {
104+
pub(crate) fn from_buf(mut buf: impl Buf) -> Self {
105+
let remaining = buf.remaining();
106+
if buf.chunk().len() == remaining {
107+
RawMessage::Contiguous(buf.copy_to_bytes(remaining))
108+
} else {
109+
let mut chunks = VecDeque::new();
110+
while buf.has_remaining() {
111+
let chunk_len = buf.chunk().len();
112+
chunks.push_back(buf.copy_to_bytes(chunk_len));
113+
}
114+
RawMessage::Chunks(chunks)
115+
}
116+
}
117+
}
118+
119+
impl SendMessage for RawMessage {
120+
fn encode(&self) -> Result<Box<dyn Buf + Send + Sync>, String> {
121+
match self {
122+
RawMessage::Contiguous(bytes) => Ok(Box::new(bytes.clone())),
123+
RawMessage::Chunks(chunks) => {
124+
// `Bytes` clones are cheap $O(1)$ reference bumps, preserving idempotency safely.
125+
Ok(Box::new(ChunkedBuf {
126+
chunks: chunks.clone(),
127+
}))
128+
}
129+
}
130+
}
131+
}

grpc/src/server/interceptor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use crate::server::Handle;
2929
use crate::server::RecvStream;
3030
use crate::server::SendStream;
3131

32+
pub mod compression;
33+
3234
/// A trait which allows intercepting an incoming RPC call to a [`Handle`] implementation.
3335
#[trait_variant::make(Send)]
3436
pub trait Intercept: Sync + 'static {

0 commit comments

Comments
 (0)