Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions src/brpc/compress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@


#include "butil/logging.h"
#include "json2pb/json_to_pb.h"
#include "brpc/compress.h"
#include "brpc/protocol.h"

#include "brpc/proto_base.pb.h"

namespace brpc {

Expand Down Expand Up @@ -47,7 +48,7 @@ int RegisterCompressHandler(CompressType type,

// Find CompressHandler by type.
// Returns NULL if not found
inline const CompressHandler* FindCompressHandler(CompressType type) {
const CompressHandler* FindCompressHandler(CompressType type) {
int index = type;
if (index < 0 || index >= MAX_HANDLER_SIZE) {
LOG(ERROR) << "CompressType=" << type << " is out of range";
Expand Down Expand Up @@ -83,10 +84,14 @@ bool ParseFromCompressedData(const butil::IOBuf& data,
return ParsePbFromIOBuf(msg, data);
}
const CompressHandler* handler = FindCompressHandler(compress_type);
if (NULL != handler) {
return handler->Decompress(data, msg);
if (NULL == handler) {
return false;
}
return false;

Deserializer deserializer([msg](google::protobuf::io::ZeroCopyInputStream* input) {
return msg->ParseFromZeroCopyStream(input);
});
return handler->Decompress(data, &deserializer);
}

bool SerializeAsCompressedData(const google::protobuf::Message& msg,
Expand All @@ -96,10 +101,28 @@ bool SerializeAsCompressedData(const google::protobuf::Message& msg,
return msg.SerializeToZeroCopyStream(&wrapper);
}
const CompressHandler* handler = FindCompressHandler(compress_type);
if (NULL != handler) {
return handler->Compress(msg, buf);
if (NULL == handler) {
return false;
}
return false;

Serializer serializer([&msg](google::protobuf::io::ZeroCopyOutputStream* output) {
return msg.SerializeToZeroCopyStream(output);
});
return handler->Compress(serializer, buf);
}

::google::protobuf::Metadata Serializer::GetMetadata() const {
::google::protobuf::Metadata metadata{};
metadata.descriptor = SerializerBase::descriptor();
metadata.reflection = nullptr;
return metadata;
}

::google::protobuf::Metadata Deserializer::GetMetadata() const {
::google::protobuf::Metadata metadata{};
metadata.descriptor = DeserializerBase::descriptor();
metadata.reflection = nullptr;
return metadata;
}

} // namespace brpc
136 changes: 136 additions & 0 deletions src/brpc/compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,143 @@

#include <google/protobuf/message.h> // Message
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/logging.h"
#include "brpc/options.pb.h" // CompressType
#include "brpc/nonreflectable_message.h"

namespace brpc {

// Serializer can be used to implement custom serialization
// before compression with user callback.
class Serializer : public NonreflectableMessage<Serializer> {
public:
using Callback = std::function<bool(google::protobuf::io::ZeroCopyOutputStream*)>;

Serializer() :Serializer(NULL) {}

explicit Serializer(Callback callback)
:_callback(std::move(callback)) {
SharedCtor();
}

~Serializer() override {
SharedDtor();
}

Serializer(const Serializer& from)
: NonreflectableMessage(from) {
SharedCtor();
MergeFrom(from);
}

Serializer& operator=(const Serializer& from) {
CopyFrom(from);
return *this;
}

void Swap(Serializer* other) {
if (other != this) {
}
}

void MergeFrom(const Serializer& from) override {
CHECK_NE(&from, this);
}

// implements Message ----------------------------------------------
void Clear() override {
_callback = nullptr;
}
size_t ByteSizeLong() const override { return 0; }
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }

::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;

// Converts the data into `output' for later compression.
bool SerializeTo(google::protobuf::io::ZeroCopyOutputStream* output) const {
if (!_callback) {
LOG(WARNING) << "Serializer::SerializeTo() called without callback";
return false;
}
return _callback(output);
}

void SetCallback(Callback callback) {
_callback = std::move(callback);
}

private:
void SharedCtor() {}
void SharedDtor() {}

Callback _callback;
};

// Deserializer can be used to implement custom deserialization
// after decompression with user callback.
class Deserializer : public NonreflectableMessage<Deserializer> {
public:
public:
using Callback = std::function<bool(google::protobuf::io::ZeroCopyInputStream*)>;

Deserializer() :Deserializer(NULL) {}

explicit Deserializer(Callback callback) : _callback(std::move(callback)) {
SharedCtor();
}

~Deserializer() override {
SharedDtor();
}

Deserializer(const Deserializer& from)
: NonreflectableMessage(from) {
SharedCtor();
MergeFrom(from);
}

Deserializer& operator=(const Deserializer& from) {
CopyFrom(from);
return *this;
}

void Swap(Deserializer* other) {
if (other != this) {
_callback.swap(other->_callback);
}
}

void MergeFrom(const Deserializer& from) override {
CHECK_NE(&from, this);
_callback = from._callback;
}

// implements Message ----------------------------------------------
void Clear() override { _callback = nullptr; }
size_t ByteSizeLong() const override { return 0; }
int GetCachedSize() const PB_425_OVERRIDE { return ByteSize(); }

::google::protobuf::Metadata GetMetadata() const PB_527_OVERRIDE;

// Converts the decompressed `input'.
bool DeserializeFrom(google::protobuf::io::ZeroCopyInputStream* intput) const {
if (!_callback) {
LOG(WARNING) << "Deserializer::DeserializeFrom() called without callback";
return false;
}
return _callback(intput);
}
void SetCallback(Callback callback) {
_callback = std::move(callback);
}

private:
void SharedCtor() {}
void SharedDtor() {}

Callback _callback;
};

struct CompressHandler {
// Compress serialized `msg' into `buf'.
// Returns true on success, false otherwise
Expand All @@ -42,6 +175,9 @@ struct CompressHandler {
// Returns 0 on success, -1 otherwise
int RegisterCompressHandler(CompressType type, CompressHandler handler);

// Returns CompressHandler pointer of `type' if registered, NULL otherwise.
const CompressHandler* FindCompressHandler(CompressType type);

// Returns the `name' of the CompressType if registered
const char* CompressTypeToCStr(CompressType type);

Expand Down
2 changes: 2 additions & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ void Controller::ResetPods() {
_http_response = NULL;
_request_user_fields = NULL;
_response_user_fields = NULL;
_request_content_type = CONTENT_TYPE_PB;
_response_content_type = CONTENT_TYPE_PB;
_request_streams.clear();
_response_streams.clear();
_remote_stream_settings = NULL;
Expand Down
19 changes: 19 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,20 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res);

void set_request_content_type(ContentType type) {
_request_content_type = type;
}
ContentType request_content_type() const {
return _request_content_type;
}

void set_response_content_type(ContentType type) {
_response_content_type = type;
}
ContentType response_content_type() const {
return _response_content_type;
}

private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
Expand Down Expand Up @@ -859,6 +873,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
butil::IOBuf _request_attachment;
butil::IOBuf _response_attachment;

// Only SerializedRequest supports `_request_content_type'.
ContentType _request_content_type;
// Only SerializedResponse supports `_response_content_type'.
ContentType _response_content_type;

// Writable progressive attachment
butil::intrusive_ptr<ProgressiveAttachment> _wpa;
// Readable progressive attachment
Expand Down
11 changes: 4 additions & 7 deletions src/brpc/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,25 +388,22 @@ static void GlobalInitializeOrDieImpl() {
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);

// Compress Handlers
const CompressHandler gzip_compress =
{ GzipCompress, GzipDecompress, "gzip" };
CompressHandler gzip_compress = { GzipCompress, GzipDecompress, "gzip" };
if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
exit(1);
}
const CompressHandler zlib_compress =
{ ZlibCompress, ZlibDecompress, "zlib" };
CompressHandler zlib_compress = { ZlibCompress, ZlibDecompress, "zlib" };
if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
exit(1);
}
const CompressHandler snappy_compress =
{ SnappyCompress, SnappyDecompress, "snappy" };
CompressHandler snappy_compress = { SnappyCompress, SnappyDecompress, "snappy" };
if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
exit(1);
}

// Protocols
Protocol baidu_protocol = { ParseRpcMessage,
SerializeRequestDefault, PackRpcRequest,
SerializeRpcRequest, PackRpcRequest,
ProcessRpcRequest, ProcessRpcResponse,
VerifyRpcRequest, NULL, NULL,
CONNECTION_TYPE_ALL, "baidu_std" };
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/memcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ MemcacheRequest::MemcacheRequest()
}

MemcacheRequest::MemcacheRequest(const MemcacheRequest& from)
: NonreflectableMessage<MemcacheRequest>() {
: NonreflectableMessage<MemcacheRequest>(from) {
SharedCtor();
MergeFrom(from);
}
Expand Down Expand Up @@ -143,7 +143,7 @@ MemcacheResponse::MemcacheResponse()
}

MemcacheResponse::MemcacheResponse(const MemcacheResponse& from)
: NonreflectableMessage<MemcacheResponse>() {
: NonreflectableMessage<MemcacheResponse>(from) {
SharedCtor();
MergeFrom(from);
}
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/nonreflectable_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <google/protobuf/generated_message_reflection.h>
#include <google/protobuf/message.h>

#include "pb_compat.h"
#include "brpc/pb_compat.h"

namespace brpc {

Expand Down
2 changes: 1 addition & 1 deletion src/brpc/nshead_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ NsheadMessage::NsheadMessage()
}

NsheadMessage::NsheadMessage(const NsheadMessage& from)
: NonreflectableMessage<NsheadMessage>() {
: NonreflectableMessage<NsheadMessage>(from) {
SharedCtor();
MergeFrom(from);
}
Expand Down
7 changes: 7 additions & 0 deletions src/brpc/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ enum CompressType {
COMPRESS_TYPE_LZ4 = 4;
}

enum ContentType {
CONTENT_TYPE_PB = 0;
CONTENT_TYPE_JSON = 1;
CONTENT_TYPE_PROTO_JSON = 2;
CONTENT_TYPE_PROTO_TEXT = 3;
}

message ChunkInfo {
required int64 stream_id = 1;
required int64 chunk_id = 2;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/baidu_rpc_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ message RpcMeta {
optional bytes authentication_data = 7;
optional StreamSettings stream_settings = 8;
map<string, string> user_fields = 9;
optional ContentType content_type = 10;
}

message RpcRequestMeta {
Expand Down
Loading
Loading