Skip to content

Commit 85cab8a

Browse files
committed
rdma support polling mode
1 parent b4ce61f commit 85cab8a

5 files changed

Lines changed: 205 additions & 39 deletions

File tree

docs/cn/rdma.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register
4747

4848
RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。
4949

50+
RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。
51+
5052
# 参数
5153

5254
可配置参数说明:
@@ -68,3 +70,6 @@ RDMA是硬件相关的通信技术,有很多独特的概念,比如device、p
6870
* rdma_memory_pool_max_regions: 最大的内存池块数,默认16
6971
* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4
7072
* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128
73+
* rdma_use_polling: 是否使用RDMA的轮询模式,默认false
74+
* rdma_poller_num: 轮询模式下的poller数目,默认1
75+
* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false

src/brpc/rdma/rdma_endpoint.cpp

Lines changed: 158 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5858
DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
5959
DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
6060
BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
61+
DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA.");
62+
DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode.");
63+
DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode.");
6164

6265
static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
6366

@@ -699,7 +702,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699702
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
700703
<< "Handshake ends (use tcp) on " << s->description();
701704
}
702-
703705
ep->TryReadOnTcp();
704706

705707
return NULL;
@@ -1041,26 +1043,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411043
return NULL;
10421044
}
10431045

1044-
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1045-
if (!res->comp_channel) {
1046-
PLOG(WARNING) << "Fail to create comp channel for CQ";
1047-
delete res;
1048-
return NULL;
1049-
}
1046+
if (!FLAGS_rdma_use_polling) {
1047+
res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
1048+
if (!res->comp_channel) {
1049+
PLOG(WARNING) << "Fail to create comp channel for CQ";
1050+
delete res;
1051+
return NULL;
1052+
}
10501053

1051-
butil::make_close_on_exec(res->comp_channel->fd);
1052-
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1053-
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1054-
delete res;
1055-
return NULL;
1056-
}
1054+
butil::make_close_on_exec(res->comp_channel->fd);
1055+
if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
1056+
PLOG(WARNING) << "Fail to set comp channel nonblocking";
1057+
delete res;
1058+
return NULL;
1059+
}
10571060

1058-
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1059-
NULL, res->comp_channel, GetRdmaCompVector());
1060-
if (!res->cq) {
1061-
PLOG(WARNING) << "Fail to create CQ";
1062-
delete res;
1063-
return NULL;
1061+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1062+
NULL, res->comp_channel, GetRdmaCompVector());
1063+
if (!res->cq) {
1064+
PLOG(WARNING) << "Fail to create CQ";
1065+
delete res;
1066+
return NULL;
1067+
}
1068+
} else {
1069+
res->cq = IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
1070+
NULL, NULL, 0);
1071+
if (!res->cq) {
1072+
PLOG(WARNING) << "Fail to create CQ";
1073+
delete res;
1074+
return NULL;
1075+
}
10641076
}
10651077

10661078
ibv_qp_init_attr attr;
@@ -1117,19 +1129,30 @@ int RdmaEndpoint::AllocateResources() {
11171129
return -1;
11181130
}
11191131

1120-
SocketOptions options;
1121-
options.user = this;
1122-
options.keytable_pool = _socket->_keytable_pool;
1123-
options.fd = _resource->comp_channel->fd;
1124-
options.on_edge_triggered_events = PollCq;
1125-
if (Socket::Create(options, &_cq_sid) < 0) {
1126-
PLOG(WARNING) << "Fail to create socket for cq";
1127-
return -1;
1128-
}
1132+
if (!FLAGS_rdma_use_polling) {
1133+
SocketOptions options;
1134+
options.user = this;
1135+
options.keytable_pool = _socket->_keytable_pool;
1136+
options.fd = _resource->comp_channel->fd;
1137+
options.on_edge_triggered_events = PollCq;
1138+
if (Socket::Create(options, &_cq_sid) < 0) {
1139+
PLOG(WARNING) << "Fail to create socket for cq";
1140+
return -1;
1141+
}
11291142

1130-
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1131-
PLOG(WARNING) << "Fail to arm CQ comp channel";
1132-
return -1;
1143+
if (ibv_req_notify_cq(_resource->cq, 1) < 0) {
1144+
PLOG(WARNING) << "Fail to arm CQ comp channel";
1145+
return -1;
1146+
}
1147+
} else {
1148+
SocketOptions options;
1149+
options.user = this;
1150+
options.keytable_pool = _socket->_keytable_pool;
1151+
if (Socket::Create(options, &_cq_sid) < 0) {
1152+
PLOG(WARNING) << "Fail to create socket for cq";
1153+
return -1;
1154+
}
1155+
PollerAddCqSocket();
11331156
}
11341157

11351158
_sbuf.resize(_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1250,9 @@ void RdmaEndpoint::DeallocateResources() {
12271250
if (!_resource) {
12281251
return;
12291252
}
1253+
if (FLAGS_rdma_use_polling) {
1254+
PollerRemoveCqSocket();
1255+
}
12301256
bool move_to_rdma_resource_list = false;
12311257
if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321258
_rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1263,10 @@ void RdmaEndpoint::DeallocateResources() {
12371263
move_to_rdma_resource_list = true;
12381264
}
12391265
}
1240-
int fd = _resource->comp_channel->fd;
1266+
int fd = -1;
1267+
if (_resource->comp_channel) {
1268+
fd = _resource->comp_channel->fd;
1269+
}
12411270
if (!move_to_rdma_resource_list) {
12421271
if (_resource->qp) {
12431272
if (IbvDestroyQp(_resource->qp) < 0) {
@@ -1327,12 +1356,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271356
}
13281357
CHECK(ep == s->_rdma_ep);
13291358

1330-
if (ep->GetAndAckEvents() < 0) {
1331-
const int saved_errno = errno;
1332-
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1333-
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1334-
s->description().c_str(), berror(saved_errno));
1335-
return;
1359+
if (!FLAGS_rdma_use_polling) {
1360+
if (ep->GetAndAckEvents() < 0) {
1361+
const int saved_errno = errno;
1362+
PLOG(ERROR) << "Fail to get cq event: " << s->description();
1363+
s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
1364+
s->description().c_str(), berror(saved_errno));
1365+
return;
1366+
}
13361367
}
13371368

13381369
int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1380,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491380
return;
13501381
}
13511382
if (cnt == 0) {
1383+
if (FLAGS_rdma_use_polling) {
1384+
return;
1385+
}
13521386
if (!notified) {
13531387
// Since RDMA only provides one shot event, we have to call the
13541388
// notify function every time. Because there is a possibility
@@ -1370,7 +1404,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701404
}
13711405
if (ep->GetAndAckEvents() < 0) {
13721406
s->SetFailed(errno, "Fail to ack CQ event on %s",
1373-
s->description().c_str());
1407+
s->description().c_str());
13741408
return;
13751409
}
13761410
notified = false;
@@ -1474,6 +1508,13 @@ int RdmaEndpoint::GlobalInitialize() {
14741508
g_rdma_resource_list = res;
14751509
}
14761510

1511+
if (FLAGS_rdma_use_polling) {
1512+
auto rc = PollingModeInitialize();
1513+
if (rc != 0) {
1514+
return -1;
1515+
}
1516+
}
1517+
14771518
return 0;
14781519
}
14791520

@@ -1486,6 +1527,84 @@ void RdmaEndpoint::GlobalRelease() {
14861527
delete res;
14871528
}
14881529
}
1530+
1531+
if (FLAGS_rdma_use_polling) {
1532+
PollingModeRelease();
1533+
}
1534+
}
1535+
1536+
std::vector<RdmaEndpoint::Poller> RdmaEndpoint::_pollers;
1537+
std::atomic<bool> RdmaEndpoint::_running(false);
1538+
std::function<void()> RdmaEndpoint::_callback(nullptr);
1539+
butil::Mutex RdmaEndpoint::_cb_mutex;
1540+
1541+
void RdmaEndpoint::SetCallbackFn(std::function<void()> cb) { _callback = cb; }
1542+
1543+
int RdmaEndpoint::PollingModeInitialize() {
1544+
auto fn = [](void* args) -> void* {
1545+
auto poller = static_cast<Poller*>(args);
1546+
while (_running.load(butil::memory_order_relaxed)) {
1547+
std::list<Socket*> sockets;
1548+
{
1549+
std::unique_lock<butil::Mutex> lk(poller->mutex);
1550+
sockets = poller->sockets; // copy all sockets is not good
1551+
}
1552+
for (auto m : sockets) {
1553+
PollCq(m);
1554+
}
1555+
{
1556+
std::unique_lock<butil::Mutex> lk(_cb_mutex);
1557+
if (_callback) {
1558+
_callback();
1559+
}
1560+
}
1561+
if (FLAGS_rdma_poller_yield) {
1562+
bthread_yield();
1563+
}
1564+
}
1565+
return nullptr;
1566+
};
1567+
_pollers = std::vector<Poller>(FLAGS_rdma_poller_num);
1568+
_running.store(true, butil::memory_order_relaxed);
1569+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1570+
auto rc = bthread_start_background(
1571+
&_pollers[i].tid, &BTHREAD_ATTR_NORMAL, fn, &_pollers[i]);
1572+
if (rc != 0) {
1573+
LOG(ERROR) << "Fail to start rdma polling bthread";
1574+
return -1;
1575+
}
1576+
}
1577+
return 0;
1578+
}
1579+
1580+
void RdmaEndpoint::PollingModeRelease() {
1581+
_running.store(false, butil::memory_order_relaxed);
1582+
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
1583+
bthread_join(_pollers[i].tid, nullptr);
1584+
}
1585+
}
1586+
1587+
// Add socket to poller
1588+
void RdmaEndpoint::PollerAddCqSocket() {
1589+
SocketUniquePtr s;
1590+
if (Socket::Address(_cq_sid, &s) != 0) {
1591+
LOG(ERROR) << "Fail to address socket id: " << _cq_sid;
1592+
return;
1593+
}
1594+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1595+
auto& poller = _pollers[index];
1596+
std::unique_lock<butil::Mutex> lk(poller.mutex);
1597+
_cq_iter = poller.sockets.insert(poller.sockets.end(), s.get());
1598+
}
1599+
1600+
// Remove socket from poller
1601+
void RdmaEndpoint::PollerRemoveCqSocket() {
1602+
auto index = butil::fmix32(_cq_sid) % FLAGS_rdma_poller_num;
1603+
auto& poller = _pollers[index];
1604+
std::unique_lock<butil::Mutex> lk(poller.mutex);
1605+
if (_cq_iter != poller.sockets.end()) {
1606+
poller.sockets.erase(_cq_iter);
1607+
}
14891608
}
14901609

14911610
} // namespace rdma

src/brpc/rdma/rdma_endpoint.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <iostream>
2424
#include <string>
2525
#include <vector>
26+
#include <list>
27+
#include <functional>
2628
#include <infiniband/verbs.h>
2729
#include "butil/atomicops.h"
2830
#include "butil/iobuf.h"
@@ -34,6 +36,8 @@ namespace brpc {
3436
class Socket;
3537
namespace rdma {
3638

39+
DECLARE_bool(rdma_use_polling);
40+
3741
class RdmaConnect : public AppConnect {
3842
public:
3943
void StartConnect(const Socket* socket,
@@ -90,6 +94,9 @@ friend class brpc::Socket;
9094
// Callback when there is new epollin event on TCP fd
9195
static void OnNewDataFromTcp(Socket* m);
9296

97+
// Set callback function
98+
static void SetCallbackFn(std::function<void()> cb);
99+
93100
private:
94101
enum State {
95102
UNINIT = 0x0,
@@ -191,6 +198,17 @@ friend class brpc::Socket;
191198
// Try to read data on TCP fd in _socket
192199
inline void TryReadOnTcp();
193200

201+
// Initialize polling mode
202+
static int PollingModeInitialize();
203+
204+
static void PollingModeRelease();
205+
206+
// Add cq socket to poller
207+
void PollerAddCqSocket();
208+
209+
// Remove cq socket to poller
210+
void PollerRemoveCqSocket();
211+
194212
// Not owner
195213
Socket* _socket;
196214

@@ -206,6 +224,9 @@ friend class brpc::Socket;
206224
// the SocketId which wrap the comp channel of CQ
207225
SocketId _cq_sid;
208226

227+
// the Socket pointer's iterator in list
228+
std::list<Socket*>::iterator _cq_iter;
229+
209230
// Capacity of local Send Queue and local Recv Queue
210231
uint16_t _sq_size;
211232
uint16_t _rq_size;
@@ -245,6 +266,18 @@ friend class brpc::Socket;
245266
butil::atomic<int> *_read_butex;
246267

247268
DISALLOW_COPY_AND_ASSIGN(RdmaEndpoint);
269+
270+
struct Poller {
271+
bthread_t tid;
272+
// TODO(yangliming): lockless queue is better
273+
std::list<Socket*> sockets;
274+
butil::Mutex mutex;
275+
};
276+
static std::vector<Poller> _pollers;
277+
static std::atomic<bool> _running;
278+
// Callback used for io_uring/spdk async engine
279+
static std::function<void()> _callback;
280+
static butil::Mutex _cb_mutex;
248281
};
249282

250283
} // namespace rdma

src/brpc/socket.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2273,6 +2273,14 @@ int Socket::OnInputEvent(void* user_data, uint32_t events,
22732273
attr.tag = bthread_self_tag();
22742274
if (FLAGS_usercode_in_coroutine) {
22752275
ProcessEvent(p);
2276+
#if BRPC_WITH_RDMA
2277+
} else if (rdma::FLAGS_rdma_use_polling && p->_rdma_state == RDMA_ON) {
2278+
auto rc = bthread_start_background(&tid, &attr, ProcessEvent, p);
2279+
if (rc != 0) {
2280+
LOG(FATAL) << "Fail to start ProcessEvent";
2281+
ProcessEvent(p);
2282+
}
2283+
#endif
22762284
} else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
22772285
LOG(FATAL) << "Fail to start ProcessEvent";
22782286
ProcessEvent(p);

src/brpc/span.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818

1919
#include <netinet/in.h>
20+
#include <functional>
2021
#include <gflags/gflags.h>
2122
#include <leveldb/db.h>
2223
#include <leveldb/comparator.h>

0 commit comments

Comments
 (0)