Skip to content

Commit ab666d4

Browse files
committed
Refactor implementation of compress
1 parent 30945c3 commit ab666d4

8 files changed

Lines changed: 375 additions & 397 deletions

File tree

src/brpc/compress.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
namespace brpc {
2525

2626
static const int MAX_HANDLER_SIZE = 1024;
27-
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = {
28-
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
29-
};
27+
static CompressHandler s_handler_map[MAX_HANDLER_SIZE] = { { NULL, NULL, NULL } };
3028

3129
int RegisterCompressHandler(CompressType type,
3230
CompressHandler handler) {
@@ -85,10 +83,11 @@ bool ParseFromCompressedData(const butil::IOBuf& data,
8583
return ParsePbFromIOBuf(msg, data);
8684
}
8785
const CompressHandler* handler = FindCompressHandler(compress_type);
88-
if (NULL != handler) {
89-
return handler->Decompress(data, msg);
86+
if (NULL == handler) {
87+
return false;
9088
}
91-
return false;
89+
PBDecompressCallback callback(data, msg);
90+
return handler->Decompress(callback);
9291
}
9392

9493
bool SerializeAsCompressedData(const google::protobuf::Message& msg,
@@ -98,10 +97,11 @@ bool SerializeAsCompressedData(const google::protobuf::Message& msg,
9897
return msg.SerializeToZeroCopyStream(&wrapper);
9998
}
10099
const CompressHandler* handler = FindCompressHandler(compress_type);
101-
if (NULL != handler) {
102-
return handler->Compress(msg, buf);
100+
if (NULL == handler) {
101+
return false;
103102
}
104-
return false;
103+
PBCompressCallback callback(msg, buf);
104+
return handler->Compress(callback);
105105
}
106106

107107
} // namespace brpc

src/brpc/compress.h

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,51 @@
2727

2828
namespace brpc {
2929

30+
// Base class for CompressCallback and DecompressCallback.
31+
class CompressBase {
32+
public:
33+
void append_error(const std::string& error) {
34+
if (_error.empty()) {
35+
_error = error;
36+
} else {
37+
_error.append(", ").append(error);
38+
}
39+
}
40+
41+
const std::string& get_error() const {
42+
return _error;
43+
}
44+
protected:
45+
// Error messages.
46+
std::string _error;
47+
};
48+
49+
// CompressCallback provides raw data for compression,
50+
// and a buffer for storing compressed data.
51+
class CompressCallback : public CompressBase {
52+
public:
53+
// Converts the data into `output' for later compression.
54+
virtual bool Convert(google::protobuf::io::ZeroCopyOutputStream* output) = 0;
55+
// Returns the buffer for storing compressed data.
56+
virtual butil::IOBuf& Buffer() = 0;
57+
};
58+
59+
// DecompressCallback provides raw data stored in a buffer for decompression,
60+
// and handles the decompressed data.
61+
class DecompressCallback : public CompressBase {
62+
public:
63+
// Converts the decompressed `input'.
64+
virtual bool Convert(google::protobuf::io::ZeroCopyInputStream* intput) = 0;
65+
// Returns the buffer containing compressed data.
66+
virtual const butil::IOBuf& Buffer() = 0;
67+
};
68+
3069
struct CompressHandler {
31-
// Compress serialized `msg' into `buf'.
32-
// Returns true on success, false otherwise
33-
bool (*Compress)(const google::protobuf::Message& msg, butil::IOBuf* buf);
34-
bool (*Compress2Json)(const google::protobuf::Message& msg, butil::IOBuf* buf,
35-
const json2pb::Pb2JsonOptions& options);
36-
bool (*Compress2ProtoJson)(const google::protobuf::Message& msg, butil::IOBuf* buf,
37-
const json2pb::Pb2ProtoJsonOptions& options);
38-
bool (*Compress2ProtoText)(const google::protobuf::Message& msg, butil::IOBuf* buf);
39-
40-
// Parse decompressed `data' as `msg'.
41-
// Returns true on success, false otherwise
42-
bool (*Decompress)(const butil::IOBuf& data, google::protobuf::Message* msg);
43-
bool (*DecompressFromJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
44-
const json2pb::Json2PbOptions& options);
45-
bool (*DecompressFromProtoJson)(const butil::IOBuf& data, google::protobuf::Message* msg,
46-
const json2pb::ProtoJson2PbOptions& options);
47-
bool (*DecompressFromProtoText)(const butil::IOBuf& data, google::protobuf::Message* msg);
70+
// Compress data from CompressCallback::Convert() into CompressCallback::Buffer().
71+
bool (*Compress)(CompressCallback& callback);
72+
73+
// Decompress data from DecompressCallback::Buffer() into DecompressCallback::Convert().
74+
bool (*Decompress)(DecompressCallback& callback);
4875

4976
// Name of the compression algorithm, must be string constant.
5077
const char* name;
@@ -63,6 +90,35 @@ const char* CompressTypeToCStr(CompressType type);
6390
// Put all registered handlers into `vec'.
6491
void ListCompressHandler(std::vector<CompressHandler>* vec);
6592

93+
// CompressCallback for Protobuf messages.
94+
class PBCompressCallback : public CompressCallback {
95+
public:
96+
PBCompressCallback(const google::protobuf::Message& msg, butil::IOBuf* buf)
97+
: _msg(msg), _buf(buf) {}
98+
bool Convert(google::protobuf::io::ZeroCopyOutputStream* output) override {
99+
return _msg.SerializeToZeroCopyStream(output);
100+
}
101+
butil::IOBuf& Buffer() override { return *_buf; }
102+
103+
private:
104+
const google::protobuf::Message& _msg;
105+
butil::IOBuf* _buf;
106+
};
107+
108+
// DecompressCallback for Protobuf messages.
109+
class PBDecompressCallback : public DecompressCallback {
110+
public:
111+
PBDecompressCallback(const butil::IOBuf& buf, google::protobuf::Message* msg) : _buf(buf), _msg(msg) {}
112+
bool Convert(google::protobuf::io::ZeroCopyInputStream* input) override {
113+
return _msg->ParseFromZeroCopyStream(input);
114+
}
115+
const butil::IOBuf& Buffer() override { return _buf; }
116+
117+
private:
118+
const butil::IOBuf& _buf;
119+
google::protobuf::Message* _msg;
120+
};
121+
66122
// Parse decompressed `data' as `msg' using registered `compress_type'.
67123
// Returns true on success, false otherwise
68124
bool ParseFromCompressedData(const butil::IOBuf& data,

src/brpc/global.cpp

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -388,27 +388,15 @@ static void GlobalInitializeOrDieImpl() {
388388
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
389389

390390
// Compress Handlers
391-
CompressHandler gzip_compress = {
392-
GzipCompress, GzipCompress2Json, GzipCompress2ProtoJson,
393-
GzipCompress2ProtoText, GzipDecompress, GzipDecompressFromJson,
394-
GzipDecompressFromProtoJson, GzipDecompressFromProtoText, "gzip"
395-
};
391+
CompressHandler gzip_compress = { GzipCompress, GzipDecompress, "gzip" };
396392
if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
397393
exit(1);
398394
}
399-
CompressHandler zlib_compress = {
400-
ZlibCompress, ZlibCompress2Json, ZlibCompress2ProtoJson,
401-
ZlibCompress2ProtoText, ZlibDecompress, ZlibDecompressFromJson,
402-
ZlibDecompressFromProtoJson, ZlibDecompressFromProtoText, "zlib"
403-
};
395+
CompressHandler zlib_compress = { ZlibCompress, ZlibDecompress, "zlib" };
404396
if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
405397
exit(1);
406398
}
407-
CompressHandler snappy_compress = {
408-
SnappyCompress, SnappyCompress2Json, SnappyCompress2ProtoJson,
409-
SnappyCompress2ProtoText, SnappyDecompress, SnappyDecompressFromJson,
410-
SnappyDecompressFromProtoJson, SnappyDecompressFromProtoText, "snappy"
411-
};
399+
CompressHandler snappy_compress = { SnappyCompress, SnappyDecompress, "snappy" };
412400
if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
413401
exit(1);
414402
}

0 commit comments

Comments
 (0)