Skip to content

Commit b16cbf8

Browse files
committed
fix: set_reset must notify send task (missed wakeup)
`set_reset` notified `recv_task` and `push_task`, but not `send_task`. This meant `poll_capacity` and `poll_reset` were never awoken for library resets. For example when a WINDOW_UPDATE overflows.
1 parent 1e68f99 commit b16cbf8

2 files changed

Lines changed: 109 additions & 1 deletion

File tree

src/proto/streams/stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,10 @@ impl Stream {
383383
}
384384

385385
/// Set the stream's state to `Closed` with the given reason and initiator.
386-
/// Notify the send and receive tasks, if they exist.
386+
/// Notify the send, receive, and push tasks, if they exist.
387387
pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
388388
self.state.set_reset(self.id, reason, initiator);
389+
self.notify_send();
389390
self.notify_push();
390391
self.notify_recv();
391392
}

tests/h2-tests/tests/flow_control.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use futures::{StreamExt, TryStreamExt};
22
use h2_support::prelude::*;
33
use h2_support::util::yield_once;
4+
use tokio::sync::oneshot;
45

56
// In this case, the stream & connection both have capacity, but capacity is not
67
// explicitly requested.
@@ -2311,3 +2312,109 @@ async fn too_many_window_update_resets_causes_go_away() {
23112312

23122313
join(srv, client).await;
23132314
}
2315+
2316+
/// When the library sends RST_STREAM (e.g., due to a WINDOW_UPDATE
2317+
/// overflow), `poll_capacity` and `poll_reset` must be notified.
2318+
/// Regression test for https://github.com/hyperium/h2/pull/897
2319+
#[tokio::test]
2320+
async fn poll_capacity_woken_on_library_reset() {
2321+
h2_support::trace_init!();
2322+
2323+
for polling_capacity in [true, false] {
2324+
let (io, mut srv) = mock::new();
2325+
let (client_done_tx, client_done_rx) = oneshot::channel::<()>();
2326+
2327+
let srv = async move {
2328+
let settings = srv.assert_client_handshake().await;
2329+
assert_default_settings!(settings);
2330+
2331+
srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
2332+
.await;
2333+
2334+
// 2. Receive the 65535-byte initial window (4 DATA frames at default MAX_FRAME_SIZE).
2335+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2336+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2337+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2338+
srv.recv_frame(frames::data(1, vec![0u8; 16_383])).await;
2339+
2340+
// 3. Grow stream window to 2^31-1, to set up for overflow later.
2341+
srv.send_frame(frames::window_update(0, 65535)).await;
2342+
srv.send_frame(frames::window_update(1, 2_147_483_647))
2343+
.await;
2344+
2345+
// 5. Receive the next 65535 bytes (connection-limited).
2346+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2347+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2348+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2349+
srv.recv_frame(frames::data(1, vec![0u8; 16_383])).await;
2350+
2351+
// 6. Overflow: stream window 2147418112 + 65536 = 2^31 > 2^31-1.
2352+
srv.send_frame(frames::window_update(1, 65536)).await;
2353+
2354+
// 8. Receive the RST_STREAM(FLOW_CONTROL_ERROR) sent by the library.
2355+
srv.recv_frame(frames::reset(1).flow_control()).await;
2356+
2357+
// Wait for the client to finish. Otherwise Recv::recv_eof hides
2358+
// the missing waker.
2359+
let _ = client_done_rx.await;
2360+
};
2361+
2362+
let client = async move {
2363+
let (mut client, conn) = client::handshake(io).await.unwrap();
2364+
tokio::spawn(async move {
2365+
// Separate task so the polled method won't resolve unless notify_send wakes it.
2366+
let _ = conn.await;
2367+
});
2368+
2369+
let request = Request::builder()
2370+
.method(Method::POST)
2371+
.uri("https://example.com/")
2372+
.body(())
2373+
.unwrap();
2374+
let (_resp, mut stream) = client.send_request(request, false).unwrap();
2375+
2376+
// 1. Exhaust the initial 65535-byte window.
2377+
stream.reserve_capacity(65535);
2378+
let cap = poll_fn(|cx| stream.poll_capacity(cx))
2379+
.await
2380+
.unwrap()
2381+
.unwrap();
2382+
assert_eq!(cap, 65535);
2383+
stream.send_data(vec![0u8; cap].into(), false).unwrap();
2384+
2385+
// 4. poll_capacity blocks until 3. replenishes windows, then send again.
2386+
stream.reserve_capacity(65535);
2387+
let cap = poll_fn(|cx| stream.poll_capacity(cx))
2388+
.await
2389+
.unwrap()
2390+
.unwrap();
2391+
assert_eq!(cap, 65535);
2392+
stream.send_data(vec![0u8; cap].into(), false).unwrap();
2393+
2394+
// 7. The polled method must be woken by the reset from 6.
2395+
if polling_capacity {
2396+
stream.reserve_capacity(65535);
2397+
let result = tokio::time::timeout(
2398+
Duration::from_secs(1),
2399+
poll_fn(|cx| stream.poll_capacity(cx)).wakened(),
2400+
)
2401+
.await
2402+
.expect("poll_capacity was not woken");
2403+
assert!(result.is_none());
2404+
} else {
2405+
let reason = tokio::time::timeout(
2406+
Duration::from_secs(1),
2407+
poll_fn(|cx| stream.poll_reset(cx)).wakened(),
2408+
)
2409+
.await
2410+
.expect("poll_reset was not woken")
2411+
.unwrap();
2412+
assert_eq!(reason, Reason::FLOW_CONTROL_ERROR);
2413+
}
2414+
2415+
let _ = client_done_tx.send(());
2416+
};
2417+
2418+
join(srv, client).await;
2419+
}
2420+
}

0 commit comments

Comments
 (0)