Skip to content

Commit a55efae

Browse files
authored
Add Server side interceptor API and interceptor chaining (#2634)
1 parent 0695dd1 commit a55efae

2 files changed

Lines changed: 266 additions & 0 deletions

File tree

grpc/src/server/interceptor.rs

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
*
3+
* Copyright 2026 gRPC authors.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to
7+
* deal in the Software without restriction, including without limitation the
8+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9+
* sell copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21+
* IN THE SOFTWARE.
22+
*
23+
*/
24+
25+
use crate::client::CallOptions;
26+
use crate::core::RequestHeaders;
27+
use crate::core::Trailers;
28+
use crate::server::Handle;
29+
use crate::server::RecvStream;
30+
use crate::server::SendStream;
31+
32+
/// A trait which allows intercepting an incoming RPC call to a [`Handle`] implementation.
33+
#[trait_variant::make(Send)]
34+
pub trait Intercept: Sync + 'static {
35+
/// Intercepts an incoming call.
36+
///
37+
/// Implementations can wrap `tx` and `rx` before passing them to `next`.
38+
async fn intercept(
39+
&self,
40+
headers: RequestHeaders,
41+
options: CallOptions,
42+
tx: &mut impl SendStream,
43+
rx: impl RecvStream + 'static,
44+
next: &impl Handle,
45+
) -> Trailers;
46+
}
47+
48+
/// Wraps a [`Handle`] and an [`Intercept`] and implements [`Handle`] for the combination.
49+
pub struct Intercepted<H, I> {
50+
handle: H,
51+
intercept: I,
52+
}
53+
54+
impl<H, I> Intercepted<H, I> {
55+
/// Creates a new `Intercepted` wrapper combining a handle and an interceptor.
56+
pub fn new(handle: H, intercept: I) -> Self {
57+
Self { handle, intercept }
58+
}
59+
}
60+
61+
impl<H, I> Handle for Intercepted<H, I>
62+
where
63+
H: Handle + 'static,
64+
I: Intercept + 'static,
65+
{
66+
async fn handle(
67+
&self,
68+
headers: RequestHeaders,
69+
options: CallOptions,
70+
tx: &mut impl SendStream,
71+
rx: impl RecvStream + 'static,
72+
) -> Trailers {
73+
self.intercept
74+
.intercept(headers, options, tx, rx, &self.handle)
75+
.await
76+
}
77+
}
78+
79+
/// Implements methods for combining [`Handle`] implementations with [`Intercept`] interceptors.
80+
pub trait HandleExt: Handle + Sized {
81+
/// Wraps this [`Handle`] with the given [`Intercept`] interceptor.
82+
fn with_interceptor<I>(self, interceptor: I) -> Intercepted<Self, I>
83+
where
84+
I: Intercept,
85+
{
86+
Intercepted::new(self, interceptor)
87+
}
88+
}
89+
90+
impl<T: Handle + Sized> HandleExt for T {}
91+
92+
#[cfg(test)]
93+
mod test {
94+
use std::sync::Arc;
95+
96+
use tokio::sync::Mutex;
97+
98+
use super::*;
99+
use crate::client::CallOptions;
100+
use crate::core::RecvMessage;
101+
use crate::core::RequestHeaders;
102+
use crate::core::ServerResponseStreamItem;
103+
use crate::server::SendOptions;
104+
105+
struct MockSendStream;
106+
impl SendStream for MockSendStream {
107+
async fn send<'a>(
108+
&mut self,
109+
_item: ServerResponseStreamItem<'a>,
110+
_options: SendOptions,
111+
) -> Result<(), ()> {
112+
Ok(())
113+
}
114+
}
115+
116+
struct MockRecvStream;
117+
impl RecvStream for MockRecvStream {
118+
async fn next(&mut self, _msg: &mut dyn RecvMessage) -> Option<Result<(), ()>> {
119+
None
120+
}
121+
}
122+
123+
struct MockHandler {
124+
called: Arc<Mutex<bool>>,
125+
}
126+
127+
impl Handle for MockHandler {
128+
async fn handle(
129+
&self,
130+
_headers: RequestHeaders,
131+
_options: CallOptions,
132+
_tx: &mut impl SendStream,
133+
_rx: impl RecvStream + 'static,
134+
) -> Trailers {
135+
let mut called = self.called.lock().await;
136+
*called = true;
137+
Trailers::new(Ok(()))
138+
}
139+
}
140+
141+
struct MockInterceptor {
142+
called: Arc<Mutex<bool>>,
143+
}
144+
145+
impl Intercept for MockInterceptor {
146+
async fn intercept(
147+
&self,
148+
headers: RequestHeaders,
149+
options: CallOptions,
150+
tx: &mut impl SendStream,
151+
rx: impl RecvStream + 'static,
152+
next: &impl Handle,
153+
) -> Trailers {
154+
let mut called = self.called.lock().await;
155+
*called = true;
156+
drop(called);
157+
next.handle(headers, options, tx, rx).await
158+
}
159+
}
160+
161+
#[tokio::test]
162+
async fn test_simple_interceptor() {
163+
let handler_called = Arc::new(Mutex::new(false));
164+
let interceptor_called = Arc::new(Mutex::new(false));
165+
166+
let handler = MockHandler {
167+
called: handler_called.clone(),
168+
};
169+
let interceptor = MockInterceptor {
170+
called: interceptor_called.clone(),
171+
};
172+
173+
let chain = handler.with_interceptor(interceptor);
174+
175+
let mut tx = MockSendStream;
176+
let rx = MockRecvStream;
177+
178+
chain
179+
.handle(
180+
RequestHeaders::default(),
181+
CallOptions::default(),
182+
&mut tx,
183+
rx,
184+
)
185+
.await;
186+
187+
assert!(*interceptor_called.lock().await);
188+
assert!(*handler_called.lock().await);
189+
}
190+
191+
#[tokio::test]
192+
async fn test_interceptor_chaining_order() {
193+
let order = Arc::new(Mutex::new(Vec::new()));
194+
195+
struct OrderInterceptor {
196+
id: usize,
197+
order: Arc<Mutex<Vec<usize>>>,
198+
}
199+
200+
impl Intercept for OrderInterceptor {
201+
async fn intercept(
202+
&self,
203+
headers: RequestHeaders,
204+
options: CallOptions,
205+
tx: &mut impl SendStream,
206+
rx: impl RecvStream + 'static,
207+
next: &impl Handle,
208+
) -> Trailers {
209+
let mut order = self.order.lock().await;
210+
order.push(self.id);
211+
drop(order);
212+
next.handle(headers, options, tx, rx).await
213+
}
214+
}
215+
216+
struct OrderHandler {
217+
order: Arc<Mutex<Vec<usize>>>,
218+
}
219+
220+
impl Handle for OrderHandler {
221+
async fn handle(
222+
&self,
223+
_h: RequestHeaders,
224+
_o: CallOptions,
225+
_tx: &mut impl SendStream,
226+
_rx: impl RecvStream + 'static,
227+
) -> Trailers {
228+
let mut order = self.order.lock().await;
229+
order.push(0); // 0 represents the handler
230+
Trailers::new(Ok(()))
231+
}
232+
}
233+
234+
let handler = OrderHandler {
235+
order: order.clone(),
236+
};
237+
let int1 = OrderInterceptor {
238+
id: 1,
239+
order: order.clone(),
240+
};
241+
let int2 = OrderInterceptor {
242+
id: 2,
243+
order: order.clone(),
244+
};
245+
246+
// This should run int1 first, then int2, then handler.
247+
let chain = handler.with_interceptor(int2).with_interceptor(int1);
248+
249+
let mut tx = MockSendStream;
250+
let rx = MockRecvStream;
251+
252+
chain
253+
.handle(
254+
RequestHeaders::default(),
255+
CallOptions::default(),
256+
&mut tx,
257+
rx,
258+
)
259+
.await;
260+
261+
let final_order = order.lock().await;
262+
assert_eq!(*final_order, vec![1, 2, 0]);
263+
}
264+
}

grpc/src/server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use crate::core::ServerResponseStreamItem;
3232
use crate::core::Trailers;
3333
use tokio::sync::oneshot;
3434

35+
pub(crate) mod interceptor;
36+
3537
pub struct Server {
3638
handler: Option<Arc<dyn DynHandle>>,
3739
}

0 commit comments

Comments
 (0)