Skip to content

Commit fb24ab3

Browse files
committed
Refactor implementation of compress
1 parent 30945c3 commit fb24ab3

17 files changed

Lines changed: 394 additions & 458 deletions

src/brpc/compress.cpp

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
#include "json2pb/json_to_pb.h"
2121
#include "brpc/compress.h"
2222
#include "brpc/protocol.h"
23+
#include "brpc/proto_base.pb.h"
2324

2425
namespace brpc {
2526

2627
static const int MAX_HANDLER_SIZE = 1024;
27-
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = {
28-
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
29-
};
28+
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { { NULL, NULL, NULL } };
3029

3130
int RegisterCompressHandler(CompressType type,
3231
CompressHandler handler) {
@@ -85,10 +84,14 @@ bool ParseFromCompressedData(const butil::IOBuf& data,
8584
return ParsePbFromIOBuf(msg, data);
8685
}
8786
const CompressHandler* handler = FindCompressHandler(compress_type);
88-
if (NULL != handler) {
89-
return handler->Decompress(data, msg);
87+
if (NULL == handler) {
88+
return false;
9089
}
91-
return false;
90+
auto converter = [msg](google::protobuf::io::ZeroCopyInputStream* input) {
91+
return msg->ParseFromZeroCopyStream(input);
92+
};
93+
Deserializer callback(converter);
94+
return handler->Decompress(data, &callback);
9295
}
9396

9497
bool SerializeAsCompressedData(const google::protobuf::Message& msg,
@@ -98,10 +101,28 @@ bool SerializeAsCompressedData(const google::protobuf::Message& msg,
98101
return msg.SerializeToZeroCopyStream(&wrapper);
99102
}
100103
const CompressHandler* handler = FindCompressHandler(compress_type);
101-
if (NULL != handler) {
102-
return handler->Compress(msg, buf);
104+
if (NULL == handler) {
105+
return false;
103106
}
104-
return false;
107+
auto converter = [&msg](google::protobuf::io::ZeroCopyOutputStream* output) {
108+
return msg.SerializeToZeroCopyStream(output);
109+
};
110+
Serializer callback(converter);
111+
return handler->Compress(callback, buf);
112+
}
113+
114+
::google::protobuf::Metadata Serializer::GetMetadata() const {
115+
::google::protobuf::Metadata metadata{};
116+
metadata.descriptor = SerializerBase::descriptor();
117+
metadata.reflection = nullptr;
118+
return metadata;
119+
}
120+
121+
::google::protobuf::Metadata Deserializer::GetMetadata() const {
122+
::google::protobuf::Metadata metadata{};
123+
metadata.descriptor = DeserializerBase::descriptor();
124+
metadata.reflection = nullptr;
125+
return metadata;
105126
}
106127

107128
} // namespace brpc

src/brpc/compress.h

Lines changed: 132 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,149 @@
2424
#include "json2pb/pb_to_json.h"
2525
#include "json2pb/json_to_pb.h"
2626
#include "brpc/options.pb.h" // CompressType
27+
#include "brpc/nonreflectable_message.h"
2728

2829
namespace brpc {
2930

31+
// Serializer can be used to implement custom serialization
32+
// before compression with user callback.
33+
class Serializer : public NonreflectableMessage<Serializer> {
34+
public:
35+
using Callback = std::function<bool(google::protobuf::io::ZeroCopyOutputStream*)>;
36+
37+
Serializer() :Serializer(NULL) {}
38+
39+
explicit Serializer(Callback callback)
40+
:_callback(std::move(callback)) {
41+
SharedCtor();
42+
}
43+
44+
~Serializer() override {
45+
SharedDtor();
46+
}
47+
48+
Serializer(const Serializer& from)
49+
: NonreflectableMessage(from) {
50+
SharedCtor();
51+
MergeFrom(from);
52+
}
53+
54+
Serializer& operator=(const Serializer& from) {
55+
CopyFrom(from);
56+
return *this;
57+
}
58+
59+
void Swap(Serializer* other) {
60+
if (other != this) {
61+
}
62+
}
63+
64+
void MergeFrom(const Serializer& from) override {
65+
CHECK_NE(&from, this);
66+
}
67+
68+
// implements Message ----------------------------------------------
69+
void Clear() override {
70+
_callback = nullptr;
71+
}
72+
size_t ByteSizeLong() const override { return 0; }
73+
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }
74+
75+
::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;
76+
77+
// Converts the data into `output' for later compression.
78+
bool SerializeTo(google::protobuf::io::ZeroCopyOutputStream* output) const {
79+
if (!_callback) {
80+
LOG(WARNING) << "CompressCallback::SerializeTo() called without converter";
81+
return false;
82+
}
83+
return _callback(output);
84+
}
85+
86+
void SetCallback(Callback callback) {
87+
_callback = std::move(callback);
88+
}
89+
90+
private:
91+
void SharedCtor() {}
92+
void SharedDtor() {}
93+
94+
Callback _callback;
95+
};
96+
97+
// Deserializer can be used to implement custom deserialization
98+
// after decompression with user callback.
99+
class Deserializer : public NonreflectableMessage<Deserializer> {
100+
public:
101+
public:
102+
using Callback = std::function<bool(google::protobuf::io::ZeroCopyInputStream*)>;
103+
104+
Deserializer() :Deserializer(NULL) {}
105+
106+
explicit Deserializer(Callback callback) : _callback(std::move(callback)) {
107+
SharedCtor();
108+
}
109+
110+
~Deserializer() override {
111+
SharedDtor();
112+
}
113+
114+
Deserializer(const Deserializer& from)
115+
: NonreflectableMessage(from) {
116+
SharedCtor();
117+
MergeFrom(from);
118+
}
119+
120+
Deserializer& operator=(const Deserializer& from) {
121+
CopyFrom(from);
122+
return *this;
123+
}
124+
125+
void Swap(Deserializer* other) {
126+
if (other != this) {
127+
_callback.swap(other->_callback);
128+
}
129+
}
130+
131+
void MergeFrom(const Deserializer& from) override {
132+
CHECK_NE(&from, this);
133+
_callback = from._callback;
134+
}
135+
136+
// implements Message ----------------------------------------------
137+
void Clear() override { _callback = nullptr; }
138+
size_t ByteSizeLong() const override { return 0; }
139+
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }
140+
141+
::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;
142+
143+
// Converts the decompressed `input'.
144+
bool DeserializeFrom(google::protobuf::io::ZeroCopyInputStream* intput) const {
145+
if (!_callback) {
146+
LOG(WARNING) << "Deserializer::DeserializeFrom() called without callback";
147+
return false;
148+
}
149+
return _callback(intput);
150+
}
151+
void SetCallback(Callback callback) {
152+
_callback = std::move(callback);
153+
}
154+
155+
private:
156+
void SharedCtor() {}
157+
void SharedDtor() {}
158+
159+
Callback _callback;
160+
};
161+
30162
struct CompressHandler {
31163
// Compress serialized `msg' into `buf'.
32164
// Returns true on success, false otherwise
33165
bool (*Compress)(const google::protobuf::Message& msg, butil::IOBuf* buf);
34-
bool (*Compress2Json)(const google::protobuf::Message& msg, butil::IOBuf* buf,
35-
const json2pb::Pb2JsonOptions& options);
36-
bool (*Compress2ProtoJson)(const google::protobuf::Message& msg, butil::IOBuf* buf,
37-
const json2pb::Pb2ProtoJsonOptions& options);
38-
bool (*Compress2ProtoText)(const google::protobuf::Message& msg, butil::IOBuf* buf);
39166

40167
// Parse decompressed `data' as `msg'.
41168
// Returns true on success, false otherwise
42169
bool (*Decompress)(const butil::IOBuf& data, google::protobuf::Message* msg);
43-
bool (*DecompressFromJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
44-
const json2pb::Json2PbOptions& options);
45-
bool (*DecompressFromProtoJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
46-
const json2pb::ProtoJson2PbOptions& options);
47-
bool (*DecompressFromProtoText)(const butil::IOBuf& data, google::protobuf::Message* msg);
48170

49171
// Name of the compression algorithm, must be string constant.
50172
const char* name;

src/brpc/global.cpp

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -388,27 +388,15 @@ static void GlobalInitializeOrDieImpl() {
388388
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
389389

390390
// Compress Handlers
391-
CompressHandler gzip_compress = {
392-
GzipCompress, GzipCompress2Json, GzipCompress2ProtoJson,
393-
GzipCompress2ProtoText, GzipDecompress, GzipDecompressFromJson,
394-
GzipDecompressFromProtoJson, GzipDecompressFromProtoText, "gzip"
395-
};
391+
CompressHandler gzip_compress = { GzipCompress, GzipDecompress, "gzip" };
396392
if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
397393
exit(1);
398394
}
399-
CompressHandler zlib_compress = {
400-
ZlibCompress, ZlibCompress2Json, ZlibCompress2ProtoJson,
401-
ZlibCompress2ProtoText, ZlibDecompress, ZlibDecompressFromJson,
402-
ZlibDecompressFromProtoJson, ZlibDecompressFromProtoText, "zlib"
403-
};
395+
CompressHandler zlib_compress = { ZlibCompress, ZlibDecompress, "zlib" };
404396
if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
405397
exit(1);
406398
}
407-
CompressHandler snappy_compress = {
408-
SnappyCompress, SnappyCompress2Json, SnappyCompress2ProtoJson,
409-
SnappyCompress2ProtoText, SnappyDecompress, SnappyDecompressFromJson,
410-
SnappyDecompressFromProtoJson, SnappyDecompressFromProtoText, "snappy"
411-
};
399+
CompressHandler snappy_compress = { SnappyCompress, SnappyDecompress, "snappy" };
412400
if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
413401
exit(1);
414402
}

src/brpc/memcache.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ MemcacheRequest::MemcacheRequest()
3232
}
3333

3434
MemcacheRequest::MemcacheRequest(const MemcacheRequest& from)
35-
: NonreflectableMessage<MemcacheRequest>() {
35+
: NonreflectableMessage<MemcacheRequest>(from) {
3636
SharedCtor();
3737
MergeFrom(from);
3838
}
@@ -143,7 +143,7 @@ MemcacheResponse::MemcacheResponse()
143143
}
144144

145145
MemcacheResponse::MemcacheResponse(const MemcacheResponse& from)
146-
: NonreflectableMessage<MemcacheResponse>() {
146+
: NonreflectableMessage<MemcacheResponse>(from) {
147147
SharedCtor();
148148
MergeFrom(from);
149149
}

src/brpc/nonreflectable_message.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <google/protobuf/generated_message_reflection.h>
2222
#include <google/protobuf/message.h>
2323

24-
#include "pb_compat.h"
24+
#include "brpc/pb_compat.h"
2525

2626
namespace brpc {
2727

src/brpc/nshead_message.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ NsheadMessage::NsheadMessage()
2828
}
2929

3030
NsheadMessage::NsheadMessage(const NsheadMessage& from)
31-
: NonreflectableMessage<NsheadMessage>() {
31+
: NonreflectableMessage<NsheadMessage>(from) {
3232
SharedCtor();
3333
MergeFrom(from);
3434
}

0 commit comments

Comments
 (0)