Skip to content

Commit 250c4ad

Browse files
committed
rdma support polling mode
1 parent 6ef4158 commit 250c4ad

10 files changed

Lines changed: 297 additions & 48 deletions

File tree

docs/cn/rdma.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register
4747

4848
RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。
4949

50+
RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。在配合使用spdk等驱动的时候,因为spdk只支持轮询模式,并且只能在单线程使用(或者叫Run To Completion模式上使用)执行一个任务过程中不允许被调度到别的线程上,所以这时候需要设置(rdma_edisp_unsched)为true,使事件驱动程序一直占用一个worker线程,不能调度别的任务。
51+
5052
# 参数
5153

5254
可配置参数说明:
@@ -68,3 +70,7 @@ RDMA是硬件相关的通信技术,有很多独特的概念,比如device、p
6870
* rdma_memory_pool_max_regions: 最大的内存池块数,默认16
6971
* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4
7072
* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128
73+
* rdma_use_polling: 是否使用RDMA的轮询模式,默认false
74+
* rdma_poller_num: 轮询模式下的poller数目,默认1
75+
* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false
76+
* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false

src/brpc/channel.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
180180
return -1;
181181
}
182182
rdma::GlobalRdmaInitializeOrDie();
183+
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
184+
return -1;
185+
}
183186
#else
184187
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
185188
return -1;

src/brpc/rdma/rdma_endpoint.cpp

Lines changed: 192 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "brpc/rdma/rdma_helper.h"
3232
#include "brpc/rdma/rdma_endpoint.h"
3333

34+
DECLARE_int32(task_group_ntags);
3435

3536
namespace brpc {
3637
namespace rdma {
@@ -58,6 +59,10 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5859
DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
5960
DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
6061
BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
62+
DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA.");
63+
DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode.");
64+
DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode.");
65+
DEFINE_bool(rdma_edisp_unsched, false, "Disable event dispatcher schedule");
6166

6267
static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
6368

@@ -699,7 +704,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699704
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
700705
<< "Handshake ends (use tcp) on " << s->description();
701706
}
702-
703707
ep->TryReadOnTcp();
704708

705709
return NULL;
@@ -1041,26 +1045,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411045
return NULL;
10421046
}
10431047

1044-
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1045-
if (!res->comp_channel) {
1046-
PLOG(WARNING) << "Fail to create comp channel for CQ";
1047-
delete res;
1048-
return NULL;
1049-
}
1048+
if (!FLAGS_rdma_use_polling) {
1049+
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1050+
if (!res->comp_channel) {
1051+
PLOG(WARNING) << "Fail to create comp channel for CQ";
1052+
delete res;
1053+
return NULL;
1054+
}
10501055

1051-
butil::make_close_on_exec(res->comp_channel->fd);
1052-
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1053-
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1054-
delete res;
1055-
return NULL;
1056-
}
1056+
butil::make_close_on_exec(res->comp_channel->fd);
1057+
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1058+
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1059+
delete res;
1060+
return NULL;
1061+
}
10571062

1058-
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1059-
NULL, res->comp_channel, GetRdmaCompVector());
1060-
if (!res->cq) {
1061-
PLOG(WARNING) << "Fail to create CQ";
1062-
delete res;
1063-
return NULL;
1063+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1064+
NULL, res->comp_channel, GetRdmaCompVector());
1065+
if (!res->cq) {
1066+
PLOG(WARNING) << "Fail to create CQ";
1067+
delete res;
1068+
return NULL;
1069+
}
1070+
} else {
1071+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1072+
NULL, NULL, 0);
1073+
if (!res->cq) {
1074+
PLOG(WARNING) << "Fail to create CQ";
1075+
delete res;
1076+
return NULL;
1077+
}
10641078
}
10651079

10661080
ibv_qp_init_attr attr;
@@ -1117,19 +1131,30 @@ int RdmaEndpoint::AllocateResources() {
11171131
return -1;
11181132
}
11191133

1120-
SocketOptions options;
1121-
options.user = this;
1122-
options.keytable_pool = _socket->_keytable_pool;
1123-
options.fd = _resource->comp_channel->fd;
1124-
options.on_edge_triggered_events = PollCq;
1125-
if (Socket::Create(options, &_cq_sid) < 0) {
1126-
PLOG(WARNING) << "Fail to create socket for cq";
1127-
return -1;
1128-
}
1134+
if (!FLAGS_rdma_use_polling) {
1135+
SocketOptions options;
1136+
options.user = this;
1137+
options.keytable_pool = _socket->_keytable_pool;
1138+
options.fd = _resource->comp_channel->fd;
1139+
options.on_edge_triggered_events = PollCq;
1140+
if (Socket::Create(options, &_cq_sid) < 0) {
1141+
PLOG(WARNING) << "Fail to create socket for cq";
1142+
return -1;
1143+
}
11291144

1130-
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1131-
PLOG(WARNING) << "Fail to arm CQ comp channel";
1132-
return -1;
1145+
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1146+
PLOG(WARNING) << "Fail to arm CQ comp channel";
1147+
return -1;
1148+
}
1149+
} else {
1150+
SocketOptions options;
1151+
options.user = this;
1152+
options.keytable_pool = _socket->_keytable_pool;
1153+
if (Socket::Create(options, &_cq_sid) < 0) {
1154+
PLOG(WARNING) << "Fail to create socket for cq";
1155+
return -1;
1156+
}
1157+
PollerAddCqSid();
11331158
}
11341159

11351160
_sbuf.resize(_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1252,9 @@ void RdmaEndpoint::DeallocateResources() {
12271252
if (!_resource) {
12281253
return;
12291254
}
1255+
if (FLAGS_rdma_use_polling) {
1256+
PollerRemoveCqSid();
1257+
}
12301258
bool move_to_rdma_resource_list = false;
12311259
if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321260
_rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1265,10 @@ void RdmaEndpoint::DeallocateResources() {
12371265
move_to_rdma_resource_list = true;
12381266
}
12391267
}
1240-
int fd = _resource->comp_channel->fd;
1268+
int fd = -1;
1269+
if (_resource->comp_channel) {
1270+
fd = _resource->comp_channel->fd;
1271+
}
12411272
if (!move_to_rdma_resource_list) {
12421273
if (_resource->qp) {
12431274
if (IbvDestroyQp(_resource->qp) < 0) {
@@ -1327,12 +1358,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271358
}
13281359
CHECK(ep == s->_rdma_ep);
13291360

1330-
if (ep->GetAndAckEvents() < 0) {
1331-
const int saved_errno = errno;
1332-
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1333-
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1334-
s->description().c_str(), berror(saved_errno));
1335-
return;
1361+
if (!FLAGS_rdma_use_polling) {
1362+
if (ep->GetAndAckEvents() < 0) {
1363+
const int saved_errno = errno;
1364+
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1365+
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1366+
s->description().c_str(), berror(saved_errno));
1367+
return;
1368+
}
13361369
}
13371370

13381371
int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1382,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491382
return;
13501383
}
13511384
if (cnt == 0) {
1385+
if (FLAGS_rdma_use_polling) {
1386+
return;
1387+
}
13521388
if (!notified) {
13531389
// Since RDMA only provides one shot event, we have to call the
13541390
// notify function every time. Because there is a possibility
@@ -1370,7 +1406,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701406
}
13711407
if (ep->GetAndAckEvents() < 0) {
13721408
s->SetFailed(errno, "Fail to ack CQ event on %s",
1373-
s->description().c_str());
1409+
s->description().c_str());
13741410
return;
13751411
}
13761412
notified = false;
@@ -1474,6 +1510,10 @@ int RdmaEndpoint::GlobalInitialize() {
14741510
g_rdma_resource_list = res;
14751511
}
14761512

1513+
if (FLAGS_rdma_use_polling) {
1514+
_poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
1515+
}
1516+
14771517
return 0;
14781518
}
14791519

@@ -1486,6 +1526,119 @@ void RdmaEndpoint::GlobalRelease() {
14861526
delete res;
14871527
}
14881528
}
1529+
// release polling mode at exit or call RdmaEndpoint::PollingModeRelease
1530+
// explicitly
1531+
if (FLAGS_rdma_use_polling) {
1532+
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
1533+
PollingModeRelease(i);
1534+
}
1535+
}
1536+
}
1537+
1538+
std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
1539+
1540+
void RdmaEndpoint::SetCallbackFn(std::function<void()> cb, bthread_tag_t tag) {
1541+
auto& group = _poller_groups[tag];
1542+
auto& pollers = group.pollers;
1543+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1544+
auto& poller = pollers[i];
1545+
std::unique_lock<bthread::Mutex> lk(poller.callback_mutex);
1546+
poller.callback = cb;
1547+
}
1548+
}
1549+
1550+
int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag) {
1551+
if (!FLAGS_rdma_use_polling) {
1552+
return 0;
1553+
}
1554+
auto& group = _poller_groups[tag];
1555+
auto& pollers = group.pollers;
1556+
auto& running = group.running;
1557+
bool expected = false;
1558+
if (!running.compare_exchange_strong(expected, true)) {
1559+
return 0;
1560+
}
1561+
struct FnArgs {
1562+
Poller* poller;
1563+
std::atomic<bool>* running;
1564+
};
1565+
auto fn = [](void* p) -> void* {
1566+
std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
1567+
auto poller = args->poller;
1568+
auto running = args->running;
1569+
std::unordered_set<SocketId> cq_sids;
1570+
CqSidOp op;
1571+
while (running->load(std::memory_order_relaxed)) {
1572+
while (poller->op_queue.Dequeue(op)) {
1573+
if (op.type == CqSidOp::ADD) {
1574+
cq_sids.emplace(op.sid);
1575+
} else if (op.type == CqSidOp::REMOVE) {
1576+
cq_sids.erase(op.sid);
1577+
}
1578+
}
1579+
for (auto sid : cq_sids) {
1580+
SocketUniquePtr s;
1581+
if (Socket::Address(sid, &s) < 0) {
1582+
continue;
1583+
}
1584+
PollCq(s.get());
1585+
}
1586+
{
1587+
std::unique_lock<bthread::Mutex> lk(poller->callback_mutex);
1588+
if (poller->callback) {
1589+
poller->callback();
1590+
}
1591+
}
1592+
if (FLAGS_rdma_poller_yield) {
1593+
bthread_yield();
1594+
}
1595+
}
1596+
return nullptr;
1597+
};
1598+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1599+
auto args = new FnArgs{&pollers[i], &running};
1600+
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
1601+
attr.tag = tag;
1602+
auto rc = bthread_start_background(&pollers[i].tid, &attr, fn, args);
1603+
if (rc != 0) {
1604+
LOG(ERROR) << "Fail to start rdma polling bthread";
1605+
return -1;
1606+
}
1607+
}
1608+
return 0;
1609+
}
1610+
1611+
void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) {
1612+
if (!FLAGS_rdma_use_polling) {
1613+
return;
1614+
}
1615+
auto& group = _poller_groups[tag];
1616+
auto& pollers = group.pollers;
1617+
auto& running = group.running;
1618+
running.store(false, std::memory_order_relaxed);
1619+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1620+
bthread_join(pollers[i].tid, nullptr);
1621+
}
1622+
}
1623+
1624+
void RdmaEndpoint::PollerAddCqSid() {
1625+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1626+
auto& group = _poller_groups[bthread_self_tag()];
1627+
auto& pollers = group.pollers;
1628+
auto& poller = pollers[index];
1629+
if (_cq_sid != INVALID_SOCKET_ID) {
1630+
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD});
1631+
}
1632+
}
1633+
1634+
void RdmaEndpoint::PollerRemoveCqSid() {
1635+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1636+
auto& group = _poller_groups[bthread_self_tag()];
1637+
auto& pollers = group.pollers;
1638+
auto& poller = pollers[index];
1639+
if (_cq_sid != INVALID_SOCKET_ID) {
1640+
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE});
1641+
}
14891642
}
14901643

14911644
} // namespace rdma

0 commit comments

Comments
 (0)