From 149d86b571c58efe0d6b6d4fd4f1e17277b961d3 Mon Sep 17 00:00:00 2001 From: Adam Buran Date: Sun, 26 Apr 2026 17:31:39 -0700 Subject: [PATCH 1/2] io_uring: add write_buffer_high_watermark for io_uring sockets Expose ``IoUringOptions.write_buffer_high_watermark`` to bound the size of ``IoUringServerSocket::write_buf_``. Without a cap, the upper layer always sees writes as fully accepted (because the io_uring socket is async and stages bytes internally), so connection-level back-pressure -- and overload protections that depend on it, e.g. HTTP flood protection -- never engages. When the new field is set to a non-zero value, ``IoUringServerSocket::write`` returns a short write once ``write_buf_`` reaches the configured size, and ``IoUringSocketHandleImpl::write`` reports the actual bytes moved (rather than the original buffer length). The upper layer's connection-level write buffer holds the remainder, where the existing watermark mechanism applies. After the in-flight write completes and the buffer drops below the threshold, an injected Write completion is delivered so the upper layer retries. The default of 0 disables the cap and preserves the previous (uncapped) behavior. Resolves the existing TODO in ``IoUringServerSocket`` referencing the ``IntegrationTest.TestFloodUpstreamErrors`` timeout. Risk Level: Low Testing: Added unit tests for the buffer- and slice-based ``write`` overloads, including the back-pressure release on completion. Docs Changes: New field documented inline in the proto. Release Notes: Added. Platform Specific Features: io_uring (Linux >=5.11) Signed-off-by: Adam Buran --- .../v3/default_socket_interface.proto | 14 +++ changelogs/current.yaml | 8 ++ .../common/io/io_uring_worker_factory_impl.cc | 12 +- .../common/io/io_uring_worker_factory_impl.h | 2 + source/common/io/io_uring_worker_impl.cc | 92 ++++++++++++--- source/common/io/io_uring_worker_impl.h | 30 +++-- .../network/io_uring_socket_handle_impl.cc | 5 +- .../common/network/socket_interface_impl.cc | 1 + .../io/io_uring_worker_factory_impl_test.cc | 2 +- .../io_uring_worker_impl_integration_test.cc | 2 +- test/common/io/io_uring_worker_impl_test.cc | 108 +++++++++++++++++- ...ing_socket_handle_impl_integration_test.cc | 2 +- 12 files changed, 244 insertions(+), 34 deletions(-) diff --git a/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto b/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto index 69b413eba12a6..7144f74d4ddeb 100644 --- a/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto +++ b/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto @@ -43,4 +43,18 @@ message IoUringOptions { // asynchronously. If the remote stops reading, the io_uring write operation may never complete. // The operation is canceled and the socket is closed after the timeout. The default is 1000. google.protobuf.UInt32Value write_timeout_ms = 4; + // High watermark in bytes for the io_uring socket's internal write buffer. Because io_uring + // writes are asynchronous, data the upper layer hands to ``writev``/``write`` is staged in an + // internal per-socket buffer until the kernel reports the completion. Without a cap, that buffer + // can grow without bound — the upper layer always sees the write as fully accepted, so + // connection-level flow control (and overload protections that depend on it, such as HTTP flood + // protection) never engages. + // + // When this field is set to a non-zero value, the io_uring socket refuses to stage more bytes + // once the internal write buffer reaches the configured size, returning a short write to the + // upper layer. The data the upper layer was unable to hand off then accumulates in its own + // write buffer (e.g. ``ConnectionImpl::write_buffer_``), which has its own watermarks, so the + // existing connection-level back-pressure mechanism kicks in. The default is 0, which disables + // the cap and preserves the previous (uncapped) behavior. + google.protobuf.UInt32Value write_buffer_high_watermark = 5; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 27855c0fd9d53..c47012192d108 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -18,6 +18,14 @@ removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` new_features: +- area: io_uring + change: | + Added :ref:`write_buffer_high_watermark + ` + to bound the io_uring socket's internal write buffer. When set, the socket returns a short + write to the upper layer once the configured size is reached, which lets connection-level + back-pressure (and overload protections like HTTP flood protection that depend on it) engage. + The default of 0 disables the cap and preserves the previous (uncapped) behavior. - area: dynamic_modules change: | Added ``envoy_dynamic_module_callback_is_validation_mode`` ABI callback that allows dynamic diff --git a/source/common/io/io_uring_worker_factory_impl.cc b/source/common/io/io_uring_worker_factory_impl.cc index b667032d32f2b..a0d3053710f25 100644 --- a/source/common/io/io_uring_worker_factory_impl.cc +++ b/source/common/io/io_uring_worker_factory_impl.cc @@ -9,9 +9,11 @@ IoUringWorkerFactoryImpl::IoUringWorkerFactoryImpl(uint32_t io_uring_size, bool use_submission_queue_polling, uint32_t read_buffer_size, uint32_t write_timeout_ms, + uint32_t write_buffer_high_watermark, ThreadLocal::SlotAllocator& tls) : io_uring_size_(io_uring_size), use_submission_queue_polling_(use_submission_queue_polling), - read_buffer_size_(read_buffer_size), write_timeout_ms_(write_timeout_ms), tls_(tls) {} + read_buffer_size_(read_buffer_size), write_timeout_ms_(write_timeout_ms), + write_buffer_high_watermark_(write_buffer_high_watermark), tls_(tls) {} OptRef IoUringWorkerFactoryImpl::getIoUringWorker() { auto ret = tls_.get(); @@ -24,10 +26,12 @@ OptRef IoUringWorkerFactoryImpl::getIoUringWorker() { void IoUringWorkerFactoryImpl::onWorkerThreadInitialized() { tls_.set([io_uring_size = io_uring_size_, use_submission_queue_polling = use_submission_queue_polling_, - read_buffer_size = read_buffer_size_, - write_timeout_ms = write_timeout_ms_](Event::Dispatcher& dispatcher) { + read_buffer_size = read_buffer_size_, write_timeout_ms = write_timeout_ms_, + write_buffer_high_watermark = + write_buffer_high_watermark_](Event::Dispatcher& dispatcher) { return std::make_shared(io_uring_size, use_submission_queue_polling, - read_buffer_size, write_timeout_ms, dispatcher); + read_buffer_size, write_timeout_ms, + write_buffer_high_watermark, dispatcher); }); } diff --git a/source/common/io/io_uring_worker_factory_impl.h b/source/common/io/io_uring_worker_factory_impl.h index 03f5f66443542..954878eda4852 100644 --- a/source/common/io/io_uring_worker_factory_impl.h +++ b/source/common/io/io_uring_worker_factory_impl.h @@ -10,6 +10,7 @@ class IoUringWorkerFactoryImpl : public IoUringWorkerFactory { public: IoUringWorkerFactoryImpl(uint32_t io_uring_size, bool use_submission_queue_polling, uint32_t read_buffer_size, uint32_t write_timeout_ms, + uint32_t write_buffer_high_watermark, ThreadLocal::SlotAllocator& tls); OptRef getIoUringWorker() override; @@ -22,6 +23,7 @@ class IoUringWorkerFactoryImpl : public IoUringWorkerFactory { const bool use_submission_queue_polling_; const uint32_t read_buffer_size_; const uint32_t write_timeout_ms_; + const uint32_t write_buffer_high_watermark_; ThreadLocal::TypedSlot tls_; }; diff --git a/source/common/io/io_uring_worker_impl.cc b/source/common/io/io_uring_worker_impl.cc index 80622775f65dd..7bd6114b07f32 100644 --- a/source/common/io/io_uring_worker_impl.cc +++ b/source/common/io/io_uring_worker_impl.cc @@ -1,5 +1,7 @@ #include "source/common/io/io_uring_worker_impl.h" +#include + namespace Envoy { namespace Io { @@ -60,14 +62,19 @@ void IoUringSocketEntry::onRemoteClose() { IoUringWorkerImpl::IoUringWorkerImpl(uint32_t io_uring_size, bool use_submission_queue_polling, uint32_t read_buffer_size, uint32_t write_timeout_ms, + uint32_t write_buffer_high_watermark, Event::Dispatcher& dispatcher) : IoUringWorkerImpl(std::make_unique(io_uring_size, use_submission_queue_polling), - read_buffer_size, write_timeout_ms, dispatcher) {} + read_buffer_size, write_timeout_ms, write_buffer_high_watermark, + dispatcher) {} IoUringWorkerImpl::IoUringWorkerImpl(IoUringPtr&& io_uring, uint32_t read_buffer_size, - uint32_t write_timeout_ms, Event::Dispatcher& dispatcher) + uint32_t write_timeout_ms, + uint32_t write_buffer_high_watermark, + Event::Dispatcher& dispatcher) : io_uring_(std::move(io_uring)), read_buffer_size_(read_buffer_size), - write_timeout_ms_(write_timeout_ms), dispatcher_(dispatcher) { + write_timeout_ms_(write_timeout_ms), + write_buffer_high_watermark_(write_buffer_high_watermark), dispatcher_(dispatcher) { const os_fd_t event_fd = io_uring_->registerEventfd(); // We only care about the read event of Eventfd, since we only receive the // event here. @@ -104,7 +111,8 @@ IoUringSocket& IoUringWorkerImpl::addServerSocket(os_fd_t fd, Event::FileReadyCb bool enable_close_event) { ENVOY_LOG(trace, "add server socket, fd = {}", fd); std::unique_ptr socket = std::make_unique( - fd, *this, std::move(cb), write_timeout_ms_, enable_close_event); + fd, *this, std::move(cb), write_timeout_ms_, enable_close_event, + write_buffer_high_watermark_); socket->enableRead(); return addSocket(std::move(socket)); } @@ -113,7 +121,8 @@ IoUringSocket& IoUringWorkerImpl::addServerSocket(os_fd_t fd, Buffer::Instance& Event::FileReadyCb cb, bool enable_close_event) { ENVOY_LOG(trace, "add server socket through existing socket, fd = {}", fd); std::unique_ptr socket = std::make_unique( - fd, read_buf, *this, std::move(cb), write_timeout_ms_, enable_close_event); + fd, read_buf, *this, std::move(cb), write_timeout_ms_, enable_close_event, + write_buffer_high_watermark_); socket->enableRead(); return addSocket(std::move(socket)); } @@ -123,7 +132,8 @@ IoUringSocket& IoUringWorkerImpl::addClientSocket(os_fd_t fd, Event::FileReadyCb ENVOY_LOG(trace, "add client socket, fd = {}", fd); // The client socket should not be read enabled until it is connected. std::unique_ptr socket = std::make_unique( - fd, *this, std::move(cb), write_timeout_ms_, enable_close_event); + fd, *this, std::move(cb), write_timeout_ms_, enable_close_event, + write_buffer_high_watermark_); return addSocket(std::move(socket)); } @@ -308,15 +318,19 @@ void IoUringWorkerImpl::submit() { IoUringServerSocket::IoUringServerSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb, uint32_t write_timeout_ms, - bool enable_close_event) + bool enable_close_event, + uint32_t write_buffer_high_watermark) : IoUringSocketEntry(fd, parent, std::move(cb), enable_close_event), - write_timeout_ms_(write_timeout_ms) {} + write_timeout_ms_(write_timeout_ms), + write_buffer_high_watermark_(write_buffer_high_watermark) {} IoUringServerSocket::IoUringServerSocket(os_fd_t fd, Buffer::Instance& read_buf, IoUringWorkerImpl& parent, Event::FileReadyCb cb, - uint32_t write_timeout_ms, bool enable_close_event) + uint32_t write_timeout_ms, bool enable_close_event, + uint32_t write_buffer_high_watermark) : IoUringSocketEntry(fd, parent, std::move(cb), enable_close_event), - write_timeout_ms_(write_timeout_ms) { + write_timeout_ms_(write_timeout_ms), + write_buffer_high_watermark_(write_buffer_high_watermark) { read_buf_.move(read_buf); } @@ -378,10 +392,26 @@ void IoUringServerSocket::write(Buffer::Instance& data) { ENVOY_LOG(trace, "write, buffer size = {}, fd = {}", data.length(), fd_); ASSERT(!shutdown_.has_value()); + uint64_t bytes_to_move = data.length(); + if (write_buffer_high_watermark_ > 0) { + const uint64_t current_size = write_buf_.length(); + if (current_size >= write_buffer_high_watermark_) { + // The internal buffer is at or above the high watermark. Refuse to stage more bytes so + // the upper layer's connection-level back-pressure engages. + back_pressure_pending_ = true; + return; + } + const uint64_t headroom = write_buffer_high_watermark_ - current_size; + if (bytes_to_move > headroom) { + bytes_to_move = headroom; + back_pressure_pending_ = true; + } + } + // We need to reset the drain trackers, since the write and close is async in // the iouring. When the write is actually finished the above layer may already // release the drain trackers. - write_buf_.move(data, data.length(), true); + write_buf_.move(data, bytes_to_move, true); submitWriteOrShutdownRequest(); } @@ -390,13 +420,36 @@ uint64_t IoUringServerSocket::write(const Buffer::RawSlice* slices, uint64_t num ENVOY_LOG(trace, "write, num_slices = {}, fd = {}", num_slice, fd_); ASSERT(!shutdown_.has_value()); + uint64_t headroom = std::numeric_limits::max(); + if (write_buffer_high_watermark_ > 0) { + const uint64_t current_size = write_buf_.length(); + if (current_size >= write_buffer_high_watermark_) { + back_pressure_pending_ = true; + return 0; + } + headroom = write_buffer_high_watermark_ - current_size; + } + uint64_t bytes_written = 0; for (uint64_t i = 0; i < num_slice; i++) { + if (slices[i].len_ == 0) { + continue; + } + if (slices[i].len_ > headroom) { + write_buf_.add(slices[i].mem_, headroom); + bytes_written += headroom; + headroom = 0; + back_pressure_pending_ = true; + break; + } write_buf_.add(slices[i].mem_, slices[i].len_); bytes_written += slices[i].len_; + headroom -= slices[i].len_; } - submitWriteOrShutdownRequest(); + if (bytes_written > 0) { + submitWriteOrShutdownRequest(); + } return bytes_written; } @@ -593,6 +646,15 @@ void IoUringServerSocket::onWrite(Request* req, int32_t result, bool injected) { } } + // If a previous ``write()`` was short because the high watermark was hit, give the upper layer + // a chance to retry now that ``write_buf_`` has drained below the watermark. + if (back_pressure_pending_ && write_buffer_high_watermark_ > 0 && + write_buf_.length() < write_buffer_high_watermark_ && !shutdown_.has_value() && + status_ != Closed) { + back_pressure_pending_ = false; + injectCompletion(Request::RequestType::Write); + } + submitWriteOrShutdownRequest(); } @@ -644,8 +706,10 @@ void IoUringServerSocket::submitWriteOrShutdownRequest() { IoUringClientSocket::IoUringClientSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb, uint32_t write_timeout_ms, - bool enable_close_event) - : IoUringServerSocket(fd, parent, cb, write_timeout_ms, enable_close_event) {} + bool enable_close_event, + uint32_t write_buffer_high_watermark) + : IoUringServerSocket(fd, parent, cb, write_timeout_ms, enable_close_event, + write_buffer_high_watermark) {} void IoUringClientSocket::connect(const Network::Address::InstanceConstSharedPtr& address) { // Reuse read request since there is no read on connecting and connect is cancellable. diff --git a/source/common/io/io_uring_worker_impl.h b/source/common/io/io_uring_worker_impl.h index b2ebd1d380497..9c12f2873afde 100644 --- a/source/common/io/io_uring_worker_impl.h +++ b/source/common/io/io_uring_worker_impl.h @@ -32,9 +32,9 @@ class IoUringWorkerImpl : public IoUringWorker, private Logger::Loggable read_error_; - // TODO (soulxu): We need water mark for write buffer. - // The upper layer will think the buffer released when the data copy into this write buffer. - // This leads to the `IntegrationTest.TestFloodUpstreamErrors` timeout, since the http layer - // always think the response is write successful, so flood protection is never kicked. - // // For write. iouring socket will write sequentially in the order of write_buf_ and shutdown_ // Unless the write_buf_ is empty, the shutdown operation will not be performed. + // + // When ``write_buffer_high_watermark_`` is non-zero, ``write()`` returns a short write once + // ``write_buf_`` reaches the configured size, so connection-level back-pressure (and overload + // protections that depend on it, like HTTP flood protection) engages. With the default of 0 the + // cap is disabled and the upper layer always sees its writes as fully accepted. + const uint32_t write_buffer_high_watermark_; + // True when a previous ``write()`` was unable to fully accept the offered bytes because the + // internal write buffer hit the high watermark. When set, an injected Write completion is + // delivered to the upper layer after the buffer drains, so the upper layer retries. + bool back_pressure_pending_{false}; Buffer::OwnedImpl write_buf_; // shutdown_ has 3 states. A absl::nullopt indicates the socket has not been shutdown, a false // value represents the socket wants to be shutdown but the shutdown has not been performed or @@ -254,7 +263,8 @@ class IoUringServerSocket : public IoUringSocketEntry { class IoUringClientSocket : public IoUringServerSocket { public: IoUringClientSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb, - uint32_t write_timeout_ms, bool enable_close_event); + uint32_t write_timeout_ms, bool enable_close_event, + uint32_t write_buffer_high_watermark = 0); void connect(const Network::Address::InstanceConstSharedPtr& address) override; void onConnect(Request* req, int32_t result, bool injected) override; diff --git a/source/common/network/io_uring_socket_handle_impl.cc b/source/common/network/io_uring_socket_handle_impl.cc index 7d10acd098df9..63dcc2d0be197 100644 --- a/source/common/network/io_uring_socket_handle_impl.cc +++ b/source/common/network/io_uring_socket_handle_impl.cc @@ -122,9 +122,10 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::write(Buffer::Instance& buffer) return std::move(*write_result); } - uint64_t buffer_size = buffer.length(); + const uint64_t before = buffer.length(); io_uring_socket_->write(buffer); - return {buffer_size, IoSocketError::none()}; + const uint64_t after = buffer.length(); + return {before - after, IoSocketError::none()}; } Api::IoCallUint64Result IoUringSocketHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int, diff --git a/source/common/network/socket_interface_impl.cc b/source/common/network/socket_interface_impl.cc index dc134bde4af47..faeb73d8f1f56 100644 --- a/source/common/network/socket_interface_impl.cc +++ b/source/common/network/socket_interface_impl.cc @@ -179,6 +179,7 @@ Server::BootstrapExtensionPtr SocketInterfaceImpl::createBootstrapExtension( options.enable_submission_queue_polling(), PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, read_buffer_size, 8192), PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, write_timeout_ms, 1000), + PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, write_buffer_high_watermark, 0), context.threadLocal()); io_uring_worker_factory_ = io_uring_worker_factory; diff --git a/test/common/io/io_uring_worker_factory_impl_test.cc b/test/common/io/io_uring_worker_factory_impl_test.cc index f6eebbc664449..a61795a8b83bc 100644 --- a/test/common/io/io_uring_worker_factory_impl_test.cc +++ b/test/common/io/io_uring_worker_factory_impl_test.cc @@ -29,7 +29,7 @@ class IoUringWorkerFactoryImplTest : public ::testing::Test { }; TEST_F(IoUringWorkerFactoryImplTest, Basic) { - IoUringWorkerFactoryImpl factory(2, false, 8192, 1000, context_.threadLocal()); + IoUringWorkerFactoryImpl factory(2, false, 8192, 1000, 0, context_.threadLocal()); EXPECT_TRUE(factory.currentThreadRegistered()); auto dispatcher = api_->allocateDispatcher("test_thread"); factory.onWorkerThreadInitialized(); diff --git a/test/common/io/io_uring_worker_impl_integration_test.cc b/test/common/io/io_uring_worker_impl_integration_test.cc index 73cd2e263bd62..c6d1814d12aa9 100644 --- a/test/common/io/io_uring_worker_impl_integration_test.cc +++ b/test/common/io/io_uring_worker_impl_integration_test.cc @@ -89,7 +89,7 @@ class IoUringSocketTestImpl : public IoUringSocketEntry { class IoUringWorkerTestImpl : public IoUringWorkerImpl { public: IoUringWorkerTestImpl(IoUringPtr io_uring_instance, Event::Dispatcher& dispatcher) - : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, dispatcher) {} + : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, 0, dispatcher) {} IoUringSocket& addTestSocket(os_fd_t fd) { return addSocket(std::make_unique(fd, *this)); diff --git a/test/common/io/io_uring_worker_impl_test.cc b/test/common/io/io_uring_worker_impl_test.cc index 37c8ac72c464a..519103eb1c3fd 100644 --- a/test/common/io/io_uring_worker_impl_test.cc +++ b/test/common/io/io_uring_worker_impl_test.cc @@ -34,7 +34,7 @@ class IoUringSocketTestImpl : public IoUringSocketEntry { class IoUringWorkerTestImpl : public IoUringWorkerImpl { public: IoUringWorkerTestImpl(IoUringPtr io_uring_instance, Event::Dispatcher& dispatcher) - : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, dispatcher) {} + : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, 0, dispatcher) {} IoUringSocket& addTestSocket(os_fd_t fd) { return addSocket(std::make_unique(fd, *this)); @@ -693,6 +693,112 @@ TEST(IoUringWorkerImplTest, NoEnableReadOnConnectError) { delete static_cast(connect_req); } +// When ``write_buffer_high_watermark`` is set, ``IoUringServerSocket::write`` must refuse to stage +// more bytes than the cap so that the upper layer's connection-level back-pressure can engage. +// Once the in-flight write completes and the buffer drops below the watermark, an injected Write +// completion is delivered so the upper layer retries. +TEST(IoUringWorkerImplTest, WriteBufferHighWatermark) { + Event::MockDispatcher dispatcher; + IoUringPtr io_uring_instance = std::make_unique(); + MockIoUring& mock_io_uring = *dynamic_cast(io_uring_instance.get()); + + EXPECT_CALL(mock_io_uring, registerEventfd()); + EXPECT_CALL(dispatcher, + createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read)) + .WillOnce(ReturnNew>()); + IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher); + + // Watermark = 100 bytes. + IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false, 100); + + // First write of 80 bytes fits under the watermark and is fully accepted. + Request* write_req = nullptr; + EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _)) + .WillOnce(DoAll(SaveArg<4>(&write_req), Return(IoUringResult::Ok))) + .RetiresOnSaturation(); + EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation(); + Buffer::OwnedImpl buf1; + buf1.add(std::string(80, 'a')); + socket.write(buf1); + EXPECT_EQ(0, buf1.length()); + + // Second write of 50 bytes can only stage 20 (100 - 80). The remaining 30 bytes stay in + // ``buf2``. No new ``prepareWritev`` is expected since a write request is already in flight. + Buffer::OwnedImpl buf2; + buf2.add(std::string(50, 'b')); + socket.write(buf2); + EXPECT_EQ(30, buf2.length()); + + // Third write while at the high watermark is rejected entirely. + Buffer::OwnedImpl buf3; + buf3.add(std::string(10, 'c')); + socket.write(buf3); + EXPECT_EQ(10, buf3.length()); + + // Complete the in-flight write (the kernel saw 80 bytes worth of iovecs). The drain leaves + // 20 bytes in ``write_buf_``, which is below the watermark, so an injected Write completion is + // queued for the upper layer to retry. The remaining 20 bytes are then submitted as a new + // ``prepareWritev``. + Request* write_req2 = nullptr; + EXPECT_CALL(mock_io_uring, injectCompletion(_, _, -EAGAIN)) + .WillOnce(Invoke([](os_fd_t, Request* req, int32_t) { delete req; })); + EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _)) + .WillOnce(DoAll(SaveArg<4>(&write_req2), Return(IoUringResult::Ok))) + .RetiresOnSaturation(); + EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation(); + socket.onWrite(write_req, 80, false); + delete write_req; + delete write_req2; + + EXPECT_CALL(dispatcher, clearDeferredDeleteList()); +} + +// The slice-based ``write`` overload also caps at the high watermark and reports the actual +// number of bytes accepted. +TEST(IoUringWorkerImplTest, WriteSliceBufferHighWatermark) { + Event::MockDispatcher dispatcher; + IoUringPtr io_uring_instance = std::make_unique(); + MockIoUring& mock_io_uring = *dynamic_cast(io_uring_instance.get()); + + EXPECT_CALL(mock_io_uring, registerEventfd()); + EXPECT_CALL(dispatcher, + createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read)) + .WillOnce(ReturnNew>()); + IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher); + IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false, 50); + + Request* write_req = nullptr; + EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _)) + .WillOnce(DoAll(SaveArg<4>(&write_req), Return(IoUringResult::Ok))) + .RetiresOnSaturation(); + EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation(); + + std::string a(40, 'a'); + std::string b(20, 'b'); + Buffer::RawSlice slices[2]; + slices[0].mem_ = a.data(); + slices[0].len_ = a.size(); + slices[1].mem_ = b.data(); + slices[1].len_ = b.size(); + EXPECT_EQ(50, socket.write(slices, 2)); + + // Subsequent writes return 0 until the buffer drains. + std::string c(10, 'c'); + Buffer::RawSlice slice2; + slice2.mem_ = c.data(); + slice2.len_ = c.size(); + EXPECT_EQ(0, socket.write(&slice2, 1)); + + // Drain the in-flight write fully (50 bytes accepted by the kernel). The buffer drops to 0, + // back-pressure clears, and a Write completion is injected for the upper layer. + EXPECT_CALL(mock_io_uring, injectCompletion(_, _, -EAGAIN)) + .WillOnce(Invoke([](os_fd_t, Request* req, int32_t) { delete req; })); + socket.onWrite(write_req, 50, false); + delete write_req; + + EXPECT_CALL(dispatcher, clearDeferredDeleteList()); +} + } // namespace } // namespace Io } // namespace Envoy diff --git a/test/common/network/io_uring_socket_handle_impl_integration_test.cc b/test/common/network/io_uring_socket_handle_impl_integration_test.cc index 46878065d6119..e0342900e8469 100644 --- a/test/common/network/io_uring_socket_handle_impl_integration_test.cc +++ b/test/common/network/io_uring_socket_handle_impl_integration_test.cc @@ -45,7 +45,7 @@ class IoUringSocketHandleImplIntegrationTest : public testing::Test { } io_uring_worker_factory_ = - std::make_unique(10, false, 8192, 1000, instance_); + std::make_unique(10, false, 8192, 1000, 0, instance_); io_uring_worker_factory_->onWorkerThreadInitialized(); // Create the thread after the io_uring worker has been initialized, otherwise the dispatcher From e7dfc11e52035e516744920fdac85ad45ab8b26a Mon Sep 17 00:00:00 2001 From: Adam Buran Date: Thu, 14 May 2026 22:23:59 -0700 Subject: [PATCH 2/2] io_uring: reuse connection buffer limits for write staging Signed-off-by: Adam Buran --- .../v3/default_socket_interface.proto | 14 ------- changelogs/current.yaml | 14 +++---- envoy/common/io/io_uring.h | 6 +++ envoy/network/io_handle.h | 7 ++++ .../common/io/io_uring_worker_factory_impl.cc | 12 ++---- .../common/io/io_uring_worker_factory_impl.h | 2 - source/common/io/io_uring_worker_impl.cc | 38 ++++++------------- source/common/io/io_uring_worker_impl.h | 21 +++++----- source/common/network/connection_impl.cc | 1 + .../network/io_uring_socket_handle_impl.cc | 11 ++++++ .../network/io_uring_socket_handle_impl.h | 2 + .../common/network/socket_interface_impl.cc | 1 - .../io/io_uring_worker_factory_impl_test.cc | 2 +- .../io_uring_worker_impl_integration_test.cc | 2 +- test/common/io/io_uring_worker_impl_test.cc | 16 ++++---- .../io_uring_socket_handle_impl_test.cc | 16 ++++++++ test/mocks/io/mocks.h | 1 + 17 files changed, 87 insertions(+), 79 deletions(-) diff --git a/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto b/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto index 7144f74d4ddeb..69b413eba12a6 100644 --- a/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto +++ b/api/envoy/extensions/network/socket_interface/v3/default_socket_interface.proto @@ -43,18 +43,4 @@ message IoUringOptions { // asynchronously. If the remote stops reading, the io_uring write operation may never complete. // The operation is canceled and the socket is closed after the timeout. The default is 1000. google.protobuf.UInt32Value write_timeout_ms = 4; - // High watermark in bytes for the io_uring socket's internal write buffer. Because io_uring - // writes are asynchronous, data the upper layer hands to ``writev``/``write`` is staged in an - // internal per-socket buffer until the kernel reports the completion. Without a cap, that buffer - // can grow without bound — the upper layer always sees the write as fully accepted, so - // connection-level flow control (and overload protections that depend on it, such as HTTP flood - // protection) never engages. - // - // When this field is set to a non-zero value, the io_uring socket refuses to stage more bytes - // once the internal write buffer reaches the configured size, returning a short write to the - // upper layer. The data the upper layer was unable to hand off then accumulates in its own - // write buffer (e.g. ``ConnectionImpl::write_buffer_``), which has its own watermarks, so the - // existing connection-level back-pressure mechanism kicks in. The default is 0, which disables - // the cap and preserves the previous (uncapped) behavior. - google.protobuf.UInt32Value write_buffer_high_watermark = 5; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 930789c5373cd..be46330451e2c 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -59,6 +59,12 @@ bug_fixes: definition requires. Both factories now serialize the ``Struct`` to a JSON string and pass the string to the dynamic module side as the configuration, matching the behavior already in place for every other dynamic module extension factory. +- area: io_uring + change: | + Fixed io_uring sockets to bound their internal asynchronous write buffer with the existing + connection write buffer limit. This lets Envoy's normal connection-level back-pressure and + overload protections engage when io_uring writes are staged faster than the kernel completes + them. - area: oauth2 change: | Fixed a crash in the OAuth2 filter where AES-CBC decryption of token cookies could spuriously @@ -72,14 +78,6 @@ removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` new_features: -- area: io_uring - change: | - Added :ref:`write_buffer_high_watermark - ` - to bound the io_uring socket's internal write buffer. When set, the socket returns a short - write to the upper layer once the configured size is reached, which lets connection-level - back-pressure (and overload protections like HTTP flood protection that depend on it) engage. - The default of 0 disables the cap and preserves the previous (uncapped) behavior. - area: stat_sinks change: | Added :ref:`max_data_points_per_request diff --git a/envoy/common/io/io_uring.h b/envoy/common/io/io_uring.h index 335b8d4c6b064..7c38acc505708 100644 --- a/envoy/common/io/io_uring.h +++ b/envoy/common/io/io_uring.h @@ -277,6 +277,12 @@ class IoUringSocket { */ virtual uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) PURE; + /** + * Set the high watermark for the socket's internal write buffer. + * @param high_watermark supplies the limit in bytes, or 0 to disable the limit. + */ + virtual void setWriteBufferHighWatermark(uint32_t high_watermark) PURE; + /** * Shutdown the socket. * @param how is SHUT_RD, SHUT_WR and SHUT_RDWR. diff --git a/envoy/network/io_handle.h b/envoy/network/io_handle.h index cd2c2f1e5c069..14e77956d38d6 100644 --- a/envoy/network/io_handle.h +++ b/envoy/network/io_handle.h @@ -114,6 +114,13 @@ class IoHandle { */ virtual Api::IoCallUint64Result write(Buffer::Instance& buffer) PURE; + /** + * Set the write buffer limit used by IoHandle implementations that stage asynchronous writes + * internally. Implementations that do not stage write data may ignore this. + * @param limit supplies the connection write buffer limit in bytes, or 0 to disable the limit. + */ + virtual void setWriteBufferLimits(uint32_t limit) { UNREFERENCED_PARAMETER(limit); } + /** * Send a message to the address. * @param slices points to the location of data to be sent. diff --git a/source/common/io/io_uring_worker_factory_impl.cc b/source/common/io/io_uring_worker_factory_impl.cc index a0d3053710f25..b667032d32f2b 100644 --- a/source/common/io/io_uring_worker_factory_impl.cc +++ b/source/common/io/io_uring_worker_factory_impl.cc @@ -9,11 +9,9 @@ IoUringWorkerFactoryImpl::IoUringWorkerFactoryImpl(uint32_t io_uring_size, bool use_submission_queue_polling, uint32_t read_buffer_size, uint32_t write_timeout_ms, - uint32_t write_buffer_high_watermark, ThreadLocal::SlotAllocator& tls) : io_uring_size_(io_uring_size), use_submission_queue_polling_(use_submission_queue_polling), - read_buffer_size_(read_buffer_size), write_timeout_ms_(write_timeout_ms), - write_buffer_high_watermark_(write_buffer_high_watermark), tls_(tls) {} + read_buffer_size_(read_buffer_size), write_timeout_ms_(write_timeout_ms), tls_(tls) {} OptRef IoUringWorkerFactoryImpl::getIoUringWorker() { auto ret = tls_.get(); @@ -26,12 +24,10 @@ OptRef IoUringWorkerFactoryImpl::getIoUringWorker() { void IoUringWorkerFactoryImpl::onWorkerThreadInitialized() { tls_.set([io_uring_size = io_uring_size_, use_submission_queue_polling = use_submission_queue_polling_, - read_buffer_size = read_buffer_size_, write_timeout_ms = write_timeout_ms_, - write_buffer_high_watermark = - write_buffer_high_watermark_](Event::Dispatcher& dispatcher) { + read_buffer_size = read_buffer_size_, + write_timeout_ms = write_timeout_ms_](Event::Dispatcher& dispatcher) { return std::make_shared(io_uring_size, use_submission_queue_polling, - read_buffer_size, write_timeout_ms, - write_buffer_high_watermark, dispatcher); + read_buffer_size, write_timeout_ms, dispatcher); }); } diff --git a/source/common/io/io_uring_worker_factory_impl.h b/source/common/io/io_uring_worker_factory_impl.h index 954878eda4852..03f5f66443542 100644 --- a/source/common/io/io_uring_worker_factory_impl.h +++ b/source/common/io/io_uring_worker_factory_impl.h @@ -10,7 +10,6 @@ class IoUringWorkerFactoryImpl : public IoUringWorkerFactory { public: IoUringWorkerFactoryImpl(uint32_t io_uring_size, bool use_submission_queue_polling, uint32_t read_buffer_size, uint32_t write_timeout_ms, - uint32_t write_buffer_high_watermark, ThreadLocal::SlotAllocator& tls); OptRef getIoUringWorker() override; @@ -23,7 +22,6 @@ class IoUringWorkerFactoryImpl : public IoUringWorkerFactory { const bool use_submission_queue_polling_; const uint32_t read_buffer_size_; const uint32_t write_timeout_ms_; - const uint32_t write_buffer_high_watermark_; ThreadLocal::TypedSlot tls_; }; diff --git a/source/common/io/io_uring_worker_impl.cc b/source/common/io/io_uring_worker_impl.cc index 7bd6114b07f32..3301d46d79f1c 100644 --- a/source/common/io/io_uring_worker_impl.cc +++ b/source/common/io/io_uring_worker_impl.cc @@ -62,19 +62,14 @@ void IoUringSocketEntry::onRemoteClose() { IoUringWorkerImpl::IoUringWorkerImpl(uint32_t io_uring_size, bool use_submission_queue_polling, uint32_t read_buffer_size, uint32_t write_timeout_ms, - uint32_t write_buffer_high_watermark, Event::Dispatcher& dispatcher) : IoUringWorkerImpl(std::make_unique(io_uring_size, use_submission_queue_polling), - read_buffer_size, write_timeout_ms, write_buffer_high_watermark, - dispatcher) {} + read_buffer_size, write_timeout_ms, dispatcher) {} IoUringWorkerImpl::IoUringWorkerImpl(IoUringPtr&& io_uring, uint32_t read_buffer_size, - uint32_t write_timeout_ms, - uint32_t write_buffer_high_watermark, - Event::Dispatcher& dispatcher) + uint32_t write_timeout_ms, Event::Dispatcher& dispatcher) : io_uring_(std::move(io_uring)), read_buffer_size_(read_buffer_size), - write_timeout_ms_(write_timeout_ms), - write_buffer_high_watermark_(write_buffer_high_watermark), dispatcher_(dispatcher) { + write_timeout_ms_(write_timeout_ms), dispatcher_(dispatcher) { const os_fd_t event_fd = io_uring_->registerEventfd(); // We only care about the read event of Eventfd, since we only receive the // event here. @@ -111,8 +106,7 @@ IoUringSocket& IoUringWorkerImpl::addServerSocket(os_fd_t fd, Event::FileReadyCb bool enable_close_event) { ENVOY_LOG(trace, "add server socket, fd = {}", fd); std::unique_ptr socket = std::make_unique( - fd, *this, std::move(cb), write_timeout_ms_, enable_close_event, - write_buffer_high_watermark_); + fd, *this, std::move(cb), write_timeout_ms_, enable_close_event); socket->enableRead(); return addSocket(std::move(socket)); } @@ -121,8 +115,7 @@ IoUringSocket& IoUringWorkerImpl::addServerSocket(os_fd_t fd, Buffer::Instance& Event::FileReadyCb cb, bool enable_close_event) { ENVOY_LOG(trace, "add server socket through existing socket, fd = {}", fd); std::unique_ptr socket = std::make_unique( - fd, read_buf, *this, std::move(cb), write_timeout_ms_, enable_close_event, - write_buffer_high_watermark_); + fd, read_buf, *this, std::move(cb), write_timeout_ms_, enable_close_event); socket->enableRead(); return addSocket(std::move(socket)); } @@ -132,8 +125,7 @@ IoUringSocket& IoUringWorkerImpl::addClientSocket(os_fd_t fd, Event::FileReadyCb ENVOY_LOG(trace, "add client socket, fd = {}", fd); // The client socket should not be read enabled until it is connected. std::unique_ptr socket = std::make_unique( - fd, *this, std::move(cb), write_timeout_ms_, enable_close_event, - write_buffer_high_watermark_); + fd, *this, std::move(cb), write_timeout_ms_, enable_close_event); return addSocket(std::move(socket)); } @@ -318,19 +310,15 @@ void IoUringWorkerImpl::submit() { IoUringServerSocket::IoUringServerSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb, uint32_t write_timeout_ms, - bool enable_close_event, - uint32_t write_buffer_high_watermark) + bool enable_close_event) : IoUringSocketEntry(fd, parent, std::move(cb), enable_close_event), - write_timeout_ms_(write_timeout_ms), - write_buffer_high_watermark_(write_buffer_high_watermark) {} + write_timeout_ms_(write_timeout_ms) {} IoUringServerSocket::IoUringServerSocket(os_fd_t fd, Buffer::Instance& read_buf, IoUringWorkerImpl& parent, Event::FileReadyCb cb, - uint32_t write_timeout_ms, bool enable_close_event, - uint32_t write_buffer_high_watermark) + uint32_t write_timeout_ms, bool enable_close_event) : IoUringSocketEntry(fd, parent, std::move(cb), enable_close_event), - write_timeout_ms_(write_timeout_ms), - write_buffer_high_watermark_(write_buffer_high_watermark) { + write_timeout_ms_(write_timeout_ms) { read_buf_.move(read_buf); } @@ -706,10 +694,8 @@ void IoUringServerSocket::submitWriteOrShutdownRequest() { IoUringClientSocket::IoUringClientSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb, uint32_t write_timeout_ms, - bool enable_close_event, - uint32_t write_buffer_high_watermark) - : IoUringServerSocket(fd, parent, cb, write_timeout_ms, enable_close_event, - write_buffer_high_watermark) {} + bool enable_close_event) + : IoUringServerSocket(fd, parent, cb, write_timeout_ms, enable_close_event) {} void IoUringClientSocket::connect(const Network::Address::InstanceConstSharedPtr& address) { // Reuse read request since there is no read on connecting and connect is cancellable. diff --git a/source/common/io/io_uring_worker_impl.h b/source/common/io/io_uring_worker_impl.h index 9c12f2873afde..634780ec12e26 100644 --- a/source/common/io/io_uring_worker_impl.h +++ b/source/common/io/io_uring_worker_impl.h @@ -32,9 +32,9 @@ class IoUringWorkerImpl : public IoUringWorker, private Logger::Loggable(Request::RequestType::Accept))) { @@ -185,11 +184,9 @@ class IoUringSocketEntry : public IoUringSocket, class IoUringServerSocket : public IoUringSocketEntry { public: IoUringServerSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb, - uint32_t write_timeout_ms, bool enable_close_event, - uint32_t write_buffer_high_watermark = 0); + uint32_t write_timeout_ms, bool enable_close_event); IoUringServerSocket(os_fd_t fd, Buffer::Instance& read_buf, IoUringWorkerImpl& parent, - Event::FileReadyCb cb, uint32_t write_timeout_ms, bool enable_close_event, - uint32_t write_buffer_high_watermark = 0); + Event::FileReadyCb cb, uint32_t write_timeout_ms, bool enable_close_event); ~IoUringServerSocket() override; // IoUringSocket @@ -198,6 +195,9 @@ class IoUringServerSocket : public IoUringSocketEntry { void disableRead() override; void write(Buffer::Instance& data) override; uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) override; + void setWriteBufferHighWatermark(uint32_t high_watermark) override { + write_buffer_high_watermark_ = high_watermark; + } void shutdown(int how) override; void onClose(Request* req, int32_t result, bool injected) override; void onRead(Request* req, int32_t result, bool injected) override; @@ -227,7 +227,7 @@ class IoUringServerSocket : public IoUringSocketEntry { // ``write_buf_`` reaches the configured size, so connection-level back-pressure (and overload // protections that depend on it, like HTTP flood protection) engages. With the default of 0 the // cap is disabled and the upper layer always sees its writes as fully accepted. - const uint32_t write_buffer_high_watermark_; + uint32_t write_buffer_high_watermark_{0}; // True when a previous ``write()`` was unable to fully accept the offered bytes because the // internal write buffer hit the high watermark. When set, an injected Write completion is // delivered to the upper layer after the buffer drains, so the upper layer retries. @@ -263,8 +263,7 @@ class IoUringServerSocket : public IoUringSocketEntry { class IoUringClientSocket : public IoUringServerSocket { public: IoUringClientSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb, - uint32_t write_timeout_ms, bool enable_close_event, - uint32_t write_buffer_high_watermark = 0); + uint32_t write_timeout_ms, bool enable_close_event); void connect(const Network::Address::InstanceConstSharedPtr& address) override; void onConnect(Request* req, int32_t result, bool injected) override; diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 820b8ebe3fd03..407d62b79e3ff 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -642,6 +642,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through void ConnectionImpl::setBufferLimits(uint32_t limit) { read_buffer_limit_ = limit; + ioHandle().setWriteBufferLimits(limit); // Due to the fact that writes to the connection and flushing data from the connection are done // asynchronously, we have the option of either setting the watermarks aggressively, and regularly diff --git a/source/common/network/io_uring_socket_handle_impl.cc b/source/common/network/io_uring_socket_handle_impl.cc index 63dcc2d0be197..2237eff580847 100644 --- a/source/common/network/io_uring_socket_handle_impl.cc +++ b/source/common/network/io_uring_socket_handle_impl.cc @@ -128,6 +128,13 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::write(Buffer::Instance& buffer) return {before - after, IoSocketError::none()}; } +void IoUringSocketHandleImpl::setWriteBufferLimits(uint32_t limit) { + write_buffer_high_watermark_ = limit; + if (io_uring_socket_.has_value()) { + io_uring_socket_->setWriteBufferHighWatermark(limit); + } +} + Api::IoCallUint64Result IoUringSocketHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int, const Address::Ip*, const Address::Instance&) { @@ -246,6 +253,7 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, io_uring_socket_->setFileReadyCb(std::move(cb)); io_uring_socket_->enableRead(); io_uring_socket_->enableCloseEvent(events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); } else { ENVOY_LOG(trace, "initialize file event from another thread, fd = {}, type = {}", fd_, ioUringSocketTypeStr()); @@ -272,6 +280,7 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, // Move the temporary buf to the newly created one. io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addServerSocket( fd, buf, std::move(cb), events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); } return; } @@ -283,12 +292,14 @@ void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, case IoUringSocketType::Server: io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addServerSocket( fd_, std::move(cb), events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); break; case IoUringSocketType::Unknown: case IoUringSocketType::Client: io_uring_socket_type_ = IoUringSocketType::Client; io_uring_socket_ = io_uring_worker_factory_.getIoUringWorker()->addClientSocket( fd_, std::move(cb), events & Event::FileReadyType::Closed); + io_uring_socket_->setWriteBufferHighWatermark(write_buffer_high_watermark_); break; } } diff --git a/source/common/network/io_uring_socket_handle_impl.h b/source/common/network/io_uring_socket_handle_impl.h index 91c083a529d27..9ac22788a1c06 100644 --- a/source/common/network/io_uring_socket_handle_impl.h +++ b/source/common/network/io_uring_socket_handle_impl.h @@ -42,6 +42,7 @@ class IoUringSocketHandleImpl : public IoSocketHandleBaseImpl { absl::optional max_length_opt) override; Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; Api::IoCallUint64Result write(Buffer::Instance& buffer) override; + void setWriteBufferLimits(uint32_t limit) override; Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags, const Address::Ip* self_ip, const Address::Instance& peer_address) override; @@ -84,6 +85,7 @@ class IoUringSocketHandleImpl : public IoSocketHandleBaseImpl { Io::IoUringWorkerFactory& io_uring_worker_factory_; IoUringSocketType io_uring_socket_type_; OptRef io_uring_socket_{absl::nullopt}; + uint32_t write_buffer_high_watermark_{0}; Event::FileEventPtr file_event_{nullptr}; diff --git a/source/common/network/socket_interface_impl.cc b/source/common/network/socket_interface_impl.cc index faeb73d8f1f56..dc134bde4af47 100644 --- a/source/common/network/socket_interface_impl.cc +++ b/source/common/network/socket_interface_impl.cc @@ -179,7 +179,6 @@ Server::BootstrapExtensionPtr SocketInterfaceImpl::createBootstrapExtension( options.enable_submission_queue_polling(), PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, read_buffer_size, 8192), PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, write_timeout_ms, 1000), - PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, write_buffer_high_watermark, 0), context.threadLocal()); io_uring_worker_factory_ = io_uring_worker_factory; diff --git a/test/common/io/io_uring_worker_factory_impl_test.cc b/test/common/io/io_uring_worker_factory_impl_test.cc index a61795a8b83bc..f6eebbc664449 100644 --- a/test/common/io/io_uring_worker_factory_impl_test.cc +++ b/test/common/io/io_uring_worker_factory_impl_test.cc @@ -29,7 +29,7 @@ class IoUringWorkerFactoryImplTest : public ::testing::Test { }; TEST_F(IoUringWorkerFactoryImplTest, Basic) { - IoUringWorkerFactoryImpl factory(2, false, 8192, 1000, 0, context_.threadLocal()); + IoUringWorkerFactoryImpl factory(2, false, 8192, 1000, context_.threadLocal()); EXPECT_TRUE(factory.currentThreadRegistered()); auto dispatcher = api_->allocateDispatcher("test_thread"); factory.onWorkerThreadInitialized(); diff --git a/test/common/io/io_uring_worker_impl_integration_test.cc b/test/common/io/io_uring_worker_impl_integration_test.cc index c6d1814d12aa9..73cd2e263bd62 100644 --- a/test/common/io/io_uring_worker_impl_integration_test.cc +++ b/test/common/io/io_uring_worker_impl_integration_test.cc @@ -89,7 +89,7 @@ class IoUringSocketTestImpl : public IoUringSocketEntry { class IoUringWorkerTestImpl : public IoUringWorkerImpl { public: IoUringWorkerTestImpl(IoUringPtr io_uring_instance, Event::Dispatcher& dispatcher) - : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, 0, dispatcher) {} + : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, dispatcher) {} IoUringSocket& addTestSocket(os_fd_t fd) { return addSocket(std::make_unique(fd, *this)); diff --git a/test/common/io/io_uring_worker_impl_test.cc b/test/common/io/io_uring_worker_impl_test.cc index 519103eb1c3fd..a65437f712f03 100644 --- a/test/common/io/io_uring_worker_impl_test.cc +++ b/test/common/io/io_uring_worker_impl_test.cc @@ -34,7 +34,7 @@ class IoUringSocketTestImpl : public IoUringSocketEntry { class IoUringWorkerTestImpl : public IoUringWorkerImpl { public: IoUringWorkerTestImpl(IoUringPtr io_uring_instance, Event::Dispatcher& dispatcher) - : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, 0, dispatcher) {} + : IoUringWorkerImpl(std::move(io_uring_instance), 8192, 1000, dispatcher) {} IoUringSocket& addTestSocket(os_fd_t fd) { return addSocket(std::make_unique(fd, *this)); @@ -693,10 +693,10 @@ TEST(IoUringWorkerImplTest, NoEnableReadOnConnectError) { delete static_cast(connect_req); } -// When ``write_buffer_high_watermark`` is set, ``IoUringServerSocket::write`` must refuse to stage -// more bytes than the cap so that the upper layer's connection-level back-pressure can engage. -// Once the in-flight write completes and the buffer drops below the watermark, an injected Write -// completion is delivered so the upper layer retries. +// When the connection write buffer limit is applied, ``IoUringServerSocket::write`` must refuse to +// stage more bytes than the cap so that the upper layer's connection-level back-pressure can +// engage. Once the in-flight write completes and the buffer drops below the watermark, an injected +// Write completion is delivered so the upper layer retries. TEST(IoUringWorkerImplTest, WriteBufferHighWatermark) { Event::MockDispatcher dispatcher; IoUringPtr io_uring_instance = std::make_unique(); @@ -709,7 +709,8 @@ TEST(IoUringWorkerImplTest, WriteBufferHighWatermark) { IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher); // Watermark = 100 bytes. - IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false, 100); + IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false); + socket.setWriteBufferHighWatermark(100); // First write of 80 bytes fits under the watermark and is fully accepted. Request* write_req = nullptr; @@ -765,7 +766,8 @@ TEST(IoUringWorkerImplTest, WriteSliceBufferHighWatermark) { createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read)) .WillOnce(ReturnNew>()); IoUringWorkerTestImpl worker(std::move(io_uring_instance), dispatcher); - IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false, 50); + IoUringServerSocket socket(0, worker, [](uint32_t) { return absl::OkStatus(); }, 0, false); + socket.setWriteBufferHighWatermark(50); Request* write_req = nullptr; EXPECT_CALL(mock_io_uring, prepareWritev(_, _, _, _, _)) diff --git a/test/common/network/io_uring_socket_handle_impl_test.cc b/test/common/network/io_uring_socket_handle_impl_test.cc index adcdb1d739e00..486b41cab274e 100644 --- a/test/common/network/io_uring_socket_handle_impl_test.cc +++ b/test/common/network/io_uring_socket_handle_impl_test.cc @@ -41,6 +41,22 @@ TEST_F(IoUringSocketHandleTest, CreateClientSocket) { EXPECT_EQ(IoUringSocketType::Client, impl.ioUringSocketType()); } +TEST_F(IoUringSocketHandleTest, AppliesWriteBufferLimit) { + IoUringSocketHandleTestImpl impl(factory_, false); + impl.setWriteBufferLimits(123); + + EXPECT_CALL(worker_, addClientSocket(_, _, _)).WillOnce(testing::ReturnRef(socket_)); + EXPECT_CALL(factory_, getIoUringWorker()) + .WillOnce(testing::Return(OptRef(worker_))); + EXPECT_CALL(socket_, setWriteBufferHighWatermark(123)); + impl.initializeFileEvent( + dispatcher_, [](uint32_t) { return absl::OkStatus(); }, Event::PlatformDefaultTriggerType, + Event::FileReadyType::Read); + + EXPECT_CALL(socket_, setWriteBufferHighWatermark(456)); + impl.setWriteBufferLimits(456); +} + TEST_F(IoUringSocketHandleTest, ReadError) { IoUringSocketHandleTestImpl impl(factory_, false); EXPECT_CALL(worker_, addClientSocket(_, _, _)).WillOnce(testing::ReturnRef(socket_)); diff --git a/test/mocks/io/mocks.h b/test/mocks/io/mocks.h index 4f7536485135c..00f8483a5245b 100644 --- a/test/mocks/io/mocks.h +++ b/test/mocks/io/mocks.h @@ -44,6 +44,7 @@ class MockIoUringSocket : public IoUringSocket { MOCK_METHOD(void, connect, (const Network::Address::InstanceConstSharedPtr& address)); MOCK_METHOD(void, write, (Buffer::Instance & data)); MOCK_METHOD(uint64_t, write, (const Buffer::RawSlice* slices, uint64_t num_slice)); + MOCK_METHOD(void, setWriteBufferHighWatermark, (uint32_t high_watermark)); MOCK_METHOD(void, onAccept, (Request * req, int32_t result, bool injected)); MOCK_METHOD(void, onConnect, (Request * req, int32_t result, bool injected)); MOCK_METHOD(void, onRead, (Request * req, int32_t result, bool injected));