@@ -184,37 +184,39 @@ struct BaiduProxyPBMessages : public RpcPBMessages {
184184}
185185
186186// Used by UT, can't be static.
187- void SendRpcResponse (int64_t correlation_id,
188- Controller* cntl,
189- RpcPBMessages* messages,
190- const Server* server,
191- MethodStatus* method_status,
192- int64_t received_us) {
187+ void SendRpcResponse (int64_t correlation_id, Controller* cntl,
188+ RpcPBMessages* messages, const Server* server,
189+ MethodStatus* method_status, int64_t received_us) {
193190 ControllerPrivateAccessor accessor (cntl);
194191 Span* span = accessor.span ();
195192 if (span) {
196193 span->set_start_send_us (butil::cpuwide_time_us ());
197194 }
198195 Socket* sock = accessor.get_sending_socket ();
199196
200- std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl (cntl);
201- ConcurrencyRemover concurrency_remover (method_status, cntl, received_us);
197+ const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request ();
198+ const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response ();
199+
200+ // Recycle resources at the end of this function.
201+ BRPC_SCOPE_EXIT {
202+ {
203+ // Remove concurrency and record latency at first.
204+ ConcurrencyRemover concurrency_remover (method_status, cntl, received_us);
205+ }
206+
207+ std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl (cntl);
202208
203- auto messages_guard = butil::MakeScopeGuard ([server, messages] {
204209 if (NULL == messages) {
205210 return ;
206211 }
207- if ( NULL != server-> options (). baidu_master_service ) {
208- BaiduProxyPBMessages::Return ( static_cast <BaiduProxyPBMessages*>(messages) );
209- } else {
212+
213+ cntl-> CallAfterRpcResp (req, res );
214+ if ( NULL == server-> options (). baidu_master_service ) {
210215 server->options ().rpc_pb_message_factory ->Return (messages);
216+ } else {
217+ BaiduProxyPBMessages::Return (static_cast <BaiduProxyPBMessages*>(messages));
211218 }
212- });
213-
214- const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request ();
215- const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response ();
216- ClosureGuard guard (brpc::NewCallback (
217- cntl, &Controller::CallAfterRpcResp, req, res));
219+ };
218220
219221 StreamIds response_stream_ids = accessor.response_streams ();
220222
@@ -299,32 +301,37 @@ void SendRpcResponse(int64_t correlation_id,
299301 }
300302 }
301303
304+ ResponseWriteInfo args;
305+ bthread_id_t response_id = INVALID_BTHREAD_ID ;
302306 if (span) {
303307 span->set_response_size (res_buf.size ());
308+ CHECK_EQ (0 , bthread_id_create (&response_id, &args, HandleResponseWritten));
304309 }
310+
305311 // Send rpc response over stream even if server side failed to create
306312 // stream for some reason.
307- if (cntl->has_remote_stream ()){
313+ if (cntl->has_remote_stream ()) {
308314 // Send the response over stream to notify that this stream connection
309315 // is successfully built.
310316 // Response_stream can be INVALID_STREAM_ID when error occurs.
311317 if (SendStreamData (sock, &res_buf,
312318 accessor.remote_stream_settings ()->stream_id (),
313- response_stream_id) != 0 ) {
314- const int errcode = errno;
315- std::string error_text = butil::string_printf (64 , " Fail to write into %s" ,
316- sock->description ().c_str ());
317- PLOG_IF (WARNING , errcode != EPIPE ) << error_text;
318- cntl->SetFailed (errcode, " %s" , error_text.c_str ());
319- Stream::SetFailed (response_stream_ids, errcode, " %s" ,
320- error_text.c_str ());
319+ response_stream_id, response_id) != 0 ) {
320+ error_code = errno;
321+ PLOG_IF (WARNING , error_code != EPIPE )
322+ << " Fail to write into " << sock->description ();
323+ cntl->SetFailed (error_code, " Fail to write into %s" ,
324+ sock->description ().c_str ());
325+ Stream::SetFailed (response_stream_ids, error_code,
326+ " Fail to write into %s" ,
327+ sock->description ().c_str ());
321328 return ;
322329 }
323330
324331 // Now it's ok the mark these server-side streams as connected as all the
325332 // written user data would follower the RPC response.
326333 // Reuse stream_ptr to avoid address first stream id again
327- if (stream_ptr) {
334+ if (stream_ptr) {
328335 ((Stream*)stream_ptr->conn ())->SetConnected ();
329336 }
330337 for (size_t i = 1 ; i < response_stream_ids.size (); ++i) {
@@ -344,6 +351,10 @@ void SendRpcResponse(int64_t correlation_id,
344351 // users to set max_concurrency.
345352 Socket::WriteOptions wopt;
346353 wopt.ignore_eovercrowded = true ;
354+ if (INVALID_BTHREAD_ID != response_id) {
355+ wopt.id_wait = response_id;
356+ wopt.notify_on_success = true ;
357+ }
347358 if (sock->Write (&res_buf, &wopt) != 0 ) {
348359 const int errcode = errno;
349360 PLOG_IF (WARNING , errcode != EPIPE ) << " Fail to write into " << *sock;
@@ -354,8 +365,10 @@ void SendRpcResponse(int64_t correlation_id,
354365 }
355366
356367 if (span) {
368+ bthread_id_join (response_id);
369+ // Do not care about the result of background writing.
357370 // TODO: this is not sent
358- span->set_sent_us (butil::cpuwide_time_us () );
371+ span->set_sent_us (args. sent_us );
359372 }
360373}
361374
0 commit comments