diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 271ccfa485..1cd9dbc8ec 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1437,12 +1437,12 @@ void Controller::HandleStreamConnection(Socket *host_socket) { Stream* s = (Stream*)ptrs[0]->conn(); s->SetConnected(_remote_stream_settings); if (stream_num > 1) { - const auto& extra_stream_ids = _remote_stream_settings->extra_stream_ids(); + auto extra_stream_ids = std::move(*_remote_stream_settings->mutable_extra_stream_ids()); _remote_stream_settings->clear_extra_stream_ids(); for (size_t i = 1; i < stream_num; ++i) { Stream* extra_stream = (Stream *) ptrs[i]->conn(); _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]); - s->ShareHostSocket(*extra_stream); + s->SetHostSocket(host_socket); extra_stream->SetConnected(_remote_stream_settings); } } diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 68397b57ec..2a4430548f 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -640,17 +640,16 @@ void Stream::SendFeedback() { } int Stream::SetHostSocket(Socket *host_socket) { - if (_host_socket != NULL) { - CHECK(false) << "SetHostSocket has already been called"; - return -1; - } - SocketUniquePtr ptr; - host_socket->ReAddress(&ptr); - // TODO add *this to host socke - if (ptr->AddStream(id()) != 0) { - return -1; - } - _host_socket = ptr.release(); + std::call_once(_set_host_socket_flag, [this, host_socket]() { + SocketUniquePtr ptr; + host_socket->ReAddress(&ptr); + // TODO add *this to host socke + if (ptr->AddStream(id()) != 0) { + CHECK(false) << id() << " fail to add stream to host socket"; + return; + } + _host_socket = ptr.release(); + }); return 0; } @@ -710,10 +709,6 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) { return TriggerOnConnectIfNeed(); } -int Stream::ShareHostSocket(Stream& other_stream) { - return other_stream.SetHostSocket(_host_socket); -} - int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index 66e0d7191b..5ff7cb04a2 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -19,6 +19,7 @@ #ifndef BRPC_STREAM_IMPL_H #define BRPC_STREAM_IMPL_H +#include #include "bthread/bthread.h" #include "bthread/execution_queue.h" #include "brpc/socket.h" @@ -67,7 +68,6 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection { __attribute__ ((__format__ (__printf__, 3, 4))); void Close(int error_code, const char* reason_fmt, ...) __attribute__ ((__format__ (__printf__, 3, 4))); - int ShareHostSocket(Stream& other_stream); private: friend void StreamWait(StreamId stream_id, const timespec *due_time, @@ -134,6 +134,7 @@ friend struct butil::DefaultDeleter; butil::IOBuf *_pending_buf; int64_t _start_idle_timer_us; bthread_timer_t _idle_timer; + std::once_flag _set_host_socket_flag; }; } // namespace brpc