diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f402f49f..33bcfd67 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -478,6 +478,11 @@ impl Inner { let stream = self.store.resolve(key); + if stream.is_pending_open { + proto_err!(conn: "recv_headers: received frame on idle stream {:?}", id); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + if stream.state.is_local_error() { // Locally reset streams must ignore frames "for some time". // This is because the remote may have sent trailers before @@ -638,6 +643,11 @@ impl Inner { } }; + if stream.is_pending_open { + proto_err!(conn: "recv_reset: received frame on idle stream {:?}", id); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + let mut send_buffer = send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; @@ -670,6 +680,11 @@ impl Inner { // The remote may send window updates for streams that the local now // considers closed. It's ok... if let Some(mut stream) = self.store.find_mut(&id) { + if stream.is_pending_open { + proto_err!(conn: "recv_window_update: received frame on idle stream {:?}", id); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + let res = self .actions .send diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 7ad5ee85..66059969 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -2075,3 +2075,84 @@ impl MockH2 for mock_io::Builder { .read(SETTINGS_ACK) } } + +/// RFC 9113 S5.1: "Receiving any frame other than HEADERS or PRIORITY on a +/// stream in [idle] state MUST be treated as a connection error of type +/// PROTOCOL_ERROR." +#[tokio::test] +async fn frame_on_pending_open_stream_is_conn_error() { + h2_support::trace_init!(); + + for scenario in 0..5u8 { + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1)) + .await; + assert_default_settings!(settings); + + // 3. Receive stream 1 HEADERS. + srv.recv_frame( + frames::headers(1) + .request("POST", "https://example.com/") + .eos(), + ) + .await; + + idle_ms(50).await; + + // 4. Send a frame targeting stream 3, whose HEADERS haven't + // been transmitted since it's pending. This is illegal. + match scenario { + 0 => { + srv.send_frame(frames::reset(3).reason(h2::Reason::NO_ERROR)) + .await + } + 1 => { + srv.send_frame(frames::reset(3).reason(h2::Reason::CANCEL)) + .await + } + 2 => srv.send_frame(frames::window_update(3, 1024)).await, + 3 => srv.send_frame(frames::headers(3).response(200).eos()).await, + 4 => srv.send_frame(frames::data(3, &b"hello"[..])).await, + _ => unreachable!(), + } + + // 5. Client responds with GOAWAY(PROTOCOL_ERROR). + srv.recv_frame(frames::go_away(0).protocol_error()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .initial_max_send_streams(1) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + + // 1. Stream 1 fills the concurrent slot + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + let (_resp1, _) = client.send_request(request, true).unwrap(); + client = conn.drive(client.ready()).await.unwrap(); + + // 2. Stream 3 is queued + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + let (_resp3, _) = client.send_request(request, true).unwrap(); + + // 6. Connection error propagates to poll_ready. + conn.drive(client.ready()) + .await + .expect_err("connection error"); + }; + + join(srv, client).await; + } +}