Skip to content

Commit 73d007a

Browse files
authored
perf(gRPC): reduce object allocations on the gRPC client side for unified cancel scenarios (#1950)
1 parent a86ecf4 commit 73d007a

3 files changed

Lines changed: 283 additions & 18 deletions

File tree

pkg/remote/trans/nphttp2/grpc/http2_client.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,15 @@ func (task *closeStreamTask) Tick() {
312312
trans.mu.Unlock()
313313

314314
for i, stream := range task.toCloseStreams {
315-
// uniformly converted to status error
316-
sErr := ContextErr(stream.Context().Err())
317-
trans.closeStream(stream, sErr, true, http2.ErrCodeCancel, status.Convert(sErr), nil, false)
315+
if !trans.casStreamDone(stream) {
316+
// stream has been closed.
317+
// there is no need to do following processing to reduce objects allocation
318+
task.toCloseStreams[i] = nil
319+
continue
320+
}
321+
// uniformly converted to *status.Status and related error
322+
st, stReused, sErr := contextStatusAndErr(stream.Context().Err())
323+
trans.doCloseStream(stream, sErr, true, http2.ErrCodeCancel, st, stReused, nil)
318324
task.toCloseStreams[i] = nil
319325
}
320326
task.toCloseStreams = task.toCloseStreams[:0]
@@ -612,23 +618,38 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
612618
}
613619
klog.CtxInfof(s.ctx, "KITEX: stream closed by ctx canceled, err: %v, rstCode: %d"+sendRSTStreamFrameSuffix, err, rstCode)
614620
}
615-
t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
621+
t.closeStream(s, err, rst, rstCode, status.Convert(err), nil)
616622
}
617623

618624
// before invoking closeStream, pls do not hold the t.mu
619625
// because accessing the controlbuf while holding t.mu will cause a deadlock.
620-
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
621-
// Set stream status to done.
626+
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string) {
627+
if !t.casStreamDone(s) {
628+
return
629+
}
630+
t.doCloseStream(s, err, rst, rstCode, st, false, mdata)
631+
}
632+
633+
// casStreamDone atomically marks s as done. Returns true if the caller wins the race
634+
// and should proceed with doCloseStream; false if another goroutine already closed it.
635+
func (t *http2Client) casStreamDone(s *Stream) bool {
622636
if s.swapState(streamDone) == streamDone {
623637
// If it was already done, return. If multiple closeStream calls
624638
// happen simultaneously, wait for the first to finish.
625639
<-s.done
626-
return
640+
return false
627641
}
642+
return true
643+
}
644+
645+
// doCloseStream performs the actual stream cleanup. Caller must have won casStreamDone first.
646+
func (t *http2Client) doCloseStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, reuseSt bool, mdata map[string][]string) {
628647
// status and trailers can be updated here without any synchronization because the stream goroutine will
629648
// only read it after it sees an io.EOF error from read or write and we'll write those errors
630649
// only after updating this.
631650
s.status = st
651+
// if reuseSt == true, retrieves status by Stream.Status() must copy a new one
652+
s.reuseStatus = reuseSt
632653
if len(mdata) > 0 {
633654
s.trailer = mdata
634655
}
@@ -713,7 +734,7 @@ func (t *http2Client) Close(err error) error {
713734

714735
// Notify all active streams.
715736
for _, s := range streams {
716-
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
737+
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil)
717738
}
718739
return cErr
719740
}
@@ -857,7 +878,7 @@ func (t *http2Client) handleData(f *grpcframe.DataFrame) {
857878
if size > 0 {
858879
if err := s.fc.onData(size); err != nil {
859880
klog.CtxInfof(s.ctx, "KITEX: http2Client.handleData inflow control err: %v, rstCode: %d"+sendRSTStreamFrameSuffix, err, http2.ErrCodeFlowControl)
860-
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
881+
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil)
861882
return
862883
}
863884
if f.Header().Flags.Has(http2.FlagDataPadded) {
@@ -878,7 +899,7 @@ func (t *http2Client) handleData(f *grpcframe.DataFrame) {
878899
// The server has closed the stream without sending trailers. Record that
879900
// the read direction is closed, and set the status appropriately.
880901
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
881-
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
902+
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil)
882903
}
883904
}
884905

@@ -909,7 +930,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
909930
} else {
910931
msg = fmt.Sprintf("stream terminated by RST_STREAM with error code: %v", f.ErrCode)
911932
}
912-
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(statusCode, msg), nil, false)
933+
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(statusCode, msg), nil)
913934
}
914935

915936
func (t *http2Client) handleSettings(f *grpcframe.SettingsFrame, isFirst bool) {
@@ -1049,7 +1070,7 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
10491070
// Pls refer to checkForStreamQuota in NewStream, it gets the controlbuf.mu and
10501071
// wants to get the t.mu.
10511072
for _, stream := range unprocessedStream {
1052-
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1073+
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil)
10531074
}
10541075
}
10551076

@@ -1094,7 +1115,7 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
10941115
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
10951116
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
10961117
klog.CtxInfof(s.ctx, "KITEX: http2Client.operateHeaders received HEADERS frame in the middle of a stream, rstCode: %d"+sendRSTStreamFrameSuffix, http2.ErrCodeProtocol)
1097-
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1118+
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil)
10981119
return
10991120
}
11001121

@@ -1103,7 +1124,7 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
11031124
state.data.isGRPC = !initialHeader
11041125
if err := state.decodeHeader(frame); err != nil {
11051126
klog.CtxInfof(s.ctx, "KITEX: http2Client.operateHeaders decode HEADERS frame failed, err: %v, rstCode: %d"+sendRSTStreamFrameSuffix, err, http2.ErrCodeProtocol)
1106-
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
1127+
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil)
11071128
return
11081129
}
11091130

@@ -1134,7 +1155,7 @@ func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
11341155
// if client received END_STREAM from server while stream was still active, send RST_STREAM
11351156
rst := s.getState() == streamActive
11361157
s.SetBizStatusErr(state.bizStatusErr())
1137-
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
1158+
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata)
11381159
}
11391160

11401161
// reader runs as a separate goroutine in charge of reading data from network
@@ -1188,7 +1209,7 @@ func (t *http2Client) reader() {
11881209
msg = err.Error()
11891210
}
11901211
klog.CtxInfof(s.ctx, "KITEX: http2Client.reader encountered http2.StreamError: %v, rstCode: %d"+sendRSTStreamFrameSuffix, se, http2.ErrCodeProtocol)
1191-
t.closeStream(s, status.New(code, msg).Err(), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1212+
t.closeStream(s, status.New(code, msg).Err(), true, http2.ErrCodeProtocol, status.New(code, msg), nil)
11921213
}
11931214
continue
11941215
} else {

pkg/remote/trans/nphttp2/grpc/transport.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ type Stream struct {
278278
// On client-side it is the status error received from the server.
279279
// On server-side it is unused.
280280
status *status.Status
281+
reuseStatus bool
281282
bizStatusErr kerrors.BizStatusErrorIface
282283

283284
bytesReceived uint32 // indicates whether any bytes have been received on this stream
@@ -454,6 +455,11 @@ func (s *Stream) Method() string {
454455
// Status can be read safely only after the stream has ended,
455456
// that is, after Done() is closed.
456457
func (s *Stream) Status() *status.Status {
458+
// When the internal status is a shared pre-defined instance (reuseStatus == true),
459+
// a deep copy is returned to prevent callers from mutating the shared object via AppendMessage().
460+
if s.reuseStatus && s.status != nil {
461+
return status.FromProto(s.status.Proto())
462+
}
457463
return s.status
458464
}
459465

@@ -875,13 +881,20 @@ const (
875881
GoAwayTooManyPings GoAwayReason = 2
876882
)
877883

884+
var (
885+
statusDeadlineExceeded = status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
886+
errDeadlineExceeded = statusDeadlineExceeded.Err()
887+
statusCanceled = status.New(codes.Canceled, context.Canceled.Error())
888+
errCanceled = statusCanceled.Err()
889+
)
890+
878891
// ContextErr converts the error from context package into a status error.
879892
func ContextErr(err error) error {
880893
switch err {
881894
case context.DeadlineExceeded:
882-
return status.New(codes.DeadlineExceeded, err.Error()).Err()
895+
return errDeadlineExceeded
883896
case context.Canceled:
884-
return status.New(codes.Canceled, err.Error()).Err()
897+
return errCanceled
885898
}
886899
statusErr, ok := err.(*status.Error)
887900
if ok { // only returned by contextWithCancelReason
@@ -890,6 +903,26 @@ func ContextErr(err error) error {
890903
return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
891904
}
892905

906+
// contextStatusAndErr converts err to (*status.Status, reused, error).
907+
// For context.Canceled / context.DeadlineExceeded, both the *status.Status and *status.Error
908+
// are pre-defined shared singletons (reused=true).
909+
// The caller must use reused flag to ensure Stream.Status() returns a copy instead of the shared instance,
910+
// preventing mutation via AppendMessage().
911+
func contextStatusAndErr(err error) (*status.Status, bool, error) {
912+
switch err {
913+
case context.DeadlineExceeded:
914+
return statusDeadlineExceeded, true, errDeadlineExceeded
915+
case context.Canceled:
916+
return statusCanceled, true, errCanceled
917+
}
918+
stErr, ok := err.(*status.Error)
919+
if ok {
920+
return stErr.GRPCStatus(), false, stErr
921+
}
922+
st := status.Newf(codes.Internal, "Unexpected error from context packet: %v", err)
923+
return st, false, st.Err()
924+
}
925+
893926
// IsStreamDoneErr returns true if the error indicates that the stream is done.
894927
func IsStreamDoneErr(err error) bool {
895928
return errors.Is(err, errStreamDone)

0 commit comments

Comments
 (0)