Skip to content

Commit c7add74

Browse files
authored
Support ProtoJson formatted http body (#2921)
1 parent 0c4f2a8 commit c7add74

16 files changed

Lines changed: 536 additions & 92 deletions

src/brpc/extension.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class Extension {
4848
void List(std::ostream& os, char separator);
4949

5050
private:
51-
friend class butil::GetLeakySingleton<Extension<T> >;
51+
template <typename U> friend U* butil::create_leaky_singleton_obj();
5252
Extension() = default;
5353
butil::CaseIgnoredFlatMap<T*> _instance_map;
5454
butil::Mutex _map_mutex;

src/brpc/policy/http_rpc_protocol.cpp

Lines changed: 114 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,26 @@
1919
#include <google/protobuf/descriptor.h> // MethodDescriptor
2020
#include <google/protobuf/text_format.h>
2121
#include <gflags/gflags.h>
22-
#include <json2pb/pb_to_json.h> // ProtoMessageToJson
23-
#include <json2pb/json_to_pb.h> // JsonToProtoMessage
2422
#include <string>
25-
2623
#include "brpc/policy/http_rpc_protocol.h"
2724
#include "butil/unique_ptr.h" // std::unique_ptr
2825
#include "butil/string_splitter.h" // StringMultiSplitter
2926
#include "butil/string_printf.h"
3027
#include "butil/time.h"
3128
#include "butil/sys_byteorder.h"
29+
#include "json2pb/pb_to_json.h" // ProtoMessageToJson
30+
#include "json2pb/json_to_pb.h" // JsonToProtoMessage
3231
#include "brpc/compress.h"
33-
#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
34-
#include "brpc/controller.h" // Controller
35-
#include "brpc/server.h" // Server
32+
#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
33+
#include "brpc/controller.h" // Controller
34+
#include "brpc/server.h" // Server
3635
#include "brpc/details/server_private_accessor.h"
3736
#include "brpc/span.h"
38-
#include "brpc/socket.h" // Socket
39-
#include "brpc/rpc_dump.h" // SampledRequest
40-
#include "brpc/http_status_code.h" // HTTP_STATUS_*
37+
#include "brpc/socket.h" // Socket
38+
#include "brpc/rpc_dump.h" // SampledRequest
39+
#include "brpc/http_status_code.h" // HTTP_STATUS_*
4140
#include "brpc/details/controller_private_accessor.h"
42-
#include "brpc/builtin/index_service.h" // IndexService
41+
#include "brpc/builtin/index_service.h" // IndexService
4342
#include "brpc/policy/gzip_compress.h"
4443
#include "brpc/policy/http2_rpc_protocol.h"
4544
#include "brpc/details/usercode_backup_pool.h"
@@ -203,6 +202,9 @@ HttpContentType ParseContentType(butil::StringPiece ct, bool* is_grpc_ct) {
203202
if (ct.starts_with("json")) {
204203
type = HTTP_CONTENT_JSON;
205204
ct.remove_prefix(4);
205+
} else if (ct.starts_with("proto-json")) {
206+
type = HTTP_CONTENT_PROTO_JSON;
207+
ct.remove_prefix(10);
206208
} else if (ct.starts_with("proto-text")) {
207209
type = HTTP_CONTENT_PROTO_TEXT;
208210
ct.remove_prefix(10);
@@ -271,6 +273,79 @@ static bool RemoveGrpcPrefix(butil::IOBuf* body, bool* compressed) {
271273
return (message_length + 5 == sz);
272274
}
273275

276+
static bool JsonToProtoMessage(const butil::IOBuf& body,
277+
google::protobuf::Message* message,
278+
Controller* cntl, int error_code) {
279+
butil::IOBufAsZeroCopyInputStream wrapper(body);
280+
json2pb::Json2PbOptions options;
281+
options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
282+
options.array_to_single_repeated = cntl->has_pb_single_repeated_to_array();
283+
std::string error;
284+
bool ok = json2pb::JsonToProtoMessage(&wrapper, message, options, &error);
285+
if (!ok) {
286+
cntl->SetFailed(error_code, "Fail to parse http json body as %s: %s",
287+
message->GetDescriptor()->full_name().c_str(),
288+
error.c_str());
289+
}
290+
return ok;
291+
}
292+
293+
static bool ProtoMessageToJson(const google::protobuf::Message& message,
294+
butil::IOBufAsZeroCopyOutputStream* wrapper,
295+
Controller* cntl, int error_code) {
296+
json2pb::Pb2JsonOptions options;
297+
options.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
298+
options.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
299+
options.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
300+
options.single_repeated_to_array = cntl->has_pb_single_repeated_to_array();
301+
options.enum_option = FLAGS_pb_enum_as_number
302+
? json2pb::OUTPUT_ENUM_BY_NUMBER
303+
: json2pb::OUTPUT_ENUM_BY_NAME;
304+
std::string error;
305+
bool ok = json2pb::ProtoMessageToJson(message, wrapper, options, &error);
306+
if (!ok) {
307+
cntl->SetFailed(error_code, "Fail to convert %s to json: %s",
308+
message.GetDescriptor()->full_name().c_str(),
309+
error.c_str());
310+
}
311+
return ok;
312+
}
313+
314+
static bool ProtoJsonToProtoMessage(const butil::IOBuf& body,
315+
google::protobuf::Message* message,
316+
Controller* cntl, int error_code) {
317+
json2pb::ProtoJson2PbOptions options;
318+
options.ignore_unknown_fields = true;
319+
butil::IOBufAsZeroCopyInputStream wrapper(body);
320+
std::string error;
321+
bool ok = json2pb::ProtoJsonToProtoMessage(&wrapper, message, options, &error);
322+
if (!ok) {
323+
cntl->SetFailed(error_code, "Fail to parse http proto-json body as %s: %s",
324+
message->GetDescriptor()->full_name().c_str(),
325+
error.c_str());
326+
}
327+
return ok;
328+
}
329+
330+
static bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
331+
butil::IOBufAsZeroCopyOutputStream* wrapper,
332+
Controller* cntl, int error_code) {
333+
json2pb::Pb2ProtoJsonOptions options;
334+
#if GOOGLE_PROTOBUF_VERSION >= 5026002
335+
options.always_print_fields_with_no_presence = cntl->has_always_print_primitive_fields();
336+
#else
337+
options.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
338+
#endif
339+
options.always_print_enums_as_ints = FLAGS_pb_enum_as_number;
340+
std::string error;
341+
bool ok = json2pb::ProtoMessageToProtoJson(message, wrapper, options, &error);
342+
if (!ok) {
343+
cntl->SetFailed(error_code, "Fail to convert %s to proto-json: %s",
344+
message.GetDescriptor()->full_name().c_str(), error.c_str());
345+
}
346+
return ok;
347+
}
348+
274349
void ProcessHttpResponse(InputMessageBase* msg) {
275350
const int64_t start_parse_us = butil::cpuwide_time_us();
276351
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
@@ -435,8 +510,8 @@ void ProcessHttpResponse(InputMessageBase* msg) {
435510
if (grpc_compressed) {
436511
encoding = res_header->GetHeader(common->GRPC_ENCODING);
437512
if (encoding == NULL) {
438-
cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding'"
439-
" in compressed gRPC response");
513+
cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding' "
514+
"in compressed gRPC response");
440515
break;
441516
}
442517
}
@@ -455,23 +530,24 @@ void ProcessHttpResponse(InputMessageBase* msg) {
455530
}
456531
if (content_type == HTTP_CONTENT_PROTO) {
457532
if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
458-
cntl->SetFailed(ERESPONSE, "Fail to parse content");
533+
cntl->SetFailed(ERESPONSE, "Fail to parse content as %s",
534+
cntl->response()->GetDescriptor()->full_name().c_str());
459535
break;
460536
}
461537
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
462538
if (!ParsePbTextFromIOBuf(cntl->response(), res_body)) {
463-
cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content");
539+
cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content as %s",
540+
cntl->response()->GetDescriptor()->full_name().c_str());
464541
break;
465542
}
466543
} else if (content_type == HTTP_CONTENT_JSON) {
467-
// message body is json
468-
butil::IOBufAsZeroCopyInputStream wrapper(res_body);
469-
std::string err;
470-
json2pb::Json2PbOptions options;
471-
options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
472-
options.array_to_single_repeated = cntl->has_pb_single_repeated_to_array();
473-
if (!json2pb::JsonToProtoMessage(&wrapper, cntl->response(), options, &err)) {
474-
cntl->SetFailed(ERESPONSE, "Fail to parse content, %s", err.c_str());
544+
// Message body is json.
545+
if (!JsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
546+
break;
547+
}
548+
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
549+
// Message body is json.
550+
if (!ProtoJsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
475551
break;
476552
}
477553
} else {
@@ -530,8 +606,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
530606
}
531607
} else {
532608
bool is_grpc_ct = false;
533-
content_type = ParseContentType(hreq.content_type(),
534-
&is_grpc_ct);
609+
content_type = ParseContentType(hreq.content_type(), &is_grpc_ct);
535610
is_grpc = (is_http2 && is_grpc_ct);
536611
}
537612

@@ -549,21 +624,15 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
549624
return cntl->SetFailed(EREQUEST, "Fail to print %s as proto-text",
550625
pbreq->GetTypeName().c_str());
551626
}
627+
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
628+
if (!ProtoMessageToProtoJson(*pbreq, &wrapper, cntl, EREQUEST)) {
629+
cntl->request_attachment().clear();
630+
return;
631+
}
552632
} else if (content_type == HTTP_CONTENT_JSON) {
553-
std::string err;
554-
json2pb::Pb2JsonOptions opt;
555-
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
556-
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
557-
opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
558-
opt.single_repeated_to_array = cntl->has_pb_single_repeated_to_array();
559-
560-
opt.enum_option = (FLAGS_pb_enum_as_number
561-
? json2pb::OUTPUT_ENUM_BY_NUMBER
562-
: json2pb::OUTPUT_ENUM_BY_NAME);
563-
if (!json2pb::ProtoMessageToJson(*pbreq, &wrapper, opt, &err)) {
633+
if (!ProtoMessageToJson(*pbreq, &wrapper, cntl, EREQUEST)) {
564634
cntl->request_attachment().clear();
565-
return cntl->SetFailed(
566-
EREQUEST, "Fail to convert request to json, %s", err.c_str());
635+
return;
567636
}
568637
} else {
569638
return cntl->SetFailed(
@@ -819,19 +888,10 @@ HttpResponseSender::~HttpResponseSender() {
819888
if (!google::protobuf::TextFormat::Print(*res, &wrapper)) {
820889
cntl->SetFailed(ERESPONSE, "Fail to print %s as proto-text", res->GetTypeName().c_str());
821890
}
891+
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
892+
ProtoMessageToProtoJson(*res, &wrapper, cntl, ERESPONSE);
822893
} else {
823-
std::string err;
824-
json2pb::Pb2JsonOptions opt;
825-
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
826-
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
827-
opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
828-
opt.single_repeated_to_array = cntl->has_pb_single_repeated_to_array();
829-
opt.enum_option = (FLAGS_pb_enum_as_number
830-
? json2pb::OUTPUT_ENUM_BY_NUMBER
831-
: json2pb::OUTPUT_ENUM_BY_NAME);
832-
if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
833-
cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
834-
}
894+
ProtoMessageToJson(*res, &wrapper, cntl, ERESPONSE);
835895
}
836896
}
837897

@@ -1610,17 +1670,14 @@ void ProcessHttpRequest(InputMessageBase *msg) {
16101670
req->GetDescriptor()->full_name().c_str());
16111671
return;
16121672
}
1673+
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
1674+
if (!ProtoJsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
1675+
return;
1676+
}
16131677
} else {
1614-
butil::IOBufAsZeroCopyInputStream wrapper(req_body);
1615-
std::string err;
1616-
json2pb::Json2PbOptions options;
1617-
options.base64_to_bytes = mp->params.pb_bytes_to_base64;
1618-
options.array_to_single_repeated = mp->params.pb_single_repeated_to_array;
16191678
cntl->set_pb_bytes_to_base64(mp->params.pb_bytes_to_base64);
16201679
cntl->set_pb_single_repeated_to_array(mp->params.pb_single_repeated_to_array);
1621-
if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {
1622-
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
1623-
req->GetDescriptor()->full_name().c_str(), err.c_str());
1680+
if (!JsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
16241681
return;
16251682
}
16261683
}

src/brpc/policy/http_rpc_protocol.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ enum HttpContentType {
149149
HTTP_CONTENT_JSON = 1,
150150
HTTP_CONTENT_PROTO = 2,
151151
HTTP_CONTENT_PROTO_TEXT = 3,
152+
HTTP_CONTENT_PROTO_JSON = 4,
152153
};
153154

154155
// Parse from the textual content type. One type may have more than one literals.

src/bthread/key.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,7 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTable {
222222
class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
223223
public:
224224
KeyTableList() :
225-
_head(NULL), _tail(NULL), _length(0) {
226-
}
225+
_head(NULL), _tail(NULL), _length(0) {}
227226

228227
~KeyTableList() {
229228
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
@@ -305,7 +304,7 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
305304
return count;
306305
}
307306

308-
inline uint32_t get_length() {
307+
inline uint32_t get_length() const {
309308
return _length;
310309
}
311310

src/butil/logging.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ bool InitializeLogFileHandle() {
395395
#elif defined(OS_POSIX)
396396
log_file = fopen(log_file_name->c_str(), "a");
397397
if (log_file == NULL) {
398-
fprintf(stderr, "Fail to fopen %s", log_file_name->c_str());
398+
fprintf(stderr, "Fail to fopen %s: %s", log_file_name->c_str(), berror());
399399
return false;
400400
}
401401
#endif

src/butil/memory/singleton_on_pthread_once.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@
2525

2626
namespace butil {
2727

28-
template <typename T> class GetLeakySingleton {
28+
template <typename T>
29+
T* create_leaky_singleton_obj() {
30+
return new T();
31+
}
32+
33+
template <typename T>
34+
class GetLeakySingleton {
2935
public:
3036
static butil::subtle::AtomicWord g_leaky_singleton_untyped;
3137
static pthread_once_t g_create_leaky_singleton_once;
@@ -39,7 +45,7 @@ pthread_once_t GetLeakySingleton<T>::g_create_leaky_singleton_once = PTHREAD_ONC
3945

4046
template <typename T>
4147
void GetLeakySingleton<T>::create_leaky_singleton() {
42-
T* obj = new T;
48+
T* obj = create_leaky_singleton_obj<T>();
4349
butil::subtle::Release_Store(
4450
&g_leaky_singleton_untyped,
4551
reinterpret_cast<butil::subtle::AtomicWord>(obj));

src/json2pb/json_to_pb.cpp

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
#include "butil/strings/string_number_conversions.h"
2828
#include "butil/third_party/rapidjson/error/error.h"
2929
#include "butil/third_party/rapidjson/rapidjson.h"
30-
#include "json_to_pb.h"
31-
#include "zero_copy_stream_reader.h" // ZeroCopyStreamReader
32-
#include "encode_decode.h"
30+
#include "json2pb/json_to_pb.h"
31+
#include "json2pb/zero_copy_stream_reader.h" // ZeroCopyStreamReader
32+
#include "json2pb/encode_decode.h"
33+
#include "json2pb/protobuf_map.h"
34+
#include "json2pb/rapidjson.h"
35+
#include "json2pb/protobuf_type_resolver.h"
3336
#include "butil/base64.h"
34-
#include "butil/string_printf.h"
35-
#include "protobuf_map.h"
36-
#include "rapidjson.h"
37-
37+
#include "butil/iobuf.h"
3838

3939
#ifdef __GNUC__
4040
// Ignore -Wnonnull for `(::google::protobuf::Message*)nullptr' of J2PERROR by design.
@@ -712,6 +712,40 @@ bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream *stream,
712712
std::string* error) {
713713
return JsonToProtoMessage(stream, message, Json2PbOptions(), error, nullptr);
714714
}
715+
716+
bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json,
717+
google::protobuf::Message* message,
718+
const ProtoJson2PbOptions& options,
719+
std::string* error) {
720+
TypeResolverUniqueptr type_resolver = GetTypeResolver(*message);
721+
butil::IOBuf buf;
722+
butil::IOBufAsZeroCopyOutputStream output_stream(&buf);
723+
std::string type_url = GetTypeUrl(*message);
724+
auto st = google::protobuf::util::JsonToBinaryStream(
725+
type_resolver.get(), type_url, json, &output_stream, options);
726+
727+
butil::IOBufAsZeroCopyInputStream input_stream(buf);
728+
google::protobuf::io::CodedInputStream decoder(&input_stream);
729+
if (!st.ok()) {
730+
if (NULL != error) {
731+
*error = st.ToString();
732+
}
733+
return false;
734+
}
735+
736+
bool ok = message->ParseFromCodedStream(&decoder);
737+
if (!ok && NULL != error) {
738+
*error = "Fail to ParseFromCodedStream";
739+
}
740+
return ok;
741+
}
742+
743+
bool ProtoJsonToProtoMessage(const std::string& json, google::protobuf::Message* message,
744+
const ProtoJson2PbOptions& options, std::string* error) {
745+
google::protobuf::io::ArrayInputStream input_stream(json.data(), json.size());
746+
return ProtoJsonToProtoMessage(&input_stream, message, options, error);
747+
}
748+
715749
} //namespace json2pb
716750

717751
#undef J2PERROR

0 commit comments

Comments
 (0)