Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,16 @@ impl Send {

stream.send_capacity_inc = false;

Poll::Ready(Some(Ok(self.capacity(stream))))
let capacity = self.capacity(stream);

// If capacity has been reduced to zero, for example due to a race
// with a SETTINGS frame, return Pending instead of Ready(Ok(0)).
if capacity == 0 {
stream.wait_send(cx);
return Poll::Pending;
}

Poll::Ready(Some(Ok(capacity)))
}

/// Current available stream send capacity
Expand Down
2 changes: 1 addition & 1 deletion src/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl<B: Buf> SendStream<B> {
/// increased by the connection. Note that `n` here represents the **total**
/// amount of assigned capacity at that point in time. It is also possible
/// that `n` is lower than the previous call if, since then, the caller has
/// sent data.
/// sent data. `n` will always be greater than zero.
pub fn poll_capacity(&mut self, cx: &mut Context) -> Poll<Option<Result<usize, crate::Error>>> {
self.inner
.poll_capacity(cx)
Expand Down
55 changes: 55 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2670,3 +2670,58 @@ async fn poll_capacity_woken_on_library_reset() {
join(srv, client).await;
}
}

/// A WINDOW_UPDATE followed by a SETTINGS decrease can cancel each other out, resulting
/// in zero capacity. `poll_capacity` must return `Pending` (not `Ready(Ok(0))`) in that case.
#[tokio::test]
async fn poll_capacity_window_update_settings_race() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let mut settings = frame::Settings::default();
settings.set_initial_window_size(Some(0));

let srv = async move {
let settings = srv.assert_client_handshake_with_settings(settings).await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
.await;
idle_ms(50).await;

// Give stream capacity then immediately take it back
srv.send_frame(frames::window_update(1, 1024)).await;
srv.send_frame(frames::settings().initial_window_size(0))
.await;
srv.recv_frame(frames::settings_ack()).await;

// Now give real, usable capacity
srv.send_frame(frames::window_update(0, 11)).await;
srv.send_frame(frames::window_update(1, 11)).await;
srv.recv_frame(frames::data(1, "hello world").eos()).await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
};

let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://example.com/")
.body(())
.unwrap();

let (response, mut stream) = client.send_request(request, false).unwrap();
stream.reserve_capacity(11);

// `wait_for_capacity` panics if `poll_capacity` ever returns `Ok(0)`
let mut stream = h2.drive(util::wait_for_capacity(stream, 11)).await;
stream.send_data("hello world".into(), true).unwrap();

let response = h2.drive(response).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

h2.await.unwrap();
};

join(srv, h2).await;
}
Loading