3131#include " brpc/rdma/rdma_helper.h"
3232#include " brpc/rdma/rdma_endpoint.h"
3333
34+ DECLARE_int32 (task_group_ntags);
3435
3536namespace brpc {
3637namespace rdma {
@@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5859DEFINE_int32 (rdma_prepared_qp_cnt, 1024 , " Initial count of prepared QP." );
5960DEFINE_bool (rdma_trace_verbose, false , " Print log message verbosely" );
6061BRPC_VALIDATE_GFLAG (rdma_trace_verbose, brpc::PassValidate);
62+ DEFINE_bool (rdma_use_polling, false , " Use polling mode for RDMA." );
63+ DEFINE_int32 (rdma_poller_num, 1 , " Poller number in RDMA polling mode." );
64+ DEFINE_bool (rdma_poller_yield, false , " Yield thread in RDMA polling mode." );
65+ DEFINE_bool (rdma_enable_edisp_schedule, true ,
66+ " Enable event dispatcher schedule in RDMA" );
6167
6268static const size_t IOBUF_BLOCK_HEADER_LEN = 32 ; // implementation-dependent
6369
@@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699705 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
700706 << " Handshake ends (use tcp) on " << s->description ();
701707 }
702-
703708 ep->TryReadOnTcp ();
704709
705710 return NULL ;
@@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411046 return NULL ;
10421047 }
10431048
1044- res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1045- if (!res->comp_channel ) {
1046- PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1047- delete res;
1048- return NULL ;
1049- }
1049+ if (!FLAGS_rdma_use_polling) {
1050+ res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1051+ if (!res->comp_channel ) {
1052+ PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1053+ delete res;
1054+ return NULL ;
1055+ }
10501056
1051- butil::make_close_on_exec (res->comp_channel ->fd );
1052- if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1053- PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1054- delete res;
1055- return NULL ;
1056- }
1057+ butil::make_close_on_exec (res->comp_channel ->fd );
1058+ if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1059+ PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1060+ delete res;
1061+ return NULL ;
1062+ }
10571063
1058- res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1059- NULL , res->comp_channel , GetRdmaCompVector ());
1060- if (!res->cq ) {
1061- PLOG (WARNING) << " Fail to create CQ" ;
1062- delete res;
1063- return NULL ;
1064+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1065+ NULL , res->comp_channel , GetRdmaCompVector ());
1066+ if (!res->cq ) {
1067+ PLOG (WARNING) << " Fail to create CQ" ;
1068+ delete res;
1069+ return NULL ;
1070+ }
1071+ } else {
1072+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1073+ NULL , NULL , 0 );
1074+ if (!res->cq ) {
1075+ PLOG (WARNING) << " Fail to create CQ" ;
1076+ delete res;
1077+ return NULL ;
1078+ }
10641079 }
10651080
10661081 ibv_qp_init_attr attr;
@@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() {
11171132 return -1 ;
11181133 }
11191134
1120- SocketOptions options;
1121- options.user = this ;
1122- options.keytable_pool = _socket->_keytable_pool ;
1123- options.fd = _resource->comp_channel ->fd ;
1124- options.on_edge_triggered_events = PollCq;
1125- if (Socket::Create (options, &_cq_sid) < 0 ) {
1126- PLOG (WARNING) << " Fail to create socket for cq" ;
1127- return -1 ;
1128- }
1135+ if (!FLAGS_rdma_use_polling) {
1136+ SocketOptions options;
1137+ options.user = this ;
1138+ options.keytable_pool = _socket->_keytable_pool ;
1139+ options.fd = _resource->comp_channel ->fd ;
1140+ options.on_edge_triggered_events = PollCq;
1141+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1142+ PLOG (WARNING) << " Fail to create socket for cq" ;
1143+ return -1 ;
1144+ }
11291145
1130- if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1131- PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1132- return -1 ;
1146+ if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1147+ PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1148+ return -1 ;
1149+ }
1150+ } else {
1151+ SocketOptions options;
1152+ options.user = this ;
1153+ options.keytable_pool = _socket->_keytable_pool ;
1154+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1155+ PLOG (WARNING) << " Fail to create socket for cq" ;
1156+ return -1 ;
1157+ }
1158+ PollerAddCqSid ();
11331159 }
11341160
11351161 _sbuf.resize (_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() {
12271253 if (!_resource) {
12281254 return ;
12291255 }
1256+ if (FLAGS_rdma_use_polling) {
1257+ PollerRemoveCqSid ();
1258+ }
12301259 bool move_to_rdma_resource_list = false ;
12311260 if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321261 _rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() {
12371266 move_to_rdma_resource_list = true ;
12381267 }
12391268 }
1240- int fd = _resource->comp_channel ->fd ;
1269+ int fd = -1 ;
1270+ if (_resource->comp_channel ) {
1271+ fd = _resource->comp_channel ->fd ;
1272+ }
12411273 if (!move_to_rdma_resource_list) {
12421274 if (_resource->qp ) {
12431275 if (IbvDestroyQp (_resource->qp ) < 0 ) {
@@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271359 }
13281360 CHECK (ep == s->_rdma_ep );
13291361
1330- if (ep->GetAndAckEvents () < 0 ) {
1331- const int saved_errno = errno;
1332- PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1333- s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1334- s->description ().c_str (), berror (saved_errno));
1335- return ;
1362+ if (!FLAGS_rdma_use_polling) {
1363+ if (ep->GetAndAckEvents () < 0 ) {
1364+ const int saved_errno = errno;
1365+ PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1366+ s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1367+ s->description ().c_str (), berror (saved_errno));
1368+ return ;
1369+ }
13361370 }
13371371
13381372 int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491383 return ;
13501384 }
13511385 if (cnt == 0 ) {
1386+ if (FLAGS_rdma_use_polling) {
1387+ return ;
1388+ }
13521389 if (!notified) {
13531390 // Since RDMA only provides one shot event, we have to call the
13541391 // notify function every time. Because there is a possibility
@@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701407 }
13711408 if (ep->GetAndAckEvents () < 0 ) {
13721409 s->SetFailed (errno, " Fail to ack CQ event on %s" ,
1373- s->description ().c_str ());
1410+ s->description ().c_str ());
13741411 return ;
13751412 }
13761413 notified = false ;
@@ -1437,7 +1474,7 @@ std::string RdmaEndpoint::GetStateStr() const {
14371474
14381475void RdmaEndpoint::DebugInfo (std::ostream& os) const {
14391476 os << " \n rdma_state=ON"
1440- << " \n handshake_state =" << GetStateStr ()
1477+ << " \n rdma_handshake_state =" << GetStateStr ()
14411478 << " \n rdma_window_size=" << _window_size.load (butil::memory_order_relaxed)
14421479 << " \n rdma_local_window_capacity=" << _local_window_capacity
14431480 << " \n rdma_remote_window_capacity=" << _remote_window_capacity
@@ -1447,8 +1484,7 @@ void RdmaEndpoint::DebugInfo(std::ostream& os) const {
14471484 << " \n rdma_unacked_rq_wr=" << _new_rq_wrs
14481485 << " \n rdma_received_ack=" << _accumulated_ack
14491486 << " \n rdma_unsolicited_sent=" << _unsolicited
1450- << " \n rdma_unsignaled_sq_wr=" << _sq_unsignaled
1451- << " \n " ;
1487+ << " \n rdma_unsignaled_sq_wr=" << _sq_unsignaled;
14521488}
14531489
14541490int RdmaEndpoint::GlobalInitialize () {
@@ -1474,6 +1510,10 @@ int RdmaEndpoint::GlobalInitialize() {
14741510 g_rdma_resource_list = res;
14751511 }
14761512
1513+ if (FLAGS_rdma_use_polling) {
1514+ _poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
1515+ }
1516+
14771517 return 0 ;
14781518}
14791519
@@ -1486,6 +1526,119 @@ void RdmaEndpoint::GlobalRelease() {
14861526 delete res;
14871527 }
14881528 }
1529+ // release polling mode at exit or call RdmaEndpoint::PollingModeRelease
1530+ // explicitly
1531+ if (FLAGS_rdma_use_polling) {
1532+ for (int i = 0 ; i < FLAGS_task_group_ntags; ++i) {
1533+ PollingModeRelease (i);
1534+ }
1535+ }
1536+ }
1537+
1538+ std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
1539+
1540+ void RdmaEndpoint::SetCallbackFn (std::function<void ()> cb, bthread_tag_t tag) {
1541+ auto & group = _poller_groups[tag];
1542+ auto & pollers = group.pollers ;
1543+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1544+ auto & poller = pollers[i];
1545+ std::unique_lock<bthread::Mutex> lk (poller.callback_mutex );
1546+ poller.callback = cb;
1547+ }
1548+ }
1549+
1550+ int RdmaEndpoint::PollingModeInitialize (bthread_tag_t tag) {
1551+ if (!FLAGS_rdma_use_polling) {
1552+ return 0 ;
1553+ }
1554+ auto & group = _poller_groups[tag];
1555+ auto & pollers = group.pollers ;
1556+ auto & running = group.running ;
1557+ bool expected = false ;
1558+ if (!running.compare_exchange_strong (expected, true )) {
1559+ return 0 ;
1560+ }
1561+ struct FnArgs {
1562+ Poller* poller;
1563+ std::atomic<bool >* running;
1564+ };
1565+ auto fn = [](void * p) -> void * {
1566+ std::unique_ptr<FnArgs> args (static_cast <FnArgs*>(p));
1567+ auto poller = args->poller ;
1568+ auto running = args->running ;
1569+ std::unordered_set<SocketId> cq_sids;
1570+ CqSidOp op;
1571+ while (running->load (std::memory_order_relaxed)) {
1572+ while (poller->op_queue .Dequeue (op)) {
1573+ if (op.type == CqSidOp::ADD) {
1574+ cq_sids.emplace (op.sid );
1575+ } else if (op.type == CqSidOp::REMOVE) {
1576+ cq_sids.erase (op.sid );
1577+ }
1578+ }
1579+ for (auto sid : cq_sids) {
1580+ SocketUniquePtr s;
1581+ if (Socket::Address (sid, &s) < 0 ) {
1582+ continue ;
1583+ }
1584+ PollCq (s.get ());
1585+ }
1586+ {
1587+ std::unique_lock<bthread::Mutex> lk (poller->callback_mutex );
1588+ if (poller->callback ) {
1589+ poller->callback ();
1590+ }
1591+ }
1592+ if (FLAGS_rdma_poller_yield) {
1593+ bthread_yield ();
1594+ }
1595+ }
1596+ return nullptr ;
1597+ };
1598+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1599+ auto args = new FnArgs{&pollers[i], &running};
1600+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
1601+ attr.tag = tag;
1602+ auto rc = bthread_start_background (&pollers[i].tid , &attr, fn, args);
1603+ if (rc != 0 ) {
1604+ LOG (ERROR) << " Fail to start rdma polling bthread" ;
1605+ return -1 ;
1606+ }
1607+ }
1608+ return 0 ;
1609+ }
1610+
1611+ void RdmaEndpoint::PollingModeRelease (bthread_tag_t tag) {
1612+ if (!FLAGS_rdma_use_polling) {
1613+ return ;
1614+ }
1615+ auto & group = _poller_groups[tag];
1616+ auto & pollers = group.pollers ;
1617+ auto & running = group.running ;
1618+ running.store (false , std::memory_order_relaxed);
1619+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1620+ bthread_join (pollers[i].tid , nullptr );
1621+ }
1622+ }
1623+
1624+ void RdmaEndpoint::PollerAddCqSid () {
1625+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1626+ auto & group = _poller_groups[bthread_self_tag ()];
1627+ auto & pollers = group.pollers ;
1628+ auto & poller = pollers[index];
1629+ if (_cq_sid != INVALID_SOCKET_ID) {
1630+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::ADD});
1631+ }
1632+ }
1633+
1634+ void RdmaEndpoint::PollerRemoveCqSid () {
1635+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1636+ auto & group = _poller_groups[bthread_self_tag ()];
1637+ auto & pollers = group.pollers ;
1638+ auto & poller = pollers[index];
1639+ if (_cq_sid != INVALID_SOCKET_ID) {
1640+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::REMOVE});
1641+ }
14891642}
14901643
14911644} // namespace rdma
0 commit comments