Skip to content

Commit f98c021

Browse files
committed
fix: Reject frames on streams whose HEADERS haven't been sent
When the max number of concurrent streams is reached and a stream is queued, the HEADERS haven't been transmitted yet. From the peer's perspective the stream is idle. RST_STREAM, WINDOW_UPDATE, and HEADERS targeting this stream are connection errors per RFC 9113 section 5.1: https://www.rfc-editor.org/rfc/rfc9113.html#section-5.1-7.2.4 > Receiving any frame other than HEADERS or PRIORITY on a stream in [idle] state MUST be treated as a connection error of type PROTOCOL_ERROR. DATA is already caught, so no additional check is needed there (although I include it in my test anyway). The reason for rejecting this besides RFC conformance is that you can get into some weird states without this check.
1 parent 30998f2 commit f98c021

2 files changed

Lines changed: 96 additions & 0 deletions

File tree

src/proto/streams/streams.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,11 @@ impl Inner {
478478

479479
let stream = self.store.resolve(key);
480480

481+
if stream.is_pending_open {
482+
proto_err!(conn: "recv_headers: received frame on idle stream {:?}", id);
483+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
484+
}
485+
481486
if stream.state.is_local_error() {
482487
// Locally reset streams must ignore frames "for some time".
483488
// This is because the remote may have sent trailers before
@@ -638,6 +643,11 @@ impl Inner {
638643
}
639644
};
640645

646+
if stream.is_pending_open {
647+
proto_err!(conn: "recv_reset: received frame on idle stream {:?}", id);
648+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
649+
}
650+
641651
let mut send_buffer = send_buffer.inner.lock().unwrap();
642652
let send_buffer = &mut *send_buffer;
643653

@@ -670,6 +680,11 @@ impl Inner {
670680
// The remote may send window updates for streams that the local now
671681
// considers closed. It's ok...
672682
if let Some(mut stream) = self.store.find_mut(&id) {
683+
if stream.is_pending_open {
684+
proto_err!(conn: "recv_window_update: received frame on idle stream {:?}", id);
685+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
686+
}
687+
673688
let res = self
674689
.actions
675690
.send

tests/h2-tests/tests/client_request.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2075,3 +2075,84 @@ impl MockH2 for mock_io::Builder {
20752075
.read(SETTINGS_ACK)
20762076
}
20772077
}
2078+
2079+
/// RFC 9113 S5.1: "Receiving any frame other than HEADERS or PRIORITY on a
2080+
/// stream in [idle] state MUST be treated as a connection error of type
2081+
/// PROTOCOL_ERROR."
2082+
#[tokio::test]
2083+
async fn frame_on_pending_open_stream_is_conn_error() {
2084+
h2_support::trace_init!();
2085+
2086+
for scenario in 0..5u8 {
2087+
let (io, mut srv) = mock::new();
2088+
2089+
let srv = async move {
2090+
let settings = srv
2091+
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
2092+
.await;
2093+
assert_default_settings!(settings);
2094+
2095+
// 3. Receive stream 1 HEADERS.
2096+
srv.recv_frame(
2097+
frames::headers(1)
2098+
.request("POST", "https://example.com/")
2099+
.eos(),
2100+
)
2101+
.await;
2102+
2103+
idle_ms(50).await;
2104+
2105+
// 4. Send a frame targeting stream 3, whose HEADERS haven't
2106+
// been transmitted since it's pending. This is illegal.
2107+
match scenario {
2108+
0 => {
2109+
srv.send_frame(frames::reset(3).reason(h2::Reason::NO_ERROR))
2110+
.await
2111+
}
2112+
1 => {
2113+
srv.send_frame(frames::reset(3).reason(h2::Reason::CANCEL))
2114+
.await
2115+
}
2116+
2 => srv.send_frame(frames::window_update(3, 1024)).await,
2117+
3 => srv.send_frame(frames::headers(3).response(200).eos()).await,
2118+
4 => srv.send_frame(frames::data(3, &b"hello"[..])).await,
2119+
_ => unreachable!(),
2120+
}
2121+
2122+
// 5. Client responds with GOAWAY(PROTOCOL_ERROR).
2123+
srv.recv_frame(frames::go_away(0).protocol_error()).await;
2124+
};
2125+
2126+
let client = async move {
2127+
let (mut client, mut conn) = client::Builder::new()
2128+
.initial_max_send_streams(1)
2129+
.handshake::<_, Bytes>(io)
2130+
.await
2131+
.unwrap();
2132+
2133+
// 1. Stream 1 fills the concurrent slot
2134+
let request = Request::builder()
2135+
.method(Method::POST)
2136+
.uri("https://example.com/")
2137+
.body(())
2138+
.unwrap();
2139+
let (_resp1, _) = client.send_request(request, true).unwrap();
2140+
client = conn.drive(client.ready()).await.unwrap();
2141+
2142+
// 2. Stream 3 is queued
2143+
let request = Request::builder()
2144+
.method(Method::POST)
2145+
.uri("https://example.com/")
2146+
.body(())
2147+
.unwrap();
2148+
let (_resp3, _) = client.send_request(request, true).unwrap();
2149+
2150+
// 6. Connection error propagates to poll_ready.
2151+
conn.drive(client.ready())
2152+
.await
2153+
.expect_err("connection error");
2154+
};
2155+
2156+
join(srv, client).await;
2157+
}
2158+
}

0 commit comments

Comments
 (0)