3131#include " brpc/rdma/rdma_helper.h"
3232#include " brpc/rdma/rdma_endpoint.h"
3333
34+ DECLARE_int32 (task_group_ntags);
3435
3536namespace brpc {
3637namespace rdma {
@@ -58,6 +59,11 @@ DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
5859DEFINE_int32 (rdma_prepared_qp_cnt, 1024 , " Initial count of prepared QP." );
5960DEFINE_bool (rdma_trace_verbose, false , " Print log message verbosely" );
6061BRPC_VALIDATE_GFLAG (rdma_trace_verbose, brpc::PassValidate);
62+ DEFINE_bool (rdma_use_polling, false , " Use polling mode for RDMA." );
63+ DEFINE_int32 (rdma_poller_num, 1 , " Poller number in RDMA polling mode." );
64+ DEFINE_bool (rdma_poller_yield, false , " Yield thread in RDMA polling mode." );
65+ DEFINE_bool (rdma_edisp_unsched, false , " Disable event dispatcher schedule" );
66+ DEFINE_bool (rdma_disable_bthread, false , " Disable bthread in RDMA" );
6167
6268static const size_t IOBUF_BLOCK_HEADER_LEN = 32 ; // implementation-dependent
6369
@@ -699,7 +705,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
699705 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
700706 << " Handshake ends (use tcp) on " << s->description ();
701707 }
702-
703708 ep->TryReadOnTcp ();
704709
705710 return NULL ;
@@ -1041,26 +1046,36 @@ static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
10411046 return NULL ;
10421047 }
10431048
1044- res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1045- if (!res->comp_channel ) {
1046- PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1047- delete res;
1048- return NULL ;
1049- }
1049+ if (!FLAGS_rdma_use_polling) {
1050+ res->comp_channel = IbvCreateCompChannel (GetRdmaContext ());
1051+ if (!res->comp_channel ) {
1052+ PLOG (WARNING) << " Fail to create comp channel for CQ" ;
1053+ delete res;
1054+ return NULL ;
1055+ }
10501056
1051- butil::make_close_on_exec (res->comp_channel ->fd );
1052- if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1053- PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1054- delete res;
1055- return NULL ;
1056- }
1057+ butil::make_close_on_exec (res->comp_channel ->fd );
1058+ if (butil::make_non_blocking (res->comp_channel ->fd ) < 0 ) {
1059+ PLOG (WARNING) << " Fail to set comp channel nonblocking" ;
1060+ delete res;
1061+ return NULL ;
1062+ }
10571063
1058- res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1059- NULL , res->comp_channel , GetRdmaCompVector ());
1060- if (!res->cq ) {
1061- PLOG (WARNING) << " Fail to create CQ" ;
1062- delete res;
1063- return NULL ;
1064+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1065+ NULL , res->comp_channel , GetRdmaCompVector ());
1066+ if (!res->cq ) {
1067+ PLOG (WARNING) << " Fail to create CQ" ;
1068+ delete res;
1069+ return NULL ;
1070+ }
1071+ } else {
1072+ res->cq = IbvCreateCq (GetRdmaContext (), 2 * FLAGS_rdma_prepared_qp_size,
1073+ NULL , NULL , 0 );
1074+ if (!res->cq ) {
1075+ PLOG (WARNING) << " Fail to create CQ" ;
1076+ delete res;
1077+ return NULL ;
1078+ }
10641079 }
10651080
10661081 ibv_qp_init_attr attr;
@@ -1117,19 +1132,30 @@ int RdmaEndpoint::AllocateResources() {
11171132 return -1 ;
11181133 }
11191134
1120- SocketOptions options;
1121- options.user = this ;
1122- options.keytable_pool = _socket->_keytable_pool ;
1123- options.fd = _resource->comp_channel ->fd ;
1124- options.on_edge_triggered_events = PollCq;
1125- if (Socket::Create (options, &_cq_sid) < 0 ) {
1126- PLOG (WARNING) << " Fail to create socket for cq" ;
1127- return -1 ;
1128- }
1135+ if (!FLAGS_rdma_use_polling) {
1136+ SocketOptions options;
1137+ options.user = this ;
1138+ options.keytable_pool = _socket->_keytable_pool ;
1139+ options.fd = _resource->comp_channel ->fd ;
1140+ options.on_edge_triggered_events = PollCq;
1141+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1142+ PLOG (WARNING) << " Fail to create socket for cq" ;
1143+ return -1 ;
1144+ }
11291145
1130- if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1131- PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1132- return -1 ;
1146+ if (ibv_req_notify_cq (_resource->cq , 1 ) < 0 ) {
1147+ PLOG (WARNING) << " Fail to arm CQ comp channel" ;
1148+ return -1 ;
1149+ }
1150+ } else {
1151+ SocketOptions options;
1152+ options.user = this ;
1153+ options.keytable_pool = _socket->_keytable_pool ;
1154+ if (Socket::Create (options, &_cq_sid) < 0 ) {
1155+ PLOG (WARNING) << " Fail to create socket for cq" ;
1156+ return -1 ;
1157+ }
1158+ PollerAddCqSid ();
11331159 }
11341160
11351161 _sbuf.resize (_sq_size - RESERVED_WR_NUM);
@@ -1227,6 +1253,9 @@ void RdmaEndpoint::DeallocateResources() {
12271253 if (!_resource) {
12281254 return ;
12291255 }
1256+ if (FLAGS_rdma_use_polling) {
1257+ PollerRemoveCqSid ();
1258+ }
12301259 bool move_to_rdma_resource_list = false ;
12311260 if (_sq_size <= FLAGS_rdma_prepared_qp_size &&
12321261 _rq_size <= FLAGS_rdma_prepared_qp_size &&
@@ -1237,7 +1266,10 @@ void RdmaEndpoint::DeallocateResources() {
12371266 move_to_rdma_resource_list = true ;
12381267 }
12391268 }
1240- int fd = _resource->comp_channel ->fd ;
1269+ int fd = -1 ;
1270+ if (_resource->comp_channel ) {
1271+ fd = _resource->comp_channel ->fd ;
1272+ }
12411273 if (!move_to_rdma_resource_list) {
12421274 if (_resource->qp ) {
12431275 if (IbvDestroyQp (_resource->qp ) < 0 ) {
@@ -1327,12 +1359,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
13271359 }
13281360 CHECK (ep == s->_rdma_ep );
13291361
1330- if (ep->GetAndAckEvents () < 0 ) {
1331- const int saved_errno = errno;
1332- PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1333- s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1334- s->description ().c_str (), berror (saved_errno));
1335- return ;
1362+ if (!FLAGS_rdma_use_polling) {
1363+ if (ep->GetAndAckEvents () < 0 ) {
1364+ const int saved_errno = errno;
1365+ PLOG (ERROR) << " Fail to get cq event: " << s->description ();
1366+ s->SetFailed (saved_errno, " Fail to get cq event from %s: %s" ,
1367+ s->description ().c_str (), berror (saved_errno));
1368+ return ;
1369+ }
13361370 }
13371371
13381372 int progress = Socket::PROGRESS_INIT;
@@ -1349,6 +1383,9 @@ void RdmaEndpoint::PollCq(Socket* m) {
13491383 return ;
13501384 }
13511385 if (cnt == 0 ) {
1386+ if (FLAGS_rdma_use_polling) {
1387+ return ;
1388+ }
13521389 if (!notified) {
13531390 // Since RDMA only provides one shot event, we have to call the
13541391 // notify function every time. Because there is a possibility
@@ -1370,7 +1407,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
13701407 }
13711408 if (ep->GetAndAckEvents () < 0 ) {
13721409 s->SetFailed (errno, " Fail to ack CQ event on %s" ,
1373- s->description ().c_str ());
1410+ s->description ().c_str ());
13741411 return ;
13751412 }
13761413 notified = false ;
@@ -1474,6 +1511,10 @@ int RdmaEndpoint::GlobalInitialize() {
14741511 g_rdma_resource_list = res;
14751512 }
14761513
1514+ if (FLAGS_rdma_use_polling) {
1515+ _poller_groups = std::vector<PollerGroup>(FLAGS_task_group_ntags);
1516+ }
1517+
14771518 return 0 ;
14781519}
14791520
@@ -1486,6 +1527,123 @@ void RdmaEndpoint::GlobalRelease() {
14861527 delete res;
14871528 }
14881529 }
1530+ // release polling mode at exit or call RdmaEndpoint::PollingModeRelease
1531+ // explicitly
1532+ if (FLAGS_rdma_use_polling) {
1533+ for (int i = 0 ; i < FLAGS_task_group_ntags; ++i) {
1534+ PollingModeRelease (i);
1535+ }
1536+ }
1537+ }
1538+
1539+ std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
1540+
1541+ int RdmaEndpoint::PollingModeInitialize (bthread_tag_t tag,
1542+ std::function<void (void )> callback,
1543+ std::function<void(void )> init_fn,
1544+ std::function<void(void )> release_fn) {
1545+ if (!FLAGS_rdma_use_polling) {
1546+ return 0 ;
1547+ }
1548+ auto & group = _poller_groups[tag];
1549+ auto & pollers = group.pollers ;
1550+ auto & running = group.running ;
1551+ bool expected = false ;
1552+ if (!running.compare_exchange_strong (expected, true )) {
1553+ return 0 ;
1554+ }
1555+ struct FnArgs {
1556+ Poller* poller;
1557+ std::atomic<bool >* running;
1558+ };
1559+ auto fn = [](void * p) -> void * {
1560+ std::unique_ptr<FnArgs> args (static_cast <FnArgs*>(p));
1561+ auto poller = args->poller ;
1562+ auto running = args->running ;
1563+ std::unordered_set<SocketId> cq_sids;
1564+ CqSidOp op;
1565+
1566+ if (!poller->init_fn ) {
1567+ poller->init_fn ();
1568+ }
1569+
1570+ while (running->load (std::memory_order_relaxed)) {
1571+ while (poller->op_queue .Dequeue (op)) {
1572+ if (op.type == CqSidOp::ADD) {
1573+ cq_sids.emplace (op.sid );
1574+ } else if (op.type == CqSidOp::REMOVE) {
1575+ cq_sids.erase (op.sid );
1576+ }
1577+ }
1578+ for (auto sid : cq_sids) {
1579+ SocketUniquePtr s;
1580+ if (Socket::Address (sid, &s) < 0 ) {
1581+ continue ;
1582+ }
1583+ PollCq (s.get ());
1584+ }
1585+ if (poller->callback ) {
1586+ poller->callback ();
1587+ }
1588+ if (FLAGS_rdma_poller_yield) {
1589+ bthread_yield ();
1590+ }
1591+ }
1592+
1593+ if (poller->release_fn ) {
1594+ poller->release_fn ();
1595+ }
1596+
1597+ return nullptr ;
1598+ };
1599+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1600+ auto args = new FnArgs{&pollers[i], &running};
1601+ auto attr = FLAGS_rdma_disable_bthread ? BTHREAD_ATTR_PTHREAD
1602+ : BTHREAD_ATTR_NORMAL;
1603+ attr.tag = tag;
1604+ pollers[i].callback = callback;
1605+ pollers[i].init_fn = init_fn;
1606+ pollers[i].release_fn = release_fn;
1607+ auto rc = bthread_start_background (&pollers[i].tid , &attr, fn, args);
1608+ if (rc != 0 ) {
1609+ LOG (ERROR) << " Fail to start rdma polling bthread" ;
1610+ return -1 ;
1611+ }
1612+ }
1613+ return 0 ;
1614+ }
1615+
1616+ void RdmaEndpoint::PollingModeRelease (bthread_tag_t tag) {
1617+ if (!FLAGS_rdma_use_polling) {
1618+ return ;
1619+ }
1620+ auto & group = _poller_groups[tag];
1621+ auto & pollers = group.pollers ;
1622+ auto & running = group.running ;
1623+ running.store (false , std::memory_order_relaxed);
1624+ for (int i = 0 ; i < FLAGS_rdma_poller_num; ++i) {
1625+ bthread_join (pollers[i].tid , nullptr );
1626+ }
1627+ }
1628+
1629+ void RdmaEndpoint::PollerAddCqSid () {
1630+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1631+ auto & group = _poller_groups[bthread_self_tag ()];
1632+ auto & pollers = group.pollers ;
1633+ auto & poller = pollers[index];
1634+ if (_cq_sid != INVALID_SOCKET_ID) {
1635+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::ADD});
1636+ }
1637+ }
1638+
1639+ void RdmaEndpoint::PollerRemoveCqSid () {
1640+ auto index = butil::fmix32 (_cq_sid) % FLAGS_rdma_poller_num;
1641+ auto & group = _poller_groups[bthread_self_tag ()];
1642+ auto & pollers = group.pollers ;
1643+ auto & poller = pollers[index];
1644+ if (_cq_sid != INVALID_SOCKET_ID) {
1645+ poller.op_queue .Enqueue (CqSidOp{_cq_sid, CqSidOp::REMOVE});
1646+ }
14891647}
14901648
14911649} // namespace rdma
0 commit comments