Skip to content

Commit c2f26ea

Browse files
committed
Add flatbuffers message construction logic
Key components: - flatbuffers_common.h: Defines abstract interfaces (RpcChannel and Service) for FlatBuffers-based RPC communication, similar to protobuf's RPC interfaces. - flatbuffers_impl.h: Implements FlatBuffers-specific message handling: * SlabAllocator: Custom allocator using SingleIOBuf for zero-copy operations. * Message: Wrapper for FlatBuffers messages with SingleIOBuf storage. * MessageBuilder: BRPC-specific FlatBufferBuilder with SlabAllocator. * ServiceDescriptor/MethodDescriptor: Service introspection support. - flatbuffers_impl.cpp: Implementation of allocation, serialization, and service descriptor initialization logic
1 parent 39a3436 commit c2f26ea

11 files changed

Lines changed: 1419 additions & 2 deletions

File tree

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
cmake_minimum_required(VERSION 2.8.10)
2+
project(benchmark_fb C CXX)
3+
4+
option(LINK_SO "Whether examples are linked dynamically" OFF)
5+
option(WITH_ASAN "With AddressSanitizer" OFF)
6+
7+
execute_process(
8+
COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
9+
OUTPUT_VARIABLE OUTPUT_PATH
10+
)
11+
12+
set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
13+
14+
include(FindThreads)
15+
include(FindProtobuf)
16+
17+
# include current directory for generated files
18+
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
19+
20+
# Search for libthrift* by best effort. If it is not found and brpc is
21+
# compiled with thrift protocol enabled, a link error would be reported.
22+
find_library(THRIFT_LIB NAMES thrift)
23+
if (NOT THRIFT_LIB)
24+
set(THRIFT_LIB "")
25+
endif()
26+
27+
find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
28+
if(LINK_SO)
29+
find_library(BRPC_LIB NAMES brpc)
30+
else()
31+
find_library(BRPC_LIB NAMES libbrpc.a brpc)
32+
endif()
33+
if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
34+
message(FATAL_ERROR "Fail to find brpc")
35+
endif()
36+
include_directories(${BRPC_INCLUDE_PATH})
37+
38+
find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
39+
find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
40+
if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
41+
message(FATAL_ERROR "Fail to find gflags")
42+
endif()
43+
include_directories(${GFLAGS_INCLUDE_PATH})
44+
45+
# Find FlatBuffers
46+
find_path(FLATBUFFERS_INCLUDE_PATH flatbuffers/flatbuffers.h)
47+
find_library(FLATBUFFERS_LIBRARY NAMES flatbuffers)
48+
if((NOT FLATBUFFERS_INCLUDE_PATH) OR (NOT FLATBUFFERS_LIBRARY))
49+
message(FATAL_ERROR "Fail to find flatbuffers")
50+
endif()
51+
include_directories(${FLATBUFFERS_INCLUDE_PATH})
52+
53+
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
54+
include(CheckFunctionExists)
55+
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
56+
if(NOT HAVE_CLOCK_GETTIME)
57+
set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
58+
endif()
59+
endif()
60+
61+
# set(CMAKE_CXX_FLAGS "${DEFINE_CLOCK_GETTIME} -g -O0 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
62+
set(CMAKE_CXX_FLAGS "${DEFINE_CLOCK_GETTIME} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
63+
64+
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
65+
66+
if (WITH_ASAN)
67+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
68+
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
69+
endif()
70+
71+
if(CMAKE_VERSION VERSION_LESS "3.1.3")
72+
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
73+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
74+
endif()
75+
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
76+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
77+
endif()
78+
else()
79+
set(CMAKE_CXX_STANDARD 11)
80+
set(CMAKE_CXX_STANDARD_REQUIRED ON)
81+
endif()
82+
83+
find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
84+
find_library(LEVELDB_LIB NAMES leveldb)
85+
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
86+
message(FATAL_ERROR "Fail to find leveldb")
87+
endif()
88+
include_directories(${LEVELDB_INCLUDE_PATH})
89+
90+
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
91+
set(OPENSSL_ROOT_DIR
92+
"/usr/local/opt/openssl" # Homebrew installed OpenSSL
93+
)
94+
endif()
95+
96+
find_package(OpenSSL)
97+
include_directories(${OPENSSL_INCLUDE_DIR})
98+
99+
set(DYNAMIC_LIB
100+
${CMAKE_THREAD_LIBS_INIT}
101+
${GFLAGS_LIBRARY}
102+
${PROTOBUF_LIBRARIES}
103+
${LEVELDB_LIB}
104+
${OPENSSL_CRYPTO_LIBRARY}
105+
${OPENSSL_SSL_LIBRARY}
106+
${FLATBUFFERS_LIBRARY}
107+
${THRIFT_LIB}
108+
dl
109+
)
110+
111+
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
112+
set(DYNAMIC_LIB ${DYNAMIC_LIB}
113+
pthread
114+
"-framework CoreFoundation"
115+
"-framework CoreGraphics"
116+
"-framework CoreData"
117+
"-framework CoreText"
118+
"-framework Security"
119+
"-framework Foundation"
120+
"-Wl,-U,_MallocExtension_ReleaseFreeMemory"
121+
"-Wl,-U,_ProfilerStart"
122+
"-Wl,-U,_ProfilerStop"
123+
"-Wl,-U,__Z13GetStackTracePPvii"
124+
"-Wl,-U,_mallctl"
125+
"-Wl,-U,_malloc_stats_print"
126+
)
127+
endif()
128+
129+
set(FLATBUFFERS_SOURCES
130+
test.brpc.fb.cpp
131+
test_generated.h
132+
test.brpc.fb.h
133+
)
134+
135+
add_executable(client client.cpp ${FLATBUFFERS_SOURCES})
136+
add_executable(server server.cpp ${FLATBUFFERS_SOURCES})
137+
138+
target_link_libraries(client ${BRPC_LIB} ${DYNAMIC_LIB})
139+
target_link_libraries(server ${BRPC_LIB} ${DYNAMIC_LIB})

example/benchmark_fb/client.cpp

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#include <gflags/gflags.h>
2+
#include <bthread/bthread.h>
3+
#include <butil/logging.h>
4+
#include <brpc/server.h>
5+
#include <brpc/channel.h>
6+
#include <bvar/bvar.h>
7+
8+
#include "test.brpc.fb.h"
9+
10+
11+
DEFINE_int32(thread_num, 1, "Number of threads to send requests");
12+
DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests");
13+
DEFINE_int32(request_size, 16, "Bytes of each request");
14+
DEFINE_string(servers, "0.0.0.0:8002", "IP Address of server");
15+
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
16+
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
17+
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
18+
19+
std::string g_request;
20+
butil::IOBuf g_attachment;
21+
22+
bvar::LatencyRecorder g_latency_recorder("client");
23+
bvar::LatencyRecorder g_msg_recorder("msg");
24+
bvar::Adder<int> g_error_count("client_error_count");
25+
26+
static void* sender(void* arg) {
27+
test::BenchmarkServiceStub stub(static_cast<brpc::Channel*>(arg));
28+
int log_id = 0;
29+
while (!brpc::IsAskedToQuit()) {
30+
brpc::Controller cntl;
31+
brpc::flatbuffers::Message response;
32+
33+
cntl.set_log_id(log_id++);
34+
cntl.request_attachment().append(g_attachment);
35+
36+
uint64_t msg_begin_ns = butil::cpuwide_time_ns();
37+
brpc::flatbuffers::MessageBuilder mb;
38+
auto message = mb.CreateString(g_request);
39+
auto req = test::CreateBenchmarkRequest(mb, 123, 333, 1111, 2222, 0, message);
40+
mb.Finish(req);
41+
brpc::flatbuffers::Message request = mb.ReleaseMessage();
42+
43+
uint64_t msg_end_ns = butil::cpuwide_time_ns();
44+
stub.Test(&cntl, &request, &response, NULL);
45+
46+
if (!cntl.Failed()) {
47+
g_latency_recorder << cntl.latency_us();
48+
g_msg_recorder << (msg_end_ns - msg_begin_ns);
49+
} else {
50+
g_error_count << 1;
51+
CHECK(brpc::IsAskedToQuit())
52+
<< "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us();
53+
// We can't connect to the server, sleep a while. Notice that this
54+
// is a specific sleeping to prevent this thread from spinning too
55+
// fast. You should continue the business logic in a production
56+
// server rather than sleeping.
57+
bthread_usleep(50000);
58+
}
59+
}
60+
return NULL;
61+
}
62+
63+
int main(int argc, char* argv[]) {
64+
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
65+
// Print parameter information in one line
66+
LOG(INFO) << "Parameters - request_size : " << FLAGS_request_size
67+
<< ", attachment_size: " << FLAGS_attachment_size
68+
<< ", thread_num: " << FLAGS_thread_num;
69+
70+
// A Channel represents a communication line to a Server. Notice that
71+
// Channel is thread-safe and can be shared by all threads in your program.
72+
brpc::Channel channel;
73+
74+
// Initialize the channel, NULL means using default options.
75+
brpc::ChannelOptions options;
76+
options.protocol = "fb_rpc";
77+
options.connection_type = "";
78+
options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
79+
options.timeout_ms = FLAGS_timeout_ms;
80+
options.max_retry = FLAGS_max_retry;
81+
if (channel.Init(FLAGS_servers.c_str(), &options) != 0) {
82+
LOG(ERROR) << "Fail to initialize channel";
83+
return -1;
84+
}
85+
if (FLAGS_attachment_size > 0) {
86+
void* _attachment_addr = malloc(FLAGS_attachment_size);
87+
if (!_attachment_addr) {
88+
LOG(ERROR) << "Fail to alloc _attachment from system heap";
89+
return -1;
90+
}
91+
g_attachment.append(_attachment_addr, FLAGS_attachment_size);
92+
free(_attachment_addr);
93+
}
94+
if (FLAGS_request_size < 0) {
95+
LOG(ERROR) << "Bad request_size=" << FLAGS_request_size;
96+
return -1;
97+
}
98+
g_request.resize(FLAGS_request_size, 'r');
99+
100+
if (FLAGS_dummy_port >= 0) {
101+
brpc::StartDummyServerAt(FLAGS_dummy_port);
102+
}
103+
104+
std::vector<bthread_t> bids;
105+
bids.resize(FLAGS_thread_num);
106+
for (int i = 0; i < FLAGS_thread_num; ++i) {
107+
if (bthread_start_background(&bids[i], NULL, sender, &channel) != 0) {
108+
LOG(ERROR) << "Fail to create bthread";
109+
return -1;
110+
}
111+
}
112+
113+
while (!brpc::IsAskedToQuit()) {
114+
sleep(1);
115+
LOG(INFO) << "Sending EchoRequest at qps=" << (g_latency_recorder.qps(1) / 1000)
116+
<< "k latency=" << g_latency_recorder.latency(1) << "us"
117+
<< " msg latency=" << g_msg_recorder.latency(1) << "ns";
118+
}
119+
120+
LOG(INFO) << "EchoClient is going to quit";
121+
for (int i = 0; i < FLAGS_thread_num; ++i) {
122+
bthread_join(bids[i], NULL);
123+
}
124+
125+
LOG(INFO) << "Average QPS: " << (g_latency_recorder.qps()/1000) << "k"
126+
<< " Average latency: " << g_latency_recorder.latency() << "us"
127+
<< " msg latency: " << g_msg_recorder.latency() << "ns";
128+
129+
return 0;
130+
}

example/benchmark_fb/server.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include <gflags/gflags.h>
2+
#include <butil/logging.h>
3+
#include <brpc/server.h>
4+
#include "test.brpc.fb.h"
5+
6+
DEFINE_bool(echo_attachment, true, "Echo attachment as well");
7+
DEFINE_int32(port, 8080, "TCP Port of this server");
8+
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
9+
"read/write operations during the last `idle_timeout_s'");
10+
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
11+
DEFINE_int32(internal_port, -1, "Only allow builtin services at this port");
12+
13+
namespace test{
14+
class BenchmarkServiceImpl : public BenchmarkService {
15+
public:
16+
BenchmarkServiceImpl() {}
17+
~BenchmarkServiceImpl() {}
18+
19+
void Test(google::protobuf::RpcController* controller,
20+
const brpc::flatbuffers::Message* request_base,
21+
brpc::flatbuffers::Message* response,
22+
google::protobuf::Closure* done) {
23+
brpc::ClosureGuard done_guard(done);
24+
brpc::Controller* cntl =
25+
static_cast<brpc::Controller*>(controller);
26+
const test::BenchmarkRequest* request = request_base->GetRoot<test::BenchmarkRequest>();
27+
// Set Response Message
28+
brpc::flatbuffers::MessageBuilder mb_;
29+
const char *req_str = request->message()->c_str();
30+
auto message = mb_.CreateString(req_str);
31+
auto resp = test::CreateBenchmarkResponse(mb_, request->opcode(),
32+
request->echo_attachment(), request->attachment_size(),
33+
request->request_id(),request->reserved(), message);
34+
mb_.Finish(resp);
35+
*response = mb_.ReleaseMessage();
36+
if (FLAGS_echo_attachment) {
37+
cntl->response_attachment().append(cntl->request_attachment());
38+
}
39+
}
40+
};
41+
42+
}
43+
44+
int main(int argc, char* argv[]) {
45+
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
46+
47+
// Generally you only need one Server.
48+
brpc::Server server;
49+
50+
// Instance of your service.
51+
test::BenchmarkServiceImpl benchmark_service_impl;
52+
53+
if (server.AddService(&benchmark_service_impl,
54+
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
55+
LOG(ERROR) << "Fail to add service";
56+
return -1;
57+
}
58+
59+
// Start the server.
60+
brpc::ServerOptions options;
61+
options.idle_timeout_sec = FLAGS_idle_timeout_s;
62+
options.max_concurrency = FLAGS_max_concurrency;
63+
options.internal_port = FLAGS_internal_port;
64+
65+
if (server.Start(FLAGS_port, &options) != 0) {
66+
LOG(ERROR) << "Fail to start EchoServer";
67+
return -1;
68+
}
69+
70+
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
71+
server.RunUntilAskedToQuit();
72+
return 0;
73+
74+
}

0 commit comments

Comments
 (0)