Skip to content

Commit e5ad96b

Browse files
authored
fix(http2): reading trailers shouldn't propagate NO_ERROR from early response (#3998)
This is the trailers variant of the fix for reading the body in #3275, so that it is possible to both attempt to read the body and the trailers when the server has sent a `RST_STREAM` with `NO_ERROR` after its response to indicate that the client should stop attempting to send it the body. I have added a trailers-only variant of `http2_responds_before_consuming_request_body` that fails without the fix, and also updated `http2_responds_before_consuming_request_body` to verify that it can check whether there are any trailers.
1 parent 743a3ba commit e5ad96b

2 files changed

Lines changed: 107 additions & 17 deletions

File tree

src/body/incoming.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,14 @@ impl Body for Incoming {
248248
return Poll::Ready(Some(Ok(Frame::data(bytes))));
249249
}
250250
Some(Err(e)) => {
251-
return match e.reason() {
252-
// These reasons should cause the body reading to stop, but not fail it.
253-
// The same logic as for `Read for H2Upgraded` is applied here.
254-
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
255-
Poll::Ready(None)
256-
}
257-
_ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
258-
};
251+
if let Some(h2::Reason::NO_ERROR) = e.reason() {
252+
// As mentioned in RFC 7540 Section 8.1, a RST_STREAM with NO_ERROR
253+
// indicates an early response, and should cause the body reading
254+
// to stop, but not fail it:
255+
return Poll::Ready(None);
256+
} else {
257+
return Poll::Ready(Some(Err(crate::Error::new_body(e))));
258+
}
259259
}
260260
None => {
261261
*data_done = true;
@@ -270,7 +270,16 @@ impl Body for Incoming {
270270
ping.record_non_data();
271271
Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
272272
}
273-
Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
273+
Err(e) => {
274+
if let Some(h2::Reason::NO_ERROR) = e.reason() {
275+
// Same as above, a RST_STREAM with NO_ERROR indicates an early
276+
// response, and should cause reading the trailers to stop, but
277+
// not fail it:
278+
Poll::Ready(None)
279+
} else {
280+
Poll::Ready(Some(Err(crate::Error::new_h2(e))))
281+
}
282+
}
274283
}
275284
}
276285

tests/client.rs

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2626,7 +2626,7 @@ mod conn {
26262626
}
26272627

26282628
#[tokio::test]
2629-
async fn http2_responds_before_consuming_request_body() {
2629+
async fn http2_responds_before_consuming_request_body_no_trailers() {
26302630
// Test that a early-response from server works correctly (request body wasn't fully consumed).
26312631
// https://github.com/hyperium/hyper/issues/2872
26322632
use hyper::service::service_fn;
@@ -2670,15 +2670,96 @@ mod conn {
26702670
let resp = client.send_request(req).await.expect("send_request");
26712671
assert!(resp.status().is_success());
26722672

2673-
let mut body = String::new();
2674-
concat(resp.into_body())
2673+
let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap();
2674+
assert_eq!(body.as_ref(), b"No bread for you!");
2675+
assert!(trailers.is_none());
2676+
}
2677+
2678+
#[tokio::test]
2679+
async fn http2_responds_before_consuming_request_body_with_trailers() {
2680+
// Test that a early-response from server works correctly (request body wasn't fully consumed).
2681+
// https://github.com/hyperium/hyper/issues/2872
2682+
use hyper::body::{Body, Frame, SizeHint};
2683+
use hyper::header::{HeaderMap, HeaderValue};
2684+
use hyper::service::service_fn;
2685+
2686+
let _ = pretty_env_logger::try_init();
2687+
2688+
let (listener, addr) = setup_tk_test_server().await;
2689+
2690+
/// An `HttpBody` implementation whose `is_end_stream()` will
2691+
/// return `true` after sending trailers.
2692+
pub struct TrailersBody(Option<HeaderMap>);
2693+
2694+
impl Body for TrailersBody {
2695+
type Data = bytes::Bytes;
2696+
type Error = hyper::Error;
2697+
2698+
fn poll_frame(
2699+
mut self: Pin<&mut Self>,
2700+
_cx: &mut Context<'_>,
2701+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
2702+
if let Some(trailers) = self.0.take() {
2703+
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
2704+
} else {
2705+
Poll::Ready(None)
2706+
}
2707+
}
2708+
2709+
fn is_end_stream(&self) -> bool {
2710+
self.0.is_none()
2711+
}
2712+
2713+
fn size_hint(&self) -> SizeHint {
2714+
SizeHint::with_exact(0)
2715+
}
2716+
}
2717+
2718+
// Spawn an HTTP2 server that responds before reading the whole request body.
2719+
// It's normal case to decline the request due to headers or size of the body.
2720+
tokio::spawn(async move {
2721+
let sock = TokioIo::new(listener.accept().await.unwrap().0);
2722+
hyper::server::conn::http2::Builder::new(TokioExecutor)
2723+
.timer(TokioTimer)
2724+
.serve_connection(
2725+
sock,
2726+
service_fn(|_req| async move {
2727+
let mut trailers = HeaderMap::new();
2728+
trailers.insert("grpc", HeaderValue::from_static("0"));
2729+
let body = TrailersBody(Some(trailers));
2730+
Ok::<_, hyper::Error>(http::Response::new(body))
2731+
}),
2732+
)
2733+
.await
2734+
.expect("serve_connection");
2735+
});
2736+
2737+
let io = tcp_connect(&addr).await.expect("tcp connect");
2738+
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
2739+
.timer(TokioTimer)
2740+
.handshake(io)
26752741
.await
2676-
.unwrap()
2677-
.reader()
2678-
.read_to_string(&mut body)
2679-
.unwrap();
2742+
.expect("http handshake");
2743+
2744+
tokio::spawn(async move {
2745+
conn.await.expect("client conn shouldn't error");
2746+
});
2747+
2748+
// Use a channel to keep request stream open
2749+
let (_tx, recv) = mpsc::channel::<Result<Frame<Bytes>, Box<dyn Error + Send + Sync>>>(0);
2750+
let req = Request::post("/a").body(StreamBody::new(recv)).unwrap();
2751+
let resp = client.send_request(req).await.expect("send_request");
2752+
assert!(resp.status().is_success());
2753+
2754+
let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap();
2755+
2756+
// No body:
2757+
assert!(body.is_empty());
26802758

2681-
assert_eq!(&body, "No bread for you!");
2759+
// Have our `grpc` trailer:
2760+
let trailers = trailers.expect("response has trailers");
2761+
assert_eq!(trailers.len(), 1);
2762+
assert_eq!(trailers.get("grpc").unwrap(), "0");
26822763
}
26832764

26842765
#[tokio::test]

0 commit comments

Comments
 (0)