Skip to content

Commit cbf674a

Browse files
committed
rdma support polling mode
1 parent 5c8bd06 commit cbf674a

9 files changed

Lines changed: 291 additions & 47 deletions

File tree

docs/cn/rdma.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register
4747

4848
RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。
4949

50+
RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。在配合使用spdk等驱动的时候,因为spdk只支持轮询模式,并且只能在单线程使用(或者叫Run To Completion模式上使用)执行一个任务过程中不允许被调度到别的线程上,所以这时候需要设置(rdma_enable_edisp_schedule)使事件驱动程序一直占用一个worker线程,不能调度别的任务。
51+
5052
# 参数
5153

5254
可配置参数说明:
@@ -68,3 +70,7 @@ RDMA是硬件相关的通信技术,有很多独特的概念,比如device、p
6870
* rdma_memory_pool_max_regions: 最大的内存池块数,默认16
6971
* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4
7072
* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128
73+
* rdma_use_polling: 是否使用RDMA的轮询模式,默认false
74+
* rdma_poller_num: 轮询模式下的poller数目,默认1
75+
* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false
76+
* rdma_enable_edisp_schedule: 让事件驱动器可以被调度,默认是true

src/brpc/channel.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
180180
return -1;
181181
}
182182
rdma::GlobalRdmaInitializeOrDie();
183+
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
184+
return -1;
185+
}
183186
#else
184187
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
185188
return -1;

src/brpc/rdma/rdma_endpoint.cpp

Lines changed: 193 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "brpc/rdma/rdma_helper.h"
3232
#include "brpc/rdma/rdma_endpoint.h"
3333

34+
DECLARE_int32(task_group_ntags);
3435

3536
namespace brpc {
3637
namespace rdma {
@@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5859
DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
5960
DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
6061
BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
62+
DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA.");
63+
DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode.");
64+
DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode.");
65+
DEFINE_bool(rdma_enable_edisp_schedule, true,
66+
"Enable event dispatcher schedule in RDMA");
6167

6268
static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
6369

@@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699705
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
700706
<< "Handshake ends (use tcp) on " << s->description();
701707
}
702-
703708
ep->TryReadOnTcp();
704709

705710
return NULL;
@@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411046
return NULL;
10421047
}
10431048

1044-
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1045-
if (!res->comp_channel) {
1046-
PLOG(WARNING) << "Fail to create comp channel for CQ";
1047-
delete res;
1048-
return NULL;
1049-
}
1049+
if (!FLAGS_rdma_use_polling) {
1050+
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1051+
if (!res->comp_channel) {
1052+
PLOG(WARNING) << "Fail to create comp channel for CQ";
1053+
delete res;
1054+
return NULL;
1055+
}
10501056

1051-
butil::make_close_on_exec(res->comp_channel->fd);
1052-
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1053-
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1054-
delete res;
1055-
return NULL;
1056-
}
1057+
butil::make_close_on_exec(res->comp_channel->fd);
1058+
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1059+
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1060+
delete res;
1061+
return NULL;
1062+
}
10571063

1058-
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1059-
NULL, res->comp_channel, GetRdmaCompVector());
1060-
if (!res->cq) {
1061-
PLOG(WARNING) << "Fail to create CQ";
1062-
delete res;
1063-
return NULL;
1064+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1065+
NULL, res->comp_channel, GetRdmaCompVector());
1066+
if (!res->cq) {
1067+
PLOG(WARNING) << "Fail to create CQ";
1068+
delete res;
1069+
return NULL;
1070+
}
1071+
} else {
1072+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1073+
NULL, NULL, 0);
1074+
if (!res->cq) {
1075+
PLOG(WARNING) << "Fail to create CQ";
1076+
delete res;
1077+
return NULL;
1078+
}
10641079
}
10651080

10661081
ibv_qp_init_attr attr;
@@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() {
11171132
return -1;
11181133
}
11191134

1120-
SocketOptions options;
1121-
options.user = this;
1122-
options.keytable_pool = _socket->_keytable_pool;
1123-
options.fd = _resource->comp_channel->fd;
1124-
options.on_edge_triggered_events = PollCq;
1125-
if (Socket::Create(options, &_cq_sid) < 0) {
1126-
PLOG(WARNING) << "Fail to create socket for cq";
1127-
return -1;
1128-
}
1135+
if (!FLAGS_rdma_use_polling) {
1136+
SocketOptions options;
1137+
options.user = this;
1138+
options.keytable_pool = _socket->_keytable_pool;
1139+
options.fd = _resource->comp_channel->fd;
1140+
options.on_edge_triggered_events = PollCq;
1141+
if (Socket::Create(options, &_cq_sid) < 0) {
1142+
PLOG(WARNING) << "Fail to create socket for cq";
1143+
return -1;
1144+
}
11291145

1130-
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1131-
PLOG(WARNING) << "Fail to arm CQ comp channel";
1132-
return -1;
1146+
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1147+
PLOG(WARNING) << "Fail to arm CQ comp channel";
1148+
return -1;
1149+
}
1150+
} else {
1151+
SocketOptions options;
1152+
options.user = this;
1153+
options.keytable_pool = _socket->_keytable_pool;
1154+
if (Socket::Create(options, &_cq_sid) < 0) {
1155+
PLOG(WARNING) << "Fail to create socket for cq";
1156+
return -1;
1157+
}
1158+
PollerAddCqSid();
11331159
}
11341160

11351161
_sbuf.resize(_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() {
12271253
if (!_resource) {
12281254
return;
12291255
}
1256+
if (FLAGS_rdma_use_polling) {
1257+
PollerRemoveCqSid();
1258+
}
12301259
bool move_to_rdma_resource_list = false;
12311260
if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321261
_rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() {
12371266
move_to_rdma_resource_list = true;
12381267
}
12391268
}
1240-
int fd = _resource->comp_channel->fd;
1269+
int fd = -1;
1270+
if (_resource->comp_channel) {
1271+
fd = _resource->comp_channel->fd;
1272+
}
12411273
if (!move_to_rdma_resource_list) {
12421274
if (_resource->qp) {
12431275
if (IbvDestroyQp(_resource->qp) < 0) {
@@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271359
}
13281360
CHECK(ep == s->_rdma_ep);
13291361

1330-
if (ep->GetAndAckEvents() < 0) {
1331-
const int saved_errno = errno;
1332-
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1333-
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1334-
s->description().c_str(), berror(saved_errno));
1335-
return;
1362+
if (!FLAGS_rdma_use_polling) {
1363+
if (ep->GetAndAckEvents() < 0) {
1364+
const int saved_errno = errno;
1365+
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1366+
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1367+
s->description().c_str(), berror(saved_errno));
1368+
return;
1369+
}
13361370
}
13371371

13381372
int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491383
return;
13501384
}
13511385
if (cnt == 0) {
1386+
if (FLAGS_rdma_use_polling) {
1387+
return;
1388+
}
13521389
if (!notified) {
13531390
// Since RDMA only provides one shot event, we have to call the
13541391
// notify function every time. Because there is a possibility
@@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701407
}
13711408
if (ep->GetAndAckEvents() < 0) {
13721409
s->SetFailed(errno, "Fail to ack CQ event on %s",
1373-
s->description().c_str());
1410+
s->description().c_str());
13741411
return;
13751412
}
13761413
notified = false;
@@ -1474,6 +1511,10 @@ int RdmaEndpoint::GlobalInitialize() {
14741511
g_rdma_resource_list = res;
14751512
}
14761513

1514+
if (FLAGS_rdma_use_polling) {
1515+
_poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
1516+
}
1517+
14771518
return 0;
14781519
}
14791520

@@ -1486,6 +1527,119 @@ void RdmaEndpoint::GlobalRelease() {
14861527
delete res;
14871528
}
14881529
}
1530+
// release polling mode at exit or call RdmaEndpoint::PollingModeRelease
1531+
// explicitly
1532+
if (FLAGS_rdma_use_polling) {
1533+
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
1534+
PollingModeRelease(i);
1535+
}
1536+
}
1537+
}
1538+
1539+
std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
1540+
1541+
void RdmaEndpoint::SetCallbackFn(std::function<void()> cb, bthread_tag_t tag) {
1542+
auto& group = _poller_groups[tag];
1543+
auto& pollers = group.pollers;
1544+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1545+
auto& poller = pollers[i];
1546+
std::unique_lock<bthread::Mutex> lk(poller.callback_mutex);
1547+
poller.callback = cb;
1548+
}
1549+
}
1550+
1551+
int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag) {
1552+
if (!FLAGS_rdma_use_polling) {
1553+
return 0;
1554+
}
1555+
auto& group = _poller_groups[tag];
1556+
auto& pollers = group.pollers;
1557+
auto& running = group.running;
1558+
bool expected = false;
1559+
if (!running.compare_exchange_strong(expected, true)) {
1560+
return 0;
1561+
}
1562+
struct FnArgs {
1563+
Poller* poller;
1564+
std::atomic<bool>* running;
1565+
};
1566+
auto fn = [](void* p) -> void* {
1567+
std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
1568+
auto poller = args->poller;
1569+
auto running = args->running;
1570+
std::unordered_set<SocketId> cq_sids;
1571+
CqSidOp op;
1572+
while (running->load(std::memory_order_relaxed)) {
1573+
while (poller->op_queue.Dequeue(op)) {
1574+
if (op.type == CqSidOp::ADD) {
1575+
cq_sids.emplace(op.sid);
1576+
} else if (op.type == CqSidOp::REMOVE) {
1577+
cq_sids.erase(op.sid);
1578+
}
1579+
}
1580+
for (auto sid : cq_sids) {
1581+
SocketUniquePtr s;
1582+
if (Socket::Address(sid, &s) < 0) {
1583+
continue;
1584+
}
1585+
PollCq(s.get());
1586+
}
1587+
{
1588+
std::unique_lock<bthread::Mutex> lk(poller->callback_mutex);
1589+
if (poller->callback) {
1590+
poller->callback();
1591+
}
1592+
}
1593+
if (FLAGS_rdma_poller_yield) {
1594+
bthread_yield();
1595+
}
1596+
}
1597+
return nullptr;
1598+
};
1599+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1600+
auto args = new FnArgs{&pollers[i], &running};
1601+
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
1602+
attr.tag = tag;
1603+
auto rc = bthread_start_background(&pollers[i].tid, &attr, fn, args);
1604+
if (rc != 0) {
1605+
LOG(ERROR) << "Fail to start rdma polling bthread";
1606+
return -1;
1607+
}
1608+
}
1609+
return 0;
1610+
}
1611+
1612+
void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) {
1613+
if (!FLAGS_rdma_use_polling) {
1614+
return;
1615+
}
1616+
auto& group = _poller_groups[tag];
1617+
auto& pollers = group.pollers;
1618+
auto& running = group.running;
1619+
running.store(false, std::memory_order_relaxed);
1620+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1621+
bthread_join(pollers[i].tid, nullptr);
1622+
}
1623+
}
1624+
1625+
void RdmaEndpoint::PollerAddCqSid() {
1626+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1627+
auto& group = _poller_groups[bthread_self_tag()];
1628+
auto& pollers = group.pollers;
1629+
auto& poller = pollers[index];
1630+
if (_cq_sid != INVALID_SOCKET_ID) {
1631+
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD});
1632+
}
1633+
}
1634+
1635+
void RdmaEndpoint::PollerRemoveCqSid() {
1636+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1637+
auto& group = _poller_groups[bthread_self_tag()];
1638+
auto& pollers = group.pollers;
1639+
auto& poller = pollers[index];
1640+
if (_cq_sid != INVALID_SOCKET_ID) {
1641+
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE});
1642+
}
14891643
}
14901644

14911645
} // namespace rdma

0 commit comments

Comments
 (0)