Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/cn/rdma.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register

RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。

RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。在配合使用spdk等驱动的时候,因为spdk只支持轮询模式,并且只能在单线程使用(或者叫Run To Completion模式上使用)执行一个任务过程中不允许被调度到别的线程上,所以这时候需要设置(rdma_edisp_unsched)为true,使事件驱动程序一直占用一个worker线程,不能调度别的任务。

# 参数

可配置参数说明:
Expand All @@ -68,3 +70,8 @@ RDMA是硬件相关的通信技术,有很多独特的概念,比如device、p
* rdma_memory_pool_max_regions: 最大的内存池块数,默认16
* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4
* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128
* rdma_use_polling: 是否使用RDMA的轮询模式,默认false
* rdma_poller_num: 轮询模式下的poller数目,默认1
* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false
* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false
* rdma_disable_bthread: 禁用bthread,默认是false
3 changes: 3 additions & 0 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
return -1;
}
rdma::GlobalRdmaInitializeOrDie();
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
return -1;
}
#else
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
return -1;
Expand Down
8 changes: 8 additions & 0 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ static void QueueMessage(InputMessageBase* to_run_msg,
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;
tmp.tag = bthread_self_tag();

#if BRPC_WITH_RDMA
if (rdma::FLAGS_rdma_disable_bthread) {
ProcessInputMessage(to_run_msg);
return;
}
#endif

if (!FLAGS_usercode_in_coroutine && bthread_start_background(
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
++*num_bthread_created;
Expand Down
236 changes: 197 additions & 39 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "brpc/rdma/rdma_helper.h"
#include "brpc/rdma/rdma_endpoint.h"

DECLARE_int32(task_group_ntags);

namespace brpc {
namespace rdma {
Expand Down Expand Up @@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA.");
DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode.");
DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode.");
DEFINE_bool(rdma_edisp_unsched, false, "Disable event dispatcher schedule");
DEFINE_bool(rdma_disable_bthread, false, "Disable bthread in RDMA");

static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent

Expand Down Expand Up @@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
<< "Handshake ends (use tcp) on " << s->description();
}

ep->TryReadOnTcp();

return NULL;
Expand Down Expand Up @@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
return NULL;
}

res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
if (!res->comp_channel) {
PLOG(WARNING) << "Fail to create comp channel for CQ";
delete res;
return NULL;
}
if (!FLAGS_rdma_use_polling) {
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
if (!res->comp_channel) {
PLOG(WARNING) << "Fail to create comp channel for CQ";
delete res;
return NULL;
}

butil::make_close_on_exec(res->comp_channel->fd);
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
PLOG(WARNING) << "Fail to set comp channel nonblocking";
delete res;
return NULL;
}
butil::make_close_on_exec(res->comp_channel->fd);
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
PLOG(WARNING) << "Fail to set comp channel nonblocking";
delete res;
return NULL;
}

res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
NULL, res->comp_channel, GetRdmaCompVector());
if (!res->cq) {
PLOG(WARNING) << "Fail to create CQ";
delete res;
return NULL;
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
NULL, res->comp_channel, GetRdmaCompVector());
if (!res->cq) {
PLOG(WARNING) << "Fail to create CQ";
delete res;
return NULL;
}
} else {
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
NULL, NULL, 0);
if (!res->cq) {
PLOG(WARNING) << "Fail to create CQ";
delete res;
return NULL;
}
}

ibv_qp_init_attr attr;
Expand Down Expand Up @@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() {
return -1;
}

SocketOptions options;
options.user = this;
options.keytable_pool = _socket->_keytable_pool;
options.fd = _resource->comp_channel->fd;
options.on_edge_triggered_events = PollCq;
if (Socket::Create(options, &_cq_sid) < 0) {
PLOG(WARNING) << "Fail to create socket for cq";
return -1;
}
if (!FLAGS_rdma_use_polling) {
SocketOptions options;
options.user = this;
options.keytable_pool = _socket->_keytable_pool;
options.fd = _resource->comp_channel->fd;
options.on_edge_triggered_events = PollCq;
if (Socket::Create(options, &_cq_sid) < 0) {
PLOG(WARNING) << "Fail to create socket for cq";
return -1;
}

if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
PLOG(WARNING) << "Fail to arm CQ comp channel";
return -1;
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
PLOG(WARNING) << "Fail to arm CQ comp channel";
return -1;
}
} else {
SocketOptions options;
options.user = this;
options.keytable_pool = _socket->_keytable_pool;
if (Socket::Create(options, &_cq_sid) < 0) {
PLOG(WARNING) << "Fail to create socket for cq";
return -1;
}
PollerAddCqSid();
}

_sbuf.resize(_sq_size - RESERVED_WR_NUM);
Expand Down Expand Up @@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() {
if (!_resource) {
return;
}
if (FLAGS_rdma_use_polling) {
PollerRemoveCqSid();
}
bool move_to_rdma_resource_list = false;
if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
_rq_size <= FLAGS_rdma_prepared_qp_size &&
Expand All @@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() {
move_to_rdma_resource_list = true;
}
}
int fd = _resource->comp_channel->fd;
int fd = -1;
if (_resource->comp_channel) {
fd = _resource->comp_channel->fd;
}
if (!move_to_rdma_resource_list) {
if (_resource->qp) {
if (IbvDestroyQp(_resource->qp) < 0) {
Expand Down Expand Up @@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
}
CHECK(ep == s->_rdma_ep);

if (ep->GetAndAckEvents() < 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to get cq event: " << s->description();
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
s->description().c_str(), berror(saved_errno));
return;
if (!FLAGS_rdma_use_polling) {
if (ep->GetAndAckEvents() < 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to get cq event: " << s->description();
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
s->description().c_str(), berror(saved_errno));
return;
}
}

int progress = Socket::PROGRESS_INIT;
Expand All @@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
return;
}
if (cnt == 0) {
if (FLAGS_rdma_use_polling) {
return;
}
if (!notified) {
// Since RDMA only provides one shot event, we have to call the
// notify function every time. Because there is a possibility
Expand All @@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
}
if (ep->GetAndAckEvents() < 0) {
s->SetFailed(errno, "Fail to ack CQ event on %s",
s->description().c_str());
s->description().c_str());
return;
}
notified = false;
Expand Down Expand Up @@ -1474,6 +1511,10 @@ int RdmaEndpoint::GlobalInitialize() {
g_rdma_resource_list = res;
}

if (FLAGS_rdma_use_polling) {
_poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
}

return 0;
}

Expand All @@ -1486,6 +1527,123 @@ void RdmaEndpoint::GlobalRelease() {
delete res;
}
}
// release polling mode at exit or call RdmaEndpoint::PollingModeRelease
// explicitly
if (FLAGS_rdma_use_polling) {
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
PollingModeRelease(i);
}
}
}

std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;

int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag,
std::function<void(void)> callback,
std::function<void(void)> init_fn,
std::function<void(void)> release_fn) {
if (!FLAGS_rdma_use_polling) {
return 0;
}
auto& group = _poller_groups[tag];
auto& pollers = group.pollers;
auto& running = group.running;
bool expected = false;
if (!running.compare_exchange_strong(expected, true)) {
return 0;
}
struct FnArgs {
Poller* poller;
std::atomic<bool>* running;
};
auto fn = [](void* p) -> void* {
std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
auto poller = args->poller;
auto running = args->running;
std::unordered_set<SocketId> cq_sids;
CqSidOp op;

if (poller->init_fn) {
poller->init_fn();
}

while (running->load(std::memory_order_relaxed)) {
while (poller->op_queue.Dequeue(op)) {
if (op.type == CqSidOp::ADD) {
cq_sids.emplace(op.sid);
} else if (op.type == CqSidOp::REMOVE) {
cq_sids.erase(op.sid);
}
}
for (auto sid : cq_sids) {
SocketUniquePtr s;
if (Socket::Address(sid, &s) < 0) {
continue;
}
PollCq(s.get());
Comment thread
yanglimingcn marked this conversation as resolved.
}
if (poller->callback) {
poller->callback();
}
if (FLAGS_rdma_poller_yield) {
bthread_yield();
}
}

if (poller->release_fn) {
poller->release_fn();
}

return nullptr;
};
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
auto args = new FnArgs{&pollers[i], &running};
auto attr = FLAGS_rdma_disable_bthread ? BTHREAD_ATTR_PTHREAD
: BTHREAD_ATTR_NORMAL;
attr.tag = tag;
pollers[i].callback = callback;
pollers[i].init_fn = init_fn;
pollers[i].release_fn = release_fn;
auto rc = bthread_start_background(&pollers[i].tid, &attr, fn, args);
if (rc != 0) {
LOG(ERROR) << "Fail to start rdma polling bthread";
return -1;
}
}
return 0;
}

void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) {
if (!FLAGS_rdma_use_polling) {
return;
}
auto& group = _poller_groups[tag];
auto& pollers = group.pollers;
auto& running = group.running;
running.store(false, std::memory_order_relaxed);
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
bthread_join(pollers[i].tid, nullptr);
}
}

void RdmaEndpoint::PollerAddCqSid() {
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
auto& group = _poller_groups[bthread_self_tag()];
auto& pollers = group.pollers;
auto& poller = pollers[index];
if (_cq_sid != INVALID_SOCKET_ID) {
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD});
}
}

void RdmaEndpoint::PollerRemoveCqSid() {
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
auto& group = _poller_groups[bthread_self_tag()];
auto& pollers = group.pollers;
auto& poller = pollers[index];
if (_cq_sid != INVALID_SOCKET_ID) {
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE});
}
}

} // namespace rdma
Expand Down
Loading
Loading