Skip to content

Commit f91cc2b

Browse files
committed
support checksum
1 parent c47ec71 commit f91cc2b

15 files changed

Lines changed: 457 additions & 60 deletions

example/echo_c++/client.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing");
3131
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
3232
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
3333
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
34+
DEFINE_bool(enable_checksum, false, "Enable checksum or not");
3435

3536
int main(int argc, char* argv[]) {
3637
// Parse gflags. We recommend you to use gflags as well.
@@ -71,6 +72,11 @@ int main(int argc, char* argv[]) {
7172
// being serialized into protobuf messages.
7273
cntl.request_attachment().append(FLAGS_attachment);
7374

75+
// Use checksum, only support CRC32C now.
76+
if (FLAGS_enable_checksum) {
77+
cntl.set_request_checksum_type(brpc::CHECKSUM_TYPE_CRC32C);
78+
}
79+
7480
// Because `done'(last parameter) is NULL, this function waits until
7581
// the response comes back or error occurs(including timedout).
7682
stub.Echo(&cntl, &request, &response, NULL);

example/echo_c++/server.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS."
2929
" If this is set, the flag port will be ignored");
3030
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
3131
"read/write operations during the last `idle_timeout_s'");
32+
DEFINE_bool(enable_checksum, false, "Enable checksum or not");
3233

3334
// Your implementation of example::EchoService
3435
// Notice that implementing brpc::Describable grants the ability to put
@@ -75,6 +76,11 @@ class EchoServiceImpl : public EchoService {
7576
// being serialized into protobuf messages.
7677
cntl->response_attachment().append(cntl->request_attachment());
7778
}
79+
80+
// Use checksum, only support CRC32C now.
81+
if (FLAGS_enable_checksum) {
82+
cntl->set_response_checksum_type(brpc::CHECKSUM_TYPE_CRC32C);
83+
}
7884
}
7985

8086
// optional

src/brpc/checksum.cpp

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "brpc/checksum.h"
19+
20+
#include "brpc/protocol.h"
21+
#include "butil/logging.h"
22+
23+
namespace brpc {
24+
25+
static const int MAX_HANDLER_SIZE = 1024;
26+
static ChecksumHandler s_handler_map[MAX_HANDLER_SIZE] = {{NULL, NULL, NULL}};
27+
28+
int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler) {
29+
if (NULL == handler.Compute) {
30+
LOG(FATAL) << "Invalid parameter: handler function is NULL";
31+
return -1;
32+
}
33+
int index = type;
34+
if (index < 0 || index >= MAX_HANDLER_SIZE) {
35+
LOG(FATAL) << "ChecksumType=" << type << " is out of range";
36+
return -1;
37+
}
38+
if (s_handler_map[index].Compute != NULL) {
39+
LOG(FATAL) << "ChecksumType=" << type << " was registered";
40+
return -1;
41+
}
42+
s_handler_map[index] = handler;
43+
return 0;
44+
}
45+
46+
// Find ChecksumHandler by type.
47+
// Returns NULL if not found
48+
inline const ChecksumHandler* FindChecksumHandler(ChecksumType type) {
49+
int index = type;
50+
if (index < 0 || index >= MAX_HANDLER_SIZE) {
51+
LOG(ERROR) << "ChecksumType=" << type << " is out of range";
52+
return NULL;
53+
}
54+
if (NULL == s_handler_map[index].Compute) {
55+
return NULL;
56+
}
57+
return &s_handler_map[index];
58+
}
59+
60+
const char* ChecksumTypeToCStr(ChecksumType type) {
61+
if (type == CHECKSUM_TYPE_NONE) {
62+
return "none";
63+
}
64+
const ChecksumHandler* handler = FindChecksumHandler(type);
65+
return (handler != NULL ? handler->name : "unknown");
66+
}
67+
68+
void ListChecksumHandler(std::vector<ChecksumHandler>* vec) {
69+
vec->clear();
70+
for (int i = 0; i < MAX_HANDLER_SIZE; ++i) {
71+
if (s_handler_map[i].Compute != NULL) {
72+
vec->push_back(s_handler_map[i]);
73+
}
74+
}
75+
}
76+
77+
// Compute `data' checksum
78+
void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) {
79+
if (checksum_type == CHECKSUM_TYPE_NONE) {
80+
return;
81+
}
82+
const ChecksumHandler* handler = FindChecksumHandler(checksum_type);
83+
if (NULL != handler) {
84+
handler->Compute(in);
85+
}
86+
}
87+
88+
// Verify `data' checksum Returns true on success, false otherwise
89+
bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type) {
90+
if (checksum_type == CHECKSUM_TYPE_NONE) {
91+
return true;
92+
}
93+
const ChecksumHandler* handler = FindChecksumHandler(checksum_type);
94+
if (NULL != handler) {
95+
return handler->Verify(in);
96+
}
97+
return true;
98+
}
99+
100+
} // namespace brpc

src/brpc/checksum.h

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#ifndef BRPC_CHECKSUM_H
19+
#define BRPC_CHECKSUM_H
20+
21+
#include <google/protobuf/message.h> // Message
22+
23+
#include "brpc/controller.h"
24+
#include "brpc/options.pb.h" // ChecksumType
25+
#include "butil/iobuf.h" // butil::IOBuf
26+
27+
namespace brpc {
28+
29+
struct ChecksumIn {
30+
const butil::IOBuf* buf;
31+
Controller* cntl;
32+
};
33+
34+
struct ChecksumHandler {
35+
// checksum `buf'.
36+
// Returns checksum value
37+
void (*Compute)(const ChecksumIn& in);
38+
39+
// verify buf checksum
40+
// Rerturn true on success, false otherwise
41+
bool (*Verify)(const ChecksumIn& in);
42+
43+
// Name of the checksum algorithm, must be string constant.
44+
const char* name;
45+
};
46+
47+
// [NOT thread-safe] Register `handler' using key=`type'
48+
// Returns 0 on success, -1 otherwise
49+
int RegisterChecksumHandler(ChecksumType type, ChecksumHandler handler);
50+
51+
// Returns the `name' of the checksumType if registered
52+
const char* ChecksumTypeToCStr(ChecksumType type);
53+
54+
// Put all registered handlers into `vec'.
55+
void ListChecksumHandler(std::vector<ChecksumHandler>* vec);
56+
57+
// Compute `data' checksum and set to controller
58+
void ComputeDataChecksum(const ChecksumIn& in, ChecksumType checksum_type);
59+
60+
// Verify `data' checksum Returns true on success, false otherwise
61+
bool VerifyDataChecksum(const ChecksumIn& in, ChecksumType checksum_type);
62+
63+
} // namespace brpc
64+
65+
#endif // BRPC_CHECKSUM_H

src/brpc/controller.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ void Controller::ResetPods() {
269269
_preferred_index = -1;
270270
_request_compress_type = COMPRESS_TYPE_NONE;
271271
_response_compress_type = COMPRESS_TYPE_NONE;
272+
_request_checksum_type = CHECKSUM_TYPE_NONE;
273+
_response_checksum_type = CHECKSUM_TYPE_NONE;
272274
_fail_limit = UNSET_MAGIC_NUM;
273275
_pipelined_count = 0;
274276
_inheritable.Reset();
@@ -1346,6 +1348,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const {
13461348
s->tos = _tos;
13471349
s->connection_type = _connection_type;
13481350
s->request_compress_type = _request_compress_type;
1351+
s->request_checksum_type = _request_checksum_type;
13491352
s->log_id = log_id();
13501353
s->has_request_code = has_request_code();
13511354
s->request_code = _request_code;
@@ -1359,6 +1362,7 @@ void Controller::ApplyClientSettings(const ClientSettings& s) {
13591362
set_type_of_service(s.tos);
13601363
set_connection_type(s.connection_type);
13611364
set_request_compress_type(s.request_compress_type);
1365+
set_request_checksum_type(s.request_checksum_type);
13621366
set_log_id(s.log_id);
13631367
set_flag(FLAGS_REQUEST_CODE, s.has_request_code);
13641368
_request_code = s.request_code;

src/brpc/controller.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
241241
// Set compression method for request.
242242
void set_request_compress_type(CompressType t) { _request_compress_type = t; }
243243

244+
// Set checksum type for request.
245+
void set_request_checksum_type(ChecksumType t) { _request_checksum_type = t; }
246+
244247
// Required by some load balancers.
245248
void set_request_code(uint64_t request_code) {
246249
add_flag(FLAGS_REQUEST_CODE);
@@ -464,6 +467,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
464467

465468
// Set compression method for response.
466469
void set_response_compress_type(CompressType t) { _response_compress_type = t; }
470+
471+
// Set checksum type for response.
472+
void set_response_checksum_type(ChecksumType t) { _response_checksum_type = t; }
467473

468474
// Non-zero when this RPC call is traced (by rpcz or rig).
469475
// NOTE: Only valid at server-side, always zero at client-side.
@@ -552,6 +558,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
552558
const std::string& request_id() const { return _inheritable.request_id; }
553559
CompressType request_compress_type() const { return _request_compress_type; }
554560
CompressType response_compress_type() const { return _response_compress_type; }
561+
ChecksumType request_checksum_type() const { return _request_checksum_type; }
562+
ChecksumType response_checksum_type() const { return _response_checksum_type; }
555563
const HttpHeader& http_request() const
556564
{ return _http_request != NULL ? *_http_request : DefaultHttpHeader(); }
557565

@@ -693,6 +701,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
693701
int32_t tos;
694702
ConnectionType connection_type;
695703
CompressType request_compress_type;
704+
ChecksumType request_checksum_type;
696705
uint64_t log_id;
697706
bool has_request_code;
698707
int64_t request_code;
@@ -834,6 +843,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
834843
int _preferred_index;
835844
CompressType _request_compress_type;
836845
CompressType _response_compress_type;
846+
ChecksumType _request_checksum_type;
847+
ChecksumType _response_checksum_type;
848+
std::string _checksum_value;
837849
Inheritable _inheritable;
838850
int _pchan_sub_count;
839851
google::protobuf::Message* _response;

src/brpc/details/controller_private_accessor.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ class ControllerPrivateAccessor {
152152
return *this;
153153
}
154154

155+
void set_checksum_value(const char* c, size_t size) {
156+
_cntl->_checksum_value.assign(c, size);
157+
}
158+
159+
void set_checksum_value(const std::string& c) {
160+
_cntl->_checksum_value = c;
161+
}
162+
163+
const std::string& checksum_value() const { return _cntl->_checksum_value; }
164+
155165
private:
156166
Controller* _cntl;
157167
};

src/brpc/global.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@
6060
#include "brpc/policy/gzip_compress.h"
6161
#include "brpc/policy/snappy_compress.h"
6262

63+
// Checksum handlers
64+
#include "brpc/checksum.h"
65+
#include "brpc/policy/crc32c_checksum.h"
66+
6367
// Protocols
6468
#include "brpc/protocol.h"
6569
#include "brpc/policy/baidu_rpc_protocol.h"
@@ -401,6 +405,13 @@ static void GlobalInitializeOrDieImpl() {
401405
exit(1);
402406
}
403407

408+
// Checksum Handlers
409+
const ChecksumHandler crc32c_checksum = {Crc32cCompute, Crc32cVerify,
410+
"crc32c"};
411+
if (RegisterChecksumHandler(CHECKSUM_TYPE_CRC32C, crc32c_checksum) != 0) {
412+
exit(1);
413+
}
414+
404415
// Protocols
405416
Protocol baidu_protocol = { ParseRpcMessage,
406417
SerializeRpcRequest, PackRpcRequest,

src/brpc/options.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ enum CompressType {
7474
COMPRESS_TYPE_LZ4 = 4;
7575
}
7676

77+
enum ChecksumType {
78+
CHECKSUM_TYPE_NONE = 0;
79+
CHECKSUM_TYPE_CRC32C = 1;
80+
}
81+
7782
enum ContentType {
7883
CONTENT_TYPE_PB = 0;
7984
CONTENT_TYPE_JSON = 1;

src/brpc/policy/baidu_rpc_meta.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ message RpcMeta {
3434
optional StreamSettings stream_settings = 8;
3535
map<string, string> user_fields = 9;
3636
optional ContentType content_type = 10;
37+
optional int32 checksum_type = 11;
38+
optional bytes checksum_value = 12;
3739
}
3840

3941
message RpcRequestMeta {

0 commit comments

Comments
 (0)