Skip to content

Commit e95eee3

Browse files
authored
Merge pull request #902 from cloudwego/release/v0.5.2
chore: release v0.5.2
2 parents 9c76b55 + a05a710 commit e95eee3

38 files changed

Lines changed: 9745 additions & 119 deletions

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Please check your PR title with the below requirements:
2323
-->
2424
- [ ] This PR title match the format: \<type\>(optional scope): \<description\>
2525
- [ ] The description of this PR title is user-oriented and clear enough for others to understand.
26+
- [ ] Attach the PR updating the user documentation if the current PR requires user awareness at the usage level. [User docs repo](https://github.com/cloudwego/cloudwego.github.io)
2627

2728

2829
#### (Optional) Translate the PR title into Chinese.
@@ -35,8 +36,13 @@ Provide more detailed info for review(e.g., it's recommended to provide perf dat
3536
en:
3637
zh(optional):
3738

38-
#### Which issue(s) this PR fixes:
39+
#### (Optional) Which issue(s) this PR fixes:
3940
<!--
4041
Automatically closes linked issue when PR is merged.
4142
Eg: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
4243
-->
44+
45+
#### (optional) The PR that updates user documentation:
46+
<!--
47+
If the current PR requires user awareness at the usage level, please submit a PR to update user docs. [User docs repo](https://github.com/cloudwego/cloudwego.github.io)
48+
-->

.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,6 @@ header:
3232
- pkg/generic/descriptor/tree_test.go
3333
- pkg/generic/httppb_test/idl/echo.pb.go
3434
- pkg/utils/json.go
35+
- pkg/protocol/bthrift/test/kitex_gen/**
3536

3637
comment: on-failure

client/client_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,15 +544,24 @@ func TestRetryWithResultRetry(t *testing.T) {
544544

545545
mockErr := errors.New("mock")
546546
retryWithMockErr := false
547-
var count int32
548547
errMW := func(next endpoint.Endpoint) endpoint.Endpoint {
548+
var count int32
549549
return func(ctx context.Context, req, resp interface{}) (err error) {
550550
if atomic.CompareAndSwapInt32(&count, 0, 1) {
551551
return mockErr
552552
}
553553
return nil
554554
}
555555
}
556+
mockTimeoutMW := func(next endpoint.Endpoint) endpoint.Endpoint {
557+
var count int32
558+
return func(ctx context.Context, req, resp interface{}) (err error) {
559+
if atomic.CompareAndSwapInt32(&count, 0, 1) {
560+
time.Sleep(300 * time.Millisecond)
561+
}
562+
return nil
563+
}
564+
}
556565
errRetryFunc := func(err error, ri rpcinfo.RPCInfo) bool {
557566
if errors.Is(err, mockErr) {
558567
retryWithMockErr = true
@@ -561,7 +570,7 @@ func TestRetryWithResultRetry(t *testing.T) {
561570
return false
562571
}
563572

564-
// should timeout
573+
// case 1: retry for mockErr
565574
cli := newMockClient(t, ctrl,
566575
WithMiddleware(errMW),
567576
WithRPCTimeout(100*time.Millisecond),
@@ -580,6 +589,40 @@ func TestRetryWithResultRetry(t *testing.T) {
580589
err := cli.Call(context.Background(), mtd, req, res)
581590
test.Assert(t, err == nil, err)
582591
test.Assert(t, retryWithMockErr)
592+
593+
// case 1: retry for timeout
594+
cli = newMockClient(t, ctrl,
595+
WithMiddleware(mockTimeoutMW),
596+
WithRPCTimeout(100*time.Millisecond),
597+
WithFailureRetry(&retry.FailurePolicy{
598+
StopPolicy: retry.StopPolicy{
599+
MaxRetryTimes: 3,
600+
CBPolicy: retry.CBPolicy{
601+
ErrorRate: 0.1,
602+
},
603+
},
604+
RetrySameNode: true,
605+
ShouldResultRetry: &retry.ShouldResultRetry{ErrorRetry: errRetryFunc},
606+
}))
607+
err = cli.Call(context.Background(), mtd, req, res)
608+
test.Assert(t, err == nil, err)
609+
610+
// case 2: set NotRetryForTimeout as true, won't do retry for timeout
611+
cli = newMockClient(t, ctrl,
612+
WithMiddleware(mockTimeoutMW),
613+
WithRPCTimeout(100*time.Millisecond),
614+
WithFailureRetry(&retry.FailurePolicy{
615+
StopPolicy: retry.StopPolicy{
616+
MaxRetryTimes: 3,
617+
CBPolicy: retry.CBPolicy{
618+
ErrorRate: 0.1,
619+
},
620+
},
621+
RetrySameNode: true,
622+
ShouldResultRetry: &retry.ShouldResultRetry{ErrorRetry: errRetryFunc, NotRetryForTimeout: true},
623+
}))
624+
err = cli.Call(context.Background(), mtd, req, res)
625+
test.Assert(t, errors.Is(err, kerrors.ErrRPCTimeout))
583626
}
584627

585628
func TestFallbackForError(t *testing.T) {

client/rpctimeout.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,14 @@ func makeTimeoutErr(ctx context.Context, start time.Time, timeout time.Duration)
6161
errMsg = fmt.Sprintf("%s, remote=%s", errMsg, target.String())
6262
}
6363

64+
needFineGrainedErrCode := rpctimeout.LoadGlobalNeedFineGrainedErrCode()
6465
// cancel error
6566
if ctx.Err() == context.Canceled {
66-
return kerrors.ErrRPCTimeout.WithCause(fmt.Errorf("%s: %w by business", errMsg, ctx.Err()))
67+
if needFineGrainedErrCode {
68+
return kerrors.ErrCanceledByBusiness
69+
} else {
70+
return kerrors.ErrRPCTimeout.WithCause(fmt.Errorf("%s: %w by business", errMsg, ctx.Err()))
71+
}
6772
}
6873

6974
if ddl, ok := ctx.Deadline(); !ok {
@@ -75,10 +80,19 @@ func makeTimeoutErr(ctx context.Context, start time.Time, timeout time.Duration)
7580
if roundTimeout >= 0 && ddl.Before(start.Add(roundTimeout)) {
7681
errMsg = fmt.Sprintf("%s, context deadline earlier than timeout, actual=%v", errMsg, ddl.Sub(start))
7782
}
83+
84+
if needFineGrainedErrCode && isBusinessTimeout(start, timeout, ddl, rpctimeout.LoadBusinessTimeoutThreshold()) {
85+
return kerrors.ErrTimeoutByBusiness
86+
}
7887
}
7988
return kerrors.ErrRPCTimeout.WithCause(errors.New(errMsg))
8089
}
8190

91+
func isBusinessTimeout(start time.Time, kitexTimeout time.Duration, actualDDL time.Time, threshold time.Duration) bool {
92+
kitexDDL := start.Add(kitexTimeout)
93+
return actualDDL.Add(threshold).Before(kitexDDL)
94+
}
95+
8296
func rpcTimeoutMW(mwCtx context.Context) endpoint.Middleware {
8397
var moreTimeout time.Duration
8498
if v, ok := mwCtx.Value(rpctimeout.TimeoutAdjustKey).(*time.Duration); ok && v != nil {

client/rpctimeout_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,121 @@ func TestNewRPCTimeoutMW(t *testing.T) {
9999
err = mw1(mw2(block))(cancelCtx, nil, nil)
100100
test.Assert(t, errors.Is(err, context.Canceled), err)
101101
}
102+
103+
func TestIsBusinessTimeout(t *testing.T) {
104+
type args struct {
105+
start time.Time
106+
kitexTimeout time.Duration
107+
actualDDL time.Time
108+
threshold time.Duration
109+
}
110+
start := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
111+
tests := []struct {
112+
name string
113+
args args
114+
want bool
115+
}{
116+
{
117+
name: "business ddl ahead of kitex ddl by more than threshold",
118+
args: args{
119+
start: start,
120+
kitexTimeout: time.Second * 2,
121+
actualDDL: start.Add(time.Second),
122+
threshold: time.Millisecond * 500,
123+
},
124+
want: true,
125+
},
126+
{
127+
name: "business ddl ahead of kitex ddl by less than threshold",
128+
args: args{
129+
start: start,
130+
kitexTimeout: time.Second,
131+
actualDDL: start.Add(time.Millisecond * 800),
132+
threshold: time.Millisecond * 500,
133+
},
134+
want: false,
135+
},
136+
}
137+
for _, tt := range tests {
138+
t.Run(tt.name, func(t *testing.T) {
139+
if got := isBusinessTimeout(tt.args.start, tt.args.kitexTimeout, tt.args.actualDDL, tt.args.threshold); got != tt.want {
140+
t.Errorf("isBusinessTimeout() = %v, want %v", got, tt.want)
141+
}
142+
})
143+
}
144+
}
145+
146+
func TestRpcTimeoutMWTimeoutByBusiness(t *testing.T) {
147+
runTimeoutMW := func() error {
148+
mw := rpcTimeoutMW(context.Background())
149+
processor := mw(func(ctx context.Context, req, rsp interface{}) error {
150+
time.Sleep(time.Millisecond * 100)
151+
return nil
152+
})
153+
154+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*30)
155+
defer cancel()
156+
157+
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, mockRPCInfo(time.Second))
158+
159+
return processor(ctx, nil, nil)
160+
}
161+
162+
t.Run("Timeout by business with no need fine grained err code", func(t *testing.T) {
163+
rpctimeout.DisableGlobalNeedFineGrainedErrCode()
164+
if err := runTimeoutMW(); !kerrors.IsTimeoutError(err) {
165+
t.Errorf("rpcTimeoutMW() = %v, want %v", err, kerrors.ErrRPCTimeout)
166+
}
167+
})
168+
169+
t.Run("Timeout by business with need fine grained err code", func(t *testing.T) {
170+
rpctimeout.EnableGlobalNeedFineGrainedErrCode()
171+
if err := runTimeoutMW(); err != kerrors.ErrTimeoutByBusiness {
172+
t.Errorf("rpcTimeoutMW() = %v, want %v", err, kerrors.ErrTimeoutByBusiness)
173+
}
174+
rpctimeout.DisableGlobalNeedFineGrainedErrCode()
175+
})
176+
}
177+
178+
func TestRpcTimeoutMWCancelByBusiness(t *testing.T) {
179+
runTimeoutMW := func() error {
180+
mw := rpcTimeoutMW(context.Background())
181+
processor := mw(func(ctx context.Context, req, rsp interface{}) error {
182+
time.Sleep(time.Millisecond * 100)
183+
return nil
184+
})
185+
186+
ctx, cancel := context.WithCancel(context.Background())
187+
go func() {
188+
time.Sleep(10 * time.Millisecond)
189+
cancel()
190+
}()
191+
192+
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, mockRPCInfo(time.Second))
193+
194+
return processor(ctx, nil, nil)
195+
}
196+
197+
t.Run("Cancel by business with no need fine grained err code", func(t *testing.T) {
198+
rpctimeout.DisableGlobalNeedFineGrainedErrCode()
199+
if err := runTimeoutMW(); !kerrors.IsTimeoutError(err) {
200+
t.Errorf("rpcTimeoutMW() = %v, want %v", err, kerrors.ErrRPCTimeout)
201+
}
202+
})
203+
204+
t.Run("Cancel by business with need fine grained err code", func(t *testing.T) {
205+
rpctimeout.EnableGlobalNeedFineGrainedErrCode()
206+
if err := runTimeoutMW(); err != kerrors.ErrCanceledByBusiness {
207+
t.Errorf("rpcTimeoutMW() = %v, want %v", err, kerrors.ErrCanceledByBusiness)
208+
}
209+
rpctimeout.DisableGlobalNeedFineGrainedErrCode()
210+
})
211+
}
212+
213+
func mockRPCInfo(timeout time.Duration) rpcinfo.RPCInfo {
214+
s := rpcinfo.NewEndpointInfo("mockService", "mockMethod", nil, nil)
215+
c := rpcinfo.NewRPCConfig()
216+
mc := rpcinfo.AsMutableRPCConfig(c)
217+
_ = mc.SetRPCTimeout(timeout)
218+
return rpcinfo.NewRPCInfo(nil, s, nil, c, rpcinfo.NewRPCStats())
219+
}

client/stream_test.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,24 @@ func TestStreaming(t *testing.T) {
102102
kc: kc,
103103
}
104104

105-
// recv nil msg
106-
err = stream.RecvMsg(nil)
107-
test.Assert(t, err == nil, err)
108-
109-
// send nil msg
110-
err = stream.SendMsg(nil)
111-
test.Assert(t, err == nil, err)
105+
for i := 0; i < 10; i++ {
106+
ch := make(chan struct{})
107+
// recv nil msg
108+
go func() {
109+
err := stream.RecvMsg(nil)
110+
test.Assert(t, err == nil, err)
111+
ch <- struct{}{}
112+
}()
113+
114+
// send nil msg
115+
go func() {
116+
err := stream.SendMsg(nil)
117+
test.Assert(t, err == nil, err)
118+
ch <- struct{}{}
119+
}()
120+
<-ch
121+
<-ch
122+
}
112123

113124
// close
114125
err = stream.Close()

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/cloudwego/fastpb v0.0.4
1111
github.com/cloudwego/frugal v0.1.6
1212
github.com/cloudwego/netpoll v0.3.2
13-
github.com/cloudwego/thriftgo v0.2.8
13+
github.com/cloudwego/thriftgo v0.2.9
1414
github.com/golang/mock v1.6.0
1515
github.com/google/pprof v0.0.0-20220608213341-c488b8fa1db3
1616
github.com/jhump/protoreflect v1.8.2

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,14 @@ github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4M
2626
github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic=
2727
github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
2828
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
29-
github.com/cloudwego/fastpb v0.0.4-0.20230131074846-6fc453d58b96 h1:61PQT0CXNUuQDiDKv/QQ+pFi9uthExZLQz8b5WfS7Qw=
30-
github.com/cloudwego/fastpb v0.0.4-0.20230131074846-6fc453d58b96/go.mod h1:/V13XFTq2TUkxj2qWReV8MwfPC4NnPcy6FsrojnsSG0=
3129
github.com/cloudwego/fastpb v0.0.4 h1:/ROVVfoFtpfc+1pkQLzGs+azjxUbSOsAqSY4tAAx4mg=
3230
github.com/cloudwego/fastpb v0.0.4/go.mod h1:/V13XFTq2TUkxj2qWReV8MwfPC4NnPcy6FsrojnsSG0=
3331
github.com/cloudwego/frugal v0.1.6 h1:aXJ7W0Omion1WTCe4JHAWinQmjXDYzHt03sabu3Rabo=
3432
github.com/cloudwego/frugal v0.1.6/go.mod h1:9ElktKsh5qd2zDBQ5ENhPSQV7F2dZ/mXlr1eaZGDBFs=
3533
github.com/cloudwego/netpoll v0.3.2 h1:/998ICrNMVBo4mlul4j7qcIeY7QnEfuCCPPwck9S3X4=
3634
github.com/cloudwego/netpoll v0.3.2/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ=
37-
github.com/cloudwego/thriftgo v0.2.8 h1:swwp+JQDeL8bBbvzJN3D3J5fluWP+chiUqVPbnToV0I=
38-
github.com/cloudwego/thriftgo v0.2.8/go.mod h1:dAyXHEmKXo0LfMCrblVEY3mUZsdeuA5+i0vF5f09j7E=
35+
github.com/cloudwego/thriftgo v0.2.9 h1:uN58Y6LkVXHWWEFcDFVi4gAnvmFTxTqgi26NZaRAubQ=
36+
github.com/cloudwego/thriftgo v0.2.9/go.mod h1:dAyXHEmKXo0LfMCrblVEY3mUZsdeuA5+i0vF5f09j7E=
3937
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
4038
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4139
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

pkg/kerrors/kerrors.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@ import (
2626

2727
// Basic error types
2828
var (
29-
ErrInternalException = &basicError{"internal exception"}
30-
ErrServiceDiscovery = &basicError{"service discovery error"}
31-
ErrGetConnection = &basicError{"get connection error"}
32-
ErrLoadbalance = &basicError{"loadbalance error"}
33-
ErrNoMoreInstance = &basicError{"no more instances to retry"}
34-
ErrRPCTimeout = &basicError{"rpc timeout"}
35-
ErrACL = &basicError{"request forbidden"}
36-
ErrCircuitBreak = &basicError{"forbidden by circuitbreaker"}
37-
ErrRemoteOrNetwork = &basicError{"remote or network error"}
38-
ErrOverlimit = &basicError{"request over limit"}
39-
ErrPanic = &basicError{"panic"}
40-
ErrBiz = &basicError{"biz error"}
29+
ErrInternalException = &basicError{"internal exception"}
30+
ErrServiceDiscovery = &basicError{"service discovery error"}
31+
ErrGetConnection = &basicError{"get connection error"}
32+
ErrLoadbalance = &basicError{"loadbalance error"}
33+
ErrNoMoreInstance = &basicError{"no more instances to retry"}
34+
ErrRPCTimeout = &basicError{"rpc timeout"}
35+
ErrCanceledByBusiness = &basicError{"canceled by business"}
36+
ErrTimeoutByBusiness = &basicError{"timeout by business"}
37+
ErrACL = &basicError{"request forbidden"}
38+
ErrCircuitBreak = &basicError{"forbidden by circuitbreaker"}
39+
ErrRemoteOrNetwork = &basicError{"remote or network error"}
40+
ErrOverlimit = &basicError{"request over limit"}
41+
ErrPanic = &basicError{"panic"}
42+
ErrBiz = &basicError{"biz error"}
4143

4244
ErrRetry = &basicError{"retry error"}
4345
// ErrRPCFinish happens when retry enabled and there is one call has finished

pkg/protocol/bthrift/test/gen.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/bash
2+
3+
kitex -module github.com/cloudwego/kitex -thrift keep_unknown_fields test.thrift
4+

0 commit comments

Comments
 (0)