diff --git a/docs/cn/rdma.md b/docs/cn/rdma.md index 86c22e1e84..29b0b6fb43 100644 --- a/docs/cn/rdma.md +++ b/docs/cn/rdma.md @@ -47,6 +47,8 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。 +RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。在配合使用spdk等驱动的时候,因为spdk只支持轮询模式,并且只能在单线程使用(或者叫Run To Completion模式上使用)执行一个任务过程中不允许被调度到别的线程上,所以这时候需要设置(rdma_edisp_unsched)为true,使事件驱动程序一直占用一个worker线程,不能调度别的任务。 + # 参数 可配置参数说明: @@ -68,3 +70,8 @@ RDMA是硬件相关的通信技术,有很多独特的概念,比如device、p * rdma_memory_pool_max_regions: 最大的内存池块数,默认16 * rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4 * rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128 +* rdma_use_polling: 是否使用RDMA的轮询模式,默认false +* rdma_poller_num: 轮询模式下的poller数目,默认1 +* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false +* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false +* rdma_disable_bthread: 禁用bthread,默认是false diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index ab07ad3bff..69c9e8c341 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -180,6 +180,9 @@ int Channel::InitChannelOptions(const ChannelOptions* options) { return -1; } rdma::GlobalRdmaInitializeOrDie(); + if (!rdma::InitPollingModeWithTag(bthread_self_tag())) { + return -1; + } #else LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma"; return -1; diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index afb8fb3f5e..b1e8d1e008 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -207,6 +207,14 @@ static void QueueMessage(InputMessageBase* to_run_msg, BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL; tmp.keytable_pool = keytable_pool; tmp.tag = bthread_self_tag(); + +#if BRPC_WITH_RDMA + if (rdma::FLAGS_rdma_disable_bthread) { + ProcessInputMessage(to_run_msg); + return; + } +#endif + if (!FLAGS_usercode_in_coroutine && bthread_start_background( &th, &tmp, ProcessInputMessage, to_run_msg) == 0) { ++*num_bthread_created; diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 2750b756d3..0ef2cef1db 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -31,6 +31,7 @@ #include "brpc/rdma/rdma_helper.h" #include "brpc/rdma/rdma_endpoint.h" +DECLARE_int32(task_group_ntags); namespace brpc { namespace rdma { @@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP."); DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP."); DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely"); BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate); +DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA."); +DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode."); +DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode."); +DEFINE_bool(rdma_edisp_unsched, false, "Disable event dispatcher schedule"); +DEFINE_bool(rdma_disable_bthread, false, "Disable bthread in RDMA"); static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent @@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { LOG_IF(INFO, FLAGS_rdma_trace_verbose) << "Handshake ends (use tcp) on " << s->description(); } - ep->TryReadOnTcp(); return NULL; @@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) { return NULL; } - res->comp_channel = IbvCreateCompChannel(GetRdmaContext()); - if (!res->comp_channel) { - PLOG(WARNING) << "Fail to create comp channel for CQ"; - delete res; - return NULL; - } + if (!FLAGS_rdma_use_polling) { + res->comp_channel = IbvCreateCompChannel(GetRdmaContext()); + if (!res->comp_channel) { + PLOG(WARNING) << "Fail to create comp channel for CQ"; + delete res; + return NULL; + } - butil::make_close_on_exec(res->comp_channel->fd); - if (butil::make_non_blocking(res->comp_channel->fd) < 0) { - PLOG(WARNING) << "Fail to set comp channel nonblocking"; - delete res; - return NULL; - } + butil::make_close_on_exec(res->comp_channel->fd); + if (butil::make_non_blocking(res->comp_channel->fd) < 0) { + PLOG(WARNING) << "Fail to set comp channel nonblocking"; + delete res; + return NULL; + } - res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size, - NULL, res->comp_channel, GetRdmaCompVector()); - if (!res->cq) { - PLOG(WARNING) << "Fail to create CQ"; - delete res; - return NULL; + res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size, + NULL, res->comp_channel, GetRdmaCompVector()); + if (!res->cq) { + PLOG(WARNING) << "Fail to create CQ"; + delete res; + return NULL; + } + } else { + res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size, + NULL, NULL, 0); + if (!res->cq) { + PLOG(WARNING) << "Fail to create CQ"; + delete res; + return NULL; + } } ibv_qp_init_attr attr; @@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() { return -1; } - SocketOptions options; - options.user = this; - options.keytable_pool = _socket->_keytable_pool; - options.fd = _resource->comp_channel->fd; - options.on_edge_triggered_events = PollCq; - if (Socket::Create(options, &_cq_sid) < 0) { - PLOG(WARNING) << "Fail to create socket for cq"; - return -1; - } + if (!FLAGS_rdma_use_polling) { + SocketOptions options; + options.user = this; + options.keytable_pool = _socket->_keytable_pool; + options.fd = _resource->comp_channel->fd; + options.on_edge_triggered_events = PollCq; + if (Socket::Create(options, &_cq_sid) < 0) { + PLOG(WARNING) << "Fail to create socket for cq"; + return -1; + } - if (ibv_req_notify_cq(_resource->cq, 1) < 0) { - PLOG(WARNING) << "Fail to arm CQ comp channel"; - return -1; + if (ibv_req_notify_cq(_resource->cq, 1) < 0) { + PLOG(WARNING) << "Fail to arm CQ comp channel"; + return -1; + } + } else { + SocketOptions options; + options.user = this; + options.keytable_pool = _socket->_keytable_pool; + if (Socket::Create(options, &_cq_sid) < 0) { + PLOG(WARNING) << "Fail to create socket for cq"; + return -1; + } + PollerAddCqSid(); } _sbuf.resize(_sq_size - RESERVED_WR_NUM); @@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() { if (!_resource) { return; } + if (FLAGS_rdma_use_polling) { + PollerRemoveCqSid(); + } bool move_to_rdma_resource_list = false; if (_sq_size <= FLAGS_rdma_prepared_qp_size && _rq_size <= FLAGS_rdma_prepared_qp_size && @@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() { move_to_rdma_resource_list = true; } } - int fd = _resource->comp_channel->fd; + int fd = -1; + if (_resource->comp_channel) { + fd = _resource->comp_channel->fd; + } if (!move_to_rdma_resource_list) { if (_resource->qp) { if (IbvDestroyQp(_resource->qp) < 0) { @@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) { } CHECK(ep == s->_rdma_ep); - if (ep->GetAndAckEvents() < 0) { - const int saved_errno = errno; - PLOG(ERROR) << "Fail to get cq event: " << s->description(); - s->SetFailed(saved_errno, "Fail to get cq event from %s: %s", - s->description().c_str(), berror(saved_errno)); - return; + if (!FLAGS_rdma_use_polling) { + if (ep->GetAndAckEvents() < 0) { + const int saved_errno = errno; + PLOG(ERROR) << "Fail to get cq event: " << s->description(); + s->SetFailed(saved_errno, "Fail to get cq event from %s: %s", + s->description().c_str(), berror(saved_errno)); + return; + } } int progress = Socket::PROGRESS_INIT; @@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) { return; } if (cnt == 0) { + if (FLAGS_rdma_use_polling) { + return; + } if (!notified) { // Since RDMA only provides one shot event, we have to call the // notify function every time. Because there is a possibility @@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) { } if (ep->GetAndAckEvents() < 0) { s->SetFailed(errno, "Fail to ack CQ event on %s", - s->description().c_str()); + s->description().c_str()); return; } notified = false; @@ -1474,6 +1511,10 @@ int RdmaEndpoint::GlobalInitialize() { g_rdma_resource_list = res; } + if (FLAGS_rdma_use_polling) { + _poller_groups = std::vector(FLAGS_task_group_ntags); + } + return 0; } @@ -1486,6 +1527,123 @@ void RdmaEndpoint::GlobalRelease() { delete res; } } + // release polling mode at exit or call RdmaEndpoint::PollingModeRelease + // explicitly + if (FLAGS_rdma_use_polling) { + for (int i = 0; i < FLAGS_task_group_ntags; ++i) { + PollingModeRelease(i); + } + } +} + +std::vector RdmaEndpoint::_poller_groups; + +int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag, + std::function callback, + std::function init_fn, + std::function release_fn) { + if (!FLAGS_rdma_use_polling) { + return 0; + } + auto& group = _poller_groups[tag]; + auto& pollers = group.pollers; + auto& running = group.running; + bool expected = false; + if (!running.compare_exchange_strong(expected, true)) { + return 0; + } + struct FnArgs { + Poller* poller; + std::atomic* running; + }; + auto fn = [](void* p) -> void* { + std::unique_ptr args(static_cast(p)); + auto poller = args->poller; + auto running = args->running; + std::unordered_set cq_sids; + CqSidOp op; + + if (poller->init_fn) { + poller->init_fn(); + } + + while (running->load(std::memory_order_relaxed)) { + while (poller->op_queue.Dequeue(op)) { + if (op.type == CqSidOp::ADD) { + cq_sids.emplace(op.sid); + } else if (op.type == CqSidOp::REMOVE) { + cq_sids.erase(op.sid); + } + } + for (auto sid : cq_sids) { + SocketUniquePtr s; + if (Socket::Address(sid, &s) < 0) { + continue; + } + PollCq(s.get()); + } + if (poller->callback) { + poller->callback(); + } + if (FLAGS_rdma_poller_yield) { + bthread_yield(); + } + } + + if (poller->release_fn) { + poller->release_fn(); + } + + return nullptr; + }; + for (int i = 0; i < FLAGS_rdma_poller_num; ++i) { + auto args = new FnArgs{&pollers[i], &running}; + auto attr = FLAGS_rdma_disable_bthread ? BTHREAD_ATTR_PTHREAD + : BTHREAD_ATTR_NORMAL; + attr.tag = tag; + pollers[i].callback = callback; + pollers[i].init_fn = init_fn; + pollers[i].release_fn = release_fn; + auto rc = bthread_start_background(&pollers[i].tid, &attr, fn, args); + if (rc != 0) { + LOG(ERROR) << "Fail to start rdma polling bthread"; + return -1; + } + } + return 0; +} + +void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) { + if (!FLAGS_rdma_use_polling) { + return; + } + auto& group = _poller_groups[tag]; + auto& pollers = group.pollers; + auto& running = group.running; + running.store(false, std::memory_order_relaxed); + for (int i = 0; i < FLAGS_rdma_poller_num; ++i) { + bthread_join(pollers[i].tid, nullptr); + } +} + +void RdmaEndpoint::PollerAddCqSid() { + auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num; + auto& group = _poller_groups[bthread_self_tag()]; + auto& pollers = group.pollers; + auto& poller = pollers[index]; + if (_cq_sid != INVALID_SOCKET_ID) { + poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD}); + } +} + +void RdmaEndpoint::PollerRemoveCqSid() { + auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num; + auto& group = _poller_groups[bthread_self_tag()]; + auto& pollers = group.pollers; + auto& poller = pollers[index]; + if (_cq_sid != INVALID_SOCKET_ID) { + poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE}); + } } } // namespace rdma diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index b4a748f9e2..75730fddf2 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -23,10 +23,12 @@ #include #include #include +#include #include #include "butil/atomicops.h" #include "butil/iobuf.h" #include "butil/macros.h" +#include "butil/containers/mpsc_queue.h" #include "brpc/socket.h" @@ -34,6 +36,10 @@ namespace brpc { class Socket; namespace rdma { +DECLARE_int32(rdma_poller_num); +DECLARE_bool(rdma_edisp_unsched); +DECLARE_bool(rdma_disable_bthread); + class RdmaConnect : public AppConnect { public: void StartConnect(const Socket* socket, @@ -90,6 +96,14 @@ friend class brpc::Socket; // Callback when there is new epollin event on TCP fd static void OnNewDataFromTcp(Socket* m); + // Initialize polling mode + static int PollingModeInitialize(bthread_tag_t tag, + std::function callback, + std::function init_fn, + std::function release_fn); + + static void PollingModeRelease(bthread_tag_t tag); + private: enum State { UNINIT = 0x0, @@ -191,6 +205,12 @@ friend class brpc::Socket; // Try to read data on TCP fd in _socket inline void TryReadOnTcp(); + // Add cq socket id to poller + void PollerAddCqSid(); + + // Remove cq socket id to poller + void PollerRemoveCqSid(); + // Not owner Socket* _socket; @@ -245,6 +265,33 @@ friend class brpc::Socket; butil::atomic *_read_butex; DISALLOW_COPY_AND_ASSIGN(RdmaEndpoint); + + // Cq socket id operation type + struct CqSidOp { + enum OpType { + ADD, + REMOVE, + }; + SocketId sid; + OpType type; + }; + // Poller instance + struct BAIDU_CACHELINE_ALIGNMENT Poller { + bthread_t tid{INVALID_BTHREAD}; + butil::MPSCQueue> op_queue; + // Callback used for io_uring/spdk etc + std::function callback; + // Init and Destory function + std::function init_fn; + std::function release_fn; + }; + // Poller group + struct BAIDU_CACHELINE_ALIGNMENT PollerGroup { + PollerGroup() : pollers(FLAGS_rdma_poller_num), running(false) {} + std::vector pollers; + std::atomic running; + }; + static std::vector _poller_groups; }; } // namespace rdma diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index a64eba0aa7..003fd23241 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -686,6 +686,21 @@ bool SupportedByRdma(std::string protocol) { return false; } +bool InitPollingModeWithTag(bthread_tag_t tag, + std::function callback, + std::function init_fn, + std::function release_fn) { + if (RdmaEndpoint::PollingModeInitialize(tag, callback, init_fn, + release_fn) == 0) { + return true; + } + return false; +} + +void ReleasePollingModeWithTag(bthread_tag_t tag) { + RdmaEndpoint::PollingModeRelease(tag); +} + } // namespace rdma } // namespace brpc diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 5d176b4e7c..a10c501aaf 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -22,6 +22,8 @@ #include #include +#include +#include "bthread/types.h" namespace brpc { @@ -31,6 +33,14 @@ namespace rdma { // Exit if failed void GlobalRdmaInitializeOrDie(); +// Initialize RDMA polling mode with tag +bool InitPollingModeWithTag(bthread_tag_t tag, + std::function callback = nullptr, + std::function init_fn = nullptr, + std::function release_fn = nullptr); + +void ReleasePollingModeWithTag(bthread_tag_t tag); + // Register the given memory // Return the memory lkey for the given memory, Return 0 when fails // To use the memory in IOBuf, append_user_data_with_meta must be called diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index aa55c858aa..17cde630b7 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -434,7 +434,6 @@ Server::~Server() { Join(); ClearServices(); FreeSSLContexts(); - delete _session_local_data_pool; _session_local_data_pool = NULL; @@ -867,25 +866,29 @@ int Server::StartInternal(const butil::EndPoint& endpoint, return -1; } + if (_options.bthread_tag < BTHREAD_TAG_DEFAULT || + _options.bthread_tag >= FLAGS_task_group_ntags) { + LOG(ERROR) << "Fail to set tag " << _options.bthread_tag + << ", tag range is [" << BTHREAD_TAG_DEFAULT << ":" + << FLAGS_task_group_ntags << ")"; + return -1; + } + if (_options.use_rdma) { #if BRPC_WITH_RDMA if (!OptionsAvailableOverRdma(&_options)) { return -1; } rdma::GlobalRdmaInitializeOrDie(); + if (!rdma::InitPollingModeWithTag(_options.bthread_tag)) { + return -1; + } #else LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma"; return -1; #endif } - if (_options.bthread_tag < BTHREAD_TAG_DEFAULT || - _options.bthread_tag >= FLAGS_task_group_ntags) { - LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is [" - << BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")"; - return -1; - } - if (_options.http_master_service) { // Check requirements for http_master_service: // has "default_method" & request/response have no fields diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index e7cc336a5d..760b83d8a0 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -2275,6 +2275,14 @@ int Socket::OnInputEvent(void* user_data, uint32_t events, attr.tag = bthread_self_tag(); if (FLAGS_usercode_in_coroutine) { ProcessEvent(p); +#if BRPC_WITH_RDMA + } else if (rdma::FLAGS_rdma_edisp_unsched == false) { + auto rc = bthread_start_background(&tid, &attr, ProcessEvent, p); + if (rc != 0) { + LOG(FATAL) << "Fail to start ProcessEvent"; + ProcessEvent(p); + } +#endif } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) { LOG(FATAL) << "Fail to start ProcessEvent"; ProcessEvent(p); diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp index 7a0218ddc1..cee13576ba 100644 --- a/src/brpc/span.cpp +++ b/src/brpc/span.cpp @@ -17,6 +17,7 @@ #include +#include #include #include #include diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 59ad61a21b..208eedd2a9 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -31,6 +31,7 @@ #include // errno #include // CHAR_BIT #include // std::invalid_argument +#include // gflags #include "butil/build_config.h" // ARCH_CPU_X86_64 #include "butil/atomicops.h" // butil::atomic #include "butil/thread_local.h" // thread_atexit @@ -43,6 +44,8 @@ namespace butil { namespace iobuf { +DEFINE_int32(iobuf_aligned_buf_block_size, 0, "iobuf aligned buf block size"); + typedef ssize_t (*iov_function)(int fd, const struct iovec *vector, int count, off_t offset); @@ -1817,8 +1820,11 @@ IOBuf::Area IOReserveAlignedBuf::reserve(size_t count) { count = (count + _alignment - 1) & ~(_alignment - 1); size_t total_nc = 0; while (total_nc < count) { - const auto block_size = + auto block_size = std::max(_alignment, 4096UL) * 2 + sizeof(IOBuf::Block); + if (iobuf::FLAGS_iobuf_aligned_buf_block_size != 0) { + block_size = iobuf::FLAGS_iobuf_aligned_buf_block_size; + } auto b = iobuf::create_block_aligned(block_size, _alignment); if (BAIDU_UNLIKELY(!b)) { LOG(ERROR) << "Create block failed";