Skip to content

Commit ab5dd6f

Browse files
committed
feat(client): retry http1 connection if closed by server
1 parent 99f7c97 commit ab5dd6f

7 files changed

Lines changed: 109 additions & 29 deletions

File tree

client/src/body.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ impl ResponseBody {
5656
}
5757
}
5858

59-
pub(crate) fn can_destroy_on_drop(&mut self) -> bool {
59+
pub(crate) fn can_destroy_on_drop(&self) -> bool {
6060
#[cfg(feature = "http1")]
61-
if let Self::H1(ref mut body) = *self {
62-
return body.conn_mut().is_destroy_on_drop();
61+
if let Self::H1(ref body) = *self {
62+
return body.conn().is_destroy_on_drop();
6363
}
6464

6565
false

client/src/response.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ impl<const PAYLOAD_LIMIT: usize> Response<PAYLOAD_LIMIT> {
177177
/// Public API for test purpose.
178178
///
179179
/// Used for testing server implementation to make sure it follows spec.
180-
pub fn can_close_connection(&mut self) -> bool {
181-
self.res.body_mut().can_destroy_on_drop()
180+
pub fn can_close_connection(&self) -> bool {
181+
self.res.body().can_destroy_on_drop()
182182
}
183183
}
184184

client/src/service.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ pub(crate) fn base_service() -> HttpService {
8787

8888
let ServiceRequest { req, client, timeout } = req;
8989

90-
let uri = Uri::try_parse(req.uri())?;
90+
let connect_uri = req.uri().clone();
91+
let uri = Uri::try_parse(&connect_uri)?;
9192

9293
// temporary version to record possible version downgrade/upgrade happens when making connections.
9394
// alpn protocol and alt-svc header are possible source of version change.
@@ -96,9 +97,9 @@ pub(crate) fn base_service() -> HttpService {
9697

9798
let mut connect = Connect::new(uri);
9899

99-
let _date = client.date_service.handle();
100-
101100
loop {
101+
let _date = client.date_service.handle();
102+
102103
match version {
103104
Version::HTTP_2 | Version::HTTP_3 => match client.shared_pool.acquire(&connect.uri).await {
104105
shared::AcquireOutput::Conn(mut _conn) => {
@@ -235,7 +236,17 @@ pub(crate) fn base_service() -> HttpService {
235236
}
236237
Ok(Err(e)) => {
237238
_conn.destroy_on_drop();
238-
Err(e.into())
239+
240+
match e {
241+
crate::h1::Error::Io(err) => {
242+
if err.kind() == std::io::ErrorKind::UnexpectedEof {
243+
continue;
244+
}
245+
246+
Err(crate::h1::Error::Io(err).into())
247+
}
248+
_ => Err(e.into()),
249+
}
239250
}
240251
Err(_) => {
241252
_conn.destroy_on_drop();

test/src/lib.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use std::{
22
error, fmt, fs,
33
future::Future,
44
io,
5-
net::SocketAddr,
6-
net::TcpListener,
5+
net::{SocketAddr, TcpListener, ToSocketAddrs},
76
pin::Pin,
87
task::{Context, Poll},
98
time::Duration,
@@ -36,9 +35,18 @@ where
3635
T::Response: ReadyService + Service<Req>,
3736
Req: TryFrom<NetStream> + 'static,
3837
{
39-
let lst = TcpListener::bind("127.0.0.1:0")?;
38+
test_server_with_addr(service, "127.0.0.1:0")
39+
}
4040

41-
let addr = lst.local_addr()?;
41+
pub fn test_server_with_addr<T, Req, A>(service: T, addr: A) -> Result<TestServerHandle, Error>
42+
where
43+
T: Service + Send + Sync + 'static,
44+
T::Response: ReadyService + Service<Req>,
45+
Req: TryFrom<NetStream> + 'static,
46+
A: ToSocketAddrs,
47+
{
48+
let lst = TcpListener::bind(addr)?;
49+
let local_addr = lst.local_addr()?;
4250

4351
let handle = Builder::new()
4452
.worker_threads(1)
@@ -47,7 +55,35 @@ where
4755
.listen::<_, _, _, Req>("test_server", lst, service)
4856
.build();
4957

50-
Ok(TestServerHandle { addr, handle })
58+
Ok(TestServerHandle {
59+
addr: local_addr,
60+
handle,
61+
})
62+
}
63+
64+
/// A specialized http/1 server on top of [test_server]
65+
pub fn test_h1_server_with_addr<T, B, E, A>(service: T, addr: A) -> Result<TestServerHandle, Error>
66+
where
67+
T: Service + Send + Sync + 'static,
68+
T::Response: ReadyService + Service<Request<RequestExt<h1::RequestBody>>, Response = HResponse<B>> + 'static,
69+
<T::Response as Service<Request<RequestExt<h1::RequestBody>>>>::Error: fmt::Debug,
70+
T::Error: error::Error + 'static,
71+
B: Stream<Item = Result<Bytes, E>> + 'static,
72+
E: fmt::Debug + 'static,
73+
A: ToSocketAddrs,
74+
{
75+
#[cfg(not(feature = "io-uring"))]
76+
{
77+
test_server_with_addr::<_, (TcpStream, SocketAddr), A>(service.enclosed(HttpServiceBuilder::h1()), addr)
78+
}
79+
80+
#[cfg(feature = "io-uring")]
81+
{
82+
test_server_with_addr::<_, (xitca_io::net::io_uring::TcpStream, SocketAddr), A>(
83+
service.enclosed(HttpServiceBuilder::h1().io_uring()),
84+
addr,
85+
)
86+
}
5187
}
5288

5389
/// A specialized http/1 server on top of [test_server]

test/tests/h1.rs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use xitca_http::{
1616
},
1717
};
1818
use xitca_service::fn_service;
19-
use xitca_test::{test_h1_server, Error};
19+
use xitca_test::{test_h1_server, test_h1_server_with_addr, Error};
2020

2121
#[tokio::test]
2222
async fn h1_get() -> Result<(), Error> {
@@ -27,7 +27,7 @@ async fn h1_get() -> Result<(), Error> {
2727
let c = Client::new();
2828

2929
for _ in 0..3 {
30-
let mut res = c.get(&server_url).version(Version::HTTP_11).send().await?;
30+
let res = c.get(&server_url).version(Version::HTTP_11).send().await?;
3131
assert_eq!(res.status().as_u16(), 200);
3232
assert!(!res.can_close_connection());
3333
let body = res.string().await?;
@@ -41,6 +41,39 @@ async fn h1_get() -> Result<(), Error> {
4141
Ok(())
4242
}
4343

44+
#[tokio::test]
45+
async fn h1_get_connection_closed_by_server() -> Result<(), Error> {
46+
let mut handle = test_h1_server(fn_service(handle))?;
47+
let ip_port = handle.ip_port_string();
48+
49+
let server_url = format!("http://{}/", ip_port);
50+
51+
let c = Client::builder().set_pool_capacity(1).finish();
52+
53+
let res = c.get(&server_url).version(Version::HTTP_11).send().await?;
54+
assert_eq!(res.status().as_u16(), 200);
55+
assert!(!res.can_close_connection());
56+
let body = res.string().await?;
57+
assert_eq!("GET Response", body);
58+
59+
handle.try_handle()?.stop(false);
60+
handle.await?;
61+
62+
let mut handle = test_h1_server_with_addr(fn_service(crate::handle), ip_port)?;
63+
let res_result = c.get(&server_url).version(Version::HTTP_11).send().await;
64+
65+
if let Err(e) = &res_result {
66+
println!("{:?}", e);
67+
}
68+
69+
assert!(res_result.is_ok());
70+
71+
handle.try_handle()?.stop(false);
72+
handle.await?;
73+
74+
Ok(())
75+
}
76+
4477
#[tokio::test]
4578
async fn h1_head() -> Result<(), Error> {
4679
let mut handle = test_h1_server(fn_service(handle))?;
@@ -50,7 +83,7 @@ async fn h1_head() -> Result<(), Error> {
5083
let c = Client::new();
5184

5285
for _ in 0..3 {
53-
let mut res = c.head(&server_url).version(Version::HTTP_11).send().await?;
86+
let res = c.head(&server_url).version(Version::HTTP_11).send().await?;
5487
assert_eq!(res.status().as_u16(), 200);
5588
assert!(!res.can_close_connection());
5689
let body = res.string().await?;
@@ -79,7 +112,7 @@ async fn h1_post() -> Result<(), Error> {
79112
}
80113
let body_len = body.len();
81114

82-
let mut res = c.post(&server_url).version(Version::HTTP_11).text(body).send().await?;
115+
let res = c.post(&server_url).version(Version::HTTP_11).text(body).send().await?;
83116
assert_eq!(res.status().as_u16(), 200);
84117
assert!(!res.can_close_connection());
85118
let body = res.limit::<{ 12 * 1024 }>().string().await?;
@@ -107,7 +140,7 @@ async fn h1_drop_body_read() -> Result<(), Error> {
107140
body.extend_from_slice(b"Hello,World!");
108141
}
109142

110-
let mut res = c.post(&server_url).version(Version::HTTP_11).text(body).send().await?;
143+
let res = c.post(&server_url).version(Version::HTTP_11).text(body).send().await?;
111144
assert_eq!(res.status().as_u16(), 200);
112145
assert!(res.can_close_connection());
113146
}
@@ -133,7 +166,7 @@ async fn h1_partial_body_read() -> Result<(), Error> {
133166
body.extend_from_slice(b"Hello,World!");
134167
}
135168

136-
let mut res = c.post(&server_url).version(Version::HTTP_11).text(body).send().await?;
169+
let res = c.post(&server_url).version(Version::HTTP_11).text(body).send().await?;
137170
assert_eq!(res.status().as_u16(), 200);
138171
assert!(res.can_close_connection());
139172
}
@@ -153,7 +186,7 @@ async fn h1_close_connection() -> Result<(), Error> {
153186

154187
let c = Client::new();
155188

156-
let mut res = c.get(&server_url).version(Version::HTTP_11).send().await?;
189+
let res = c.get(&server_url).version(Version::HTTP_11).send().await?;
157190
assert_eq!(res.status().as_u16(), 200);
158191
assert!(res.can_close_connection());
159192

@@ -190,7 +223,7 @@ async fn h1_request_too_large() -> Result<(), Error> {
190223
req.headers_mut()
191224
.insert("large-header", HeaderValue::try_from(body).unwrap());
192225

193-
let mut res = req.send().await?;
226+
let res = req.send().await?;
194227
assert_eq!(res.status().as_u16(), 431);
195228
assert!(res.can_close_connection());
196229

test/tests/h2.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ async fn h2_get() -> Result<(), Error> {
2020
let c = Client::new();
2121

2222
for _ in 0..3 {
23-
let mut res = c.get(&server_url).version(Version::HTTP_2).send().await?;
23+
let res = c.get(&server_url).version(Version::HTTP_2).send().await?;
2424
assert_eq!(res.status().as_u16(), 200);
2525
assert!(!res.can_close_connection());
2626
let body = res.string().await?;
@@ -46,7 +46,7 @@ async fn h2_no_host_header() -> Result<(), Error> {
4646
let mut req = c.get(&server_url).version(Version::HTTP_2);
4747
req.headers_mut().insert(header::HOST, "localhost".parse().unwrap());
4848

49-
let mut res = req.send().await?;
49+
let res = req.send().await?;
5050
assert_eq!(res.status().as_u16(), 200);
5151
assert!(!res.can_close_connection());
5252
let body = res.string().await?;
@@ -73,7 +73,7 @@ async fn h2_post() -> Result<(), Error> {
7373
for _ in 0..1024 * 1024 {
7474
body.extend_from_slice(b"Hello,World!");
7575
}
76-
let mut res = c.post(&server_url).version(Version::HTTP_2).text(body).send().await?;
76+
let res = c.post(&server_url).version(Version::HTTP_2).text(body).send().await?;
7777
assert_eq!(res.status().as_u16(), 200);
7878
assert!(!res.can_close_connection());
7979
let _ = res.body().await;
@@ -142,7 +142,7 @@ async fn h2_keepalive() -> Result<(), Error> {
142142
.block_on(async move {
143143
let c = Client::new();
144144

145-
let mut res = c.get(&server_url).version(Version::HTTP_2).send().await?;
145+
let res = c.get(&server_url).version(Version::HTTP_2).send().await?;
146146
assert_eq!(res.status().as_u16(), 200);
147147
assert!(!res.can_close_connection());
148148
let body = res.string().await?;

test/tests/h3.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async fn h3_get() -> Result<(), Error> {
1717
let server_url = format!("https://localhost:{}/", handle.addr().port());
1818

1919
for _ in 0..3 {
20-
let mut res = c.get(&server_url).version(Version::HTTP_3).send().await?;
20+
let res = c.get(&server_url).version(Version::HTTP_3).send().await?;
2121
assert_eq!(res.status().as_u16(), 200);
2222
assert!(!res.can_close_connection());
2323
let body = res.string().await?;
@@ -43,7 +43,7 @@ async fn h3_no_host_header() -> Result<(), Error> {
4343
let mut req = c.get(&server_url).version(Version::HTTP_3);
4444
req.headers_mut().insert(header::HOST, "localhost".parse().unwrap());
4545

46-
let mut res = req.send().await?;
46+
let res = req.send().await?;
4747
assert_eq!(res.status().as_u16(), 200);
4848
assert!(!res.can_close_connection());
4949
let body = res.string().await?;
@@ -70,7 +70,7 @@ async fn h3_post() -> Result<(), Error> {
7070
for _ in 0..1024 * 1024 {
7171
body.extend_from_slice(b"Hello,World!");
7272
}
73-
let mut res = c.post(&server_url).version(Version::HTTP_3).text(body).send().await?;
73+
let res = c.post(&server_url).version(Version::HTTP_3).text(body).send().await?;
7474
assert_eq!(res.status().as_u16(), 200);
7575
assert!(!res.can_close_connection());
7676
}

0 commit comments

Comments
 (0)