Skip to content

Commit 97c9b05

Browse files
committed
fix: Reject frames on streams whose HEADERS haven't been sent
When the max number of concurrent streams is reached and a stream is queued, the HEADERS haven't been transmitted yet. From the peer's perspective the stream is idle. RST_STREAM, WINDOW_UPDATE, and HEADERS targeting this stream are connection errors per RFC 9113 section 5.1: https://www.rfc-editor.org/rfc/rfc9113.html#section-5.1-7.2.4 > Receiving any frame other than HEADERS or PRIORITY on a stream in [idle] state MUST be treated as a connection error of type PROTOCOL_ERROR. DATA is already caught, so no additional check is needed there (although I include it in my test anyway). The reason for rejecting this besides RFC conformance is that you can get into some weird states without this check.
1 parent 30998f2 commit 97c9b05

2 files changed

Lines changed: 88 additions & 0 deletions

File tree

src/proto/streams/streams.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,11 @@ impl Inner {
478478

479479
let stream = self.store.resolve(key);
480480

481+
if stream.is_pending_open {
482+
proto_err!(conn: "recv_headers: received frame on idle stream {:?}", id);
483+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
484+
}
485+
481486
if stream.state.is_local_error() {
482487
// Locally reset streams must ignore frames "for some time".
483488
// This is because the remote may have sent trailers before
@@ -638,6 +643,11 @@ impl Inner {
638643
}
639644
};
640645

646+
if stream.is_pending_open {
647+
proto_err!(conn: "recv_reset: received frame on idle stream {:?}", id);
648+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
649+
}
650+
641651
let mut send_buffer = send_buffer.inner.lock().unwrap();
642652
let send_buffer = &mut *send_buffer;
643653

@@ -670,6 +680,11 @@ impl Inner {
670680
// The remote may send window updates for streams that the local now
671681
// considers closed. It's ok...
672682
if let Some(mut stream) = self.store.find_mut(&id) {
683+
if stream.is_pending_open {
684+
proto_err!(conn: "recv_window_update: received frame on idle stream {:?}", id);
685+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
686+
}
687+
673688
let res = self
674689
.actions
675690
.send

tests/h2-tests/tests/client_request.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use h2_support::prelude::*;
55
use std::pin::Pin;
66
use std::task::Context;
77
use std::{io, panic};
8+
use tokio::sync::oneshot;
89

910
#[tokio::test]
1011
async fn handshake() {
@@ -2075,3 +2076,75 @@ impl MockH2 for mock_io::Builder {
20752076
.read(SETTINGS_ACK)
20762077
}
20772078
}
2079+
2080+
2081+
/// RFC 9113 S5.1: "Receiving any frame other than HEADERS or PRIORITY on a
2082+
/// stream in [idle] state MUST be treated as a connection error of type
2083+
/// PROTOCOL_ERROR."
2084+
#[tokio::test]
2085+
async fn frame_on_pending_open_stream_is_conn_error() {
2086+
h2_support::trace_init!();
2087+
2088+
for scenario in 0..5u8 {
2089+
let (io, mut srv) = mock::new();
2090+
2091+
let srv = async move {
2092+
let settings = srv
2093+
.assert_client_handshake_with_settings(
2094+
frames::settings().max_concurrent_streams(1),
2095+
)
2096+
.await;
2097+
assert_default_settings!(settings);
2098+
2099+
// 3. Receive stream 1 HEADERS.
2100+
srv.recv_frame(frames::headers(1).request("POST", "https://example.com/").eos())
2101+
.await;
2102+
2103+
idle_ms(50).await;
2104+
2105+
// 4. Send a frame targeting stream 3, whose HEADERS haven't
2106+
// been transmitted since it's pending. This is illegal.
2107+
match scenario {
2108+
0 => srv.send_frame(frames::reset(3).reason(h2::Reason::NO_ERROR)).await,
2109+
1 => srv.send_frame(frames::reset(3).reason(h2::Reason::CANCEL)).await,
2110+
2 => srv.send_frame(frames::window_update(3, 1024)).await,
2111+
3 => srv.send_frame(frames::headers(3).response(200).eos()).await,
2112+
4 => srv.send_frame(frames::data(3, &b"hello"[..])).await,
2113+
_ => unreachable!(),
2114+
}
2115+
2116+
// 5. Client responds with GOAWAY(PROTOCOL_ERROR).
2117+
srv.recv_frame(frames::go_away(0).protocol_error()).await;
2118+
};
2119+
2120+
let client = async move {
2121+
let (mut client, mut conn) = client::Builder::new()
2122+
.initial_max_send_streams(1)
2123+
.handshake::<_, Bytes>(io)
2124+
.await
2125+
.unwrap();
2126+
2127+
// 1. Stream 1 fills the concurrent slot
2128+
let request = Request::builder()
2129+
.method(Method::POST)
2130+
.uri("https://example.com/")
2131+
.body(())
2132+
.unwrap();
2133+
let (_resp1, _) = client.send_request(request, true).unwrap();
2134+
client = conn.drive(client.ready()).await.unwrap();
2135+
2136+
// 2. Stream 3 is queued
2137+
let request = Request::builder()
2138+
.method(Method::POST)
2139+
.uri("https://example.com/")
2140+
.body(())
2141+
.unwrap();
2142+
let (_resp3, _) = client.send_request(request, true).unwrap();
2143+
2144+
// 6. Connection error propagates to poll_ready.
2145+
conn.drive(client.ready()).await.expect_err("connection error");
2146+
};
2147+
2148+
join(srv, client).await;
2149+
}
2150+
}

0 commit comments

Comments
 (0)