Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ pub struct Builder {
/// Maximum amount of bytes to "buffer" for writing per stream.
max_send_buffer_size: usize,

/// Maximum number of bytes to read at a time (for the entire connection).
recv_buffer_size: usize,

/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

Expand Down Expand Up @@ -655,6 +658,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
recv_buffer_size: proto::DEFAULT_RECV_BUFFER_SIZE,
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
Expand Down Expand Up @@ -1074,6 +1078,19 @@ impl Builder {
self
}

/// Sets the read buffer size for the entire connection.
/// Determines the maximum number of bytes that can be read at a time.
/// The default is currently 8KB, but may change.
///
/// # Panics
///
/// This function panics if `n` is larger than `i32::MAX`.
pub fn recv_buffer_size(&mut self, n: usize) -> &mut Self {
assert!(n <= i32::MAX as usize);
self.recv_buffer_size = n;
self
}

/// Enables or disables server push promises.
///
/// This value is included in the initial SETTINGS handshake.
Expand Down Expand Up @@ -1309,7 +1326,7 @@ where
bind_connection(&mut io).await?;

// Create the codec
let mut codec = Codec::new(io);
let mut codec = Codec::with_recv_buffer_size(io, builder.recv_buffer_size);

if let Some(max) = builder.settings.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
Expand Down
82 changes: 78 additions & 4 deletions src/codec/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ use futures_core::Stream;
use bytes::{Buf, BytesMut};

use std::io;

use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};

use tokio::io::AsyncRead;
use tokio_util::codec::FramedRead as InnerFramedRead;
use tokio_util::codec::{LengthDelimitedCodec, LengthDelimitedCodecError};
use tokio_util::codec::{Decoder, LengthDelimitedCodec, LengthDelimitedCodecError};

// 16 MB "sane default" taken from golang http2
const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: usize = 16 << 20;

#[derive(Debug)]
pub struct FramedRead<T> {
inner: InnerFramedRead<T, LengthDelimitedCodec>,
inner: InnerFramedRead<T, BufferManager<LengthDelimitedCodec>>,

// hpack decoder state
hpack: hpack::Decoder,
Expand All @@ -35,6 +36,19 @@ pub struct FramedRead<T> {
partial: Option<Partial>,
}

/// Enables more efficient frame decoder buffering
#[derive(Debug)]
struct BufferManager<T> {
inner: T,

/// Secondary buffer which gets used when the primary buffer's capacity falls below min_buf_capacity.
///
/// This buffer has a higher likelihood of being able to reclaim its original buffer space.
alt_buf: BytesMut,

min_buf_capacity: usize,
}

/// Partially loaded headers frame
#[derive(Debug)]
struct Partial {
Expand All @@ -58,8 +72,9 @@ impl<T> FramedRead<T> {
let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
let max_continuation_frames =
calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length());
let min_buf_capacity = inner.read_buffer().capacity() / 2;
FramedRead {
inner,
inner: inner.map_decoder(|d| BufferManager::with_min_buf_capacity(d, min_buf_capacity)),
hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
max_header_list_size,
max_continuation_frames,
Expand Down Expand Up @@ -419,6 +434,65 @@ fn map_err(err: io::Error) -> Error {
err.into()
}

// ===== impl BufferManager =====

impl<T> BufferManager<T> {
pub fn with_min_buf_capacity(inner: T, min_buf_capacity: usize) -> Self {
BufferManager {
inner,
alt_buf: BytesMut::new(),
min_buf_capacity,
}
}
}

impl<T> Deref for BufferManager<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> DerefMut for BufferManager<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl<T: Decoder> Decoder for BufferManager<T> {
type Item = T::Item;
type Error = T::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let r = self.inner.decode(src);
// If we can't decode any more frames at the moment,
// and buffer capacity has fallen below the desired minimum.
if matches!(r, Ok(None)) && src.capacity() < self.min_buf_capacity {
// Empty the secondary buffer (it hasn't been used in a while anyways).
self.alt_buf.clear();

// Ensure that the secondary buffer has at least 2 times the minimum desired capacity.
// It's much more likely for the secondary buffer to no longer be referenced externally at this point,
// increasing the likelihood of this reservation simply reclaiming its original buffer space.
self.alt_buf.reserve(self.min_buf_capacity * 2);

// Copy the primary buffer to the secondary buffer.
// The primary buffer is highly likely to be empty or almost empty within this block.
self.alt_buf.extend_from_slice(src);

tracing::trace!(
capacity_before = %src.capacity(),
capacity_after = %self.alt_buf.capacity(),
len = %src.len(),
"replacing read buffer",
);

std::mem::swap(src, &mut self.alt_buf);
}
r
}
}

// ===== impl Continuable =====

impl Continuable {
Expand Down
44 changes: 36 additions & 8 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use self::framed_read::FramedRead;
use self::framed_write::FramedWrite;

use crate::frame::{self, Data, Frame};
use crate::proto::Error;
use crate::proto::{self, Error};

use bytes::Buf;
use futures_core::Stream;
Expand All @@ -17,6 +17,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::length_delimited;
use tokio_util::codec::FramedRead as InnerFramedRead;

use std::io;

Expand All @@ -33,21 +34,48 @@ where
/// Returns a new `Codec` with the default max frame size
#[inline]
pub fn new(io: T) -> Self {
Self::with_max_recv_frame_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize)
Self::with_recv_buffer_size(io, proto::DEFAULT_RECV_BUFFER_SIZE)
}

/// Returns a new `Codec` with the given read buffer size
pub fn with_recv_buffer_size(io: T, recv_buffer_size: usize) -> Self {
Self::with_max_recv_frame_size_and_recv_buffer_size(
io,
frame::DEFAULT_MAX_FRAME_SIZE as usize,
recv_buffer_size,
)
}

/// Returns a new `Codec` with the given maximum frame size
#[allow(dead_code)]
pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self {
Self::with_max_recv_frame_size_and_recv_buffer_size(
io,
max_frame_size,
proto::DEFAULT_RECV_BUFFER_SIZE,
)
}

/// Returns a new `Codec` with the given maximum frame size and read buffer size
pub fn with_max_recv_frame_size_and_recv_buffer_size(
io: T,
max_frame_size: usize,
recv_buffer_size: usize,
) -> Self {
// Wrap with writer
let framed_write = FramedWrite::new(io);

// Delimit the frames
let delimited = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(framed_write);
let delimited = InnerFramedRead::with_capacity(
framed_write,
length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_codec(),
recv_buffer_size,
);

let mut inner = FramedRead::new(delimited);

Expand Down
1 change: 1 addition & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ pub const DEFAULT_RESET_STREAM_MAX: usize = 50;
// reasonable guess of the average here.
pub const DEFAULT_RESET_STREAM_SECS: u64 = 1;
pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400;
pub const DEFAULT_RECV_BUFFER_SIZE: usize = 1024 * 8;
19 changes: 18 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ pub struct Builder {
/// Maximum amount of bytes to "buffer" for writing per stream.
max_send_buffer_size: usize,

/// Maximum number of bytes to read at a time (for the entire connection).
recv_buffer_size: usize,

/// Maximum number of locally reset streams due to protocol error across
/// the lifetime of the connection.
///
Expand Down Expand Up @@ -381,7 +384,7 @@ where
let entered = span.enter();

// Create the codec.
let mut codec = Codec::new(io);
let mut codec = Codec::with_recv_buffer_size(io, builder.recv_buffer_size);

if let Some(max) = builder.settings.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
Expand Down Expand Up @@ -655,6 +658,7 @@ impl Builder {
settings: Settings::default(),
initial_target_connection_window_size: None,
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
recv_buffer_size: proto::DEFAULT_RECV_BUFFER_SIZE,

local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
}
Expand Down Expand Up @@ -985,6 +989,19 @@ impl Builder {
self
}

/// Sets the read buffer size for the entire connection.
/// Determines the maximum number of bytes that can be read at a time.
/// The default is currently 8KB, but may change.
///
/// # Panics
///
/// This function panics if `n` is larger than `i32::MAX`.
pub fn recv_buffer_size(&mut self, n: usize) -> &mut Self {
assert!(n <= i32::MAX as usize);
self.recv_buffer_size = n;
self
}

/// Sets the maximum number of concurrent locally reset streams.
///
/// When a stream is explicitly reset by either calling
Expand Down