Skip to content

Commit 24a9a64

Browse files
committed
Add flatbuffers protocol support for brpc
Key components: - Protocol Implementation * Added flatbuffers_protocol.cpp/h for parsing, serialization and request/response handling. * Registered "fb_rpc" protocol in global.cpp - Core Infrastructure Updates * Channel: Added FBCallMethod and CallMethodInternal<bool is_pb> template for dual pb/fb support. * Controller: Added _fb_method, _fb_response fields and is_use_flatbuffer() method for flatbuffers state tracking. * Server: Added FlatBuffersMethodProperty/ServiceProperty structures and AddService() overloads for flatbuffers::Service registration. - Protocol Handler Refactoring * Updated all protocol handlers (baidu_rpc, http, http2, redis, etc.) to use generic void* pointers instead of google::protobuf types, enabling protocol-agnostic message handling.
1 parent be01d10 commit 24a9a64

13 files changed

Lines changed: 871 additions & 20 deletions

src/brpc/channel.cpp

Lines changed: 85 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -443,11 +443,18 @@ static void HandleBackupRequest(void* arg) {
443443
bthread_id_error(correlation_id, EBACKUPREQUEST);
444444
}
445445

446-
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
447-
google::protobuf::RpcController* controller_base,
448-
const google::protobuf::Message* request,
449-
google::protobuf::Message* response,
450-
google::protobuf::Closure* done) {
446+
template <bool is_pb>
447+
void Channel::CallMethodInternal(const typename std::conditional<is_pb,
448+
google::protobuf::MethodDescriptor,
449+
brpc::flatbuffers::MethodDescriptor>::type* method,
450+
google::protobuf::RpcController* controller_base,
451+
const typename std::conditional<is_pb,
452+
google::protobuf::Message,
453+
brpc::flatbuffers::Message>::type* request,
454+
typename std::conditional<is_pb,
455+
google::protobuf::Message,
456+
brpc::flatbuffers::Message>::type* response,
457+
google::protobuf::Closure* done) {
451458
const int64_t start_send_real_us = butil::gettimeofday_us();
452459
Controller* cntl = static_cast<Controller*>(controller_base);
453460
cntl->OnRPCBegin(start_send_real_us);
@@ -507,22 +514,38 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
507514
const int64_t start_send_us = butil::cpuwide_time_us();
508515
std::string method_name;
509516
if (_get_method_name) {
510-
method_name = butil::EnsureString(_get_method_name(method, cntl));
517+
if (is_pb) {
518+
auto pb_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
519+
method_name = butil::EnsureString(_get_method_name(pb_method, cntl));
520+
} else {
521+
// FlatBuffers doesn't support _get_method_name yet
522+
method_name = "";
523+
}
524+
511525
} else if (method) {
512-
method_name = butil::EnsureString(method->full_name());
526+
if (is_pb) {
527+
auto pb_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
528+
method_name = butil::EnsureString(pb_method->full_name());
529+
} else {
530+
auto fb_method = reinterpret_cast<const brpc::flatbuffers::MethodDescriptor*>(method);
531+
method_name = butil::EnsureString(fb_method->full_name());
532+
}
533+
513534
} else {
514535
const static std::string NULL_METHOD_STR = "null-method";
515536
method_name = NULL_METHOD_STR;
516537
}
517-
std::shared_ptr<Span> span = Span::CreateClientSpan(
538+
if (!method_name.empty()) {
539+
std::shared_ptr<Span> span = Span::CreateClientSpan(
518540
method_name, start_send_real_us - start_send_us);
519-
if (span) {
520-
ControllerPrivateAccessor accessor(cntl);
521-
span->set_log_id(cntl->log_id());
522-
span->set_base_cid(correlation_id);
523-
span->set_protocol(_options.protocol);
524-
span->set_start_send_us(start_send_us);
525-
accessor.set_span(span);
541+
if (span) {
542+
ControllerPrivateAccessor accessor(cntl);
543+
span->set_log_id(cntl->log_id());
544+
span->set_base_cid(correlation_id);
545+
span->set_protocol(_options.protocol);
546+
span->set_start_send_us(start_send_us);
547+
accessor.set_span(span);
548+
}
526549
}
527550
}
528551
// Override some options if they haven't been set by Controller
@@ -541,11 +564,20 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
541564
if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
542565
cntl->set_connection_type(_options.connection_type);
543566
}
544-
cntl->_response = response;
567+
545568
cntl->_done = done;
546569
cntl->_pack_request = _pack_request;
547-
cntl->_method = method;
548570
cntl->_auth = _options.auth;
571+
// Use reinterpret_cast to avoid template instantiation errors
572+
// The actual type is guaranteed by the is_pb parameter
573+
if (is_pb) {
574+
cntl->_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
575+
cntl->_response = reinterpret_cast<google::protobuf::Message*>(response);
576+
} else {
577+
cntl->_fb_method = reinterpret_cast<const brpc::flatbuffers::MethodDescriptor*>(method);
578+
cntl->_fb_response = reinterpret_cast<brpc::flatbuffers::Message*>(response);
579+
cntl->set_use_flatbuffer();
580+
}
549581

550582
if (SingleServer()) {
551583
cntl->_single_server_id = _server_id;
@@ -629,6 +661,22 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
629661
}
630662
}
631663

664+
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
665+
google::protobuf::RpcController* controller_base,
666+
const google::protobuf::Message* request,
667+
google::protobuf::Message* response,
668+
google::protobuf::Closure* done) {
669+
CallMethodInternal<true>(method, controller_base, request, response, done);
670+
}
671+
672+
void Channel::FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method,
673+
google::protobuf::RpcController* controller_base,
674+
const brpc::flatbuffers::Message* request,
675+
brpc::flatbuffers::Message* response,
676+
google::protobuf::Closure* done) {
677+
CallMethodInternal<false>(method, controller_base, request, response, done);
678+
}
679+
632680
void Channel::Describe(std::ostream& os, const DescribeOptions& opt) const {
633681
os << "Channel[";
634682
if (SingleServer()) {
@@ -658,4 +706,24 @@ int Channel::CheckHealth() {
658706
}
659707
}
660708

709+
// CallMethodInternal instance for pb and fb
710+
template
711+
void Channel::CallMethodInternal<true>(
712+
const google::protobuf::MethodDescriptor* method,
713+
google::protobuf::RpcController* controller_base,
714+
const google::protobuf::Message* request,
715+
google::protobuf::Message* response,
716+
google::protobuf::Closure* done
717+
);
718+
719+
// CallMethodInternal instance for pb and fb
720+
template
721+
void Channel::CallMethodInternal<false>(
722+
const brpc::flatbuffers::MethodDescriptor* method,
723+
google::protobuf::RpcController* controller_base,
724+
const brpc::flatbuffers::Message* request,
725+
brpc::flatbuffers::Message* response,
726+
google::protobuf::Closure* done
727+
);
728+
661729
} // namespace brpc

src/brpc/channel.h

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "brpc/naming_service_filter.h"
3939
#include "brpc/health_check_option.h"
4040
#include "brpc/socket_mode.h"
41+
#include "brpc/details/flatbuffers_impl.h" // Flatbuffers Protocol
4142

4243
namespace brpc {
4344

@@ -175,7 +176,8 @@ struct ChannelOptions {
175176
// channel.Init("bns://rdev.matrix.all", "rr", NULL/*default options*/);
176177
// MyService_Stub stub(&channel);
177178
// stub.MyMethod(&controller, &request, &response, NULL);
178-
class Channel : public ChannelBase {
179+
class Channel : public ChannelBase,
180+
public brpc::flatbuffers::RpcChannel {
179181
friend class Controller;
180182
friend class SelectiveChannel;
181183
public:
@@ -225,6 +227,12 @@ friend class SelectiveChannel;
225227
google::protobuf::Closure* done);
226228

227229
// Get current options.
230+
void FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method,
231+
google::protobuf::RpcController* controller_base,
232+
const brpc::flatbuffers::Message* request,
233+
brpc::flatbuffers::Message* response,
234+
google::protobuf::Closure* done);
235+
228236
const ChannelOptions& options() const { return _options; }
229237

230238
void Describe(std::ostream&, const DescribeOptions&) const;
@@ -250,6 +258,19 @@ friend class SelectiveChannel;
250258
const char* raw_server_address,
251259
const ChannelOptions* options,
252260
int raw_port = -1);
261+
262+
template <bool is_pb>
263+
inline void CallMethodInternal(const typename std::conditional<is_pb,
264+
google::protobuf::MethodDescriptor,
265+
brpc::flatbuffers::MethodDescriptor>::type* method,
266+
google::protobuf::RpcController* controller_base,
267+
const typename std::conditional<is_pb,
268+
google::protobuf::Message,
269+
brpc::flatbuffers::Message>::type* request,
270+
typename std::conditional<is_pb,
271+
google::protobuf::Message,
272+
brpc::flatbuffers::Message>::type* response,
273+
google::protobuf::Closure* done);
253274

254275
std::string _service_name;
255276
std::string _scheme;

src/brpc/channel_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "butil/logging.h"
2525
#include <google/protobuf/service.h> // google::protobuf::RpcChannel
2626
#include "brpc/describable.h"
27+
#include "brpc/details/flatbuffers_common.h"
2728

2829
// To brpc developers: This is a header included by user, don't depend
2930
// on internal structures, use opaque pointers instead.

src/brpc/controller.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ void Controller::ResetPods() {
276276
_inheritable.Reset();
277277
_pchan_sub_count = 0;
278278
_response = NULL;
279+
_fb_response = NULL;
279280
_done = NULL;
280281
_sender = NULL;
281282
_request_code = 0;
@@ -285,6 +286,7 @@ void Controller::ResetPods() {
285286
_accessed = NULL;
286287
_pack_request = NULL;
287288
_method = NULL;
289+
_fb_method = NULL;
288290
_auth = NULL;
289291
_idl_names = idl_single_req_single_res;
290292
_idl_result = IDL_VOID_RESULT;
@@ -1211,7 +1213,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
12111213
// Make request
12121214
butil::IOBuf packet;
12131215
SocketMessage* user_packet = NULL;
1214-
_pack_request(&packet, &user_packet, cid.value, _method, this,
1216+
const void *method_desc = is_use_flatbuffer()? (const void*)_fb_method :
1217+
(const void*)_method;
1218+
_pack_request(&packet, &user_packet, cid.value, method_desc, this,
12151219
_request_buf, using_auth);
12161220
// TODO: PackRequest may accept SocketMessagePtr<>?
12171221
SocketMessagePtr<> user_packet_guard(user_packet);

src/brpc/controller.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "brpc/grpc.h"
4949
#include "brpc/kvmap.h"
5050
#include "brpc/rpc_dump.h"
51+
#include "brpc/details/flatbuffers_common.h"
5152

5253
// EAUTH is defined in MAC
5354
#ifndef EAUTH
@@ -152,6 +153,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
152153
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
153154
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
154155
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
156+
static const uint32_t FLAGS_USE_FLATBUFFER = (1 << 23);
155157

156158
public:
157159
struct Inheritable {
@@ -220,6 +222,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
220222

221223
// Response of the RPC call (passed to CallMethod)
222224
google::protobuf::Message* response() const { return _response; }
225+
226+
brpc::flatbuffers::Message* fb_response() const { return _fb_response; }
223227

224228
// An identifier to send to server along with request. This is widely used
225229
// throughout baidu's servers to tag a searching session (a series of
@@ -293,6 +297,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
293297
// Get the called method. May-be NULL for non-pb services.
294298
const google::protobuf::MethodDescriptor* method() const { return _method; }
295299

300+
const brpc::flatbuffers::MethodDescriptor* fb_method() const { return _fb_method; }
301+
296302
// Get the controllers for accessing sub channels in combo channels.
297303
// Ordinary channel:
298304
// sub_count() is 0 and sub() is always NULL.
@@ -650,6 +656,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
650656
// the received time of RPC is not recorded in the controller.
651657
int64_t get_rpc_received_us() const { return _rpc_received_us; }
652658

659+
void set_use_flatbuffer() { add_flag(FLAGS_USE_FLATBUFFER); }
660+
bool is_use_flatbuffer() const { return has_flag(FLAGS_USE_FLATBUFFER); }
661+
653662
private:
654663
struct CompletionInfo {
655664
CallId id; // call_id of the corresponding request
@@ -861,6 +870,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
861870
Inheritable _inheritable;
862871
int _pchan_sub_count;
863872
google::protobuf::Message* _response;
873+
brpc::flatbuffers::Message* _fb_response;
864874
google::protobuf::Closure* _done;
865875
RPCSender* _sender;
866876
uint64_t _request_code;
@@ -879,6 +889,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
879889
// Fields will be used when making requests
880890
Protocol::PackRequest _pack_request;
881891
const google::protobuf::MethodDescriptor* _method;
892+
const brpc::flatbuffers::MethodDescriptor* _fb_method;
882893
const Authenticator* _auth;
883894
butil::IOBuf _request_buf;
884895
IdlNames _idl_names;

src/brpc/details/controller_private_accessor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ class ControllerPrivateAccessor {
125125
void set_method(const google::protobuf::MethodDescriptor* method)
126126
{ _cntl->_method = method; }
127127

128+
void set_fb_method(const brpc::flatbuffers::MethodDescriptor* method)
129+
{ _cntl->_fb_method = method; }
130+
128131
void set_readable_progressive_attachment(ReadableProgressiveAttachment* s)
129132
{ _cntl->_rpa.reset(s); }
130133

src/brpc/details/server_private_accessor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ class ServerPrivateAccessor {
8686
return _server->FindServicePropertyByName(name);
8787
}
8888

89+
const Server::FlatBuffersMethodProperty* FindFlatBuffersMethodPropertyByIndex(
90+
uint32_t server_index, int method_index) const {
91+
return _server->FindFlatBuffersMethodPropertyByIndex(server_index, method_index);
92+
}
93+
8994
const Server::ServiceProperty*
9095
FindServicePropertyAdaptively(const butil::StringPiece& service_name) const {
9196
if (service_name.find('.') == butil::StringPiece::npos) {

src/brpc/global.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
#include "brpc/policy/nshead_mcpack_protocol.h"
8484
#include "brpc/policy/rtmp_protocol.h"
8585
#include "brpc/policy/esp_protocol.h"
86+
#include "brpc/policy/flatbuffers_protocol.h"
8687
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
8788
# include "brpc/policy/thrift_protocol.h"
8889
#endif
@@ -427,6 +428,15 @@ static void GlobalInitializeOrDieImpl() {
427428
exit(1);
428429
}
429430

431+
Protocol fb_protocol = { ParseFlatBuffersMessage,
432+
SerializeFlatBuffersRequest, PackFlatBuffersRequest,
433+
ProcessFlatBuffersRequest, ProcessFlatBuffersResponse,
434+
NULL, NULL, NULL,
435+
CONNECTION_TYPE_SINGLE, "fb_rpc" };
436+
if (RegisterProtocol(PROTOCOL_FLATBUFFERS_RPC, fb_protocol) != 0) {
437+
exit(1);
438+
}
439+
430440
Protocol streaming_protocol = { ParseStreamingMessage,
431441
NULL, NULL, ProcessStreamingMessage,
432442
ProcessStreamingMessage,

src/brpc/options.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ enum ProtocolType {
6565
PROTOCOL_ESP = 25; // Client side only
6666
PROTOCOL_H2 = 26;
6767
PROTOCOL_COUCHBASE = 27;
68+
PROTOCOL_FLATBUFFERS_RPC = 28;
6869
}
6970

7071
enum CompressType {

0 commit comments

Comments
 (0)