Skip to content

Commit 0408e1d

Browse files
committed
fix: set_reset must notify send task (missed wakeup)
`set_reset` notified `recv_task` and `push_task`, but not `send_task`. This meant `poll_capacity` and `poll_reset` were never awoken for library resets. For example when a WINDOW_UPDATE overflows.
1 parent 1e68f99 commit 0408e1d

2 files changed

Lines changed: 108 additions & 1 deletion

File tree

src/proto/streams/stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,10 @@ impl Stream {
383383
}
384384

385385
/// Set the stream's state to `Closed` with the given reason and initiator.
386-
/// Notify the send and receive tasks, if they exist.
386+
/// Notify the send, receive, and push tasks, if they exist.
387387
pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
388388
self.state.set_reset(self.id, reason, initiator);
389+
self.notify_send();
389390
self.notify_push();
390391
self.notify_recv();
391392
}

tests/h2-tests/tests/flow_control.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use futures::{StreamExt, TryStreamExt};
22
use h2_support::prelude::*;
33
use h2_support::util::yield_once;
4+
use tokio::sync::oneshot;
45

56
// In this case, the stream & connection both have capacity, but capacity is not
67
// explicitly requested.
@@ -2311,3 +2312,108 @@ async fn too_many_window_update_resets_causes_go_away() {
23112312

23122313
join(srv, client).await;
23132314
}
2315+
2316+
/// When the library sends RST_STREAM (e.g., due to a WINDOW_UPDATE
2317+
/// overflow), `poll_capacity` and `poll_reset` must be notified.
2318+
#[tokio::test]
2319+
async fn poll_capacity_woken_on_library_reset() {
2320+
h2_support::trace_init!();
2321+
2322+
for polling_capacity in [true, false] {
2323+
let (io, mut srv) = mock::new();
2324+
let (client_done_tx, client_done_rx) = oneshot::channel::<()>();
2325+
2326+
let srv = async move {
2327+
let settings = srv.assert_client_handshake().await;
2328+
assert_default_settings!(settings);
2329+
2330+
srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
2331+
.await;
2332+
2333+
// 2. Receive the 65535-byte initial window (4 DATA frames at default MAX_FRAME_SIZE).
2334+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2335+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2336+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2337+
srv.recv_frame(frames::data(1, vec![0u8; 16_383])).await;
2338+
2339+
// 3. Grow stream window to 2^31-1, to set up for overflow later.
2340+
srv.send_frame(frames::window_update(0, 65535)).await;
2341+
srv.send_frame(frames::window_update(1, 2_147_483_647))
2342+
.await;
2343+
2344+
// 5. Receive the next 65535 bytes (connection-limited).
2345+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2346+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2347+
srv.recv_frame(frames::data(1, vec![0u8; 16_384])).await;
2348+
srv.recv_frame(frames::data(1, vec![0u8; 16_383])).await;
2349+
2350+
// 6. Overflow: stream window 2147418112 + 65536 = 2^31 > 2^31-1.
2351+
srv.send_frame(frames::window_update(1, 65536)).await;
2352+
2353+
// 8. Receive the RST_STREAM(FLOW_CONTROL_ERROR) sent by the library.
2354+
srv.recv_frame(frames::reset(1).flow_control()).await;
2355+
2356+
// Wait for the client to finish. Otherwise Recv::recv_eof hides
2357+
// the missing waker.
2358+
let _ = client_done_rx.await;
2359+
};
2360+
2361+
let client = async move {
2362+
let (mut client, conn) = client::handshake(io).await.unwrap();
2363+
tokio::spawn(async move {
2364+
// Separate task so the polled method won't resolve unless notify_send wakes it.
2365+
let _ = conn.await;
2366+
});
2367+
2368+
let request = Request::builder()
2369+
.method(Method::POST)
2370+
.uri("https://example.com/")
2371+
.body(())
2372+
.unwrap();
2373+
let (_resp, mut stream) = client.send_request(request, false).unwrap();
2374+
2375+
// 1. Exhaust the initial 65535-byte window.
2376+
stream.reserve_capacity(65535);
2377+
let cap = poll_fn(|cx| stream.poll_capacity(cx))
2378+
.await
2379+
.unwrap()
2380+
.unwrap();
2381+
assert_eq!(cap, 65535);
2382+
stream.send_data(vec![0u8; cap].into(), false).unwrap();
2383+
2384+
// 4. poll_capacity blocks until 3. replenishes windows, then send again.
2385+
stream.reserve_capacity(65535);
2386+
let cap = poll_fn(|cx| stream.poll_capacity(cx))
2387+
.await
2388+
.unwrap()
2389+
.unwrap();
2390+
assert_eq!(cap, 65535);
2391+
stream.send_data(vec![0u8; cap].into(), false).unwrap();
2392+
2393+
// 7. The polled method must be woken by the reset from 6.
2394+
if polling_capacity {
2395+
stream.reserve_capacity(65535);
2396+
let result = tokio::time::timeout(
2397+
Duration::from_secs(1),
2398+
poll_fn(|cx| stream.poll_capacity(cx)).wakened(),
2399+
)
2400+
.await
2401+
.expect("poll_capacity was not woken");
2402+
assert!(result.is_none());
2403+
} else {
2404+
let reason = tokio::time::timeout(
2405+
Duration::from_secs(1),
2406+
poll_fn(|cx| stream.poll_reset(cx)).wakened(),
2407+
)
2408+
.await
2409+
.expect("poll_reset was not woken")
2410+
.unwrap();
2411+
assert_eq!(reason, Reason::FLOW_CONTROL_ERROR);
2412+
}
2413+
2414+
let _ = client_done_tx.send(());
2415+
};
2416+
2417+
join(srv, client).await;
2418+
}
2419+
}

0 commit comments

Comments
 (0)