Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ bug_fixes:
definition requires. Both factories now serialize the ``Struct`` to a JSON string and pass the
string to the dynamic module side as the configuration, matching the behavior already in place
for every other dynamic module extension factory.
- area: io_uring
change: |
Fixed io_uring sockets to bound their internal asynchronous write buffer with the existing
connection write buffer limit. This lets Envoy's normal connection-level back-pressure and
overload protections engage when io_uring writes are staged faster than the kernel completes
them.
- area: oauth2
change: |
Fixed a crash in the OAuth2 filter where AES-CBC decryption of token cookies could spuriously
Expand Down
6 changes: 6 additions & 0 deletions envoy/common/io/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ class IoUringSocket {
*/
virtual uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) PURE;

/**
* Set the high watermark for the socket's internal write buffer.
* @param high_watermark supplies the limit in bytes, or 0 to disable the limit.
*/
virtual void setWriteBufferHighWatermark(uint32_t high_watermark) PURE;

/**
* Shutdown the socket.
* @param how is SHUT_RD, SHUT_WR and SHUT_RDWR.
Expand Down
7 changes: 7 additions & 0 deletions envoy/network/io_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ class IoHandle {
*/
virtual Api::IoCallUint64Result write(Buffer::Instance& buffer) PURE;

/**
* Set the write buffer limit used by IoHandle implementations that stage asynchronous writes
* internally. Implementations that do not stage write data may ignore this.
* @param limit supplies the connection write buffer limit in bytes, or 0 to disable the limit.
*/
virtual void setWriteBufferLimits(uint32_t limit) { UNREFERENCED_PARAMETER(limit); }

/**
* Send a message to the address.
* @param slices points to the location of data to be sent.
Expand Down
54 changes: 52 additions & 2 deletions source/common/io/io_uring_worker_impl.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/common/io/io_uring_worker_impl.h"

#include <limits>

namespace Envoy {
namespace Io {

Expand Down Expand Up @@ -378,10 +380,26 @@ void IoUringServerSocket::write(Buffer::Instance& data) {
ENVOY_LOG(trace, "write, buffer size = {}, fd = {}", data.length(), fd_);
ASSERT(!shutdown_.has_value());

uint64_t bytes_to_move = data.length();
if (write_buffer_high_watermark_ > 0) {
const uint64_t current_size = write_buf_.length();
if (current_size >= write_buffer_high_watermark_) {
// The internal buffer is at or above the high watermark. Refuse to stage more bytes so
// the upper layer's connection-level back-pressure engages.
back_pressure_pending_ = true;
return;
}
const uint64_t headroom = write_buffer_high_watermark_ - current_size;
if (bytes_to_move > headroom) {
bytes_to_move = headroom;
back_pressure_pending_ = true;
}
}

// We need to reset the drain trackers, since the write and close is async in
// the iouring. When the write is actually finished the above layer may already
// release the drain trackers.
write_buf_.move(data, data.length(), true);
write_buf_.move(data, bytes_to_move, true);

submitWriteOrShutdownRequest();
}
Expand All @@ -390,13 +408,36 @@ uint64_t IoUringServerSocket::write(const Buffer::RawSlice* slices, uint64_t num
ENVOY_LOG(trace, "write, num_slices = {}, fd = {}", num_slice, fd_);
ASSERT(!shutdown_.has_value());

uint64_t headroom = std::numeric_limits<uint64_t>::max();
if (write_buffer_high_watermark_ > 0) {
const uint64_t current_size = write_buf_.length();
if (current_size >= write_buffer_high_watermark_) {
back_pressure_pending_ = true;
return 0;
}
headroom = write_buffer_high_watermark_ - current_size;
}

uint64_t bytes_written = 0;
for (uint64_t i = 0; i < num_slice; i++) {
if (slices[i].len_ == 0) {
continue;
}
if (slices[i].len_ > headroom) {
write_buf_.add(slices[i].mem_, headroom);
bytes_written += headroom;
headroom = 0;
back_pressure_pending_ = true;
break;
}
write_buf_.add(slices[i].mem_, slices[i].len_);
bytes_written += slices[i].len_;
headroom -= slices[i].len_;
}

submitWriteOrShutdownRequest();
if (bytes_written > 0) {
submitWriteOrShutdownRequest();
}
return bytes_written;
}

Expand Down Expand Up @@ -593,6 +634,15 @@ void IoUringServerSocket::onWrite(Request* req, int32_t result, bool injected) {
}
}

// If a previous ``write()`` was short because the high watermark was hit, give the upper layer
// a chance to retry now that ``write_buf_`` has drained below the watermark.
if (back_pressure_pending_ && write_buffer_high_watermark_ > 0 &&
write_buf_.length() < write_buffer_high_watermark_ && !shutdown_.has_value() &&
status_ != Closed) {
back_pressure_pending_ = false;
injectCompletion(Request::RequestType::Write);
}

submitWriteOrShutdownRequest();
}

Expand Down
19 changes: 14 additions & 5 deletions source/common/io/io_uring_worker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class IoUringSocketEntry : public IoUringSocket,
void disableRead() override { status_ = ReadDisabled; }
void enableCloseEvent(bool enable) override { enable_close_event_ = enable; }
void connect(const Network::Address::InstanceConstSharedPtr&) override { PANIC("not implement"); }
void setWriteBufferHighWatermark(uint32_t) override {}

void onAccept(Request*, int32_t, bool injected) override {
if (injected && (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Accept))) {
Expand Down Expand Up @@ -194,6 +195,9 @@ class IoUringServerSocket : public IoUringSocketEntry {
void disableRead() override;
void write(Buffer::Instance& data) override;
uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) override;
void setWriteBufferHighWatermark(uint32_t high_watermark) override {
write_buffer_high_watermark_ = high_watermark;
}
void shutdown(int how) override;
void onClose(Request* req, int32_t result, bool injected) override;
void onRead(Request* req, int32_t result, bool injected) override;
Expand All @@ -216,13 +220,18 @@ class IoUringServerSocket : public IoUringSocketEntry {
Buffer::OwnedImpl read_buf_;
absl::optional<int32_t> read_error_;

// TODO (soulxu): We need water mark for write buffer.
// The upper layer will think the buffer released when the data copy into this write buffer.
// This leads to the `IntegrationTest.TestFloodUpstreamErrors` timeout, since the http layer
// always think the response is write successful, so flood protection is never kicked.
//
// For write. iouring socket will write sequentially in the order of write_buf_ and shutdown_
// Unless the write_buf_ is empty, the shutdown operation will not be performed.
//
// When ``write_buffer_high_watermark_`` is non-zero, ``write()`` returns a short write once
// ``write_buf_`` reaches the configured size, so connection-level back-pressure (and overload
// protections that depend on it, like HTTP flood protection) engages. With the default of 0 the
// cap is disabled and the upper layer always sees its writes as fully accepted.
uint32_t write_buffer_high_watermark_{0};
// True when a previous ``write()`` was unable to fully accept the offered bytes because the
// internal write buffer hit the high watermark. When set, an injected Write completion is
// delivered to the upper layer after the buffer drains, so the upper layer retries.
bool back_pressure_pending_{false};
Buffer::OwnedImpl write_buf_;
// shutdown_ has 3 states. A absl::nullopt indicates the socket has not been shutdown, a false
// value represents the socket wants to be shutdown but the shutdown has not been performed or
Expand Down
1 change: 1 addition & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through

void ConnectionImpl::setBufferLimits(uint32_t limit) {
read_buffer_limit_ = limit;
ioHandle().setWriteBufferLimits(limit);

// Due to the fact that writes to the connection and flushing data from the connection are done
// asynchronously, we have the option of either setting the watermarks aggressively, and regularly
Expand Down
16 changes: 14 additions & 2 deletions source/common/network/io_uring_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,17 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::write(Buffer::Instance& buffer)
return std::move(*write_result);
}

uint64_t buffer_size = buffer.length();
const uint64_t before = buffer.length();
io_uring_socket_->write(buffer);
return {buffer_size, IoSocketError::none()};
const uint64_t after = buffer.length();
return {before - after, IoSocketError::none()};
}

void IoUringSocketHandleImpl::setWriteBufferLimits(uint32_t limit) {
write_buffer_high_watermark_ = limit;
if (io_uring_socket_.has_value()) {
io_uring_socket_->setWriteBufferHighWatermark(limit);
}
}

Api::IoCallUint64Result IoUringSocketHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int,
Expand Down Expand Up @@ -245,6 +253,7 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher,
io_uring_socket_->setFileReadyCb(std::move(cb));
io_uring_socket_->enableRead();
io_uring_socket_->enableCloseEvent(events & Event::FileReadyType::Closed);
io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_);
} else {
ENVOY_LOG(trace, "initialize file event from another thread, fd = {}, type = {}", fd_,
ioUringSocketTypeStr());
Expand All @@ -271,6 +280,7 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher,
// Move the temporary buf to the newly created one.
io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addServerSocket(
fd, buf, std::move(cb), events & Event::FileReadyType::Closed);
io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_);
}
return;
}
Expand All @@ -282,12 +292,14 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher,
case IoUringSocketType::Server:
io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addServerSocket(
fd_, std::move(cb), events & Event::FileReadyType::Closed);
io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_);
break;
case IoUringSocketType::Unknown:
case IoUringSocketType::Client:
io_uring_socket_type_ = IoUringSocketType::Client;
io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addClientSocket(
fd_, std::move(cb), events & Event::FileReadyType::Closed);
io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_);
break;
}
}
Expand Down
2 changes: 2 additions & 0 deletions source/common/network/io_uring_socket_handle_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class IoUringSocketHandleImpl : public IoSocketHandleBaseImpl {
absl::optional<uint64_t> max_length_opt) override;
Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override;
Api::IoCallUint64Result write(Buffer::Instance& buffer) override;
void setWriteBufferLimits(uint32_t limit) override;
Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags,
const Address::Ip* self_ip,
const Address::Instance& peer_address) override;
Expand Down Expand Up @@ -84,6 +85,7 @@ class IoUringSocketHandleImpl : public IoSocketHandleBaseImpl {
Io::IoUringWorkerFactory& io_uring_worker_factory_;
IoUringSocketType io_uring_socket_type_;
OptRef<Io::IoUringSocket> io_uring_socket_{absl::nullopt};
uint32_t write_buffer_high_watermark_{0};

Event::FileEventPtr file_event_{nullptr};

Expand Down
108 changes: 108 additions & 0 deletions test/common/io/io_uring_worker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,114 @@ TEST(IoUringWorkerImplTest, NoEnableReadOnConnectError) {
delete static_cast<Request*>(connect_req);
}

// When the connection write buffer limit is applied, ``IoUringServerSocket::write`` must refuse to
// stage more bytes than the cap so that the upper layer's connection-level back-pressure can
// engage. Once the in-flight write completes and the buffer drops below the watermark, an injected
// Write completion is delivered so the upper layer retries.
TEST(IoUringWorkerImplTest, WriteBufferHighWatermark) {
Event::MockDispatcher dispatcher;
IoUringPtr io_uring_instance = std::make_unique<MockIoUring>();
MockIoUring& mock_io_uring = *dynamic_cast<MockIoUring*>(io_uring_instance.get());

EXPECT_CALL(mock_io_uring, registerEventfd());
EXPECT_CALL(dispatcher,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
.WillOnce(ReturnNew<NiceMock<Event::MockFileEvent>>());
IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher);

// Watermark = 100 bytes.
IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false);
socket.setWriteBufferHighWatermark(100);

// First write of 80 bytes fits under the watermark and is fully accepted.
Request* write_req = nullptr;
EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _))
.WillOnce(DoAll(SaveArg<4>(&write_req), Return<IoUringResult>(IoUringResult::Ok)))
.RetiresOnSaturation();
EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation();
Buffer::OwnedImpl buf1;
buf1.add(std::string(80, 'a'));
socket.write(buf1);
EXPECT_EQ(0, buf1.length());

// Second write of 50 bytes can only stage 20 (100 - 80). The remaining 30 bytes stay in
// ``buf2``. No new ``prepareWritev`` is expected since a write request is already in flight.
Buffer::OwnedImpl buf2;
buf2.add(std::string(50, 'b'));
socket.write(buf2);
EXPECT_EQ(30, buf2.length());

// Third write while at the high watermark is rejected entirely.
Buffer::OwnedImpl buf3;
buf3.add(std::string(10, 'c'));
socket.write(buf3);
EXPECT_EQ(10, buf3.length());

// Complete the in-flight write (the kernel saw 80 bytes worth of iovecs). The drain leaves
// 20 bytes in ``write_buf_``, which is below the watermark, so an injected Write completion is
// queued for the upper layer to retry. The remaining 20 bytes are then submitted as a new
// ``prepareWritev``.
Request* write_req2 = nullptr;
EXPECT_CALL(mock_io_uring, injectCompletion(_, _, -EAGAIN))
.WillOnce(Invoke([](os_fd_t, Request* req, int32_t) { delete req; }));
EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _))
.WillOnce(DoAll(SaveArg<4>(&write_req2), Return<IoUringResult>(IoUringResult::Ok)))
.RetiresOnSaturation();
EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation();
socket.onWrite(write_req, 80, false);
delete write_req;
delete write_req2;

EXPECT_CALL(dispatcher, clearDeferredDeleteList());
}

// The slice-based ``write`` overload also caps at the high watermark and reports the actual
// number of bytes accepted.
TEST(IoUringWorkerImplTest, WriteSliceBufferHighWatermark) {
Event::MockDispatcher dispatcher;
IoUringPtr io_uring_instance = std::make_unique<MockIoUring>();
MockIoUring& mock_io_uring = *dynamic_cast<MockIoUring*>(io_uring_instance.get());

EXPECT_CALL(mock_io_uring, registerEventfd());
EXPECT_CALL(dispatcher,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
.WillOnce(ReturnNew<NiceMock<Event::MockFileEvent>>());
IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher);
IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false);
socket.setWriteBufferHighWatermark(50);

Request* write_req = nullptr;
EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _))
.WillOnce(DoAll(SaveArg<4>(&write_req), Return<IoUringResult>(IoUringResult::Ok)))
.RetiresOnSaturation();
EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation();

std::string a(40, 'a');
std::string b(20, 'b');
Buffer::RawSlice slices[2];
slices[0].mem_ = a.data();
slices[0].len_ = a.size();
slices[1].mem_ = b.data();
slices[1].len_ = b.size();
EXPECT_EQ(50, socket.write(slices, 2));

// Subsequent writes return 0 until the buffer drains.
std::string c(10, 'c');
Buffer::RawSlice slice2;
slice2.mem_ = c.data();
slice2.len_ = c.size();
EXPECT_EQ(0, socket.write(&slice2, 1));

// Drain the in-flight write fully (50 bytes accepted by the kernel). The buffer drops to 0,
// back-pressure clears, and a Write completion is injected for the upper layer.
EXPECT_CALL(mock_io_uring, injectCompletion(_, _, -EAGAIN))
.WillOnce(Invoke([](os_fd_t, Request* req, int32_t) { delete req; }));
socket.onWrite(write_req, 50, false);
delete write_req;

EXPECT_CALL(dispatcher, clearDeferredDeleteList());
}

class ReproSocket : public IoUringSocketEntry {
public:
ReproSocket(os_fd_t fd, IoUringWorkerImpl& parent)
Expand Down
16 changes: 16 additions & 0 deletions test/common/network/io_uring_socket_handle_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ TEST_F(IoUringSocketHandleTest, CreateClientSocket) {
EXPECT_EQ(IoUringSocketType::Client, impl.ioUringSocketType());
}

TEST_F(IoUringSocketHandleTest, AppliesWriteBufferLimit) {
IoUringSocketHandleTestImpl impl(factory_, false);
impl.setWriteBufferLimits(123);

EXPECT_CALL(worker_, addClientSocket(_, _, _)).WillOnce(testing::ReturnRef(socket_));
EXPECT_CALL(factory_, getIoUringWorker())
.WillOnce(testing::Return(OptRef<Io::IoUringWorker>(worker_)));
EXPECT_CALL(socket_, setWriteBufferHighWatermark(123));
impl.initializeFileEvent(
dispatcher_, [](uint32_t) { return absl::OkStatus(); }, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read);

EXPECT_CALL(socket_, setWriteBufferHighWatermark(456));
impl.setWriteBufferLimits(456);
}

TEST_F(IoUringSocketHandleTest, ReadError) {
IoUringSocketHandleTestImpl impl(factory_, false);
EXPECT_CALL(worker_, addClientSocket(_, _, _)).WillOnce(testing::ReturnRef(socket_));
Expand Down
1 change: 1 addition & 0 deletions test/mocks/io/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MockIoUringSocket : public IoUringSocket {
MOCK_METHOD(void, connect, (const Network::Address::InstanceConstSharedPtr& address));
MOCK_METHOD(void, write, (Buffer::Instance & data));
MOCK_METHOD(uint64_t, write, (const Buffer::RawSlice* slices, uint64_t num_slice));
MOCK_METHOD(void, setWriteBufferHighWatermark, (uint32_t high_watermark));
MOCK_METHOD(void, onAccept, (Request * req, int32_t result, bool injected));
MOCK_METHOD(void, onConnect, (Request * req, int32_t result, bool injected));
MOCK_METHOD(void, onRead, (Request * req, int32_t result, bool injected));
Expand Down