Skip to content

Commit 02879c8

Browse files
zchuangowenjiecn
andauthored
Add Transport to support more communication protocol extensions (#3199)
* Add The transport layer to support communication protocols of different device vendors. * Refine the SocketMode name style and clean some unused code * Refine Transport Debug method param and RdmaTransport WaitEpollOut code * format the code, remove indentation for top class and variables in new file * review code --------- Co-authored-by: wenjiecn <3252896864@qq.com>
1 parent d92e7cf commit 02879c8

25 files changed

Lines changed: 775 additions & 338 deletions

example/rdma_performance/client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class PerformanceTest {
102102

103103
int Init() {
104104
brpc::ChannelOptions options;
105-
options.use_rdma = FLAGS_use_rdma;
105+
options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP;
106106
options.protocol = FLAGS_protocol;
107107
options.connection_type = FLAGS_connection_type;
108108
options.timeout_ms = FLAGS_rpc_timeout_ms;

example/rdma_performance/server.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ int main(int argc, char* argv[]) {
7676
g_last_time.store(0, butil::memory_order_relaxed);
7777

7878
brpc::ServerOptions options;
79-
options.use_rdma = FLAGS_use_rdma;
79+
options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP;
8080
if (server.Start(FLAGS_port, &options) != 0) {
8181
LOG(ERROR) << "Fail to start EchoServer";
8282
return -1;

src/brpc/acceptor.cpp

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
#include "butil/fd_guard.h" // fd_guard
2222
#include "butil/fd_utility.h" // make_close_on_exec
2323
#include "butil/time.h" // gettimeofday_us
24-
#include "brpc/rdma/rdma_endpoint.h"
2524
#include "brpc/acceptor.h"
25+
#include "brpc/transport_factory.h"
2626

2727

2828
namespace brpc {
@@ -40,7 +40,7 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
4040
, _empty_cond(&_map_mutex)
4141
, _force_ssl(false)
4242
, _ssl_ctx(NULL)
43-
, _use_rdma(false)
43+
, _socket_mode(SOCKET_MODE_TCP)
4444
, _bthread_tag(BTHREAD_TAG_DEFAULT) {
4545
}
4646

@@ -282,18 +282,10 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
282282
options.fd = in_fd;
283283
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
284284
options.user = acception->user();
285+
options.need_on_edge_trigger = true;
285286
options.force_ssl = am->_force_ssl;
286287
options.initial_ssl_ctx = am->_ssl_ctx;
287-
#if BRPC_WITH_RDMA
288-
if (am->_use_rdma) {
289-
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
290-
} else {
291-
#else
292-
{
293-
#endif
294-
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
295-
}
296-
options.use_rdma = am->_use_rdma;
288+
options.socket_mode = am->_socket_mode;
297289
options.bthread_tag = am->_bthread_tag;
298290
if (Socket::Create(options, &socket_id) != 0) {
299291
LOG(ERROR) << "Fail to create Socket";

src/brpc/acceptor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "butil/synchronization/condition_variable.h"
2323
#include "butil/containers/flat_map.h"
2424
#include "brpc/input_messenger.h"
25+
#include "brpc/socket_mode.h"
2526

2627

2728
namespace brpc {
@@ -110,8 +111,8 @@ friend class Server;
110111
bool _force_ssl;
111112
std::shared_ptr<SocketSSLContext> _ssl_ctx;
112113

113-
// Whether to use rdma or not
114-
bool _use_rdma;
114+
// Choose to use a certain socket: 0 TCP, 1 RDMA
115+
SocketMode _socket_mode;
115116

116117
// Acceptor belongs to this tag
117118
bthread_tag_t _bthread_tag;

src/brpc/channel.cpp

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
3838
#include "brpc/rdma/rdma_helper.h"
3939
#include "brpc/policy/esp_authenticator.h"
40+
#include "brpc/transport_factory.h"
4041

4142
namespace brpc {
4243

@@ -60,7 +61,7 @@ ChannelOptions::ChannelOptions()
6061
, connection_type(CONNECTION_TYPE_UNKNOWN)
6162
, succeed_without_server(true)
6263
, log_succeed_without_server(true)
63-
, use_rdma(false)
64+
, socket_mode(SOCKET_MODE_TCP)
6465
, auth(NULL)
6566
, backup_request_policy(NULL)
6667
, retry_policy(NULL)
@@ -130,7 +131,7 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
130131
} else {
131132
// All disabled ChannelSSLOptions are the same
132133
}
133-
if (opt.use_rdma) {
134+
if (opt.socket_mode == SOCKET_MODE_RDMA) {
134135
buf.append("|rdma");
135136
}
136137
butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
@@ -173,20 +174,6 @@ Channel::~Channel() {
173174
}
174175
}
175176

176-
#if BRPC_WITH_RDMA
177-
static bool OptionsAvailableForRdma(const ChannelOptions* opt) {
178-
if (opt->has_ssl_options()) {
179-
LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
180-
return false;
181-
}
182-
if (!rdma::SupportedByRdma(opt->protocol.name())) {
183-
LOG(WARNING) << "Cannot use " << opt->protocol.name()
184-
<< " over RDMA";
185-
return false;
186-
}
187-
return true;
188-
}
189-
#endif
190177

191178
int Channel::InitChannelOptions(const ChannelOptions* options) {
192179
if (options) { // Override default options if user provided one.
@@ -201,19 +188,10 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
201188
_options.hc_option.health_check_path = FLAGS_health_check_path;
202189
_options.hc_option.health_check_timeout_ms = FLAGS_health_check_timeout_ms;
203190
}
204-
if (_options.use_rdma) {
205-
#if BRPC_WITH_RDMA
206-
if (!OptionsAvailableForRdma(&_options)) {
207-
return -1;
208-
}
209-
rdma::GlobalRdmaInitializeOrDie();
210-
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
211-
return -1;
212-
}
213-
#else
214-
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
191+
auto ret = TransportFactory::ContextInitOrDie(_options.socket_mode, false, &_options);
192+
if (ret != 0) {
193+
LOG(ERROR) << "Fail to initialize transport context for channel, ret=" << ret;
215194
return -1;
216-
#endif
217195
}
218196

219197
_serialize_request = protocol->serialize_request;
@@ -388,7 +366,7 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
388366
SocketOptions opt;
389367
opt.local_side = client_endpoint;
390368
opt.initial_ssl_ctx = ssl_ctx;
391-
opt.use_rdma = _options.use_rdma;
369+
opt.socket_mode = _options.socket_mode;
392370
opt.hc_option = _options.hc_option;
393371
opt.device_name = _options.device_name;
394372
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
@@ -436,7 +414,7 @@ int Channel::Init(const char* ns_url,
436414
GetNamingServiceThreadOptions ns_opt;
437415
ns_opt.succeed_without_server = _options.succeed_without_server;
438416
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
439-
ns_opt.socket_option.use_rdma = _options.use_rdma;
417+
ns_opt.socket_option.socket_mode = _options.socket_mode;
440418
ns_opt.channel_signature = ComputeChannelSignature(_options);
441419
ns_opt.socket_option.hc_option = _options.hc_option;
442420
ns_opt.socket_option.local_side = client_endpoint;

src/brpc/channel.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "brpc/backup_request_policy.h"
3838
#include "brpc/naming_service_filter.h"
3939
#include "brpc/health_check_option.h"
40+
#include "brpc/socket_mode.h"
4041

4142
namespace brpc {
4243

@@ -105,9 +106,9 @@ struct ChannelOptions {
105106
const ChannelSSLOptions& ssl_options() const { return *_ssl_options; }
106107
ChannelSSLOptions* mutable_ssl_options();
107108

108-
// Let this channel use rdma rather than tcp.
109-
// Default: false
110-
bool use_rdma;
109+
// Let this channel Choose to use a certain socket: 0 SOCKET_MODE_TCP, 1 SOCKET_MODE_RDMA.
110+
// Default: SOCKET_MODE_TCP
111+
SocketMode socket_mode;
111112

112113
// Turn on authentication for this channel if `auth' is not NULL.
113114
// Note `auth' will not be deleted by channel and must remain valid when

src/brpc/details/naming_service_thread.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ void NamingServiceThread::Actions::ResetServers(
125125
// Socket. SocketMapKey may be passed through AddWatcher. Make sure
126126
// to pick those Sockets with the right settings during OnAddedServers
127127
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
128-
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
128+
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
129129
_owner->_options.socket_option));
130130
_added_sockets.push_back(tagged_id);
131131
}

src/brpc/details/naming_service_thread.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "brpc/naming_service.h" // NamingService
2828
#include "brpc/naming_service_filter.h" // NamingServiceFilter
2929
#include "brpc/socket_map.h"
30+
#include "brpc/socket_mode.h"
3031

3132
namespace brpc {
3233

@@ -45,7 +46,7 @@ struct GetNamingServiceThreadOptions {
4546
GetNamingServiceThreadOptions()
4647
: succeed_without_server(false)
4748
, log_succeed_without_server(true) {
48-
socket_option.use_rdma = false;
49+
socket_option.socket_mode = SOCKET_MODE_TCP;
4950
}
5051

5152
bool succeed_without_server;

src/brpc/input_message_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class InputMessageBase : public Destroyable {
5555
friend class InputMessenger;
5656
friend void* ProcessInputMessage(void*);
5757
friend class Stream;
58+
friend class Transport;
5859
int64_t _received_us;
5960
int64_t _base_real_us;
6061
SocketUniquePtr _socket;

src/brpc/input_messenger.cpp

Lines changed: 11 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
#include "brpc/protocol.h" // ListProtocols
3030
#include "brpc/rdma/rdma_endpoint.h"
3131
#include "brpc/input_messenger.h"
32-
32+
#include "brpc/transport_factory.h"
3333

3434
namespace brpc {
3535

@@ -112,8 +112,7 @@ ParseResult InputMessenger::CutInputMessage(
112112
// The length of `data' must be PROTO_DUMMY_LEN + 1 to store extra ending char '\0'
113113
char data[PROTO_DUMMY_LEN + 1];
114114
m->_read_buf.copy_to_cstr(data, PROTO_DUMMY_LEN);
115-
if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0 &&
116-
m->_rdma_state == Socket::RDMA_OFF) {
115+
if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0) {
117116
// To avoid timeout when client uses RDMA but server uses TCP
118117
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
119118
}
@@ -191,46 +190,13 @@ struct RunLastMessage {
191190
}
192191
};
193192

194-
static void QueueMessage(InputMessageBase* to_run_msg,
195-
int* num_bthread_created,
196-
bthread_keytable_pool_t* keytable_pool) {
197-
if (!to_run_msg) {
198-
return;
199-
}
200-
201-
#if BRPC_WITH_RDMA
202-
if (rdma::FLAGS_rdma_disable_bthread) {
203-
ProcessInputMessage(to_run_msg);
204-
return;
205-
}
206-
#endif
207-
// Create bthread for last_msg. The bthread is not scheduled
208-
// until bthread_flush() is called (in the worse case).
209-
210-
// TODO(gejun): Join threads.
211-
bthread_t th;
212-
bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
213-
BTHREAD_ATTR_PTHREAD :
214-
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
215-
tmp.keytable_pool = keytable_pool;
216-
tmp.tag = bthread_self_tag();
217-
bthread_attr_set_name(&tmp, "ProcessInputMessage");
218-
219-
if (!FLAGS_usercode_in_coroutine && bthread_start_background(
220-
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
221-
++*num_bthread_created;
222-
} else {
223-
ProcessInputMessage(to_run_msg);
224-
}
225-
}
226-
227-
InputMessenger::InputMessageClosure::~InputMessageClosure() noexcept(false) {
193+
InputMessageClosure::~InputMessageClosure() noexcept(false) {
228194
if (_msg) {
229195
ProcessInputMessage(_msg);
230196
}
231197
}
232198

233-
void InputMessenger::InputMessageClosure::reset(InputMessageBase* m) {
199+
void InputMessageClosure::reset(InputMessageBase* m) {
234200
if (_msg) {
235201
ProcessInputMessage(_msg);
236202
}
@@ -303,7 +269,7 @@ int InputMessenger::ProcessNewMessage(
303269
// This unique_ptr prevents msg to be lost before transfering
304270
// ownership to last_msg
305271
DestroyingPtr<InputMessageBase> msg(pr.message());
306-
QueueMessage(last_msg.release(), &num_bthread_created, m->_keytable_pool);
272+
m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
307273
if (_handlers[index].process == NULL) {
308274
LOG(ERROR) << "process of index=" << index << " is NULL";
309275
continue;
@@ -336,22 +302,19 @@ int InputMessenger::ProcessNewMessage(
336302
// Transfer ownership to last_msg
337303
last_msg.reset(msg.release());
338304
} else {
339-
QueueMessage(msg.release(), &num_bthread_created,
340-
m->_keytable_pool);
305+
last_msg.reset(msg.release());
306+
m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
341307
bthread_flush();
342308
num_bthread_created = 0;
343309
}
344310
}
345-
#if BRPC_WITH_RDMA
346311
// In RDMA polling mode, all messages must be executed in a new bthread and
347312
// not in the bthread where the polling bthread is located, because the
348313
// method for processing messages may call synchronization primitives,
349314
// causing the polling bthread to be scheduled out.
350-
if (rdma::FLAGS_rdma_use_polling) {
351-
QueueMessage(last_msg.release(), &num_bthread_created,
352-
m->_keytable_pool);
315+
if (m->_socket_mode == SOCKET_MODE_RDMA) {
316+
m->_transport->QueueMessage(last_msg, &num_bthread_created, true);
353317
}
354-
#endif
355318
if (num_bthread_created) {
356319
bthread_flush();
357320
}
@@ -414,8 +377,7 @@ void InputMessenger::OnNewMessages(Socket* m) {
414377
}
415378
}
416379

417-
if (m->_rdma_state == Socket::RDMA_OFF && messenger->ProcessNewMessage(
418-
m, nr, read_eof, received_us, base_realtime, last_msg) < 0) {
380+
if (messenger->ProcessNewMessage(m, nr, read_eof, received_us, base_realtime, last_msg) < 0) {
419381
return;
420382
}
421383
}
@@ -533,16 +495,7 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,
533495

534496
int InputMessenger::Create(SocketOptions options, SocketId* id) {
535497
options.user = this;
536-
#if BRPC_WITH_RDMA
537-
if (options.use_rdma) {
538-
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
539-
options.app_connect = std::make_shared<rdma::RdmaConnect>();
540-
} else {
541-
#else
542-
{
543-
#endif
544-
options.on_edge_triggered_events = OnNewMessages;
545-
}
498+
options.need_on_edge_trigger = true;
546499
// Enable keepalive by options or Gflag.
547500
// Priority: options > Gflag.
548501
if (options.keepalive_options || FLAGS_socket_keepalive) {

0 commit comments

Comments
 (0)