Skip to content

Commit fd71081

Browse files
committed
Add controller flag to manage ConcurrencyRemover lifecycle for CallAfterRpcResp
In SendRpcResponse, ConcurrencyRemover was destroyed before CallAfterRpcResp was called, meaning concurrency control didn't cover the after-response callback. This could lead to inaccurate concurrency tracking and latency measurements. This change adds a controller-level flag that is automatically set when set_after_rpc_resp_fn() is called. When the flag is true, ConcurrencyRemover's lifetime is extended to cover CallAfterRpcResp. Implementation: - Add _concurrency_remover_manages_after_rpc_resp flag to Controller - Automatically set flag to true in set_after_rpc_resp_fn() - Use unique_ptr with explicit reset() for clear control flow - Apply to both baidu_rpc_protocol and http_rpc_protocol When false (default): Original behavior - ConcurrencyRemover is released before CallAfterRpcResp via explicit reset(). When true (set_after_rpc_resp_fn called): ConcurrencyRemover lives until the end of BRPC_SCOPE_EXIT, covering the entire response lifecycle including after-response callbacks.
1 parent a47e349 commit fd71081

4 files changed

Lines changed: 35 additions & 9 deletions

File tree

src/brpc/controller.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ void Controller::ResetPods() {
298298
_response_streams.clear();
299299
_remote_stream_settings = NULL;
300300
_auth_flags = 0;
301+
_concurrency_remover_manages_after_rpc_resp = false;
301302
_rpc_received_us = 0;
302303
}
303304

src/brpc/controller.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,10 +621,19 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
621621
const google::protobuf::Message* req,
622622
const google::protobuf::Message* res)>;
623623

624-
void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) { _after_rpc_resp_fn = fn; }
624+
void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) {
625+
_after_rpc_resp_fn = fn;
626+
// When after_rpc_resp_fn is set, enable concurrency management for it
627+
_concurrency_remover_manages_after_rpc_resp = true;
628+
}
625629

626630
void CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res);
627631

632+
// Check whether ConcurrencyRemover should manage the lifecycle of CallAfterRpcResp.
633+
bool concurrency_remover_manages_after_rpc_resp() const {
634+
return _concurrency_remover_manages_after_rpc_resp;
635+
}
636+
628637
void set_request_content_type(ContentType type) {
629638
_request_content_type = type;
630639
}
@@ -921,6 +930,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
921930
uint32_t _auth_flags;
922931

923932
AfterRpcRespFnType _after_rpc_resp_fn;
933+
bool _concurrency_remover_manages_after_rpc_resp;
924934

925935
// The point in time when the rpc is read from the socket
926936
int64_t _rpc_received_us;

src/brpc/policy/baidu_rpc_protocol.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,12 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
285285

286286
// Recycle resources at the end of this function.
287287
BRPC_SCOPE_EXIT {
288-
{
289-
// Remove concurrency and record latency at first.
290-
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
288+
std::unique_ptr<ConcurrencyRemover> concurrency_remover_ptr(
289+
new ConcurrencyRemover(method_status, cntl, received_us));
290+
291+
if (!cntl->concurrency_remover_manages_after_rpc_resp()) {
292+
// Original behavior: remove concurrency before CallAfterRpcResp
293+
concurrency_remover_ptr.reset();
291294
}
292295

293296
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
@@ -302,6 +305,8 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
302305
} else {
303306
BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
304307
}
308+
// If concurrency_remover_manages_after_rpc_resp is true,
309+
// concurrency_remover_ptr will be destroyed here
305310
};
306311

307312
StreamIds response_stream_ids = accessor.response_streams();

src/brpc/policy/http_rpc_protocol.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -813,10 +813,7 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure {
813813
public:
814814
explicit HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
815815
void Run() override {
816-
if (NULL != _sender._messages) {
817-
_sender._cntl->CallAfterRpcResp(_sender._messages->Request(),
818-
_sender._messages->Response());
819-
}
816+
// CallAfterRpcResp is now called in HttpResponseSender::~HttpResponseSender()
820817
delete this;
821818
}
822819

@@ -827,9 +824,23 @@ class HttpResponseSenderAsDone : public google::protobuf::Closure {
827824
HttpResponseSender::~HttpResponseSender() {
828825
// Return messages to factory at the end.
829826
BRPC_SCOPE_EXIT {
827+
std::unique_ptr<ConcurrencyRemover> concurrency_remover_ptr;
828+
Controller* cntl = _cntl.get();
829+
if (cntl != NULL) {
830+
concurrency_remover_ptr.reset(new ConcurrencyRemover(_method_status, cntl, _received_us));
831+
832+
if (!cntl->concurrency_remover_manages_after_rpc_resp()) {
833+
// Original behavior: remove concurrency before CallAfterRpcResp
834+
concurrency_remover_ptr.reset();
835+
}
836+
}
837+
830838
if (NULL != _messages) {
839+
_cntl->CallAfterRpcResp(_messages->Request(), _messages->Response());
831840
_cntl->server()->options().rpc_pb_message_factory->Return(_messages);
832841
}
842+
// If concurrency_remover_manages_after_rpc_resp is true,
843+
// concurrency_remover_ptr will be destroyed here
833844
};
834845
Controller* cntl = _cntl.get();
835846
if (cntl == NULL) {
@@ -840,7 +851,6 @@ HttpResponseSender::~HttpResponseSender() {
840851
if (span) {
841852
span->set_start_send_us(butil::cpuwide_time_us());
842853
}
843-
ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
844854
Socket* socket = accessor.get_sending_socket();
845855
const google::protobuf::Message* res = NULL != _messages ? _messages->Response() : NULL;
846856

0 commit comments

Comments
 (0)