Skip to content

Commit 0184757

Browse files
committed
rdma support polling mode
1 parent df0fbdc commit 0184757

9 files changed

Lines changed: 286 additions & 47 deletions

File tree

docs/cn/rdma.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register
4747

4848
RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。
4949

50+
RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。在配合使用spdk等驱动的时候,因为spdk只支持轮询模式,并且只能在单线程使用(或者叫Run To Completion模式上使用)执行一个任务过程中不允许被调度到别的线程上,所以这时候需要设置(rdma_enable_edisp_schedule)使事件驱动程序一直占用一个worker线程,不能调度别的任务。
51+
5052
# 参数
5153

5254
可配置参数说明:
@@ -68,3 +70,7 @@ RDMA是硬件相关的通信技术,有很多独特的概念,比如device、p
6870
* rdma_memory_pool_max_regions: 最大的内存池块数,默认16
6971
* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4
7072
* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128
73+
* rdma_use_polling: 是否使用RDMA的轮询模式,默认false
74+
* rdma_poller_num: 轮询模式下的poller数目,默认1
75+
* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false
76+
* rdma_enable_edisp_schedule: 让事件驱动器可以被调度,默认是true

src/brpc/channel.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ Channel::~Channel() {
147147
const ChannelSignature sig = ComputeChannelSignature(_options);
148148
SocketMapRemove(SocketMapKey(_server_address, sig));
149149
}
150+
if (_options.use_rdma) {
151+
#if BRPC_WITH_RDMA
152+
rdma::ReleasePollingModeWithTag(bthread_self_tag());
153+
#endif
154+
}
150155
}
151156

152157
#if BRPC_WITH_RDMA
@@ -180,6 +185,9 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
180185
return -1;
181186
}
182187
rdma::GlobalRdmaInitializeOrDie();
188+
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
189+
return -1;
190+
}
183191
#else
184192
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
185193
return -1;

src/brpc/rdma/rdma_endpoint.cpp

Lines changed: 177 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "brpc/rdma/rdma_helper.h"
3232
#include "brpc/rdma/rdma_endpoint.h"
3333

34+
DECLARE_int32(task_group_ntags);
3435

3536
namespace brpc {
3637
namespace rdma {
@@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5859
DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
5960
DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
6061
BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
62+
DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA.");
63+
DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode.");
64+
DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode.");
65+
DEFINE_bool(rdma_enable_edisp_schedule, true,
66+
"Enable event dispatcher schedule in RDMA");
6167

6268
static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
6369

@@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699705
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
700706
<< "Handshake ends (use tcp) on " << s->description();
701707
}
702-
703708
ep->TryReadOnTcp();
704709

705710
return NULL;
@@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411046
return NULL;
10421047
}
10431048

1044-
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1045-
if (!res->comp_channel) {
1046-
PLOG(WARNING) << "Fail to create comp channel for CQ";
1047-
delete res;
1048-
return NULL;
1049-
}
1049+
if (!FLAGS_rdma_use_polling) {
1050+
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1051+
if (!res->comp_channel) {
1052+
PLOG(WARNING) << "Fail to create comp channel for CQ";
1053+
delete res;
1054+
return NULL;
1055+
}
10501056

1051-
butil::make_close_on_exec(res->comp_channel->fd);
1052-
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1053-
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1054-
delete res;
1055-
return NULL;
1056-
}
1057+
butil::make_close_on_exec(res->comp_channel->fd);
1058+
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1059+
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1060+
delete res;
1061+
return NULL;
1062+
}
10571063

1058-
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1059-
NULL, res->comp_channel, GetRdmaCompVector());
1060-
if (!res->cq) {
1061-
PLOG(WARNING) << "Fail to create CQ";
1062-
delete res;
1063-
return NULL;
1064+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1065+
NULL, res->comp_channel, GetRdmaCompVector());
1066+
if (!res->cq) {
1067+
PLOG(WARNING) << "Fail to create CQ";
1068+
delete res;
1069+
return NULL;
1070+
}
1071+
} else {
1072+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1073+
NULL, NULL, 0);
1074+
if (!res->cq) {
1075+
PLOG(WARNING) << "Fail to create CQ";
1076+
delete res;
1077+
return NULL;
1078+
}
10641079
}
10651080

10661081
ibv_qp_init_attr attr;
@@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() {
11171132
return -1;
11181133
}
11191134

1120-
SocketOptions options;
1121-
options.user = this;
1122-
options.keytable_pool = _socket->_keytable_pool;
1123-
options.fd = _resource->comp_channel->fd;
1124-
options.on_edge_triggered_events = PollCq;
1125-
if (Socket::Create(options, &_cq_sid) < 0) {
1126-
PLOG(WARNING) << "Fail to create socket for cq";
1127-
return -1;
1128-
}
1135+
if (!FLAGS_rdma_use_polling) {
1136+
SocketOptions options;
1137+
options.user = this;
1138+
options.keytable_pool = _socket->_keytable_pool;
1139+
options.fd = _resource->comp_channel->fd;
1140+
options.on_edge_triggered_events = PollCq;
1141+
if (Socket::Create(options, &_cq_sid) < 0) {
1142+
PLOG(WARNING) << "Fail to create socket for cq";
1143+
return -1;
1144+
}
11291145

1130-
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1131-
PLOG(WARNING) << "Fail to arm CQ comp channel";
1132-
return -1;
1146+
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1147+
PLOG(WARNING) << "Fail to arm CQ comp channel";
1148+
return -1;
1149+
}
1150+
} else {
1151+
SocketOptions options;
1152+
options.user = this;
1153+
options.keytable_pool = _socket->_keytable_pool;
1154+
if (Socket::Create(options, &_cq_sid) < 0) {
1155+
PLOG(WARNING) << "Fail to create socket for cq";
1156+
return -1;
1157+
}
1158+
PollerAddCqSid();
11331159
}
11341160

11351161
_sbuf.resize(_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() {
12271253
if (!_resource) {
12281254
return;
12291255
}
1256+
if (FLAGS_rdma_use_polling) {
1257+
PollerRemoveCqSid();
1258+
}
12301259
bool move_to_rdma_resource_list = false;
12311260
if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321261
_rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() {
12371266
move_to_rdma_resource_list = true;
12381267
}
12391268
}
1240-
int fd = _resource->comp_channel->fd;
1269+
int fd = -1;
1270+
if (_resource->comp_channel) {
1271+
fd = _resource->comp_channel->fd;
1272+
}
12411273
if (!move_to_rdma_resource_list) {
12421274
if (_resource->qp) {
12431275
if (IbvDestroyQp(_resource->qp) < 0) {
@@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271359
}
13281360
CHECK(ep == s->_rdma_ep);
13291361

1330-
if (ep->GetAndAckEvents() < 0) {
1331-
const int saved_errno = errno;
1332-
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1333-
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1334-
s->description().c_str(), berror(saved_errno));
1335-
return;
1362+
if (!FLAGS_rdma_use_polling) {
1363+
if (ep->GetAndAckEvents() < 0) {
1364+
const int saved_errno = errno;
1365+
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1366+
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1367+
s->description().c_str(), berror(saved_errno));
1368+
return;
1369+
}
13361370
}
13371371

13381372
int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491383
return;
13501384
}
13511385
if (cnt == 0) {
1386+
if (FLAGS_rdma_use_polling) {
1387+
return;
1388+
}
13521389
if (!notified) {
13531390
// Since RDMA only provides one shot event, we have to call the
13541391
// notify function every time. Because there is a possibility
@@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701407
}
13711408
if (ep->GetAndAckEvents() < 0) {
13721409
s->SetFailed(errno, "Fail to ack CQ event on %s",
1373-
s->description().c_str());
1410+
s->description().c_str());
13741411
return;
13751412
}
13761413
notified = false;
@@ -1474,6 +1511,10 @@ int RdmaEndpoint::GlobalInitialize() {
14741511
g_rdma_resource_list = res;
14751512
}
14761513

1514+
if (FLAGS_rdma_use_polling) {
1515+
_poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
1516+
}
1517+
14771518
return 0;
14781519
}
14791520

@@ -1488,6 +1529,103 @@ void RdmaEndpoint::GlobalRelease() {
14881529
}
14891530
}
14901531

1532+
std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
1533+
1534+
void RdmaEndpoint::SetCallbackFn(std::function<void()> cb, bthread_tag_t tag) {
1535+
auto& group = _poller_groups[tag];
1536+
auto& pollers = group.pollers;
1537+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1538+
auto& poller = pollers[i];
1539+
std::unique_lock<bthread::Mutex> lk(poller.cb_mutex);
1540+
poller.callback = cb;
1541+
}
1542+
}
1543+
1544+
int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag) {
1545+
auto& group = _poller_groups[tag];
1546+
auto& pollers = group.pollers;
1547+
auto& running = group.running;
1548+
struct FnArgs {
1549+
Poller* poller;
1550+
std::atomic<bool>* running;
1551+
};
1552+
auto fn = [](void* p) -> void* {
1553+
std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
1554+
auto poller = args->poller;
1555+
auto running = args->running;
1556+
std::unordered_set<SocketId> cq_sids;
1557+
CqSidOp op;
1558+
while (running->load(butil::memory_order_relaxed)) {
1559+
while (poller->op_queue.Dequeue(op)) {
1560+
if (op.type == CqSidOp::ADD) {
1561+
cq_sids.emplace(op.sid);
1562+
} else if (op.type == CqSidOp::REMOVE) {
1563+
cq_sids.erase(op.sid);
1564+
}
1565+
}
1566+
for (auto sid : cq_sids) {
1567+
SocketUniquePtr s;
1568+
if (Socket::Address(sid, &s) < 0) {
1569+
continue;
1570+
}
1571+
PollCq(s.get());
1572+
}
1573+
{
1574+
std::unique_lock<bthread::Mutex> lk(poller->cb_mutex);
1575+
if (poller->callback) {
1576+
poller->callback();
1577+
}
1578+
}
1579+
if (FLAGS_rdma_poller_yield) {
1580+
bthread_yield();
1581+
}
1582+
}
1583+
return nullptr;
1584+
};
1585+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1586+
auto args = new FnArgs{&pollers[i], &running};
1587+
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
1588+
attr.tag = tag;
1589+
auto rc = bthread_start_background(&pollers[i].tid, &attr, fn, args);
1590+
if (rc != 0) {
1591+
LOG(ERROR) << "Fail to start rdma polling bthread";
1592+
return -1;
1593+
}
1594+
}
1595+
running.store(true, butil::memory_order_relaxed);
1596+
return 0;
1597+
}
1598+
1599+
void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) {
1600+
auto& group = _poller_groups[tag];
1601+
auto& pollers = group.pollers;
1602+
auto& running = group.running;
1603+
running.store(false, butil::memory_order_relaxed);
1604+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1605+
bthread_join(pollers[i].tid, nullptr);
1606+
}
1607+
}
1608+
1609+
void RdmaEndpoint::PollerAddCqSid() {
1610+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1611+
auto& group = _poller_groups[bthread_self_tag()];
1612+
auto& pollers = group.pollers;
1613+
auto& poller = pollers[index];
1614+
if (_cq_sid != INVALID_SOCKET_ID) {
1615+
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD});
1616+
}
1617+
}
1618+
1619+
void RdmaEndpoint::PollerRemoveCqSid() {
1620+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1621+
auto& group = _poller_groups[bthread_self_tag()];
1622+
auto& pollers = group.pollers;
1623+
auto& poller = pollers[index];
1624+
if (_cq_sid != INVALID_SOCKET_ID) {
1625+
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE});
1626+
}
1627+
}
1628+
14911629
} // namespace rdma
14921630
} // namespace brpc
14931631

0 commit comments

Comments
 (0)