@@ -58,6 +58,9 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5858DEFINE_int32 (rdma_prepared_qp_cnt, 1024 , " Initial count of prepared QP." );
5959DEFINE_bool (rdma_trace_verbose, false , " Print log message verbosely" );
6060BRPC_VALIDATE_GFLAG (rdma_trace_verbose, brpc::PassValidate);
61+ DEFINE_bool (rdma_use_polling, false , " Use polling mode for RDMA." );
62+ DEFINE_int32 (rdma_poller_num, 1 , " Poller number in RDMA polling mode." );
63+ DEFINE_bool (rdma_poller_yield, false , " Yield thread in RDMA polling mode." );
6164
6265static const size_t IOBUF_BLOCK_HEADER_LEN = 32 ; // implementation-dependent
6366
@@ -699,7 +702,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699702 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
700703 << " Handshake ends (use tcp) on " << s->description ();
701704 }
702-
703705 ep->TryReadOnTcp ();
704706
705707 return NULL ;
@@ -1041,26 +1043,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411043 return NULL ;
10421044 }
10431045
1044- res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1045- if (!res->comp_channel ) {
1046- PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1047- delete res;
1048- return NULL ;
1049- }
1046+ if (!FLAGS_rdma_use_polling) {
1047+ res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1048+ if (!res->comp_channel ) {
1049+ PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1050+ delete res;
1051+ return NULL ;
1052+ }
10501053
1051- butil::make_close_on_exec (res->comp_channel ->fd );
1052- if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1053- PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1054- delete res;
1055- return NULL ;
1056- }
1054+ butil::make_close_on_exec (res->comp_channel ->fd );
1055+ if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1056+ PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1057+ delete res;
1058+ return NULL ;
1059+ }
10571060
1058- res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1059- NULL , res->comp_channel , GetRdmaCompVector ());
1060- if (!res->cq ) {
1061- PLOG (WARNING) << " Fail to create CQ" ;
1062- delete res;
1063- return NULL ;
1061+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1062+ NULL , res->comp_channel , GetRdmaCompVector ());
1063+ if (!res->cq ) {
1064+ PLOG (WARNING) << " Fail to create CQ" ;
1065+ delete res;
1066+ return NULL ;
1067+ }
1068+ } else {
1069+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1070+ NULL , NULL , 0 );
1071+ if (!res->cq ) {
1072+ PLOG (WARNING) << " Fail to create CQ" ;
1073+ delete res;
1074+ return NULL ;
1075+ }
10641076 }
10651077
10661078 ibv_qp_init_attr attr;
@@ -1117,19 +1129,30 @@ int RdmaEndpoint::AllocateResources() {
11171129 return -1 ;
11181130 }
11191131
1120- SocketOptions options;
1121- options.user = this ;
1122- options.keytable_pool = _socket->_keytable_pool ;
1123- options.fd = _resource->comp_channel ->fd ;
1124- options.on_edge_triggered_events = PollCq;
1125- if (Socket::Create (options, &_cq_sid) < 0 ) {
1126- PLOG (WARNING) << " Fail to create socket for cq" ;
1127- return -1 ;
1128- }
1132+ if (!FLAGS_rdma_use_polling) {
1133+ SocketOptions options;
1134+ options.user = this ;
1135+ options.keytable_pool = _socket->_keytable_pool ;
1136+ options.fd = _resource->comp_channel ->fd ;
1137+ options.on_edge_triggered_events = PollCq;
1138+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1139+ PLOG (WARNING) << " Fail to create socket for cq" ;
1140+ return -1 ;
1141+ }
11291142
1130- if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1131- PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1132- return -1 ;
1143+ if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1144+ PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1145+ return -1 ;
1146+ }
1147+ } else {
1148+ SocketOptions options;
1149+ options.user = this ;
1150+ options.keytable_pool = _socket->_keytable_pool ;
1151+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1152+ PLOG (WARNING) << " Fail to create socket for cq" ;
1153+ return -1 ;
1154+ }
1155+ PollerAddCqSocket ();
11331156 }
11341157
11351158 _sbuf.resize (_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1250,9 @@ void RdmaEndpoint::DeallocateResources() {
12271250 if (!_resource) {
12281251 return ;
12291252 }
1253+ if (FLAGS_rdma_use_polling) {
1254+ PollerRemoveCqSocket ();
1255+ }
12301256 bool move_to_rdma_resource_list = false ;
12311257 if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321258 _rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1263,10 @@ void RdmaEndpoint::DeallocateResources() {
12371263 move_to_rdma_resource_list = true ;
12381264 }
12391265 }
1240- int fd = _resource->comp_channel ->fd ;
1266+ int fd = -1 ;
1267+ if (_resource->comp_channel ) {
1268+ fd = _resource->comp_channel ->fd ;
1269+ }
12411270 if (!move_to_rdma_resource_list) {
12421271 if (_resource->qp ) {
12431272 if (IbvDestroyQp (_resource->qp ) < 0 ) {
@@ -1327,12 +1356,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271356 }
13281357 CHECK (ep == s->_rdma_ep );
13291358
1330- if (ep->GetAndAckEvents () < 0 ) {
1331- const int saved_errno = errno;
1332- PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1333- s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1334- s->description ().c_str (), berror (saved_errno));
1335- return ;
1359+ if (!FLAGS_rdma_use_polling) {
1360+ if (ep->GetAndAckEvents () < 0 ) {
1361+ const int saved_errno = errno;
1362+ PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1363+ s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1364+ s->description ().c_str (), berror (saved_errno));
1365+ return ;
1366+ }
13361367 }
13371368
13381369 int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1380,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491380 return ;
13501381 }
13511382 if (cnt == 0 ) {
1383+ if (FLAGS_rdma_use_polling) {
1384+ return ;
1385+ }
13521386 if (!notified) {
13531387 // Since RDMA only provides one shot event, we have to call the
13541388 // notify function every time. Because there is a possibility
@@ -1370,7 +1404,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701404 }
13711405 if (ep->GetAndAckEvents () < 0 ) {
13721406 s->SetFailed (errno, " Fail to ack CQ event on %s" ,
1373- s->description ().c_str ());
1407+ s->description ().c_str ());
13741408 return ;
13751409 }
13761410 notified = false ;
@@ -1474,6 +1508,13 @@ int RdmaEndpoint::GlobalInitialize() {
14741508 g_rdma_resource_list = res;
14751509 }
14761510
1511+ if (FLAGS_rdma_use_polling) {
1512+ auto rc = PollingModeInitialize ();
1513+ if (rc != 0 ) {
1514+ return -1 ;
1515+ }
1516+ }
1517+
14771518 return 0 ;
14781519}
14791520
@@ -1486,6 +1527,84 @@ void RdmaEndpoint::GlobalRelease() {
14861527 delete res;
14871528 }
14881529 }
1530+
1531+ if (FLAGS_rdma_use_polling) {
1532+ PollingModeRelease ();
1533+ }
1534+ }
1535+
1536+ std::vector<RdmaEndpoint::Poller> RdmaEndpoint::_pollers;
1537+ std::atomic<bool > RdmaEndpoint::_running (false );
1538+ std::function<void ()> RdmaEndpoint::_callback(nullptr );
1539+ butil::Mutex RdmaEndpoint::_cb_mutex;
1540+
1541+ void RdmaEndpoint::SetCallbackFn (std::function<void ()> cb) { _callback = cb; }
1542+
1543+ int RdmaEndpoint::PollingModeInitialize () {
1544+ auto fn = [](void * args) -> void * {
1545+ auto poller = static_cast <Poller*>(args);
1546+ while (_running.load (butil::memory_order_relaxed)) {
1547+ std::list<Socket*> sockets;
1548+ {
1549+ std::unique_lock<butil::Mutex> lk (poller->mutex );
1550+ sockets = poller->sockets ; // copy all sockets is not good
1551+ }
1552+ for (auto m : sockets) {
1553+ PollCq (m);
1554+ }
1555+ {
1556+ std::unique_lock<butil::Mutex> lk (_cb_mutex);
1557+ if (_callback) {
1558+ _callback ();
1559+ }
1560+ }
1561+ if (FLAGS_rdma_poller_yield) {
1562+ bthread_yield ();
1563+ }
1564+ }
1565+ return nullptr ;
1566+ };
1567+ _pollers = std::move (std::vector<Poller>(FLAGS_rdma_poller_num));
1568+ _running.store (true , butil::memory_order_relaxed);
1569+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1570+ auto rc = bthread_start_background (
1571+ &_pollers[i].tid , &BTHREAD_ATTR_NORMAL, fn, &_pollers[i]);
1572+ if (rc != 0 ) {
1573+ LOG (ERROR) << " Fail to start rdma polling bthread" ;
1574+ return -1 ;
1575+ }
1576+ }
1577+ return 0 ;
1578+ }
1579+
1580+ void RdmaEndpoint::PollingModeRelease () {
1581+ _running.store (false , butil::memory_order_relaxed);
1582+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1583+ bthread_join (_pollers[i].tid , nullptr );
1584+ }
1585+ }
1586+
1587+ // Add socket to poller
1588+ void RdmaEndpoint::PollerAddCqSocket () {
1589+ SocketUniquePtr s;
1590+ if (Socket::Address (_cq_sid, &s) != 0 ) {
1591+ LOG (ERROR) << " Fail to address socket id: " << _cq_sid;
1592+ return ;
1593+ }
1594+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1595+ auto & poller = _pollers[index];
1596+ std::unique_lock<butil::Mutex> lk (poller.mutex );
1597+ _cq_iter = poller.sockets .insert (poller.sockets .end (), s.get ());
1598+ }
1599+
1600+ // Remove socket from poller
1601+ void RdmaEndpoint::PollerRemoveCqSocket () {
1602+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1603+ auto & poller = _pollers[index];
1604+ std::unique_lock<butil::Mutex> lk (poller.mutex );
1605+ if (_cq_iter != poller.sockets .end ()) {
1606+ poller.sockets .erase (_cq_iter);
1607+ }
14891608}
14901609
14911610} // namespace rdma
0 commit comments