Skip to content

Commit 519fdbd

Browse files
committed
Refactor implementation of compress
1 parent 30945c3 commit 519fdbd

16 files changed

Lines changed: 394 additions & 467 deletions

src/brpc/compress.cpp

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
#include "json2pb/json_to_pb.h"
2121
#include "brpc/compress.h"
2222
#include "brpc/protocol.h"
23+
#include "brpc/proto_base.pb.h"
2324

2425
namespace brpc {
2526

2627
static const int MAX_HANDLER_SIZE = 1024;
27-
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = {
28-
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
29-
};
28+
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { { NULL, NULL, NULL } };
3029

3130
int RegisterCompressHandler(CompressType type,
3231
CompressHandler handler) {
@@ -85,10 +84,14 @@ bool ParseFromCompressedData(const butil::IOBuf& data,
8584
return ParsePbFromIOBuf(msg, data);
8685
}
8786
const CompressHandler* handler = FindCompressHandler(compress_type);
88-
if (NULL != handler) {
89-
return handler->Decompress(data, msg);
87+
if (NULL == handler) {
88+
return false;
9089
}
91-
return false;
90+
91+
Deserializer deserializer([msg](google::protobuf::io::ZeroCopyInputStream* input) {
92+
return msg->ParseFromZeroCopyStream(input);
93+
});
94+
return handler->Decompress(data, &deserializer);
9295
}
9396

9497
bool SerializeAsCompressedData(const google::protobuf::Message& msg,
@@ -98,10 +101,28 @@ bool SerializeAsCompressedData(const google::protobuf::Message& msg,
98101
return msg.SerializeToZeroCopyStream(&wrapper);
99102
}
100103
const CompressHandler* handler = FindCompressHandler(compress_type);
101-
if (NULL != handler) {
102-
return handler->Compress(msg, buf);
104+
if (NULL == handler) {
105+
return false;
103106
}
104-
return false;
107+
108+
Serializer serializer([&msg](google::protobuf::io::ZeroCopyOutputStream* output) {
109+
return msg.SerializeToZeroCopyStream(output);
110+
});
111+
return handler->Compress(serializer, buf);
112+
}
113+
114+
::google::protobuf::Metadata Serializer::GetMetadata() const {
115+
::google::protobuf::Metadata metadata{};
116+
metadata.descriptor = SerializerBase::descriptor();
117+
metadata.reflection = nullptr;
118+
return metadata;
119+
}
120+
121+
::google::protobuf::Metadata Deserializer::GetMetadata() const {
122+
::google::protobuf::Metadata metadata{};
123+
metadata.descriptor = DeserializerBase::descriptor();
124+
metadata.reflection = nullptr;
125+
return metadata;
105126
}
106127

107128
} // namespace brpc

src/brpc/compress.h

Lines changed: 133 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,151 @@
2121

2222
#include <google/protobuf/message.h> // Message
2323
#include "butil/iobuf.h" // butil::IOBuf
24-
#include "json2pb/pb_to_json.h"
25-
#include "json2pb/json_to_pb.h"
24+
#include "butil/logging.h"
2625
#include "brpc/options.pb.h" // CompressType
26+
#include "brpc/nonreflectable_message.h"
2727

2828
namespace brpc {
2929

30+
// Serializer can be used to implement custom serialization
31+
// before compression with user callback.
32+
class Serializer : public NonreflectableMessage<Serializer> {
33+
public:
34+
using Callback = std::function<bool(google::protobuf::io::ZeroCopyOutputStream*)>;
35+
36+
Serializer() :Serializer(NULL) {}
37+
38+
explicit Serializer(Callback callback)
39+
:_callback(std::move(callback)) {
40+
SharedCtor();
41+
}
42+
43+
~Serializer() override {
44+
SharedDtor();
45+
}
46+
47+
Serializer(const Serializer& from)
48+
: NonreflectableMessage(from) {
49+
SharedCtor();
50+
MergeFrom(from);
51+
}
52+
53+
Serializer& operator=(const Serializer& from) {
54+
CopyFrom(from);
55+
return *this;
56+
}
57+
58+
void Swap(Serializer* other) {
59+
if (other != this) {
60+
}
61+
}
62+
63+
void MergeFrom(const Serializer& from) override {
64+
CHECK_NE(&from, this);
65+
}
66+
67+
// implements Message ----------------------------------------------
68+
void Clear() override {
69+
_callback = nullptr;
70+
}
71+
size_t ByteSizeLong() const override { return 0; }
72+
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }
73+
74+
::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;
75+
76+
// Converts the data into `output' for later compression.
77+
bool SerializeTo(google::protobuf::io::ZeroCopyOutputStream* output) const {
78+
if (!_callback) {
79+
LOG(WARNING) << "CompressCallback::SerializeTo() called without converter";
80+
return false;
81+
}
82+
return _callback(output);
83+
}
84+
85+
void SetCallback(Callback callback) {
86+
_callback = std::move(callback);
87+
}
88+
89+
private:
90+
void SharedCtor() {}
91+
void SharedDtor() {}
92+
93+
Callback _callback;
94+
};
95+
96+
// Deserializer can be used to implement custom deserialization
97+
// after decompression with user callback.
98+
class Deserializer : public NonreflectableMessage<Deserializer> {
99+
public:
100+
public:
101+
using Callback = std::function<bool(google::protobuf::io::ZeroCopyInputStream*)>;
102+
103+
Deserializer() :Deserializer(NULL) {}
104+
105+
explicit Deserializer(Callback callback) : _callback(std::move(callback)) {
106+
SharedCtor();
107+
}
108+
109+
~Deserializer() override {
110+
SharedDtor();
111+
}
112+
113+
Deserializer(const Deserializer& from)
114+
: NonreflectableMessage(from) {
115+
SharedCtor();
116+
MergeFrom(from);
117+
}
118+
119+
Deserializer& operator=(const Deserializer& from) {
120+
CopyFrom(from);
121+
return *this;
122+
}
123+
124+
void Swap(Deserializer* other) {
125+
if (other != this) {
126+
_callback.swap(other->_callback);
127+
}
128+
}
129+
130+
void MergeFrom(const Deserializer& from) override {
131+
CHECK_NE(&from, this);
132+
_callback = from._callback;
133+
}
134+
135+
// implements Message ----------------------------------------------
136+
void Clear() override { _callback = nullptr; }
137+
size_t ByteSizeLong() const override { return 0; }
138+
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }
139+
140+
::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;
141+
142+
// Converts the decompressed `input'.
143+
bool DeserializeFrom(google::protobuf::io::ZeroCopyInputStream* intput) const {
144+
if (!_callback) {
145+
LOG(WARNING) << "Deserializer::DeserializeFrom() called without callback";
146+
return false;
147+
}
148+
return _callback(intput);
149+
}
150+
void SetCallback(Callback callback) {
151+
_callback = std::move(callback);
152+
}
153+
154+
private:
155+
void SharedCtor() {}
156+
void SharedDtor() {}
157+
158+
Callback _callback;
159+
};
160+
30161
struct CompressHandler {
31162
// Compress serialized `msg' into `buf'.
32163
// Returns true on success, false otherwise
33164
bool (*Compress)(const google::protobuf::Message& msg, butil::IOBuf* buf);
34-
bool (*Compress2Json)(const google::protobuf::Message& msg, butil::IOBuf* buf,
35-
const json2pb::Pb2JsonOptions& options);
36-
bool (*Compress2ProtoJson)(const google::protobuf::Message& msg, butil::IOBuf* buf,
37-
const json2pb::Pb2ProtoJsonOptions& options);
38-
bool (*Compress2ProtoText)(const google::protobuf::Message& msg, butil::IOBuf* buf);
39165

40166
// Parse decompressed `data' as `msg'.
41167
// Returns true on success, false otherwise
42168
bool (*Decompress)(const butil::IOBuf& data, google::protobuf::Message* msg);
43-
bool (*DecompressFromJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
44-
const json2pb::Json2PbOptions& options);
45-
bool (*DecompressFromProtoJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
46-
const json2pb::ProtoJson2PbOptions& options);
47-
bool (*DecompressFromProtoText)(const butil::IOBuf& data, google::protobuf::Message* msg);
48169

49170
// Name of the compression algorithm, must be string constant.
50171
const char* name;

src/brpc/global.cpp

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -388,27 +388,15 @@ static void GlobalInitializeOrDieImpl() {
388388
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
389389

390390
// Compress Handlers
391-
CompressHandler gzip_compress = {
392-
GzipCompress, GzipCompress2Json, GzipCompress2ProtoJson,
393-
GzipCompress2ProtoText, GzipDecompress, GzipDecompressFromJson,
394-
GzipDecompressFromProtoJson, GzipDecompressFromProtoText, "gzip"
395-
};
391+
CompressHandler gzip_compress = { GzipCompress, GzipDecompress, "gzip" };
396392
if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
397393
exit(1);
398394
}
399-
CompressHandler zlib_compress = {
400-
ZlibCompress, ZlibCompress2Json, ZlibCompress2ProtoJson,
401-
ZlibCompress2ProtoText, ZlibDecompress, ZlibDecompressFromJson,
402-
ZlibDecompressFromProtoJson, ZlibDecompressFromProtoText, "zlib"
403-
};
395+
CompressHandler zlib_compress = { ZlibCompress, ZlibDecompress, "zlib" };
404396
if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
405397
exit(1);
406398
}
407-
CompressHandler snappy_compress = {
408-
SnappyCompress, SnappyCompress2Json, SnappyCompress2ProtoJson,
409-
SnappyCompress2ProtoText, SnappyDecompress, SnappyDecompressFromJson,
410-
SnappyDecompressFromProtoJson, SnappyDecompressFromProtoText, "snappy"
411-
};
399+
CompressHandler snappy_compress = { SnappyCompress, SnappyDecompress, "snappy" };
412400
if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
413401
exit(1);
414402
}

src/brpc/memcache.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ MemcacheRequest::MemcacheRequest()
3232
}
3333

3434
MemcacheRequest::MemcacheRequest(const MemcacheRequest& from)
35-
: NonreflectableMessage<MemcacheRequest>() {
35+
: NonreflectableMessage<MemcacheRequest>(from) {
3636
SharedCtor();
3737
MergeFrom(from);
3838
}
@@ -143,7 +143,7 @@ MemcacheResponse::MemcacheResponse()
143143
}
144144

145145
MemcacheResponse::MemcacheResponse(const MemcacheResponse& from)
146-
: NonreflectableMessage<MemcacheResponse>() {
146+
: NonreflectableMessage<MemcacheResponse>(from) {
147147
SharedCtor();
148148
MergeFrom(from);
149149
}

src/brpc/nonreflectable_message.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <google/protobuf/generated_message_reflection.h>
2222
#include <google/protobuf/message.h>
2323

24-
#include "pb_compat.h"
24+
#include "brpc/pb_compat.h"
2525

2626
namespace brpc {
2727

src/brpc/nshead_message.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ NsheadMessage::NsheadMessage()
2828
}
2929

3030
NsheadMessage::NsheadMessage(const NsheadMessage& from)
31-
: NonreflectableMessage<NsheadMessage>() {
31+
: NonreflectableMessage<NsheadMessage>(from) {
3232
SharedCtor();
3333
MergeFrom(from);
3434
}

0 commit comments

Comments
 (0)