diff --git a/changelogs/current.yaml b/changelogs/current.yaml index e1e4e7ccb6f79..574996bd889cd 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -84,6 +84,12 @@ bug_fixes: definition requires. Both factories now serialize the ``Struct`` to a JSON string and pass the string to the dynamic module side as the configuration, matching the behavior already in place for every other dynamic module extension factory. +- area: io_uring + change: | + Fixed io_uring sockets to bound their internal asynchronous write buffer with the existing + connection write buffer limit. This lets Envoy's normal connection-level back-pressure and + overload protections engage when io_uring writes are staged faster than the kernel completes + them. - area: oauth2 change: | Fixed a crash in the OAuth2 filter where AES-CBC decryption of token cookies could spuriously diff --git a/envoy/common/io/io_uring.h b/envoy/common/io/io_uring.h index 335b8d4c6b064..7c38acc505708 100644 --- a/envoy/common/io/io_uring.h +++ b/envoy/common/io/io_uring.h @@ -277,6 +277,12 @@ class IoUringSocket { */ virtual uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) PURE; + /** + * Set the high watermark for the socket's internal write buffer. + * @param high_watermark supplies the limit in bytes, or 0 to disable the limit. + */ + virtual void setWriteBufferHighWatermark(uint32_t high_watermark) PURE; + /** * Shutdown the socket. * @param how is SHUT_RD, SHUT_WR and SHUT_RDWR. diff --git a/envoy/network/io_handle.h b/envoy/network/io_handle.h index cd2c2f1e5c069..14e77956d38d6 100644 --- a/envoy/network/io_handle.h +++ b/envoy/network/io_handle.h @@ -114,6 +114,13 @@ class IoHandle { */ virtual Api::IoCallUint64Result write(Buffer::Instance& buffer) PURE; + /** + * Set the write buffer limit used by IoHandle implementations that stage asynchronous writes + * internally. Implementations that do not stage write data may ignore this. + * @param limit supplies the connection write buffer limit in bytes, or 0 to disable the limit. + */ + virtual void setWriteBufferLimits(uint32_t limit) { UNREFERENCED_PARAMETER(limit); } + /** * Send a message to the address. * @param slices points to the location of data to be sent. diff --git a/source/common/io/io_uring_worker_impl.cc b/source/common/io/io_uring_worker_impl.cc index 80622775f65dd..3301d46d79f1c 100644 --- a/source/common/io/io_uring_worker_impl.cc +++ b/source/common/io/io_uring_worker_impl.cc @@ -1,5 +1,7 @@ #include "source/common/io/io_uring_worker_impl.h" +#include + namespace Envoy { namespace Io { @@ -378,10 +380,26 @@ void IoUringServerSocket::write(Buffer::Instance& data) { ENVOY_LOG(trace, "write, buffer size = {}, fd = {}", data.length(), fd_); ASSERT(!shutdown_.has_value()); + uint64_t bytes_to_move = data.length(); + if (write_buffer_high_watermark_ > 0) { + const uint64_t current_size = write_buf_.length(); + if (current_size >= write_buffer_high_watermark_) { + // The internal buffer is at or above the high watermark. Refuse to stage more bytes so + // the upper layer's connection-level back-pressure engages. + back_pressure_pending_ = true; + return; + } + const uint64_t headroom = write_buffer_high_watermark_ - current_size; + if (bytes_to_move > headroom) { + bytes_to_move = headroom; + back_pressure_pending_ = true; + } + } + // We need to reset the drain trackers, since the write and close is async in // the iouring. When the write is actually finished the above layer may already // release the drain trackers. - write_buf_.move(data, data.length(), true); + write_buf_.move(data, bytes_to_move, true); submitWriteOrShutdownRequest(); } @@ -390,13 +408,36 @@ uint64_t IoUringServerSocket::write(const Buffer::RawSlice* slices, uint64_t num ENVOY_LOG(trace, "write, num_slices = {}, fd = {}", num_slice, fd_); ASSERT(!shutdown_.has_value()); + uint64_t headroom = std::numeric_limits::max(); + if (write_buffer_high_watermark_ > 0) { + const uint64_t current_size = write_buf_.length(); + if (current_size >= write_buffer_high_watermark_) { + back_pressure_pending_ = true; + return 0; + } + headroom = write_buffer_high_watermark_ - current_size; + } + uint64_t bytes_written = 0; for (uint64_t i = 0; i < num_slice; i++) { + if (slices[i].len_ == 0) { + continue; + } + if (slices[i].len_ > headroom) { + write_buf_.add(slices[i].mem_, headroom); + bytes_written += headroom; + headroom = 0; + back_pressure_pending_ = true; + break; + } write_buf_.add(slices[i].mem_, slices[i].len_); bytes_written += slices[i].len_; + headroom -= slices[i].len_; } - submitWriteOrShutdownRequest(); + if (bytes_written > 0) { + submitWriteOrShutdownRequest(); + } return bytes_written; } @@ -593,6 +634,15 @@ void IoUringServerSocket::onWrite(Request* req, int32_t result, bool injected) { } } + // If a previous ``write()`` was short because the high watermark was hit, give the upper layer + // a chance to retry now that ``write_buf_`` has drained below the watermark. + if (back_pressure_pending_ && write_buffer_high_watermark_ > 0 && + write_buf_.length() < write_buffer_high_watermark_ && !shutdown_.has_value() && + status_ != Closed) { + back_pressure_pending_ = false; + injectCompletion(Request::RequestType::Write); + } + submitWriteOrShutdownRequest(); } diff --git a/source/common/io/io_uring_worker_impl.h b/source/common/io/io_uring_worker_impl.h index 26a50fc8402fe..844a62f54a696 100644 --- a/source/common/io/io_uring_worker_impl.h +++ b/source/common/io/io_uring_worker_impl.h @@ -106,6 +106,7 @@ class IoUringSocketEntry : public IoUringSocket, void disableRead() override { status_ = ReadDisabled; } void enableCloseEvent(bool enable) override { enable_close_event_ = enable; } void connect(const Network::Address::InstanceConstSharedPtr&) override { PANIC("not implement"); } + void setWriteBufferHighWatermark(uint32_t) override {} void onAccept(Request*, int32_t, bool injected) override { if (injected && (injected_completions_ & static_cast(Request::RequestType::Accept))) { @@ -194,6 +195,9 @@ class IoUringServerSocket : public IoUringSocketEntry { void disableRead() override; void write(Buffer::Instance& data) override; uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) override; + void setWriteBufferHighWatermark(uint32_t high_watermark) override { + write_buffer_high_watermark_ = high_watermark; + } void shutdown(int how) override; void onClose(Request* req, int32_t result, bool injected) override; void onRead(Request* req, int32_t result, bool injected) override; @@ -216,13 +220,18 @@ class IoUringServerSocket : public IoUringSocketEntry { Buffer::OwnedImpl read_buf_; absl::optional read_error_; - // TODO (soulxu): We need water mark for write buffer. - // The upper layer will think the buffer released when the data copy into this write buffer. - // This leads to the `IntegrationTest.TestFloodUpstreamErrors` timeout, since the http layer - // always think the response is write successful, so flood protection is never kicked. - // // For write. iouring socket will write sequentially in the order of write_buf_ and shutdown_ // Unless the write_buf_ is empty, the shutdown operation will not be performed. + // + // When ``write_buffer_high_watermark_`` is non-zero, ``write()`` returns a short write once + // ``write_buf_`` reaches the configured size, so connection-level back-pressure (and overload + // protections that depend on it, like HTTP flood protection) engages. With the default of 0 the + // cap is disabled and the upper layer always sees its writes as fully accepted. + uint32_t write_buffer_high_watermark_{0}; + // True when a previous ``write()`` was unable to fully accept the offered bytes because the + // internal write buffer hit the high watermark. When set, an injected Write completion is + // delivered to the upper layer after the buffer drains, so the upper layer retries. + bool back_pressure_pending_{false}; Buffer::OwnedImpl write_buf_; // shutdown_ has 3 states. A absl::nullopt indicates the socket has not been shutdown, a false // value represents the socket wants to be shutdown but the shutdown has not been performed or diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index b507d2b5fb109..e5caca6a377f1 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -640,6 +640,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through void ConnectionImpl::setBufferLimits(uint32_t limit) { read_buffer_limit_ = limit; + ioHandle().setWriteBufferLimits(limit); // Due to the fact that writes to the connection and flushing data from the connection are done // asynchronously, we have the option of either setting the watermarks aggressively, and regularly diff --git a/source/common/network/io_uring_socket_handle_impl.cc b/source/common/network/io_uring_socket_handle_impl.cc index 7d10acd098df9..2237eff580847 100644 --- a/source/common/network/io_uring_socket_handle_impl.cc +++ b/source/common/network/io_uring_socket_handle_impl.cc @@ -122,9 +122,17 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::write(Buffer::Instance& buffer) return std::move(*write_result); } - uint64_t buffer_size = buffer.length(); + const uint64_t before = buffer.length(); io_uring_socket_->write(buffer); - return {buffer_size, IoSocketError::none()}; + const uint64_t after = buffer.length(); + return {before - after, IoSocketError::none()}; +} + +void IoUringSocketHandleImpl::setWriteBufferLimits(uint32_t limit) { + write_buffer_high_watermark_ = limit; + if (io_uring_socket_.has_value()) { + io_uring_socket_->setWriteBufferHighWatermark(limit); + } } Api::IoCallUint64Result IoUringSocketHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int, @@ -245,6 +253,7 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, io_uring_socket_->setFileReadyCb(std::move(cb)); io_uring_socket_->enableRead(); io_uring_socket_->enableCloseEvent(events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); } else { ENVOY_LOG(trace, "initialize file event from another thread, fd = {}, type = {}", fd_, ioUringSocketTypeStr()); @@ -271,6 +280,7 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, // Move the temporary buf to the newly created one. io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addServerSocket( fd, buf, std::move(cb), events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); } return; } @@ -282,12 +292,14 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, case IoUringSocketType::Server: io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addServerSocket( fd_, std::move(cb), events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); break; case IoUringSocketType::Unknown: case IoUringSocketType::Client: io_uring_socket_type_ = IoUringSocketType::Client; io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addClientSocket( fd_, std::move(cb), events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); break; } } diff --git a/source/common/network/io_uring_socket_handle_impl.h b/source/common/network/io_uring_socket_handle_impl.h index 91c083a529d27..9ac22788a1c06 100644 --- a/source/common/network/io_uring_socket_handle_impl.h +++ b/source/common/network/io_uring_socket_handle_impl.h @@ -42,6 +42,7 @@ class IoUringSocketHandleImpl : public IoSocketHandleBaseImpl { absl::optional max_length_opt) override; Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; Api::IoCallUint64Result write(Buffer::Instance& buffer) override; + void setWriteBufferLimits(uint32_t limit) override; Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags, const Address::Ip* self_ip, const Address::Instance& peer_address) override; @@ -84,6 +85,7 @@ class IoUringSocketHandleImpl : public IoSocketHandleBaseImpl { Io::IoUringWorkerFactory& io_uring_worker_factory_; IoUringSocketType io_uring_socket_type_; OptRef io_uring_socket_{absl::nullopt}; + uint32_t write_buffer_high_watermark_{0}; Event::FileEventPtr file_event_{nullptr}; diff --git a/test/common/io/io_uring_worker_impl_test.cc b/test/common/io/io_uring_worker_impl_test.cc index 6c10e8d70f535..99db473ea9b20 100644 --- a/test/common/io/io_uring_worker_impl_test.cc +++ b/test/common/io/io_uring_worker_impl_test.cc @@ -693,6 +693,114 @@ TEST(IoUringWorkerImplTest, NoEnableReadOnConnectError) { delete static_cast(connect_req); } +// When the connection write buffer limit is applied, ``IoUringServerSocket::write`` must refuse to +// stage more bytes than the cap so that the upper layer's connection-level back-pressure can +// engage. Once the in-flight write completes and the buffer drops below the watermark, an injected +// Write completion is delivered so the upper layer retries. +TEST(IoUringWorkerImplTest, WriteBufferHighWatermark) { + Event::MockDispatcher dispatcher; + IoUringPtr io_uring_instance = std::make_unique(); + MockIoUring& mock_io_uring = *dynamic_cast(io_uring_instance.get()); + + EXPECT_CALL(mock_io_uring, registerEventfd()); + EXPECT_CALL(dispatcher, + createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read)) + .WillOnce(ReturnNew>()); + IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher); + + // Watermark = 100 bytes. + IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false); + socket.setWriteBufferHighWatermark(100); + + // First write of 80 bytes fits under the watermark and is fully accepted. + Request* write_req = nullptr; + EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _)) + .WillOnce(DoAll(SaveArg<4>(&write_req), Return(IoUringResult::Ok))) + .RetiresOnSaturation(); + EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation(); + Buffer::OwnedImpl buf1; + buf1.add(std::string(80, 'a')); + socket.write(buf1); + EXPECT_EQ(0, buf1.length()); + + // Second write of 50 bytes can only stage 20 (100 - 80). The remaining 30 bytes stay in + // ``buf2``. No new ``prepareWritev`` is expected since a write request is already in flight. + Buffer::OwnedImpl buf2; + buf2.add(std::string(50, 'b')); + socket.write(buf2); + EXPECT_EQ(30, buf2.length()); + + // Third write while at the high watermark is rejected entirely. + Buffer::OwnedImpl buf3; + buf3.add(std::string(10, 'c')); + socket.write(buf3); + EXPECT_EQ(10, buf3.length()); + + // Complete the in-flight write (the kernel saw 80 bytes worth of iovecs). The drain leaves + // 20 bytes in ``write_buf_``, which is below the watermark, so an injected Write completion is + // queued for the upper layer to retry. The remaining 20 bytes are then submitted as a new + // ``prepareWritev``. + Request* write_req2 = nullptr; + EXPECT_CALL(mock_io_uring, injectCompletion(_, _, -EAGAIN)) + .WillOnce(Invoke([](os_fd_t, Request* req, int32_t) { delete req; })); + EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _)) + .WillOnce(DoAll(SaveArg<4>(&write_req2), Return(IoUringResult::Ok))) + .RetiresOnSaturation(); + EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation(); + socket.onWrite(write_req, 80, false); + delete write_req; + delete write_req2; + + EXPECT_CALL(dispatcher, clearDeferredDeleteList()); +} + +// The slice-based ``write`` overload also caps at the high watermark and reports the actual +// number of bytes accepted. +TEST(IoUringWorkerImplTest, WriteSliceBufferHighWatermark) { + Event::MockDispatcher dispatcher; + IoUringPtr io_uring_instance = std::make_unique(); + MockIoUring& mock_io_uring = *dynamic_cast(io_uring_instance.get()); + + EXPECT_CALL(mock_io_uring, registerEventfd()); + EXPECT_CALL(dispatcher, + createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read)) + .WillOnce(ReturnNew>()); + IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher); + IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false); + socket.setWriteBufferHighWatermark(50); + + Request* write_req = nullptr; + EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _)) + .WillOnce(DoAll(SaveArg<4>(&write_req), Return(IoUringResult::Ok))) + .RetiresOnSaturation(); + EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation(); + + std::string a(40, 'a'); + std::string b(20, 'b'); + Buffer::RawSlice slices[2]; + slices[0].mem_ = a.data(); + slices[0].len_ = a.size(); + slices[1].mem_ = b.data(); + slices[1].len_ = b.size(); + EXPECT_EQ(50, socket.write(slices, 2)); + + // Subsequent writes return 0 until the buffer drains. + std::string c(10, 'c'); + Buffer::RawSlice slice2; + slice2.mem_ = c.data(); + slice2.len_ = c.size(); + EXPECT_EQ(0, socket.write(&slice2, 1)); + + // Drain the in-flight write fully (50 bytes accepted by the kernel). The buffer drops to 0, + // back-pressure clears, and a Write completion is injected for the upper layer. + EXPECT_CALL(mock_io_uring, injectCompletion(_, _, -EAGAIN)) + .WillOnce(Invoke([](os_fd_t, Request* req, int32_t) { delete req; })); + socket.onWrite(write_req, 50, false); + delete write_req; + + EXPECT_CALL(dispatcher, clearDeferredDeleteList()); +} + class ReproSocket : public IoUringSocketEntry { public: ReproSocket(os_fd_t fd, IoUringWorkerImpl& parent) diff --git a/test/common/network/io_uring_socket_handle_impl_test.cc b/test/common/network/io_uring_socket_handle_impl_test.cc index adcdb1d739e00..486b41cab274e 100644 --- a/test/common/network/io_uring_socket_handle_impl_test.cc +++ b/test/common/network/io_uring_socket_handle_impl_test.cc @@ -41,6 +41,22 @@ TEST_F(IoUringSocketHandleTest, CreateClientSocket) { EXPECT_EQ(IoUringSocketType::Client, impl.ioUringSocketType()); } +TEST_F(IoUringSocketHandleTest, AppliesWriteBufferLimit) { + IoUringSocketHandleTestImpl impl(factory_, false); + impl.setWriteBufferLimits(123); + + EXPECT_CALL(worker_, addClientSocket(_, _, _)).WillOnce(testing::ReturnRef(socket_)); + EXPECT_CALL(factory_, getIoUringWorker()) + .WillOnce(testing::Return(OptRef(worker_))); + EXPECT_CALL(socket_, setWriteBufferHighWatermark(123)); + impl.initializeFileEvent( + dispatcher_, [](uint32_t) { return absl::OkStatus(); }, Event::PlatformDefaultTriggerType, + Event::FileReadyType::Read); + + EXPECT_CALL(socket_, setWriteBufferHighWatermark(456)); + impl.setWriteBufferLimits(456); +} + TEST_F(IoUringSocketHandleTest, ReadError) { IoUringSocketHandleTestImpl impl(factory_, false); EXPECT_CALL(worker_, addClientSocket(_, _, _)).WillOnce(testing::ReturnRef(socket_)); diff --git a/test/mocks/io/mocks.h b/test/mocks/io/mocks.h index 4f7536485135c..00f8483a5245b 100644 --- a/test/mocks/io/mocks.h +++ b/test/mocks/io/mocks.h @@ -44,6 +44,7 @@ class MockIoUringSocket : public IoUringSocket { MOCK_METHOD(void, connect, (const Network::Address::InstanceConstSharedPtr& address)); MOCK_METHOD(void, write, (Buffer::Instance & data)); MOCK_METHOD(uint64_t, write, (const Buffer::RawSlice* slices, uint64_t num_slice)); + MOCK_METHOD(void, setWriteBufferHighWatermark, (uint32_t high_watermark)); MOCK_METHOD(void, onAccept, (Request * req, int32_t result, bool injected)); MOCK_METHOD(void, onConnect, (Request * req, int32_t result, bool injected)); MOCK_METHOD(void, onRead, (Request * req, int32_t result, bool injected));