Skip to content

Commit dd72a80

Browse files
committed
fix: Reject frames on streams whose HEADERS haven't been sent
When the max number of concurrent streams is reached and a stream is queued, the HEADERS haven't been transmitted yet. From the peer's perspective the stream is idle. RST_STREAM, WINDOW_UPDATE, and HEADERS targeting this stream are connection errors per RFC 9113 section 5.1: https://www.rfc-editor.org/rfc/rfc9113.html#section-5.1-7.2.4 > Receiving any frame other than HEADERS or PRIORITY on a stream in [idle] state MUST be treated as a connection error of type PROTOCOL_ERROR. DATA is already caught, so no additional check is needed there (although I include it in my test anyway). The reason for rejecting this besides RFC conformance is that you can get into some weird states without this check.
1 parent 30998f2 commit dd72a80

2 files changed

Lines changed: 97 additions & 0 deletions

File tree

src/proto/streams/streams.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,11 @@ impl Inner {
478478

479479
let stream = self.store.resolve(key);
480480

481+
if stream.is_pending_open {
482+
proto_err!(conn: "recv_headers: received frame on idle stream {:?}", id);
483+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
484+
}
485+
481486
if stream.state.is_local_error() {
482487
// Locally reset streams must ignore frames "for some time".
483488
// This is because the remote may have sent trailers before
@@ -638,6 +643,11 @@ impl Inner {
638643
}
639644
};
640645

646+
if stream.is_pending_open {
647+
proto_err!(conn: "recv_reset: received frame on idle stream {:?}", id);
648+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
649+
}
650+
641651
let mut send_buffer = send_buffer.inner.lock().unwrap();
642652
let send_buffer = &mut *send_buffer;
643653

@@ -670,6 +680,11 @@ impl Inner {
670680
// The remote may send window updates for streams that the local now
671681
// considers closed. It's ok...
672682
if let Some(mut stream) = self.store.find_mut(&id) {
683+
if stream.is_pending_open {
684+
proto_err!(conn: "recv_window_update: received frame on idle stream {:?}", id);
685+
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
686+
}
687+
673688
let res = self
674689
.actions
675690
.send

tests/h2-tests/tests/client_request.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use h2_support::prelude::*;
55
use std::pin::Pin;
66
use std::task::Context;
77
use std::{io, panic};
8+
use tokio::sync::oneshot;
89

910
#[tokio::test]
1011
async fn handshake() {
@@ -2075,3 +2076,84 @@ impl MockH2 for mock_io::Builder {
20752076
.read(SETTINGS_ACK)
20762077
}
20772078
}
2079+
2080+
/// RFC 9113 S5.1: "Receiving any frame other than HEADERS or PRIORITY on a
2081+
/// stream in [idle] state MUST be treated as a connection error of type
2082+
/// PROTOCOL_ERROR."
2083+
#[tokio::test]
2084+
async fn frame_on_pending_open_stream_is_conn_error() {
2085+
h2_support::trace_init!();
2086+
2087+
for scenario in 0..5u8 {
2088+
let (io, mut srv) = mock::new();
2089+
2090+
let srv = async move {
2091+
let settings = srv
2092+
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
2093+
.await;
2094+
assert_default_settings!(settings);
2095+
2096+
// 3. Receive stream 1 HEADERS.
2097+
srv.recv_frame(
2098+
frames::headers(1)
2099+
.request("POST", "https://example.com/")
2100+
.eos(),
2101+
)
2102+
.await;
2103+
2104+
idle_ms(50).await;
2105+
2106+
// 4. Send a frame targeting stream 3, whose HEADERS haven't
2107+
// been transmitted since it's pending. This is illegal.
2108+
match scenario {
2109+
0 => {
2110+
srv.send_frame(frames::reset(3).reason(h2::Reason::NO_ERROR))
2111+
.await
2112+
}
2113+
1 => {
2114+
srv.send_frame(frames::reset(3).reason(h2::Reason::CANCEL))
2115+
.await
2116+
}
2117+
2 => srv.send_frame(frames::window_update(3, 1024)).await,
2118+
3 => srv.send_frame(frames::headers(3).response(200).eos()).await,
2119+
4 => srv.send_frame(frames::data(3, &b"hello"[..])).await,
2120+
_ => unreachable!(),
2121+
}
2122+
2123+
// 5. Client responds with GOAWAY(PROTOCOL_ERROR).
2124+
srv.recv_frame(frames::go_away(0).protocol_error()).await;
2125+
};
2126+
2127+
let client = async move {
2128+
let (mut client, mut conn) = client::Builder::new()
2129+
.initial_max_send_streams(1)
2130+
.handshake::<_, Bytes>(io)
2131+
.await
2132+
.unwrap();
2133+
2134+
// 1. Stream 1 fills the concurrent slot
2135+
let request = Request::builder()
2136+
.method(Method::POST)
2137+
.uri("https://example.com/")
2138+
.body(())
2139+
.unwrap();
2140+
let (_resp1, _) = client.send_request(request, true).unwrap();
2141+
client = conn.drive(client.ready()).await.unwrap();
2142+
2143+
// 2. Stream 3 is queued
2144+
let request = Request::builder()
2145+
.method(Method::POST)
2146+
.uri("https://example.com/")
2147+
.body(())
2148+
.unwrap();
2149+
let (_resp3, _) = client.send_request(request, true).unwrap();
2150+
2151+
// 6. Connection error propagates to poll_ready.
2152+
conn.drive(client.ready())
2153+
.await
2154+
.expect_err("connection error");
2155+
};
2156+
2157+
join(srv, client).await;
2158+
}
2159+
}

0 commit comments

Comments
 (0)