Skip to content

Commit d88ea67

Browse files
committed
Support various payload of baidu-std: json, proto-json and proto-text
1 parent 724e2fd commit d88ea67

23 files changed

Lines changed: 760 additions & 167 deletions

src/brpc/compress.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818

1919
#include "butil/logging.h"
20+
#include "json2pb/json_to_pb.h"
2021
#include "brpc/compress.h"
2122
#include "brpc/protocol.h"
2223

23-
2424
namespace brpc {
2525

2626
static const int MAX_HANDLER_SIZE = 1024;
27-
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { { NULL, NULL, NULL } };
27+
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = {
28+
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
29+
};
2830

2931
int RegisterCompressHandler(CompressType type,
3032
CompressHandler handler) {
@@ -47,7 +49,7 @@ int RegisterCompressHandler(CompressType type,
4749

4850
// Find CompressHandler by type.
4951
// Returns NULL if not found
50-
inline const CompressHandler* FindCompressHandler(CompressType type) {
52+
const CompressHandler* FindCompressHandler(CompressType type) {
5153
int index = type;
5254
if (index < 0 || index >= MAX_HANDLER_SIZE) {
5355
LOG(ERROR) << "CompressType=" << type << " is out of range";

src/brpc/compress.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
#include <google/protobuf/message.h> // Message
2323
#include "butil/iobuf.h" // butil::IOBuf
24+
#include "json2pb/pb_to_json.h"
25+
#include "json2pb/json_to_pb.h"
2426
#include "brpc/options.pb.h" // CompressType
2527

2628
namespace brpc {
@@ -29,10 +31,20 @@ struct CompressHandler {
2931
// Compress serialized `msg' into `buf'.
3032
// Returns true on success, false otherwise
3133
bool (*Compress)(const google::protobuf::Message& msg, butil::IOBuf* buf);
34+
bool (*Compress2Json)(const google::protobuf::Message& msg, butil::IOBuf* buf,
35+
const json2pb::Pb2JsonOptions& options);
36+
bool (*Compress2ProtoJson)(const google::protobuf::Message& msg, butil::IOBuf* buf,
37+
const json2pb::Pb2ProtoJsonOptions& options);
38+
bool (*Compress2ProtoText)(const google::protobuf::Message& msg, butil::IOBuf* buf);
3239

3340
// Parse decompressed `data' as `msg'.
3441
// Returns true on success, false otherwise
3542
bool (*Decompress)(const butil::IOBuf& data, google::protobuf::Message* msg);
43+
bool (*DecompressFromJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
44+
const json2pb::Json2PbOptions& options);
45+
bool (*DecompressFromProtoJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
46+
const json2pb::ProtoJson2PbOptions& options);
47+
bool (*DecompressFromProtoText)(const butil::IOBuf& data, google::protobuf::Message* msg);
3648

3749
// Name of the compression algorithm, must be string constant.
3850
const char* name;
@@ -42,6 +54,9 @@ struct CompressHandler {
4254
// Returns 0 on success, -1 otherwise
4355
int RegisterCompressHandler(CompressType type, CompressHandler handler);
4456

57+
// Returns CompressHandler pointer of `type' if registered, NULL otherwise.
58+
const CompressHandler* FindCompressHandler(CompressType type);
59+
4560
// Returns the `name' of the CompressType if registered
4661
const char* CompressTypeToCStr(CompressType type);
4762

src/brpc/controller.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ void Controller::ResetPods() {
290290
_http_response = NULL;
291291
_request_user_fields = NULL;
292292
_response_user_fields = NULL;
293+
_request_content_type = CONTENT_TYPE_PB;
294+
_response_content_type = CONTENT_TYPE_PB;
293295
_request_streams.clear();
294296
_response_streams.clear();
295297
_remote_stream_settings = NULL;

src/brpc/controller.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,20 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
616616

617617
void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res);
618618

619+
void set_request_content_type(ContentType type) {
620+
_request_content_type = type;
621+
}
622+
ContentType request_content_type() const {
623+
return _request_content_type;
624+
}
625+
626+
void set_response_content_type(ContentType type) {
627+
_response_content_type = type;
628+
}
629+
ContentType response_content_type() const {
630+
return _response_content_type;
631+
}
632+
619633
private:
620634
struct CompletionInfo {
621635
CallId id; // call_id of the corresponding request
@@ -859,6 +873,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
859873
butil::IOBuf _request_attachment;
860874
butil::IOBuf _response_attachment;
861875

876+
// Only SerializedRequest supports `_request_content_type'.
877+
ContentType _request_content_type;
878+
// Only SerializedResponse supports `_response_content_type'.
879+
ContentType _response_content_type;
880+
862881
// Writable progressive attachment
863882
butil::intrusive_ptr<ProgressiveAttachment> _wpa;
864883
// Readable progressive attachment

src/brpc/global.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -388,25 +388,34 @@ static void GlobalInitializeOrDieImpl() {
388388
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
389389

390390
// Compress Handlers
391-
const CompressHandler gzip_compress =
392-
{ GzipCompress, GzipDecompress, "gzip" };
391+
CompressHandler gzip_compress = {
392+
GzipCompress, GzipCompress2Json, GzipCompress2ProtoJson,
393+
GzipCompress2ProtoText, GzipDecompress, GzipDecompressFromJson,
394+
GzipDecompressFromProtoJson, GzipDecompressFromProtoText, "gzip"
395+
};
393396
if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
394397
exit(1);
395398
}
396-
const CompressHandler zlib_compress =
397-
{ ZlibCompress, ZlibDecompress, "zlib" };
399+
CompressHandler zlib_compress = {
400+
ZlibCompress, ZlibCompress2Json, ZlibCompress2ProtoJson,
401+
ZlibCompress2ProtoText, ZlibDecompress, ZlibDecompressFromJson,
402+
ZlibDecompressFromProtoJson, ZlibDecompressFromProtoText, "zlib"
403+
};
398404
if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
399405
exit(1);
400406
}
401-
const CompressHandler snappy_compress =
402-
{ SnappyCompress, SnappyDecompress, "snappy" };
407+
CompressHandler snappy_compress = {
408+
SnappyCompress, SnappyCompress2Json, SnappyCompress2ProtoJson,
409+
SnappyCompress2ProtoText, SnappyDecompress, SnappyDecompressFromJson,
410+
SnappyDecompressFromProtoJson, SnappyDecompressFromProtoText, "snappy"
411+
};
403412
if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
404413
exit(1);
405414
}
406415

407416
// Protocols
408417
Protocol baidu_protocol = { ParseRpcMessage,
409-
SerializeRequestDefault, PackRpcRequest,
418+
SerializeRpcRequest, PackRpcRequest,
410419
ProcessRpcRequest, ProcessRpcResponse,
411420
VerifyRpcRequest, NULL, NULL,
412421
CONNECTION_TYPE_ALL, "baidu_std" };

src/brpc/options.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ enum CompressType {
7474
COMPRESS_TYPE_LZ4 = 4;
7575
}
7676

77+
enum ContentType {
78+
CONTENT_TYPE_PB = 0;
79+
CONTENT_TYPE_JSON = 1;
80+
CONTENT_TYPE_PROTO_JSON = 2;
81+
CONTENT_TYPE_PROTO_TEXT = 3;
82+
}
83+
7784
message ChunkInfo {
7885
required int64 stream_id = 1;
7986
required int64 chunk_id = 2;

src/brpc/policy/baidu_rpc_meta.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ message RpcMeta {
3333
optional bytes authentication_data = 7;
3434
optional StreamSettings stream_settings = 8;
3535
map<string, string> user_fields = 9;
36+
optional ContentType content_type = 10;
3637
}
3738

3839
message RpcRequestMeta {

0 commit comments

Comments
 (0)