Skip to content

Commit 6745229

Browse files
committed
feat(client): throw specififc error if h1 stream not in correct state, add middleware to retry sending request for this error
1 parent cec4ef4 commit 6745229

7 files changed

Lines changed: 173 additions & 9 deletions

File tree

client/src/h1/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@ use std::{error, io};
22

33
use xitca_http::h1::proto::error::ProtoError;
44

5+
#[derive(Debug)]
6+
pub enum UnexpectedStateError {
7+
RemainingData,
8+
ConnectionClosed,
9+
Io(io::Error),
10+
}
11+
512
#[derive(Debug)]
613
pub enum Error {
714
Std(Box<dyn error::Error + Send + Sync>),
815
Io(io::Error),
916
Proto(ProtoError),
17+
UnexpectedState(UnexpectedStateError),
1018
}
1119

1220
impl From<Box<dyn error::Error + Send + Sync>> for Error {
@@ -26,3 +34,9 @@ impl From<ProtoError> for Error {
2634
Self::Proto(e)
2735
}
2836
}
37+
38+
impl From<UnexpectedStateError> for Error {
39+
fn from(e: UnexpectedStateError) -> Self {
40+
Self::UnexpectedState(e)
41+
}
42+
}

client/src/h1/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ mod error;
33
pub(crate) mod body;
44
pub(crate) mod proto;
55

6-
pub use self::error::Error;
6+
pub use self::error::{Error, UnexpectedStateError};

client/src/h1/proto/dispatcher.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
body::BodyError,
1111
bytes::{Bytes, BytesMut},
1212
date::DateTimeHandle,
13-
h1::Error,
13+
h1::{error::UnexpectedStateError, Error},
1414
http::{
1515
header::{HeaderValue, EXPECT, HOST},
1616
Method, Request, Response, StatusCode,
@@ -29,6 +29,21 @@ where
2929
B: Stream<Item = Result<Bytes, E>> + Unpin,
3030
BodyError: From<E>,
3131
{
32+
match stream.read(&mut [0; 1]) {
33+
// if the stream is ready to read, it's not in correct state, which means there is either data to read (which means previous connection was not handled correctly) or the connection is closed).
34+
Ok(n) => {
35+
if n > 0 {
36+
return Err(Error::from(UnexpectedStateError::RemainingData));
37+
}
38+
39+
return Err(Error::from(UnexpectedStateError::ConnectionClosed));
40+
}
41+
// if the stream is not ready to read, it's in correct state.
42+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => (),
43+
// other errors are considered as not in correct state, we should close the connection here
44+
Err(io) => return Err(Error::from(io)),
45+
}
46+
3247
let mut buf = BytesMut::new();
3348

3449
if !req.headers().contains_key(HOST) {

client/src/middleware/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! middleware offer extended functionality to http client.
22
33
mod redirect;
4+
mod retry_closed_connection;
45

56
#[cfg(feature = "compress")]
67
mod decompress;
@@ -9,3 +10,4 @@ mod decompress;
910
pub use decompress::Decompress;
1011

1112
pub use redirect::FollowRedirect;
13+
pub use retry_closed_connection::RetryClosedConnection;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use crate::{
2+
error::Error,
3+
response::Response,
4+
service::{Service, ServiceRequest},
5+
};
6+
7+
/// middleware for retrying closed connection
8+
pub struct RetryClosedConnection<S, const MAX_COUNT: usize = 3> {
9+
service: S,
10+
}
11+
12+
impl<S> RetryClosedConnection<S> {
13+
/// construct retry closed connection middleware for client.
14+
///
15+
/// # Examples:
16+
/// ```rust
17+
/// # use xitca_client::{ClientBuilder, middleware::RetryClosedConnection};
18+
/// let builder = ClientBuilder::new()
19+
/// .middleware(RetryClosedConnection::new);
20+
/// ```
21+
pub fn new(service: S) -> Self {
22+
Self { service }
23+
}
24+
}
25+
26+
impl<S, const MAX: usize> RetryClosedConnection<S, MAX> {
27+
/// set max retry count for request. when max value is reached the request will return the most recent errror.
28+
pub fn max<const MAX2: usize>(self) -> RetryClosedConnection<S, MAX2> {
29+
RetryClosedConnection { service: self.service }
30+
}
31+
}
32+
33+
impl<'r, 'c, S, const MAX: usize> Service<ServiceRequest<'r, 'c>> for RetryClosedConnection<S, MAX>
34+
where
35+
S: for<'r2, 'c2> Service<ServiceRequest<'r2, 'c2>, Response = Response, Error = Error> + Send + Sync,
36+
{
37+
type Response = Response;
38+
type Error = Error;
39+
40+
async fn call(&self, req: ServiceRequest<'r, 'c>) -> Result<Self::Response, Self::Error> {
41+
let ServiceRequest { req, client, timeout } = req;
42+
let mut count = 0;
43+
44+
loop {
45+
let res = self.service.call(ServiceRequest { req, client, timeout }).await;
46+
47+
if count == MAX {
48+
return res;
49+
}
50+
51+
match res {
52+
#[cfg(feature = "http1")]
53+
Err(Error::H1(crate::h1::Error::UnexpectedState(
54+
crate::h1::UnexpectedStateError::ConnectionClosed,
55+
))) => (),
56+
res => return res,
57+
}
58+
59+
count += 1;
60+
}
61+
}
62+
}

test/src/lib.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use std::{
22
error, fmt, fs,
33
future::Future,
44
io,
5-
net::SocketAddr,
6-
net::TcpListener,
5+
net::{SocketAddr, TcpListener, ToSocketAddrs},
76
pin::Pin,
87
task::{Context, Poll},
98
time::Duration,
@@ -36,9 +35,18 @@ where
3635
T::Response: ReadyService + Service<Req>,
3736
Req: TryFrom<NetStream> + 'static,
3837
{
39-
let lst = TcpListener::bind("127.0.0.1:0")?;
38+
test_server_with_addr(service, "127.0.0.1:0")
39+
}
4040

41-
let addr = lst.local_addr()?;
41+
pub fn test_server_with_addr<T, Req, A>(service: T, addr: A) -> Result<TestServerHandle, Error>
42+
where
43+
T: Service + Send + Sync + 'static,
44+
T::Response: ReadyService + Service<Req>,
45+
Req: TryFrom<NetStream> + 'static,
46+
A: ToSocketAddrs,
47+
{
48+
let lst = TcpListener::bind(addr)?;
49+
let local_addr = lst.local_addr()?;
4250

4351
let handle = Builder::new()
4452
.worker_threads(1)
@@ -47,7 +55,35 @@ where
4755
.listen::<_, _, _, Req>("test_server", lst, service)
4856
.build();
4957

50-
Ok(TestServerHandle { addr, handle })
58+
Ok(TestServerHandle {
59+
addr: local_addr,
60+
handle,
61+
})
62+
}
63+
64+
/// A specialized http/1 server on top of [test_server]
65+
pub fn test_h1_server_with_addr<T, B, E, A>(service: T, addr: A) -> Result<TestServerHandle, Error>
66+
where
67+
T: Service + Send + Sync + 'static,
68+
T::Response: ReadyService + Service<Request<RequestExt<h1::RequestBody>>, Response = HResponse<B>> + 'static,
69+
<T::Response as Service<Request<RequestExt<h1::RequestBody>>>>::Error: fmt::Debug,
70+
T::Error: error::Error + 'static,
71+
B: Stream<Item = Result<Bytes, E>> + 'static,
72+
E: fmt::Debug + 'static,
73+
A: ToSocketAddrs,
74+
{
75+
#[cfg(not(feature = "io-uring"))]
76+
{
77+
test_server_with_addr::<_, (TcpStream, SocketAddr), A>(service.enclosed(HttpServiceBuilder::h1()), addr)
78+
}
79+
80+
#[cfg(feature = "io-uring")]
81+
{
82+
test_server_with_addr::<_, (xitca_io::net::io_uring::TcpStream, SocketAddr), A>(
83+
service.enclosed(HttpServiceBuilder::h1().io_uring()),
84+
addr,
85+
)
86+
}
5187
}
5288

5389
/// A specialized http/1 server on top of [test_server]

test/tests/h1.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55
time::Duration,
66
};
77

8-
use xitca_client::Client;
8+
use xitca_client::{middleware::RetryClosedConnection, Client};
99
use xitca_http::{
1010
body::{BoxBody, ResponseBody},
1111
bytes::{Bytes, BytesMut},
@@ -16,7 +16,7 @@ use xitca_http::{
1616
},
1717
};
1818
use xitca_service::fn_service;
19-
use xitca_test::{test_h1_server, Error};
19+
use xitca_test::{test_h1_server, test_h1_server_with_addr, Error};
2020

2121
#[tokio::test]
2222
async fn h1_get() -> Result<(), Error> {
@@ -270,6 +270,41 @@ async fn h1_keepalive() -> Result<(), Error> {
270270
Ok(())
271271
}
272272

273+
#[tokio::test]
274+
async fn h1_get_connection_closed_by_server() -> Result<(), Error> {
275+
let mut handle = test_h1_server(fn_service(handle))?;
276+
let ip_port = handle.ip_port_string();
277+
278+
let server_url = format!("http://{}/", ip_port);
279+
280+
let c = Client::builder()
281+
.middleware(RetryClosedConnection::new)
282+
.set_pool_capacity(1)
283+
.finish();
284+
285+
let mut res = c.get(&server_url).version(Version::HTTP_11).send().await?;
286+
assert_eq!(res.status().as_u16(), 200);
287+
assert!(!res.can_close_connection());
288+
let body = res.string().await?;
289+
assert_eq!("GET Response", body);
290+
291+
handle.try_handle()?.stop(false);
292+
handle.await?;
293+
294+
let mut handle = test_h1_server_with_addr(fn_service(crate::handle), ip_port)?;
295+
let mut res = c.get(&server_url).version(Version::HTTP_11).send().await?;
296+
297+
assert_eq!(res.status().as_u16(), 200);
298+
assert!(!res.can_close_connection());
299+
let body = res.string().await?;
300+
assert_eq!("GET Response", body);
301+
302+
handle.try_handle()?.stop(false);
303+
handle.await?;
304+
305+
Ok(())
306+
}
307+
273308
async fn handle(req: Request<RequestExt<h1::RequestBody>>) -> Result<Response<ResponseBody>, Error> {
274309
match (req.method(), req.uri().path()) {
275310
(&Method::GET, "/") | (&Method::HEAD, "/") => Ok(Response::new(Bytes::from("GET Response").into())),

0 commit comments

Comments
 (0)