Skip to content

Commit 30998f2

Browse files
authored
fix: set_reset must notify send task (missed wakeup) (#897)
`set_reset` notified `recv_task` and `push_task`, but not `send_task`. This meant `poll_capacity` and `poll_reset` were never awoken for library resets. For example when a WINDOW_UPDATE overflows.
1 parent d9689ea commit 30998f2

2 files changed

Lines changed: 109 additions & 1 deletion

File tree

src/proto/streams/stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,10 @@ impl Stream {
383383
}
384384

385385
/// Set the stream's state to `Closed` with the given reason and initiator.
386-
/// Notify the send and receive tasks, if they exist.
386+
/// Notify the send, receive, and push tasks, if they exist.
387387
pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
388388
self.state.set_reset(self.id, reason, initiator);
389+
self.notify_send();
389390
self.notify_push();
390391
self.notify_recv();
391392
}

tests/h2-tests/tests/flow_control.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use futures::{StreamExt, TryStreamExt};
22
use h2_support::prelude::*;
33
use h2_support::util::yield_once;
4+
use tokio::sync::oneshot;
45

56
// In this case, the stream & connection both have capacity, but capacity is not
67
// explicitly requested.
@@ -2563,3 +2564,109 @@ async fn goaway_ignores_data_but_returns_connection_capacity() {
25632564
join(client, srv).await;
25642565
}
25652566
}
2567+
2568+
/// When the library sends RST_STREAM (e.g., due to a WINDOW_UPDATE
2569+
/// overflow), `poll_capacity` and `poll_reset` must be notified.
2570+
/// Regression test for https://github.com/hyperium/h2/pull/897
2571+
#[tokio::test]
2572+
async fn poll_capacity_woken_on_library_reset() {
2573+
h2_support::trace_init!();
2574+
2575+
for polling_capacity in [true, false] {
2576+
let (io, mut srv) = mock::new();
2577+
let (client_done_tx, client_done_rx) = oneshot::channel::<()>();
2578+
2579+
let srv = async move {
2580+
let settings = srv.assert_client_handshake().await;
2581+
assert_default_settings!(settings);
2582+
2583+
srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
2584+
.await;
2585+
2586+
// 2. Receive the 65535-byte initial window (4 DATA frames at default MAX_FRAME_SIZE).
2587+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2588+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2589+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2590+
srv.recv_frame(frames::data(1, vec![0u8; 16_383])).await;
2591+
2592+
// 3. Grow stream window to 2^31-1, to set up for overflow later.
2593+
srv.send_frame(frames::window_update(0, 65535)).await;
2594+
srv.send_frame(frames::window_update(1, 2_147_483_647))
2595+
.await;
2596+
2597+
// 5. Receive the next 65535 bytes (connection-limited).
2598+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2599+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2600+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2601+
srv.recv_frame(frames::data(1, vec![0u8; 16_383])).await;
2602+
2603+
// 6. Overflow: stream window 2147418112 + 65536 = 2^31 > 2^31-1.
2604+
srv.send_frame(frames::window_update(1, 65536)).await;
2605+
2606+
// 8. Receive the RST_STREAM(FLOW_CONTROL_ERROR) sent by the library.
2607+
srv.recv_frame(frames::reset(1).flow_control()).await;
2608+
2609+
// Wait for the client to finish. Otherwise Recv::recv_eof hides
2610+
// the missing waker.
2611+
let _ = client_done_rx.await;
2612+
};
2613+
2614+
let client = async move {
2615+
let (mut client, conn) = client::handshake(io).await.unwrap();
2616+
tokio::spawn(async move {
2617+
// Separate task so the polled method won't resolve unless notify_send wakes it.
2618+
let _ = conn.await;
2619+
});
2620+
2621+
let request = Request::builder()
2622+
.method(Method::POST)
2623+
.uri("https://example.com/")
2624+
.body(())
2625+
.unwrap();
2626+
let (_resp, mut stream) = client.send_request(request, false).unwrap();
2627+
2628+
// 1. Exhaust the initial 65535-byte window.
2629+
stream.reserve_capacity(65535);
2630+
let cap = poll_fn(|cx| stream.poll_capacity(cx))
2631+
.await
2632+
.unwrap()
2633+
.unwrap();
2634+
assert_eq!(cap, 65535);
2635+
stream.send_data(vec![0u8; cap].into(), false).unwrap();
2636+
2637+
// 4. poll_capacity blocks until 3. replenishes windows, then send again.
2638+
stream.reserve_capacity(65535);
2639+
let cap = poll_fn(|cx| stream.poll_capacity(cx))
2640+
.await
2641+
.unwrap()
2642+
.unwrap();
2643+
assert_eq!(cap, 65535);
2644+
stream.send_data(vec![0u8; cap].into(), false).unwrap();
2645+
2646+
// 7. The polled method must be woken by the reset from 6.
2647+
if polling_capacity {
2648+
stream.reserve_capacity(65535);
2649+
let result = tokio::time::timeout(
2650+
Duration::from_secs(1),
2651+
poll_fn(|cx| stream.poll_capacity(cx)).wakened(),
2652+
)
2653+
.await
2654+
.expect("poll_capacity was not woken");
2655+
assert!(result.is_none());
2656+
} else {
2657+
let reason = tokio::time::timeout(
2658+
Duration::from_secs(1),
2659+
poll_fn(|cx| stream.poll_reset(cx)).wakened(),
2660+
)
2661+
.await
2662+
.expect("poll_reset was not woken")
2663+
.unwrap();
2664+
assert_eq!(reason, Reason::FLOW_CONTROL_ERROR);
2665+
}
2666+
2667+
let _ = client_done_tx.send(());
2668+
};
2669+
2670+
join(srv, client).await;
2671+
}
2672+
}

0 commit comments

Comments
 (0)