@@ -58,6 +58,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5858DEFINE_int32 (rdma_prepared_qp_cnt, 1024 , " Initial count of prepared QP." );
5959DEFINE_bool (rdma_trace_verbose, false , " Print log message verbosely" );
6060BRPC_VALIDATE_GFLAG (rdma_trace_verbose, brpc::PassValidate);
61+ DEFINE_bool (rdma_use_polling, false , " Use polling mode for RDMA." );
62+ DEFINE_int32 (rdma_poller_num, 1 , " Poller number in RDMA polling mode." );
63+ DEFINE_bool (rdma_poller_yield, false , " Yield thread in RDMA polling mode." );
64+ DEFINE_bool (rdma_enable_edisp_schedule, true ,
65+ " Enable event dispatcher schedule in RDMA" );
6166
6267static const size_t IOBUF_BLOCK_HEADER_LEN = 32 ; // implementation-dependent
6368
@@ -699,7 +704,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699704 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
700705 << " Handshake ends (use tcp) on " << s->description ();
701706 }
702-
703707 ep->TryReadOnTcp ();
704708
705709 return NULL ;
@@ -1041,26 +1045,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411045 return NULL ;
10421046 }
10431047
1044- res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1045- if (!res->comp_channel ) {
1046- PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1047- delete res;
1048- return NULL ;
1049- }
1048+ if (!FLAGS_rdma_use_polling) {
1049+ res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1050+ if (!res->comp_channel ) {
1051+ PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1052+ delete res;
1053+ return NULL ;
1054+ }
10501055
1051- butil::make_close_on_exec (res->comp_channel ->fd );
1052- if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1053- PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1054- delete res;
1055- return NULL ;
1056- }
1056+ butil::make_close_on_exec (res->comp_channel ->fd );
1057+ if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1058+ PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1059+ delete res;
1060+ return NULL ;
1061+ }
10571062
1058- res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1059- NULL , res->comp_channel , GetRdmaCompVector ());
1060- if (!res->cq ) {
1061- PLOG (WARNING) << " Fail to create CQ" ;
1062- delete res;
1063- return NULL ;
1063+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1064+ NULL , res->comp_channel , GetRdmaCompVector ());
1065+ if (!res->cq ) {
1066+ PLOG (WARNING) << " Fail to create CQ" ;
1067+ delete res;
1068+ return NULL ;
1069+ }
1070+ } else {
1071+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1072+ NULL , NULL , 0 );
1073+ if (!res->cq ) {
1074+ PLOG (WARNING) << " Fail to create CQ" ;
1075+ delete res;
1076+ return NULL ;
1077+ }
10641078 }
10651079
10661080 ibv_qp_init_attr attr;
@@ -1117,19 +1131,30 @@ int RdmaEndpoint::AllocateResources() {
11171131 return -1 ;
11181132 }
11191133
1120- SocketOptions options;
1121- options.user = this ;
1122- options.keytable_pool = _socket->_keytable_pool ;
1123- options.fd = _resource->comp_channel ->fd ;
1124- options.on_edge_triggered_events = PollCq;
1125- if (Socket::Create (options, &_cq_sid) < 0 ) {
1126- PLOG (WARNING) << " Fail to create socket for cq" ;
1127- return -1 ;
1128- }
1134+ if (!FLAGS_rdma_use_polling) {
1135+ SocketOptions options;
1136+ options.user = this ;
1137+ options.keytable_pool = _socket->_keytable_pool ;
1138+ options.fd = _resource->comp_channel ->fd ;
1139+ options.on_edge_triggered_events = PollCq;
1140+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1141+ PLOG (WARNING) << " Fail to create socket for cq" ;
1142+ return -1 ;
1143+ }
11291144
1130- if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1131- PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1132- return -1 ;
1145+ if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1146+ PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1147+ return -1 ;
1148+ }
1149+ } else {
1150+ SocketOptions options;
1151+ options.user = this ;
1152+ options.keytable_pool = _socket->_keytable_pool ;
1153+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1154+ PLOG (WARNING) << " Fail to create socket for cq" ;
1155+ return -1 ;
1156+ }
1157+ PollerAddCqSocket ();
11331158 }
11341159
11351160 _sbuf.resize (_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1252,9 @@ void RdmaEndpoint::DeallocateResources() {
12271252 if (!_resource) {
12281253 return ;
12291254 }
1255+ if (FLAGS_rdma_use_polling) {
1256+ PollerRemoveCqSocket ();
1257+ }
12301258 bool move_to_rdma_resource_list = false ;
12311259 if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321260 _rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1265,10 @@ void RdmaEndpoint::DeallocateResources() {
12371265 move_to_rdma_resource_list = true ;
12381266 }
12391267 }
1240- int fd = _resource->comp_channel ->fd ;
1268+ int fd = -1 ;
1269+ if (_resource->comp_channel ) {
1270+ fd = _resource->comp_channel ->fd ;
1271+ }
12411272 if (!move_to_rdma_resource_list) {
12421273 if (_resource->qp ) {
12431274 if (IbvDestroyQp (_resource->qp ) < 0 ) {
@@ -1327,12 +1358,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271358 }
13281359 CHECK (ep == s->_rdma_ep );
13291360
1330- if (ep->GetAndAckEvents () < 0 ) {
1331- const int saved_errno = errno;
1332- PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1333- s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1334- s->description ().c_str (), berror (saved_errno));
1335- return ;
1361+ if (!FLAGS_rdma_use_polling) {
1362+ if (ep->GetAndAckEvents () < 0 ) {
1363+ const int saved_errno = errno;
1364+ PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1365+ s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1366+ s->description ().c_str (), berror (saved_errno));
1367+ return ;
1368+ }
13361369 }
13371370
13381371 int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1382,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491382 return ;
13501383 }
13511384 if (cnt == 0 ) {
1385+ if (FLAGS_rdma_use_polling) {
1386+ return ;
1387+ }
13521388 if (!notified) {
13531389 // Since RDMA only provides one shot event, we have to call the
13541390 // notify function every time. Because there is a possibility
@@ -1370,7 +1406,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701406 }
13711407 if (ep->GetAndAckEvents () < 0 ) {
13721408 s->SetFailed (errno, " Fail to ack CQ event on %s" ,
1373- s->description ().c_str ());
1409+ s->description ().c_str ());
13741410 return ;
13751411 }
13761412 notified = false ;
@@ -1474,6 +1510,13 @@ int RdmaEndpoint::GlobalInitialize() {
14741510 g_rdma_resource_list = res;
14751511 }
14761512
1513+ if (FLAGS_rdma_use_polling) {
1514+ auto rc = PollingModeInitialize ();
1515+ if (rc != 0 ) {
1516+ return -1 ;
1517+ }
1518+ }
1519+
14771520 return 0 ;
14781521}
14791522
@@ -1486,6 +1529,88 @@ void RdmaEndpoint::GlobalRelease() {
14861529 delete res;
14871530 }
14881531 }
1532+
1533+ if (FLAGS_rdma_use_polling) {
1534+ PollingModeRelease ();
1535+ }
1536+ }
1537+
1538+ std::vector<RdmaEndpoint::Poller> RdmaEndpoint::_pollers;
1539+ std::atomic<bool > RdmaEndpoint::_running (false );
1540+
1541+ void RdmaEndpoint::SetCallbackFn (std::function<void ()> cb) {
1542+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1543+ auto & poller = _pollers[i];
1544+ std::unique_lock<bthread::Mutex> lk (poller.callback_mutex );
1545+ poller.callback = cb;
1546+ }
1547+ }
1548+
1549+ int RdmaEndpoint::PollingModeInitialize () {
1550+ auto fn = [](void * args) -> void * {
1551+ auto poller = static_cast <Poller*>(args);
1552+ while (_running.load (butil::memory_order_relaxed)) {
1553+ std::list<Socket*> sockets;
1554+ {
1555+ std::unique_lock<bthread::Mutex> lk (poller->sockets_mutex );
1556+ sockets = poller->sockets ; // copy all sockets is not good
1557+ }
1558+ for (auto m : sockets) {
1559+ PollCq (m);
1560+ }
1561+ {
1562+ std::unique_lock<bthread::Mutex> lk (poller->callback_mutex );
1563+ if (poller->callback ) {
1564+ poller->callback ();
1565+ }
1566+ }
1567+ if (FLAGS_rdma_poller_yield) {
1568+ bthread_yield ();
1569+ }
1570+ }
1571+ return nullptr ;
1572+ };
1573+ _pollers = std::vector<Poller>(FLAGS_rdma_poller_num);
1574+ _running.store (true , butil::memory_order_relaxed);
1575+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1576+ auto rc = bthread_start_background (
1577+ &_pollers[i].tid , &BTHREAD_ATTR_NORMAL, fn, &_pollers[i]);
1578+ if (rc != 0 ) {
1579+ LOG (ERROR) << " Fail to start rdma polling bthread" ;
1580+ return -1 ;
1581+ }
1582+ }
1583+ return 0 ;
1584+ }
1585+
1586+ void RdmaEndpoint::PollingModeRelease () {
1587+ _running.store (false , butil::memory_order_relaxed);
1588+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1589+ bthread_join (_pollers[i].tid , nullptr );
1590+ }
1591+ }
1592+
1593+ // Add socket to poller
1594+ void RdmaEndpoint::PollerAddCqSocket () {
1595+ SocketUniquePtr s;
1596+ if (Socket::Address (_cq_sid, &s) != 0 ) {
1597+ LOG (ERROR) << " Fail to address socket id: " << _cq_sid;
1598+ return ;
1599+ }
1600+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1601+ auto & poller = _pollers[index];
1602+ std::unique_lock<bthread::Mutex> lk (poller.sockets_mutex );
1603+ _cq_iter = poller.sockets .insert (poller.sockets .end (), s.get ());
1604+ }
1605+
1606+ // Remove socket from poller
1607+ void RdmaEndpoint::PollerRemoveCqSocket () {
1608+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1609+ auto & poller = _pollers[index];
1610+ std::unique_lock<bthread::Mutex> lk (poller.sockets_mutex );
1611+ if (_cq_iter != poller.sockets .end ()) {
1612+ poller.sockets .erase (_cq_iter);
1613+ }
14891614}
14901615
14911616} // namespace rdma
0 commit comments