3131#include " brpc/rdma/rdma_helper.h"
3232#include " brpc/rdma/rdma_endpoint.h"
3333
34+ DECLARE_int32 (task_group_ntags);
3435
3536namespace brpc {
3637namespace rdma {
@@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5859DEFINE_int32 (rdma_prepared_qp_cnt, 1024 , " Initial count of prepared QP." );
5960DEFINE_bool (rdma_trace_verbose, false , " Print log message verbosely" );
6061BRPC_VALIDATE_GFLAG (rdma_trace_verbose, brpc::PassValidate);
62+ DEFINE_bool (rdma_use_polling, false , " Use polling mode for RDMA." );
63+ DEFINE_int32 (rdma_poller_num, 1 , " Poller number in RDMA polling mode." );
64+ DEFINE_bool (rdma_poller_yield, false , " Yield thread in RDMA polling mode." );
65+ DEFINE_bool (rdma_enable_edisp_schedule, true ,
66+ " Enable event dispatcher schedule in RDMA" );
6167
6268static const size_t IOBUF_BLOCK_HEADER_LEN = 32 ; // implementation-dependent
6369
@@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699705 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
700706 << " Handshake ends (use tcp) on " << s->description ();
701707 }
702-
703708 ep->TryReadOnTcp ();
704709
705710 return NULL ;
@@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411046 return NULL ;
10421047 }
10431048
1044- res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1045- if (!res->comp_channel ) {
1046- PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1047- delete res;
1048- return NULL ;
1049- }
1049+ if (!FLAGS_rdma_use_polling) {
1050+ res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1051+ if (!res->comp_channel ) {
1052+ PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1053+ delete res;
1054+ return NULL ;
1055+ }
10501056
1051- butil::make_close_on_exec (res->comp_channel ->fd );
1052- if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1053- PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1054- delete res;
1055- return NULL ;
1056- }
1057+ butil::make_close_on_exec (res->comp_channel ->fd );
1058+ if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1059+ PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1060+ delete res;
1061+ return NULL ;
1062+ }
10571063
1058- res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1059- NULL , res->comp_channel , GetRdmaCompVector ());
1060- if (!res->cq ) {
1061- PLOG (WARNING) << " Fail to create CQ" ;
1062- delete res;
1063- return NULL ;
1064+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1065+ NULL , res->comp_channel , GetRdmaCompVector ());
1066+ if (!res->cq ) {
1067+ PLOG (WARNING) << " Fail to create CQ" ;
1068+ delete res;
1069+ return NULL ;
1070+ }
1071+ } else {
1072+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1073+ NULL , NULL , 0 );
1074+ if (!res->cq ) {
1075+ PLOG (WARNING) << " Fail to create CQ" ;
1076+ delete res;
1077+ return NULL ;
1078+ }
10641079 }
10651080
10661081 ibv_qp_init_attr attr;
@@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() {
11171132 return -1 ;
11181133 }
11191134
1120- SocketOptions options;
1121- options.user = this ;
1122- options.keytable_pool = _socket->_keytable_pool ;
1123- options.fd = _resource->comp_channel ->fd ;
1124- options.on_edge_triggered_events = PollCq;
1125- if (Socket::Create (options, &_cq_sid) < 0 ) {
1126- PLOG (WARNING) << " Fail to create socket for cq" ;
1127- return -1 ;
1128- }
1135+ if (!FLAGS_rdma_use_polling) {
1136+ SocketOptions options;
1137+ options.user = this ;
1138+ options.keytable_pool = _socket->_keytable_pool ;
1139+ options.fd = _resource->comp_channel ->fd ;
1140+ options.on_edge_triggered_events = PollCq;
1141+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1142+ PLOG (WARNING) << " Fail to create socket for cq" ;
1143+ return -1 ;
1144+ }
11291145
1130- if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1131- PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1132- return -1 ;
1146+ if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1147+ PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1148+ return -1 ;
1149+ }
1150+ } else {
1151+ SocketOptions options;
1152+ options.user = this ;
1153+ options.keytable_pool = _socket->_keytable_pool ;
1154+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1155+ PLOG (WARNING) << " Fail to create socket for cq" ;
1156+ return -1 ;
1157+ }
1158+ PollerAddCqSid ();
11331159 }
11341160
11351161 _sbuf.resize (_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() {
12271253 if (!_resource) {
12281254 return ;
12291255 }
1256+ if (FLAGS_rdma_use_polling) {
1257+ PollerRemoveCqSid ();
1258+ }
12301259 bool move_to_rdma_resource_list = false ;
12311260 if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321261 _rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() {
12371266 move_to_rdma_resource_list = true ;
12381267 }
12391268 }
1240- int fd = _resource->comp_channel ->fd ;
1269+ int fd = -1 ;
1270+ if (_resource->comp_channel ) {
1271+ fd = _resource->comp_channel ->fd ;
1272+ }
12411273 if (!move_to_rdma_resource_list) {
12421274 if (_resource->qp ) {
12431275 if (IbvDestroyQp (_resource->qp ) < 0 ) {
@@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271359 }
13281360 CHECK (ep == s->_rdma_ep );
13291361
1330- if (ep->GetAndAckEvents () < 0 ) {
1331- const int saved_errno = errno;
1332- PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1333- s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1334- s->description ().c_str (), berror (saved_errno));
1335- return ;
1362+ if (!FLAGS_rdma_use_polling) {
1363+ if (ep->GetAndAckEvents () < 0 ) {
1364+ const int saved_errno = errno;
1365+ PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1366+ s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1367+ s->description ().c_str (), berror (saved_errno));
1368+ return ;
1369+ }
13361370 }
13371371
13381372 int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491383 return ;
13501384 }
13511385 if (cnt == 0 ) {
1386+ if (FLAGS_rdma_use_polling) {
1387+ return ;
1388+ }
13521389 if (!notified) {
13531390 // Since RDMA only provides one shot event, we have to call the
13541391 // notify function every time. Because there is a possibility
@@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701407 }
13711408 if (ep->GetAndAckEvents () < 0 ) {
13721409 s->SetFailed (errno, " Fail to ack CQ event on %s" ,
1373- s->description ().c_str ());
1410+ s->description ().c_str ());
13741411 return ;
13751412 }
13761413 notified = false ;
@@ -1474,6 +1511,10 @@ int RdmaEndpoint::GlobalInitialize() {
14741511 g_rdma_resource_list = res;
14751512 }
14761513
1514+ if (FLAGS_rdma_use_polling) {
1515+ _poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
1516+ }
1517+
14771518 return 0 ;
14781519}
14791520
@@ -1488,6 +1529,103 @@ void RdmaEndpoint::GlobalRelease() {
14881529 }
14891530}
14901531
1532+ std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
1533+
1534+ void RdmaEndpoint::SetCallbackFn (std::function<void ()> cb, bthread_tag_t tag) {
1535+ auto & group = _poller_groups[tag];
1536+ auto & pollers = group.pollers ;
1537+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1538+ auto & poller = pollers[i];
1539+ std::unique_lock<bthread::Mutex> lk (poller.cb_mutex );
1540+ poller.callback = cb;
1541+ }
1542+ }
1543+
1544+ int RdmaEndpoint::PollingModeInitialize (bthread_tag_t tag) {
1545+ auto & group = _poller_groups[tag];
1546+ auto & pollers = group.pollers ;
1547+ auto & running = group.running ;
1548+ struct FnArgs {
1549+ Poller* poller;
1550+ std::atomic<bool >* running;
1551+ };
1552+ auto fn = [](void * p) -> void * {
1553+ std::unique_ptr<FnArgs> args (static_cast <FnArgs*>(p));
1554+ auto poller = args->poller ;
1555+ auto running = args->running ;
1556+ std::unordered_set<SocketId> cq_sids;
1557+ CqSidOp op;
1558+ while (running->load (butil::memory_order_relaxed)) {
1559+ while (poller->op_queue .Dequeue (op)) {
1560+ if (op.type == CqSidOp::ADD) {
1561+ cq_sids.emplace (op.sid );
1562+ } else if (op.type == CqSidOp::REMOVE) {
1563+ cq_sids.erase (op.sid );
1564+ }
1565+ }
1566+ for (auto sid : cq_sids) {
1567+ SocketUniquePtr s;
1568+ if (Socket::Address (sid, &s) < 0 ) {
1569+ continue ;
1570+ }
1571+ PollCq (s.get ());
1572+ }
1573+ {
1574+ std::unique_lock<bthread::Mutex> lk (poller->cb_mutex );
1575+ if (poller->callback ) {
1576+ poller->callback ();
1577+ }
1578+ }
1579+ if (FLAGS_rdma_poller_yield) {
1580+ bthread_yield ();
1581+ }
1582+ }
1583+ return nullptr ;
1584+ };
1585+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1586+ auto args = new FnArgs{&pollers[i], &running};
1587+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
1588+ attr.tag = tag;
1589+ auto rc = bthread_start_background (&pollers[i].tid , &attr, fn, args);
1590+ if (rc != 0 ) {
1591+ LOG (ERROR) << " Fail to start rdma polling bthread" ;
1592+ return -1 ;
1593+ }
1594+ }
1595+ running.store (true , butil::memory_order_relaxed);
1596+ return 0 ;
1597+ }
1598+
1599+ void RdmaEndpoint::PollingModeRelease (bthread_tag_t tag) {
1600+ auto & group = _poller_groups[tag];
1601+ auto & pollers = group.pollers ;
1602+ auto & running = group.running ;
1603+ running.store (false , butil::memory_order_relaxed);
1604+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1605+ bthread_join (pollers[i].tid , nullptr );
1606+ }
1607+ }
1608+
1609+ void RdmaEndpoint::PollerAddCqSid () {
1610+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1611+ auto & group = _poller_groups[bthread_self_tag ()];
1612+ auto & pollers = group.pollers ;
1613+ auto & poller = pollers[index];
1614+ if (_cq_sid != INVALID_SOCKET_ID) {
1615+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::ADD});
1616+ }
1617+ }
1618+
1619+ void RdmaEndpoint::PollerRemoveCqSid () {
1620+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1621+ auto & group = _poller_groups[bthread_self_tag ()];
1622+ auto & pollers = group.pollers ;
1623+ auto & poller = pollers[index];
1624+ if (_cq_sid != INVALID_SOCKET_ID) {
1625+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::REMOVE});
1626+ }
1627+ }
1628+
14911629} // namespace rdma
14921630} // namespace brpc
14931631
0 commit comments