Skip to content

Commit 47f487a

Browse files
committed
fix: Avoid panic or capacity leak when a stream is cancelled after reserve_capacity
1 parent dbc204e commit 47f487a

2 files changed

Lines changed: 78 additions & 8 deletions

File tree

src/proto/streams/prioritize.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -442,14 +442,10 @@ impl Prioritize {
442442
return;
443443
}
444444

445-
// If the stream has requested capacity, then it must be in the
446-
// streaming state (more data could be sent) or there is buffered data
447-
// waiting to be sent.
448-
debug_assert!(
449-
stream.state.is_send_streaming() || stream.buffered_send_data > 0,
450-
"state={:?}",
451-
stream.state
452-
);
445+
// The stream may have been reset before capacity was assigned.
446+
if !stream.state.is_send_streaming() && stream.buffered_send_data == 0 {
447+
return;
448+
}
453449

454450
// The amount of currently available capacity on the connection
455451
let conn_available = self.flow.available().as_size();

tests/h2-tests/tests/flow_control.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,6 +1587,80 @@ async fn reset_stream_waiting_for_capacity() {
15871587
join(srv, client).await;
15881588
}
15891589

1590+
// Regression test for TODO (fill in PR link)
1591+
#[tokio::test]
1592+
async fn reserve_capacity_then_cancel_does_not_leak() {
1593+
for explicit_reset in [true, false] {
1594+
let (io, mut srv) = mock::new();
1595+
1596+
let srv = async move {
1597+
let _ = srv.assert_client_handshake().await;
1598+
srv.recv_frame(frames::headers(1).request("POST", "https://example.com/"))
1599+
.await;
1600+
if !explicit_reset {
1601+
srv.send_frame(frames::headers(1).response(200)).await;
1602+
}
1603+
let mut data_bytes = 0;
1604+
loop {
1605+
let frame = srv.next().await.unwrap().unwrap();
1606+
match frame {
1607+
h2::frame::Frame::Reset(_) | h2::frame::Frame::Headers(_) => {}
1608+
h2::frame::Frame::Data(d) => {
1609+
data_bytes += d.payload().len();
1610+
if d.is_end_stream() {
1611+
break;
1612+
}
1613+
}
1614+
other => panic!("unexpected: {:?}", other),
1615+
}
1616+
}
1617+
assert_eq!(data_bytes, 65535);
1618+
srv.send_frame(frames::headers(3).response(200).eos()).await;
1619+
};
1620+
1621+
let client = async move {
1622+
let (mut client, mut conn) = client::handshake(io).await.expect("handshake");
1623+
let request = Request::builder()
1624+
.method(Method::POST)
1625+
.uri("https://example.com/")
1626+
.body(())
1627+
.unwrap();
1628+
let (response, mut stream) = client.send_request(request, false).unwrap();
1629+
stream.reserve_capacity(10);
1630+
1631+
if explicit_reset {
1632+
stream.send_reset(Reason::CANCEL);
1633+
drop(stream);
1634+
let err = response.await.unwrap_err();
1635+
assert_eq!(err.reason(), Some(Reason::CANCEL));
1636+
} else {
1637+
let resp = conn.drive(response).await.unwrap();
1638+
assert_eq!(resp.status(), StatusCode::OK);
1639+
drop(stream);
1640+
drop(resp);
1641+
}
1642+
1643+
// Open a second stream and send a full window of data. If capacity
1644+
// leaked, this would stall.
1645+
let request2 = Request::builder()
1646+
.method(Method::POST)
1647+
.uri("https://example.com/")
1648+
.body(())
1649+
.unwrap();
1650+
let (response2, mut stream2) = client.send_request(request2, false).unwrap();
1651+
stream2.send_data(vec![0; 65535].into(), true).unwrap();
1652+
join(async move { conn.await.expect("conn") }, async move {
1653+
let resp = response2.await.expect("resp");
1654+
assert_eq!(resp.status(), StatusCode::OK);
1655+
drop(client);
1656+
})
1657+
.await;
1658+
};
1659+
1660+
join(srv, client).await;
1661+
}
1662+
}
1663+
15901664
#[tokio::test]
15911665
async fn data_padding() {
15921666
h2_support::trace_init!();

0 commit comments

Comments
 (0)