From ca23a77651f58e20f418583437596b3cbf166d56 Mon Sep 17 00:00:00 2001 From: Arni Dagur Date: Tue, 28 Apr 2026 07:24:01 +0100 Subject: [PATCH] fix: Flow control capacity leak with padded frames --- src/proto/streams/streams.rs | 4 +- tests/h2-tests/tests/flow_control.rs | 142 +++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index fce6bfd1..c65d4997 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -561,7 +561,7 @@ impl Inner { if self.actions.may_have_forgotten_stream(peer, id) { tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,); - let sz = frame.payload().len(); + let sz = frame.flow_controlled_len(); // This should have been enforced at the codec::FramedRead layer, so // this is just a sanity check. assert!(sz <= super::MAX_WINDOW_SIZE as usize); @@ -581,7 +581,7 @@ impl Inner { let send_buffer = &mut *send_buffer; self.counts.transition(stream, |counts, stream| { - let sz = frame.payload().len(); + let sz = frame.flow_controlled_len(); let res = actions.recv.recv_data(frame, stream); // Any stream error after receiving a DATA frame means diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 57453e78..80e7c4aa 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -446,6 +446,148 @@ async fn stream_error_release_connection_capacity() { join(srv, client).await; } +// Regression test for TODO +#[tokio::test] +async fn padded_data_stream_error_releases_connection_capacity() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + // Padded EOS frame: 1 byte pad_len + 8 bytes data + 1 byte padding. + // flow_controlled_len = 10, payload (data only) = 8. + let mut padded_eos = vec![0u8; 10]; + padded_eos[0] = 1; + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .await; + // Wrong content-length triggers a stream error on the padded EOS frame. + srv.send_frame( + frames::headers(1) + .response(200) + .field("content-length", &*(16_384 * 3).to_string()), + ) + .await; + srv.send_frame(frames::data(1, vec![0; 16_384])).await; + srv.send_frame(frames::data(1, vec![0; 16_384])).await; + srv.send_frame(frames::data(1, &padded_eos[..]).padded().eos()) + .await; + srv.recv_frame(frames::reset(1).protocol_error()).await; + // Released capacity must include the padded frame's full + // flow_controlled_len (10), not just its payload (8). + srv.recv_frame(frames::window_update(0, 16_384 * 2 + 10)) + .await; + }; + + let client = async move { + let (mut client, mut conn) = client::handshake(io).await.unwrap(); + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let req = async { + let resp = client + .send_request(request, true) + .unwrap() + .0 + .await + .expect("response"); + assert_eq!(resp.status(), StatusCode::OK); + let mut body = resp.into_parts().1; + let mut cap = body.flow_control().clone(); + let to_release = 16_384 * 2; + let mut should_recv_bytes = to_release; + let mut should_recv_frames = 2usize; + + let err = body + .try_for_each(|bytes| async move { + should_recv_bytes -= bytes.len(); + should_recv_frames -= 1; + if should_recv_bytes == 0 { + assert_eq!(should_recv_frames, 0); + } + Ok(()) + }) + .await + .expect_err("body"); + assert_eq!( + err.to_string(), + "stream error detected: unspecific protocol error detected" + ); + cap.release_capacity(to_release).expect("release_capacity"); + }; + conn.drive(req).await; + conn.await.expect("client"); + }; + + join(srv, client).await; +} + +// Regression test for TODO +#[tokio::test] +async fn padded_data_on_forgotten_stream_releases_connection_capacity() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + // Padded frame: 1 byte pad_len + 16378 bytes data + 5 bytes padding. + // flow_controlled_len = 16384, payload (data only) = 16378. + let mut padded = vec![0u8; 16_384]; + padded[0] = 5; + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.send_frame(frames::data(1, vec![0; 16_384])).await; + srv.recv_frame(frames::reset(1).cancel()).await; + // Wait for the reset to expire so the stream is forgotten. + idle_ms(50).await; + srv.ping_pong([1; 8]).await; + // Stream 1 has been evicted. Send a padded DATA frame for it. + srv.send_frame(frames::data(1, &padded[..]).padded().eos()) + .await; + // Released capacity must cover both frames using their full + // flow_controlled_len. There used to be a bug where the padded + // frame would release only 16378 (payload) instead of 16384. + srv.recv_frame(frames::window_update(0, 16_384 * 2)).await; + srv.recv_frame(frames::reset(1).stream_closed()).await; + }; + + let client = async move { + let (mut client, conn) = client::Builder::new() + .reset_stream_duration(Duration::from_millis(10)) + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let req = async { + let resp = client.get("https://example.com/").await.expect("response"); + assert_eq!(resp.status(), StatusCode::OK); + // This drop sends RST_STREAM(CANCEL) + drop(resp); + }; + + let mut conn = Box::pin(async move { conn.await.expect("client") }); + conn.drive(req).await; + conn.await; + drop(client); + }; + + join(srv, client).await; +} + #[tokio::test] async fn stream_close_by_data_frame_releases_capacity() { h2_support::trace_init!();