Skip to content

Commit 98b1479

Browse files
author
chengmiaomiao.123
committed
use flush to send
1 parent 6045f13 commit 98b1479

13 files changed

Lines changed: 356 additions & 124 deletions

File tree

Cargo.lock

Lines changed: 28 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

volo-thrift/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ tokio = { workspace = true, features = [
4444
"parking_lot",
4545
] }
4646
tracing.workspace = true
47+
tokio-condvar = "0.1.0"
4748

4849
[features]
4950
default = []

volo-thrift/src/client/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -444,9 +444,10 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
444444
}
445445

446446
#[derive(Clone)]
447-
pub struct MessageService<Resp, MkT, MkC>
447+
pub struct MessageService<Req, Resp, MkT, MkC>
448448
where
449-
Resp: EntryMessage + Send + 'static,
449+
Req: EntryMessage + Send + 'static + Sync,
450+
Resp: EntryMessage + Send + 'static + Sync,
450451
MkT: MakeTransport,
451452
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
452453
{
@@ -455,13 +456,13 @@ where
455456
#[cfg(feature = "multiplex")]
456457
inner: motore::utils::Either<
457458
pingpong::Client<Resp, MkT, MkC>,
458-
crate::transport::multiplex::Client<Resp, MkT, MkC>,
459+
crate::transport::multiplex::Client<Req, Resp, MkT, MkC>,
459460
>,
460461
}
461462

462-
impl<Req, Resp, MkT, MkC> Service<ClientContext, Req> for MessageService<Resp, MkT, MkC>
463+
impl<Req, Resp, MkT, MkC> Service<ClientContext, Req> for MessageService<Req, Resp, MkT, MkC>
463464
where
464-
Req: EntryMessage + 'static + Send,
465+
Req: Send + 'static + EntryMessage + Sync,
465466
Resp: Send + 'static + EntryMessage + Sync,
466467
MkT: MakeTransport,
467468
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
@@ -506,8 +507,8 @@ where
506507
+ Clone
507508
+ Sync,
508509
Req: EntryMessage + Send + 'static + Sync + Clone,
509-
Resp: EntryMessage + Send + 'static,
510-
IL: Layer<MessageService<Resp, MkT, MkC>>,
510+
Resp: EntryMessage + Send + 'static + Sync,
511+
IL: Layer<MessageService<Req, Resp, MkT, MkC>>,
511512
IL::Service:
512513
Service<ClientContext, Req, Response = Option<Resp>> + Sync + Clone + Send + 'static,
513514
<IL::Service as Service<ClientContext, Req>>::Error: Send + Into<Error>,

volo-thrift/src/codec/default/mod.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,7 @@ pub struct DefaultEncoder<E, W> {
116116
impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
117117
for DefaultEncoder<E, W>
118118
{
119-
#[inline]
120-
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
119+
async fn send<Req: Send + EntryMessage, Cx: ThriftContext>(
121120
&mut self,
122121
cx: &mut Cx,
123122
msg: ThriftMessage<Req>,
@@ -177,6 +176,54 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
177176
}
178177
// write_result
179178
}
179+
180+
#[inline]
181+
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
182+
&mut self,
183+
cx: &mut Cx,
184+
msg: ThriftMessage<Req>,
185+
) -> Result<(), crate::Error> {
186+
cx.stats_mut().record_encode_start_at();
187+
188+
// first, we need to get the size of the message
189+
let (real_size, malloc_size) = self.encoder.size(cx, &msg)?;
190+
trace!(
191+
"[VOLO] codec encode message real size: {}, malloc size: {}",
192+
real_size,
193+
malloc_size
194+
);
195+
cx.stats_mut().set_write_size(real_size);
196+
197+
// then we reserve the size of the message in the linked bytes
198+
self.linked_bytes.reserve(malloc_size);
199+
// after that, we encode the message into the linked bytes
200+
self.encoder
201+
.encode(cx, &mut self.linked_bytes, msg)
202+
.map_err(|e| {
203+
// record the error time
204+
cx.stats_mut().record_encode_end_at();
205+
e
206+
})?;
207+
208+
cx.stats_mut().record_encode_end_at();
209+
Ok(())
210+
}
211+
212+
async fn flush(&mut self) -> Result<(), crate::Error> {
213+
self.linked_bytes
214+
.write_all_vectored(&mut self.writer)
215+
.await
216+
.map_err(TransportError::from)?;
217+
218+
match self.writer.flush().await.map_err(TransportError::from) {
219+
Ok(()) => Ok(()),
220+
Err(e) => Err(e.into()),
221+
}
222+
}
223+
224+
async fn reset(&mut self) {
225+
self.linked_bytes.reset();
226+
}
180227
}
181228

182229
pub struct DefaultDecoder<D, R> {

volo-thrift/src/codec/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,19 @@ pub trait Decoder: Send + 'static {
2525
///
2626
/// Note: [`Encoder`] should be designed to be ready for reuse.
2727
pub trait Encoder: Send + 'static {
28+
fn reset(&mut self) -> impl Future<Output = ()> + Send;
29+
fn send<Req: Send + EntryMessage, Cx: ThriftContext>(
30+
&mut self,
31+
cx: &mut Cx,
32+
msg: ThriftMessage<Req>,
33+
) -> impl Future<Output = Result<(), crate::Error>> + Send;
2834
fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
2935
&mut self,
3036
cx: &mut Cx,
3137
msg: ThriftMessage<Req>,
3238
) -> impl Future<Output = Result<(), crate::Error>> + Send;
39+
40+
fn flush(&mut self) -> impl Future<Output = Result<(), crate::Error>> + Send;
3341
}
3442

3543
/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a

volo-thrift/src/transport/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
pub(crate) mod incoming;
2-
#[cfg(feature = "multiplex")]
32
pub mod multiplex;
43
pub mod pingpong;
54
pub mod pool;

volo-thrift/src/transport/multiplex/client.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ use crate::{
1515
EntryMessage, Error, ThriftMessage,
1616
};
1717

18-
pub struct MakeClientTransport<MkT, MkC, Resp>
18+
pub struct MakeClientTransport<MkT, MkC, Req, Resp>
1919
where
2020
MkT: MakeTransport,
2121
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>,
2222
{
2323
make_transport: MkT,
2424
make_codec: MkC,
25-
_phantom: PhantomData<fn() -> Resp>,
25+
_phantom: PhantomData<(fn() -> Resp, fn() -> Req)>,
2626
}
2727

28-
impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Resp> Clone
29-
for MakeClientTransport<MkT, MkC, Resp>
28+
impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Req, Resp> Clone
29+
for MakeClientTransport<MkT, MkC, Req, Resp>
3030
{
3131
fn clone(&self) -> Self {
3232
Self {
@@ -37,7 +37,7 @@ impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Resp> Cl
3737
}
3838
}
3939

40-
impl<MkT, MkC, Resp> MakeClientTransport<MkT, MkC, Resp>
40+
impl<MkT, MkC, Req, Resp> MakeClientTransport<MkT, MkC, Req, Resp>
4141
where
4242
MkT: MakeTransport,
4343
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>,
@@ -52,13 +52,14 @@ where
5252
}
5353
}
5454

55-
impl<MkT, MkC, Resp> UnaryService<Address> for MakeClientTransport<MkT, MkC, Resp>
55+
impl<MkT, MkC, Req, Resp> UnaryService<Address> for MakeClientTransport<MkT, MkC, Req, Resp>
5656
where
5757
MkT: MakeTransport,
5858
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
59-
Resp: EntryMessage + Send + 'static,
59+
Resp: EntryMessage + Send + 'static + Sync,
60+
Req: EntryMessage + Send + 'static + Sync,
6061
{
61-
type Response = ThriftTransport<MkC::Encoder, Resp>;
62+
type Response = ThriftTransport<MkC::Encoder, Req, Resp>;
6263
type Error = io::Error;
6364

6465
async fn call(&self, target: Address) -> Result<Self::Response, Self::Error> {
@@ -73,22 +74,24 @@ where
7374
}
7475
}
7576

76-
pub struct Client<Resp, MkT, MkC>
77+
pub struct Client<Req, Resp, MkT, MkC>
7778
where
7879
MkT: MakeTransport,
7980
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
80-
Resp: EntryMessage + Send + 'static,
81+
Resp: EntryMessage + Send + 'static + Sync,
82+
Req: EntryMessage + Send + 'static + Sync,
8183
{
8284
#[allow(clippy::type_complexity)]
83-
make_transport: PooledMakeTransport<MakeClientTransport<MkT, MkC, Resp>, Address>,
85+
make_transport: PooledMakeTransport<MakeClientTransport<MkT, MkC, Req, Resp>, Address>,
8486
_marker: PhantomData<Resp>,
8587
}
8688

87-
impl<Resp, MkT, MkC> Clone for Client<Resp, MkT, MkC>
89+
impl<Req, Resp, MkT, MkC> Clone for Client<Req, Resp, MkT, MkC>
8890
where
8991
MkT: MakeTransport,
9092
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
91-
Resp: EntryMessage + Send + 'static,
93+
Resp: EntryMessage + Send + 'static + Sync,
94+
Req: EntryMessage + Send + 'static + Sync,
9295
{
9396
fn clone(&self) -> Self {
9497
Self {
@@ -98,11 +101,12 @@ where
98101
}
99102
}
100103

101-
impl<Resp, MkT, MkC> Client<Resp, MkT, MkC>
104+
impl<Req, Resp, MkT, MkC> Client<Req, Resp, MkT, MkC>
102105
where
103106
MkT: MakeTransport,
104107
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
105-
Resp: EntryMessage + Send + 'static,
108+
Resp: EntryMessage + Send + 'static + Sync,
109+
Req: EntryMessage + Send + 'static + Sync,
106110
{
107111
pub fn new(make_transport: MkT, pool_cfg: Option<Config>, make_codec: MkC) -> Self {
108112
let make_transport = MakeClientTransport::new(make_transport, make_codec);
@@ -114,9 +118,9 @@ where
114118
}
115119
}
116120

117-
impl<Req, Resp, MkT, MkC> Service<ClientContext, ThriftMessage<Req>> for Client<Resp, MkT, MkC>
121+
impl<Req, Resp, MkT, MkC> Service<ClientContext, ThriftMessage<Req>> for Client<Req, Resp, MkT, MkC>
118122
where
119-
Req: Send + 'static + EntryMessage,
123+
Req: Send + 'static + EntryMessage + Sync,
120124
Resp: EntryMessage + Send + 'static + Sync,
121125
MkT: MakeTransport,
122126
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod client;
22
mod server;
33
mod thrift_transport;
4+
pub mod utils;
45

56
pub use client::Client;
67
pub use server::serve;

0 commit comments

Comments
 (0)