Skip to content

Commit 88a7a19

Browse files
amoxicfeng-yu1
authored andcommitted
Handle response stream bind failures gracefully
1 parent 0565d8d commit 88a7a19

3 files changed

Lines changed: 125 additions & 10 deletions

File tree

src/brpc/policy/baidu_rpc_protocol.cpp

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,19 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
360360
Stream* s = (Stream *) stream_ptr->conn();
361361
StreamSettings *stream_settings = meta.mutable_stream_settings();
362362
s->FillSettings(stream_settings);
363-
s->SetHostSocket(sock);
363+
if (s->SetHostSocket(sock) != 0) {
364+
const int errcode = sock->non_zero_error_code();
365+
LOG_IF(WARNING, errcode != EPIPE)
366+
<< "Fail to bind response stream=" << response_stream_id
367+
<< " to " << sock->description() << ": "
368+
<< berror(errcode);
369+
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
370+
sock->description().c_str());
371+
Stream::SetFailed(response_stream_ids, errcode,
372+
"Fail to bind response stream to %s",
373+
sock->description().c_str());
374+
return;
375+
}
364376
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
365377
stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]);
366378
}
@@ -390,6 +402,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
390402

391403
ResponseWriteInfo args;
392404
bthread_id_t response_id = INVALID_BTHREAD_ID;
405+
auto response_write_guard = butil::MakeScopeGuard([&response_id, &args, span] {
406+
if (response_id == INVALID_BTHREAD_ID) {
407+
return;
408+
}
409+
bthread_id_join(response_id);
410+
// Do not care about the result of background writing.
411+
// TODO: this is not sent
412+
span->set_sent_us(args.sent_us);
413+
});
393414
if (span) {
394415
span->set_response_size(res_buf.size());
395416
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
@@ -426,7 +447,21 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
426447
SocketUniquePtr extra_stream_ptr;
427448
if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) {
428449
Stream* extra_stream = (Stream *) extra_stream_ptr->conn();
429-
extra_stream->SetHostSocket(sock);
450+
if (extra_stream->SetHostSocket(sock) != 0) {
451+
const int errcode = sock->non_zero_error_code();
452+
LOG_IF(WARNING, errcode != EPIPE)
453+
<< "Fail to bind response stream=" << extra_stream_id
454+
<< " to " << sock->description() << ": "
455+
<< berror(errcode);
456+
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
457+
sock->description().c_str());
458+
StreamIds remaining_stream_ids(response_stream_ids.begin() + i,
459+
response_stream_ids.end());
460+
Stream::SetFailed(remaining_stream_ids, errcode,
461+
"Fail to bind response stream to %s",
462+
sock->description().c_str());
463+
return;
464+
}
430465
extra_stream->SetConnected();
431466
} else {
432467
LOG(WARNING) << "Stream=" << extra_stream_id
@@ -451,12 +486,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
451486
}
452487
}
453488

454-
if (span) {
455-
bthread_id_join(response_id);
456-
// Do not care about the result of background writing.
457-
// TODO: this is not sent
458-
span->set_sent_us(args.sent_us);
459-
}
460489
}
461490

462491
namespace {

src/brpc/stream.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,16 +640,21 @@ void Stream::SendFeedback() {
640640
}
641641

642642
int Stream::SetHostSocket(Socket *host_socket) {
643-
std::call_once(_set_host_socket_flag, [this, host_socket]() {
643+
int error_code = 0;
644+
std::call_once(_set_host_socket_flag, [this, host_socket, &error_code]() {
644645
SocketUniquePtr ptr;
645646
host_socket->ReAddress(&ptr);
646647
// TODO add *this to host socke
647648
if (ptr->AddStream(id()) != 0) {
648-
CHECK(false) << id() << " fail to add stream to host socket";
649+
error_code = ptr->non_zero_error_code();
649650
return;
650651
}
651652
_host_socket = ptr.release();
652653
});
654+
if (_host_socket == NULL) {
655+
errno = error_code ? error_code : EFAILEDSOCKET;
656+
return -1;
657+
}
653658
return 0;
654659
}
655660

test/brpc_streaming_rpc_unittest.cpp

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
// Date: 2015/10/22 16:28:44
2121

2222
#include <gtest/gtest.h>
23+
#include <errno.h>
2324
#include "brpc/server.h"
2425

2526
#include "brpc/controller.h"
2627
#include "brpc/channel.h"
2728
#include "brpc/socket.h"
29+
#include "brpc/details/controller_private_accessor.h"
2830
#include "brpc/stream_impl.h"
2931
#include "brpc/policy/streaming_rpc_protocol.h"
3032
#include "echo.pb.h"
@@ -78,6 +80,52 @@ class StreamingRpcTest : public testing::Test {
7880
test::EchoResponse response;
7981
};
8082

83+
class MyServiceWithStreamAndFailedSocket : public test::EchoService {
84+
public:
85+
explicit MyServiceWithStreamAndFailedSocket(const brpc::StreamOptions& options)
86+
: _options(options) {}
87+
88+
void Echo(::google::protobuf::RpcController* controller,
89+
const ::test::EchoRequest* request,
90+
::test::EchoResponse* response,
91+
::google::protobuf::Closure* done) override {
92+
brpc::ClosureGuard done_guard(done);
93+
response->set_message(request->message());
94+
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
95+
brpc::StreamId response_stream;
96+
ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options));
97+
brpc::ControllerPrivateAccessor accessor(cntl);
98+
ASSERT_TRUE(accessor.get_sending_socket() != NULL);
99+
accessor.get_sending_socket()->SetFailed();
100+
}
101+
102+
private:
103+
brpc::StreamOptions _options;
104+
};
105+
106+
TEST_F(StreamingRpcTest, set_host_socket_returns_error_when_socket_is_failed) {
107+
brpc::SocketOptions socket_options;
108+
brpc::SocketId host_socket_id;
109+
ASSERT_EQ(0, brpc::Socket::Create(socket_options, &host_socket_id));
110+
brpc::SocketUniquePtr host_socket;
111+
ASSERT_EQ(0, brpc::Socket::Address(host_socket_id, &host_socket));
112+
ASSERT_EQ(0, host_socket->SetFailed());
113+
114+
brpc::StreamId stream_id;
115+
brpc::StreamOptions stream_options;
116+
ASSERT_EQ(0, brpc::Stream::Create(stream_options, NULL, &stream_id, false));
117+
brpc::ScopedStream stream_guard(stream_id);
118+
119+
brpc::SocketUniquePtr stream_socket;
120+
ASSERT_EQ(0, brpc::Socket::Address(stream_id, &stream_socket));
121+
brpc::Stream* stream = static_cast<brpc::Stream*>(stream_socket->conn());
122+
123+
errno = 0;
124+
ASSERT_EQ(-1, stream->SetHostSocket(host_socket.get()));
125+
ASSERT_NE(0, errno);
126+
ASSERT_TRUE(stream->_host_socket == NULL);
127+
}
128+
81129
TEST_F(StreamingRpcTest, sanity) {
82130
brpc::Server server;
83131
MyServiceWithStream service;
@@ -159,6 +207,39 @@ class OrderedInputHandler : public brpc::StreamInputHandler {
159207
HandlerControl* _cntl;
160208
};
161209

210+
TEST_F(StreamingRpcTest, server_failed_socket_before_response_closes_stream_without_abort) {
211+
OrderedInputHandler handler;
212+
brpc::StreamOptions response_stream_options;
213+
response_stream_options.handler = &handler;
214+
brpc::Server server;
215+
MyServiceWithStreamAndFailedSocket service(response_stream_options);
216+
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
217+
ASSERT_EQ(0, server.Start(9007, NULL));
218+
219+
brpc::Channel channel;
220+
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
221+
brpc::Controller cntl;
222+
brpc::StreamId request_stream;
223+
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL));
224+
brpc::ScopedStream stream_guard(request_stream);
225+
226+
test::EchoService_Stub stub(&channel);
227+
stub.Echo(&cntl, &request, &response, NULL);
228+
ASSERT_TRUE(cntl.Failed());
229+
230+
for (int i = 0; i < 10000 && !handler.stopped(); ++i) {
231+
usleep(100);
232+
}
233+
234+
server.Stop(0);
235+
server.Join();
236+
237+
ASSERT_TRUE(handler.stopped());
238+
ASSERT_TRUE(handler.failed());
239+
ASSERT_EQ(0, handler.idle_times());
240+
ASSERT_EQ(0, handler._expected_next_value);
241+
}
242+
162243
TEST_F(StreamingRpcTest, received_in_order) {
163244
OrderedInputHandler handler;
164245
brpc::StreamOptions opt;

0 commit comments

Comments
 (0)