diff --git a/changelogs/current.yaml b/changelogs/current.yaml index e1e4e7ccb6f79..fe58c55c5d3a2 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -84,6 +84,12 @@ bug_fixes: definition requires. Both factories now serialize the ``Struct`` to a JSON string and pass the string to the dynamic module side as the configuration, matching the behavior already in place for every other dynamic module extension factory. +- area: io_uring + change: | + Bounded the number of injected io_uring completions processed per dispatcher tick to keep a + steady stream of injected completions (or completion callbacks that inject more completions) + from starving other work on the dispatcher thread. Any completions left over after the cap is + reached stay queued and are processed on subsequent ticks via an eventfd self-poke. - area: oauth2 change: | Fixed a crash in the OAuth2 filter where AES-CBC decryption of token cookies could spuriously diff --git a/source/common/io/io_uring_impl.cc b/source/common/io/io_uring_impl.cc index 411955c756ae9..513732eb560d4 100644 --- a/source/common/io/io_uring_impl.cc +++ b/source/common/io/io_uring_impl.cc @@ -6,7 +6,7 @@ namespace Envoy { namespace Io { bool isIoUringSupported() { - struct io_uring_params p {}; + struct io_uring_params p{}; struct io_uring ring; bool is_supported = io_uring_queue_init_params(2, &ring, &p) == 0; @@ -17,9 +17,13 @@ bool isIoUringSupported() { return is_supported; } -IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling) - : cqes_(io_uring_size, nullptr) { - struct io_uring_params p {}; +IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling, + uint32_t max_injected_completions_per_event) + : cqes_(io_uring_size, nullptr), + max_injected_completions_per_event_(max_injected_completions_per_event) { + RELEASE_ASSERT(max_injected_completions_per_event_ > 0, + "max_injected_completions_per_event must be > 0"); + struct io_uring_params p{}; if (use_submission_queue_polling) { p.flags |= IORING_SETUP_SQPOLL; } @@ -75,13 +79,28 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) { io_uring_cq_advance(&ring_, count); ENVOY_LOG(trace, "the num of injected completion is {}", injected_completions_.size()); - // TODO(soulxu): Add bound here to avoid too many completion to stuck the thread too - // long. - // Iterate the injected completion. - while (!injected_completions_.empty()) { + // Drain at most `max_injected_completions_per_event_` injected completions per event-loop tick + // so a steady stream of injections (or completion callbacks that inject more completions) can + // never starve other work on the dispatcher thread. Any completions left over are processed on + // the next tick after the eventfd is re-armed below. + uint32_t processed = 0; + while (!injected_completions_.empty() && processed < max_injected_completions_per_event_) { auto completion = injected_completions_.front(); injected_completions_.pop_front(); completion_cb(completion.user_data_, completion.result_, true); + ++processed; + } + + // If we hit the cap with work still queued, write to the eventfd so this callback fires again + // on the next dispatcher tick. The eventfd is non-blocking and we drain it at the top of this + // method, so this is just a self-poke. + if (!injected_completions_.empty()) { + ENVOY_LOG(trace, "injected completion cap reached, {} remaining; re-arming eventfd", + injected_completions_.size()); + const eventfd_t v = 1; + int ret = eventfd_write(event_fd_, v); + RELEASE_ASSERT(ret == 0, + fmt::format("failed to re-arm io_uring eventfd: {}", errorDetails(errno))); } } diff --git a/source/common/io/io_uring_impl.h b/source/common/io/io_uring_impl.h index 196ba0d942a0e..3ed70729a1472 100644 --- a/source/common/io/io_uring_impl.h +++ b/source/common/io/io_uring_impl.h @@ -25,7 +25,15 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject, protected Logger::Loggable { public: - IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling); + // Cap on the number of injected completions processed per event-loop tick. + // Without this cap, a steady stream of injected completions whose callbacks inject more + // completions can stall the dispatcher thread indefinitely. When the cap is reached, the + // remaining injected completions stay queued and the eventfd is re-armed so processing + // resumes on the next tick. + static constexpr uint32_t DefaultMaxInjectedCompletionsPerEvent = 1024; + + IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling, + uint32_t max_injected_completions_per_event = DefaultMaxInjectedCompletionsPerEvent); ~IoUringImpl() override; os_fd_t registerEventfd() override; @@ -48,10 +56,11 @@ class IoUringImpl : public IoUring, void removeInjectedCompletion(os_fd_t fd) override; private: - struct io_uring ring_ {}; + struct io_uring ring_{}; std::vector cqes_; os_fd_t event_fd_{INVALID_SOCKET}; std::list injected_completions_; + const uint32_t max_injected_completions_per_event_; }; } // namespace Io diff --git a/test/common/io/io_uring_impl_test.cc b/test/common/io/io_uring_impl_test.cc index 64fe888494071..ae6642d4a54e8 100644 --- a/test/common/io/io_uring_impl_test.cc +++ b/test/common/io/io_uring_impl_test.cc @@ -276,6 +276,55 @@ TEST_F(IoUringImplTest, NestRemoveInjectCompletion) { waitForCondition(*dispatcher, [&completions_nr]() { return completions_nr == 2; }); } +TEST_F(IoUringImplTest, BoundedInjectedCompletionsPerEvent) { + // Recreate io_uring_ with a small per-tick cap so we can observe the bound. + io_uring_ = std::make_unique(2, false, /*max_injected_completions_per_event=*/3); + + auto dispatcher = api_->allocateDispatcher("test_thread"); + + os_fd_t event_fd = io_uring_->registerEventfd(); + const Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType; + + int32_t total_completions = 0; + int32_t event_callback_invocations = 0; + std::vector per_invocation_counts; + + auto file_event = dispatcher->createFileEvent( + event_fd, + [this, &total_completions, &event_callback_invocations, &per_invocation_counts](uint32_t) { + int32_t before = total_completions; + io_uring_->forEveryCompletion([&total_completions](Request*, int32_t, bool injected) { + EXPECT_TRUE(injected); + total_completions++; + }); + per_invocation_counts.push_back(total_completions - before); + event_callback_invocations++; + return absl::OkStatus(); + }, + trigger, Event::FileReadyType::Read); + + // Inject 7 completions: with cap=3 we expect 3 + 3 + 1 across three event ticks. + std::array data{1, 2, 3, 4, 5, 6, 7}; + std::vector> requests; + for (int& d : data) { + requests.push_back(std::make_unique(d)); + io_uring_->injectCompletion(/*fd=*/42, requests.back().get(), -1); + } + + file_event->activate(Event::FileReadyType::Read); + waitForCondition(*dispatcher, [&total_completions]() { return total_completions == 7; }); + + // Each tick after the first must have been driven by the eventfd self-poke since we never + // called activate() again — proving the re-arm path works. With cap=3 and 7 injected + // completions the drain must split exactly 3 + 3 + 1 across three ticks, and the eventfd + // is only written by IoUring itself, so the callback is invoked exactly 3 times. + EXPECT_EQ(7, total_completions); + EXPECT_EQ(3, event_callback_invocations); + EXPECT_EQ(3, per_invocation_counts[0]); + EXPECT_EQ(3, per_invocation_counts[1]); + EXPECT_EQ(1, per_invocation_counts[2]); +} + TEST_F(IoUringImplTest, RegisterEventfd) { EXPECT_FALSE(io_uring_->isEventfdRegistered()); io_uring_->registerEventfd();