From 8bf295bf7be66d651eda735b413e41664b329689 Mon Sep 17 00:00:00 2001 From: Adam Buran Date: Sat, 16 May 2026 03:56:20 +0000 Subject: [PATCH] io_uring: bound injected completions per dispatcher tick The injected-completion drain loop in IoUringImpl::forEveryCompletion was unbounded. A steady stream of injected completions, or completion callbacks that inject more completions, could stall the dispatcher thread indefinitely and starve other event sources sharing the same loop. The TODO at io_uring_impl.cc:78 called this out explicitly. Cap the per-tick drain at a configurable bound (default 1024). When the cap is hit with completions still queued, write to the registered eventfd to re-arm the file event so processing resumes on the next dispatcher tick. The eventfd is non-blocking and is fully drained at the top of the method, so the self-poke is safe. Add a unit test that injects more completions than the cap, asserts the drain is split across multiple ticks, and verifies all completions still fire (proving the eventfd re-arm path works without an explicit activate()). Per ravenblackx review: tightened EXPECT_GE to EXPECT_EQ since the eventfd is only written by IoUring's self-poke, so the callback fires exactly 3 times for 3+3+1. Also unified event_callback_invocations to int32_t for consistency with the other counters. Signed-off-by: Adam Buran --- changelogs/current.yaml | 6 ++++ source/common/io/io_uring_impl.cc | 35 +++++++++++++++----- source/common/io/io_uring_impl.h | 13 ++++++-- test/common/io/io_uring_impl_test.cc | 49 ++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/changelogs/current.yaml b/changelogs/current.yaml index e1e4e7ccb6f79..fe58c55c5d3a2 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -84,6 +84,12 @@ bug_fixes: definition requires. Both factories now serialize the ``Struct`` to a JSON string and pass the string to the dynamic module side as the configuration, matching the behavior already in place for every other dynamic module extension factory. +- area: io_uring + change: | + Bounded the number of injected io_uring completions processed per dispatcher tick to keep a + steady stream of injected completions (or completion callbacks that inject more completions) + from starving other work on the dispatcher thread. Any completions left over after the cap is + reached stay queued and are processed on subsequent ticks via an eventfd self-poke. - area: oauth2 change: | Fixed a crash in the OAuth2 filter where AES-CBC decryption of token cookies could spuriously diff --git a/source/common/io/io_uring_impl.cc b/source/common/io/io_uring_impl.cc index 411955c756ae9..513732eb560d4 100644 --- a/source/common/io/io_uring_impl.cc +++ b/source/common/io/io_uring_impl.cc @@ -6,7 +6,7 @@ namespace Envoy { namespace Io { bool isIoUringSupported() { - struct io_uring_params p {}; + struct io_uring_params p{}; struct io_uring ring; bool is_supported = io_uring_queue_init_params(2, &ring, &p) == 0; @@ -17,9 +17,13 @@ bool isIoUringSupported() { return is_supported; } -IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling) - : cqes_(io_uring_size, nullptr) { - struct io_uring_params p {}; +IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling, + uint32_t max_injected_completions_per_event) + : cqes_(io_uring_size, nullptr), + max_injected_completions_per_event_(max_injected_completions_per_event) { + RELEASE_ASSERT(max_injected_completions_per_event_ > 0, + "max_injected_completions_per_event must be > 0"); + struct io_uring_params p{}; if (use_submission_queue_polling) { p.flags |= IORING_SETUP_SQPOLL; } @@ -75,13 +79,28 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) { io_uring_cq_advance(&ring_, count); ENVOY_LOG(trace, "the num of injected completion is {}", injected_completions_.size()); - // TODO(soulxu): Add bound here to avoid too many completion to stuck the thread too - // long. - // Iterate the injected completion. - while (!injected_completions_.empty()) { + // Drain at most `max_injected_completions_per_event_` injected completions per event-loop tick + // so a steady stream of injections (or completion callbacks that inject more completions) can + // never starve other work on the dispatcher thread. Any completions left over are processed on + // the next tick after the eventfd is re-armed below. + uint32_t processed = 0; + while (!injected_completions_.empty() && processed < max_injected_completions_per_event_) { auto completion = injected_completions_.front(); injected_completions_.pop_front(); completion_cb(completion.user_data_, completion.result_, true); + ++processed; + } + + // If we hit the cap with work still queued, write to the eventfd so this callback fires again + // on the next dispatcher tick. The eventfd is non-blocking and we drain it at the top of this + // method, so this is just a self-poke. + if (!injected_completions_.empty()) { + ENVOY_LOG(trace, "injected completion cap reached, {} remaining; re-arming eventfd", + injected_completions_.size()); + const eventfd_t v = 1; + int ret = eventfd_write(event_fd_, v); + RELEASE_ASSERT(ret == 0, + fmt::format("failed to re-arm io_uring eventfd: {}", errorDetails(errno))); } } diff --git a/source/common/io/io_uring_impl.h b/source/common/io/io_uring_impl.h index 196ba0d942a0e..3ed70729a1472 100644 --- a/source/common/io/io_uring_impl.h +++ b/source/common/io/io_uring_impl.h @@ -25,7 +25,15 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject, protected Logger::Loggable { public: - IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling); + // Cap on the number of injected completions processed per event-loop tick. + // Without this cap, a steady stream of injected completions whose callbacks inject more + // completions can stall the dispatcher thread indefinitely. When the cap is reached, the + // remaining injected completions stay queued and the eventfd is re-armed so processing + // resumes on the next tick. + static constexpr uint32_t DefaultMaxInjectedCompletionsPerEvent = 1024; + + IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling, + uint32_t max_injected_completions_per_event = DefaultMaxInjectedCompletionsPerEvent); ~IoUringImpl() override; os_fd_t registerEventfd() override; @@ -48,10 +56,11 @@ class IoUringImpl : public IoUring, void removeInjectedCompletion(os_fd_t fd) override; private: - struct io_uring ring_ {}; + struct io_uring ring_{}; std::vector cqes_; os_fd_t event_fd_{INVALID_SOCKET}; std::list injected_completions_; + const uint32_t max_injected_completions_per_event_; }; } // namespace Io diff --git a/test/common/io/io_uring_impl_test.cc b/test/common/io/io_uring_impl_test.cc index 64fe888494071..ae6642d4a54e8 100644 --- a/test/common/io/io_uring_impl_test.cc +++ b/test/common/io/io_uring_impl_test.cc @@ -276,6 +276,55 @@ TEST_F(IoUringImplTest, NestRemoveInjectCompletion) { waitForCondition(*dispatcher, [&completions_nr]() { return completions_nr == 2; }); } +TEST_F(IoUringImplTest, BoundedInjectedCompletionsPerEvent) { + // Recreate io_uring_ with a small per-tick cap so we can observe the bound. + io_uring_ = std::make_unique(2, false, /*max_injected_completions_per_event=*/3); + + auto dispatcher = api_->allocateDispatcher("test_thread"); + + os_fd_t event_fd = io_uring_->registerEventfd(); + const Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType; + + int32_t total_completions = 0; + int32_t event_callback_invocations = 0; + std::vector per_invocation_counts; + + auto file_event = dispatcher->createFileEvent( + event_fd, + [this, &total_completions, &event_callback_invocations, &per_invocation_counts](uint32_t) { + int32_t before = total_completions; + io_uring_->forEveryCompletion([&total_completions](Request*, int32_t, bool injected) { + EXPECT_TRUE(injected); + total_completions++; + }); + per_invocation_counts.push_back(total_completions - before); + event_callback_invocations++; + return absl::OkStatus(); + }, + trigger, Event::FileReadyType::Read); + + // Inject 7 completions: with cap=3 we expect 3 + 3 + 1 across three event ticks. + std::array data{1, 2, 3, 4, 5, 6, 7}; + std::vector> requests; + for (int& d : data) { + requests.push_back(std::make_unique(d)); + io_uring_->injectCompletion(/*fd=*/42, requests.back().get(), -1); + } + + file_event->activate(Event::FileReadyType::Read); + waitForCondition(*dispatcher, [&total_completions]() { return total_completions == 7; }); + + // Each tick after the first must have been driven by the eventfd self-poke since we never + // called activate() again — proving the re-arm path works. With cap=3 and 7 injected + // completions the drain must split exactly 3 + 3 + 1 across three ticks, and the eventfd + // is only written by IoUring itself, so the callback is invoked exactly 3 times. + EXPECT_EQ(7, total_completions); + EXPECT_EQ(3, event_callback_invocations); + EXPECT_EQ(3, per_invocation_counts[0]); + EXPECT_EQ(3, per_invocation_counts[1]); + EXPECT_EQ(1, per_invocation_counts[2]); +} + TEST_F(IoUringImplTest, RegisterEventfd) { EXPECT_FALSE(io_uring_->isEventfdRegistered()); io_uring_->registerEventfd();