3131#include " brpc/rdma/rdma_helper.h"
3232#include " brpc/rdma/rdma_endpoint.h"
3333
34+ DECLARE_int32 (task_group_ntags);
3435
3536namespace brpc {
3637namespace rdma {
@@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5859DEFINE_int32 (rdma_prepared_qp_cnt, 1024 , " Initial count of prepared QP." );
5960DEFINE_bool (rdma_trace_verbose, false , " Print log message verbosely" );
6061BRPC_VALIDATE_GFLAG (rdma_trace_verbose, brpc::PassValidate);
62+ DEFINE_bool (rdma_use_polling, false , " Use polling mode for RDMA." );
63+ DEFINE_int32 (rdma_poller_num, 1 , " Poller number in RDMA polling mode." );
64+ DEFINE_bool (rdma_poller_yield, false , " Yield thread in RDMA polling mode." );
65+ DEFINE_bool (rdma_enable_edisp_schedule, true ,
66+ " Enable event dispatcher schedule in RDMA" );
6167
6268static const size_t IOBUF_BLOCK_HEADER_LEN = 32 ; // implementation-dependent
6369
@@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699705 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
700706 << " Handshake ends (use tcp) on " << s->description ();
701707 }
702-
703708 ep->TryReadOnTcp ();
704709
705710 return NULL ;
@@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411046 return NULL ;
10421047 }
10431048
1044- res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1045- if (!res->comp_channel ) {
1046- PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1047- delete res;
1048- return NULL ;
1049- }
1049+ if (!FLAGS_rdma_use_polling) {
1050+ res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1051+ if (!res->comp_channel ) {
1052+ PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1053+ delete res;
1054+ return NULL ;
1055+ }
10501056
1051- butil::make_close_on_exec (res->comp_channel ->fd );
1052- if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1053- PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1054- delete res;
1055- return NULL ;
1056- }
1057+ butil::make_close_on_exec (res->comp_channel ->fd );
1058+ if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1059+ PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1060+ delete res;
1061+ return NULL ;
1062+ }
10571063
1058- res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1059- NULL , res->comp_channel , GetRdmaCompVector ());
1060- if (!res->cq ) {
1061- PLOG (WARNING) << " Fail to create CQ" ;
1062- delete res;
1063- return NULL ;
1064+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1065+ NULL , res->comp_channel , GetRdmaCompVector ());
1066+ if (!res->cq ) {
1067+ PLOG (WARNING) << " Fail to create CQ" ;
1068+ delete res;
1069+ return NULL ;
1070+ }
1071+ } else {
1072+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1073+ NULL , NULL , 0 );
1074+ if (!res->cq ) {
1075+ PLOG (WARNING) << " Fail to create CQ" ;
1076+ delete res;
1077+ return NULL ;
1078+ }
10641079 }
10651080
10661081 ibv_qp_init_attr attr;
@@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() {
11171132 return -1 ;
11181133 }
11191134
1120- SocketOptions options;
1121- options.user = this ;
1122- options.keytable_pool = _socket->_keytable_pool ;
1123- options.fd = _resource->comp_channel ->fd ;
1124- options.on_edge_triggered_events = PollCq;
1125- if (Socket::Create (options, &_cq_sid) < 0 ) {
1126- PLOG (WARNING) << " Fail to create socket for cq" ;
1127- return -1 ;
1128- }
1135+ if (!FLAGS_rdma_use_polling) {
1136+ SocketOptions options;
1137+ options.user = this ;
1138+ options.keytable_pool = _socket->_keytable_pool ;
1139+ options.fd = _resource->comp_channel ->fd ;
1140+ options.on_edge_triggered_events = PollCq;
1141+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1142+ PLOG (WARNING) << " Fail to create socket for cq" ;
1143+ return -1 ;
1144+ }
11291145
1130- if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1131- PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1132- return -1 ;
1146+ if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1147+ PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1148+ return -1 ;
1149+ }
1150+ } else {
1151+ SocketOptions options;
1152+ options.user = this ;
1153+ options.keytable_pool = _socket->_keytable_pool ;
1154+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1155+ PLOG (WARNING) << " Fail to create socket for cq" ;
1156+ return -1 ;
1157+ }
1158+ PollerAddCqSid ();
11331159 }
11341160
11351161 _sbuf.resize (_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() {
12271253 if (!_resource) {
12281254 return ;
12291255 }
1256+ if (FLAGS_rdma_use_polling) {
1257+ PollerRemoveCqSid ();
1258+ }
12301259 bool move_to_rdma_resource_list = false ;
12311260 if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321261 _rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() {
12371266 move_to_rdma_resource_list = true ;
12381267 }
12391268 }
1240- int fd = _resource->comp_channel ->fd ;
1269+ int fd = -1 ;
1270+ if (_resource->comp_channel ) {
1271+ fd = _resource->comp_channel ->fd ;
1272+ }
12411273 if (!move_to_rdma_resource_list) {
12421274 if (_resource->qp ) {
12431275 if (IbvDestroyQp (_resource->qp ) < 0 ) {
@@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271359 }
13281360 CHECK (ep == s->_rdma_ep );
13291361
1330- if (ep->GetAndAckEvents () < 0 ) {
1331- const int saved_errno = errno;
1332- PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1333- s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1334- s->description ().c_str (), berror (saved_errno));
1335- return ;
1362+ if (!FLAGS_rdma_use_polling) {
1363+ if (ep->GetAndAckEvents () < 0 ) {
1364+ const int saved_errno = errno;
1365+ PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1366+ s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1367+ s->description ().c_str (), berror (saved_errno));
1368+ return ;
1369+ }
13361370 }
13371371
13381372 int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491383 return ;
13501384 }
13511385 if (cnt == 0 ) {
1386+ if (FLAGS_rdma_use_polling) {
1387+ return ;
1388+ }
13521389 if (!notified) {
13531390 // Since RDMA only provides one shot event, we have to call the
13541391 // notify function every time. Because there is a possibility
@@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701407 }
13711408 if (ep->GetAndAckEvents () < 0 ) {
13721409 s->SetFailed (errno, " Fail to ack CQ event on %s" ,
1373- s->description ().c_str ());
1410+ s->description ().c_str ());
13741411 return ;
13751412 }
13761413 notified = false ;
@@ -1474,6 +1511,10 @@ int RdmaEndpoint::GlobalInitialize() {
14741511 g_rdma_resource_list = res;
14751512 }
14761513
1514+ if (FLAGS_rdma_use_polling) {
1515+ _poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
1516+ }
1517+
14771518 return 0 ;
14781519}
14791520
@@ -1486,6 +1527,120 @@ void RdmaEndpoint::GlobalRelease() {
14861527 delete res;
14871528 }
14881529 }
1530+ // release polling mode at exit or call RdmaEndpoint::PollingModeRelease
1531+ // explicitly
1532+ if (FLAGS_rdma_use_polling) {
1533+ for (int i = 0 ; i < FLAGS_task_group_ntags; ++i) {
1534+ PollingModeRelease (i);
1535+ }
1536+ }
1537+ }
1538+
1539+ std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
1540+
1541+ void RdmaEndpoint::SetCallbackFn (std::function<void ()> cb, bthread_tag_t tag) {
1542+ auto & group = _poller_groups[tag];
1543+ auto & pollers = group.pollers ;
1544+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1545+ auto & poller = pollers[i];
1546+ std::unique_lock<bthread::Mutex> lk (poller.callback_mutex );
1547+ poller.callback = cb;
1548+ }
1549+ }
1550+
1551+ int RdmaEndpoint::PollingModeInitialize (bthread_tag_t tag) {
1552+ if (!FLAGS_rdma_use_polling) {
1553+ LOG (ERROR) << " RDMA polling mode is not enabled" ;
1554+ return -1 ;
1555+ }
1556+ auto & group = _poller_groups[tag];
1557+ auto & pollers = group.pollers ;
1558+ auto & running = group.running ;
1559+ bool expected = false ;
1560+ if (!running.compare_exchange_strong (expected, true )) {
1561+ return 0 ;
1562+ }
1563+ struct FnArgs {
1564+ Poller* poller;
1565+ std::atomic<bool >* running;
1566+ };
1567+ auto fn = [](void * p) -> void * {
1568+ std::unique_ptr<FnArgs> args (static_cast <FnArgs*>(p));
1569+ auto poller = args->poller ;
1570+ auto running = args->running ;
1571+ std::unordered_set<SocketId> cq_sids;
1572+ CqSidOp op;
1573+ while (running->load (std::memory_order_relaxed)) {
1574+ while (poller->op_queue .Dequeue (op)) {
1575+ if (op.type == CqSidOp::ADD) {
1576+ cq_sids.emplace (op.sid );
1577+ } else if (op.type == CqSidOp::REMOVE) {
1578+ cq_sids.erase (op.sid );
1579+ }
1580+ }
1581+ for (auto sid : cq_sids) {
1582+ SocketUniquePtr s;
1583+ if (Socket::Address (sid, &s) < 0 ) {
1584+ continue ;
1585+ }
1586+ PollCq (s.get ());
1587+ }
1588+ {
1589+ std::unique_lock<bthread::Mutex> lk (poller->callback_mutex );
1590+ if (poller->callback ) {
1591+ poller->callback ();
1592+ }
1593+ }
1594+ if (FLAGS_rdma_poller_yield) {
1595+ bthread_yield ();
1596+ }
1597+ }
1598+ return nullptr ;
1599+ };
1600+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1601+ auto args = new FnArgs{&pollers[i], &running};
1602+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
1603+ attr.tag = tag;
1604+ auto rc = bthread_start_background (&pollers[i].tid , &attr, fn, args);
1605+ if (rc != 0 ) {
1606+ LOG (ERROR) << " Fail to start rdma polling bthread" ;
1607+ return -1 ;
1608+ }
1609+ }
1610+ return 0 ;
1611+ }
1612+
1613+ void RdmaEndpoint::PollingModeRelease (bthread_tag_t tag) {
1614+ if (!FLAGS_rdma_use_polling) {
1615+ return ;
1616+ }
1617+ auto & group = _poller_groups[tag];
1618+ auto & pollers = group.pollers ;
1619+ auto & running = group.running ;
1620+ running.store (false , std::memory_order_relaxed);
1621+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1622+ bthread_join (pollers[i].tid , nullptr );
1623+ }
1624+ }
1625+
1626+ void RdmaEndpoint::PollerAddCqSid () {
1627+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1628+ auto & group = _poller_groups[bthread_self_tag ()];
1629+ auto & pollers = group.pollers ;
1630+ auto & poller = pollers[index];
1631+ if (_cq_sid != INVALID_SOCKET_ID) {
1632+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::ADD});
1633+ }
1634+ }
1635+
1636+ void RdmaEndpoint::PollerRemoveCqSid () {
1637+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1638+ auto & group = _poller_groups[bthread_self_tag ()];
1639+ auto & pollers = group.pollers ;
1640+ auto & poller = pollers[index];
1641+ if (_cq_sid != INVALID_SOCKET_ID) {
1642+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::REMOVE});
1643+ }
14891644}
14901645
14911646} // namespace rdma
0 commit comments