Skip to content

Commit 1f307f2

Browse files
authored
Merge pull request #852 from cloudwego/release/v0.5.0
chore: release v0.5.0
2 parents dd39b82 + bb0bcb3 commit 1f307f2

131 files changed

Lines changed: 5317 additions & 1857 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/CODEOWNERS

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# For more information, please refer to https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners
2+
3+
* @cloudwego/Kitex-reviewers @cloudwego/Kitex-approvers @cloudwego/Kitex-maintainers

.github/workflows/pr-check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
1515

1616
- name: Check Spell
17-
uses: crate-ci/typos@master
17+
uses: crate-ci/typos@v1.13.14
1818

1919
staticcheck:
2020
runs-on: [ self-hosted, X64 ]

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,4 @@ jobs:
4848
with:
4949
go-version: ${{ matrix.go }}
5050
- name: Unit Test
51-
run: go test -gcflags=-l -race -covermode=atomic -coverprofile=coverage.txt ./...
51+
run: go test -gcflags=-l -race -covermode=atomic ./...

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Before you submit your Pull Request (PR) consider the following guidelines:
3131
```
3232
git checkout -b my-fix-branch develop
3333
```
34-
6. Create your patch, including appropriate test cases.
34+
6. Create your patch, including appropriate test cases. Please refer to [Go-UT](https://pkg.go.dev/testing#pkg-overview) for writing guides. [Go-Mock](https://github.com/golang/mock) is recommended to mock interface, please refer to internal/mocks/readme.md for more details, and [Mockey](https://github.com/bytedance/mockey) is recommended to mock functions, please refer to its readme doc for specific usage.
3535
7. Follow our [Style Guides](#code-style-guides).
3636
8. Commit your changes using a descriptive commit message that follows [AngularJS Git Commit Message Conventions](https://docs.google.com/document/d/1QrDFcIiPjSLDn3EL15IJygNPiHORgU1_OOAqWjiDU5Y/edit).
3737
Adherence to these conventions is necessary because release notes are automatically generated from these messages.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ Kitex has built-in code generation tools that support generating **Thrift**, **P
5252

5353
- **Basic Features**
5454

55-
Including Message Type, Supported Protocols, Directly Invoke, Connection Pool, Timeout Control, Request Retry, LoadBalancer, Circuit Breaker, Rate Limiting, Instrumentation Control, Logging and HttpResolver.[[more]](https://www.cloudwego.io/docs/tutorials/basic-feature/)
55+
Including Message Type, Supported Protocols, Directly Invoke, Connection Pool, Timeout Control, Request Retry, LoadBalancer, Circuit Breaker, Rate Limiting, Instrumentation Control, Logging and HttpResolver.[[more]](https://www.cloudwego.io/docs/kitex/tutorials/basic-feature/)
5656

5757
- **Governance Features**
5858

client/callopt/options.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/cloudwego/kitex/internal/client"
2828
"github.com/cloudwego/kitex/pkg/discovery"
29+
"github.com/cloudwego/kitex/pkg/fallback"
2930
"github.com/cloudwego/kitex/pkg/http"
3031
"github.com/cloudwego/kitex/pkg/retry"
3132
"github.com/cloudwego/kitex/pkg/rpcinfo"
@@ -45,6 +46,7 @@ type CallOptions struct {
4546

4647
// export field for using in client
4748
RetryPolicy retry.Policy
49+
Fallback *fallback.Policy
4850
}
4951

5052
func newOptions() interface{} {
@@ -63,6 +65,7 @@ func (co *CallOptions) Recycle() {
6365
co.configs = nil
6466
co.svr = nil
6567
co.RetryPolicy = retry.Policy{}
68+
co.Fallback = nil
6669
co.locks.Zero()
6770
callOptionsPool.Put(co)
6871
}
@@ -171,14 +174,14 @@ func WithTag(key, val string) Option {
171174

172175
// WithRetryPolicy sets the retry policy for a RPC call.
173176
// Build retry.Policy with retry.BuildFailurePolicy or retry.BuildBackupRequest instead of building retry.Policy directly.
174-
// Below is use demo, eg:
177+
// Demos are provided below:
175178
//
176179
// demo1. call with failure retry policy, default retry error is Timeout
177-
// resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy())))
180+
// `resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy())))`
178181
// demo2. call with backup request policy
179-
// bp := retry.NewBackupPolicy(10)
182+
// `bp := retry.NewBackupPolicy(10)
180183
// bp.WithMaxRetryTimes(1)
181-
// resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildBackupRequest(bp)))
184+
// resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildBackupRequest(bp)))`
182185
func WithRetryPolicy(p retry.Policy) Option {
183186
return Option{f: func(o *CallOptions, di *strings.Builder) {
184187
if !p.Enable {
@@ -193,6 +196,23 @@ func WithRetryPolicy(p retry.Policy) Option {
193196
}}
194197
}
195198

199+
// WithFallback is used to set the fallback policy for a RPC call.
200+
// Demos are provided below:
201+
//
202+
// demo1. call with fallback for error
203+
// `resp, err := cli.Mock(ctx, req, callopt.WithFallback(fallback.ErrorFallback(yourFBFunc))`
204+
// demo2. call with fallback for error and enable reportAsFallback, which sets reportAsFallback to be true and will do report(metric) as Fallback result
205+
// `resp, err := cli.Mock(ctx, req, callopt.WithFallback(fallback.ErrorFallback(yourFBFunc).EnableReportAsFallback())`
206+
func WithFallback(fb *fallback.Policy) Option {
207+
return Option{f: func(o *CallOptions, di *strings.Builder) {
208+
if !fallback.IsPolicyValid(fb) {
209+
return
210+
}
211+
di.WriteString("WithFallback")
212+
o.Fallback = fb
213+
}}
214+
}
215+
196216
// Apply applies call options to the rpcinfo.RPCConfig and internal.RemoteInfo of kitex client.
197217
// The return value records the name and arguments of each option.
198218
// This function is for internal purpose only.

client/callopt/options_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package callopt
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"testing"
2223
"time"
2324

2425
"github.com/cloudwego/kitex/internal/client"
2526
"github.com/cloudwego/kitex/internal/test"
27+
"github.com/cloudwego/kitex/pkg/fallback"
2628
"github.com/cloudwego/kitex/pkg/http"
29+
"github.com/cloudwego/kitex/pkg/retry"
2730
"github.com/cloudwego/kitex/pkg/rpcinfo"
2831
"github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo"
2932
)
@@ -89,4 +92,27 @@ func TestApply(t *testing.T) {
8992
v, exist := remoteInfo.Tag(mockKey)
9093
test.Assert(t, exist)
9194
test.Assert(t, v == mockVal, v)
95+
96+
// WithRetryPolicy
97+
option = WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy()))
98+
_, co := Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
99+
test.Assert(t, co.RetryPolicy.Enable)
100+
test.Assert(t, co.RetryPolicy.FailurePolicy != nil)
101+
102+
// WithRetryPolicy pass empty struct
103+
option = WithRetryPolicy(retry.Policy{})
104+
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
105+
test.Assert(t, !co.RetryPolicy.Enable)
106+
107+
// WithFallback
108+
option = WithFallback(fallback.ErrorFallback(fallback.UnwrapHelper(func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
109+
return
110+
})).EnableReportAsFallback())
111+
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
112+
test.Assert(t, co.Fallback != nil)
113+
114+
// WithFallback pass nil
115+
option = WithFallback(nil)
116+
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
117+
test.Assert(t, co.Fallback == nil)
92118
}

client/client.go

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"runtime"
24+
"runtime/debug"
2425
"strconv"
2526
"sync/atomic"
2627

@@ -34,6 +35,7 @@ import (
3435
"github.com/cloudwego/kitex/pkg/discovery"
3536
"github.com/cloudwego/kitex/pkg/endpoint"
3637
"github.com/cloudwego/kitex/pkg/event"
38+
"github.com/cloudwego/kitex/pkg/fallback"
3739
"github.com/cloudwego/kitex/pkg/kerrors"
3840
"github.com/cloudwego/kitex/pkg/klog"
3941
"github.com/cloudwego/kitex/pkg/loadbalance"
@@ -244,11 +246,14 @@ func (kc *kClient) initLBCache() error {
244246
NameFunc: func() string { return "no_resolver" },
245247
}
246248
}
249+
// because we cannot ensure that user's custom loadbalancer is cacheable, we need to disable it here
250+
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService, Cacheable: false}
247251
balancer := kc.opt.Balancer
248252
if balancer == nil {
253+
// default internal lb balancer is cacheable
254+
cacheOpts.Cacheable = true
249255
balancer = loadbalance.NewWeightedBalancer()
250256
}
251-
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService}
252257
if kc.opt.BalancerCacheOpt != nil {
253258
cacheOpts = *kc.opt.BalancerCacheOpt
254259
}
@@ -365,48 +370,67 @@ func (kc *kClient) Call(ctx context.Context, method string, request, response in
365370
ctx = kc.opt.TracerCtl.DoStart(ctx, ri)
366371

367372
var callOptRetry retry.Policy
368-
if callOpts != nil && callOpts.RetryPolicy.Enable {
369-
callOptRetry = callOpts.RetryPolicy
370-
}
371-
if kc.opt.RetryContainer == nil {
372-
if callOptRetry.Enable {
373-
// setup retry in callopt
374-
kc.opt.RetryContainer = retry.NewRetryContainer()
375-
} else {
376-
err := kc.eps(ctx, request, response)
377-
kc.opt.TracerCtl.DoFinish(ctx, ri, err)
378-
if err == nil {
379-
err = ri.Invocation().BizStatusErr()
380-
rpcinfo.PutRPCInfo(ri)
381-
}
382-
return err
373+
var callOptFallback *fallback.Policy
374+
if callOpts != nil {
375+
callOptFallback = callOpts.Fallback
376+
if callOpts.RetryPolicy.Enable {
377+
callOptRetry = callOpts.RetryPolicy
383378
}
384379
}
380+
if kc.opt.RetryContainer == nil && callOptRetry.Enable {
381+
// setup retry in callopt
382+
kc.opt.RetryContainer = retry.NewRetryContainer()
383+
}
385384

386-
var callTimes int32
387-
var prevRI rpcinfo.RPCInfo
388-
recycleRI, err := kc.opt.RetryContainer.WithRetryIfNeeded(ctx, callOptRetry, func(ctx context.Context, r retry.Retryer) (rpcinfo.RPCInfo, interface{}, error) {
389-
currCallTimes := int(atomic.AddInt32(&callTimes, 1))
390-
retryCtx := ctx
391-
cRI := ri
392-
if currCallTimes > 1 {
393-
retryCtx, cRI, _ = kc.initRPCInfo(ctx, method)
394-
retryCtx = metainfo.WithPersistentValue(retryCtx, retry.TransitKey, strconv.Itoa(currCallTimes-1))
395-
if prevRI == nil {
396-
prevRI = ri
385+
var err error
386+
var recycleRI bool
387+
if kc.opt.RetryContainer == nil {
388+
err = kc.eps(ctx, request, response)
389+
if err == nil {
390+
recycleRI = true
391+
}
392+
} else {
393+
var callTimes int32
394+
// prevRI represents a value of rpcinfo.RPCInfo type.
395+
var prevRI atomic.Value
396+
recycleRI, err = kc.opt.RetryContainer.WithRetryIfNeeded(ctx, callOptRetry, func(ctx context.Context, r retry.Retryer) (rpcinfo.RPCInfo, interface{}, error) {
397+
currCallTimes := int(atomic.AddInt32(&callTimes, 1))
398+
retryCtx := ctx
399+
cRI := ri
400+
if currCallTimes > 1 {
401+
retryCtx, cRI, _ = kc.initRPCInfo(ctx, method)
402+
retryCtx = metainfo.WithPersistentValue(retryCtx, retry.TransitKey, strconv.Itoa(currCallTimes-1))
403+
if prevRI.Load() == nil {
404+
prevRI.Store(ri)
405+
}
406+
r.Prepare(retryCtx, prevRI.Load().(rpcinfo.RPCInfo), cRI)
407+
prevRI.Store(cRI)
397408
}
398-
r.Prepare(retryCtx, prevRI, cRI)
399-
prevRI = cRI
409+
err := kc.eps(retryCtx, request, response)
410+
return cRI, response, err
411+
}, ri, request)
412+
}
413+
414+
// do fallback if with setup
415+
fallback, hasFallback := getFallbackPolicy(callOptFallback, kc.opt.Fallback)
416+
var fbErr error
417+
reportErr := err
418+
if hasFallback {
419+
reportAsFB := false
420+
// Notice: If rpc err is nil, rpcStatAsFB will always be false, even if it's set to true by user.
421+
fbErr, reportAsFB = fallback.DoIfNeeded(ctx, ri, request, response, err)
422+
if reportAsFB {
423+
reportErr = fbErr
400424
}
401-
err := kc.eps(retryCtx, request, response)
402-
return cRI, response, err
403-
}, ri, request)
425+
err = fbErr
426+
}
404427

405-
kc.opt.TracerCtl.DoFinish(ctx, ri, err)
428+
kc.opt.TracerCtl.DoFinish(ctx, ri, reportErr)
406429
callOpts.Recycle()
407-
if err == nil {
430+
if err == nil && !hasFallback {
408431
err = ri.Invocation().BizStatusErr()
409432
}
433+
410434
if recycleRI {
411435
// why need check recycleRI to decide if recycle RPCInfo?
412436
// 1. no retry, rpc timeout happen will cause panic when response return
@@ -503,6 +527,11 @@ func (kc *kClient) invokeHandleEndpoint() (endpoint.Endpoint, error) {
503527

504528
// Close is not concurrency safe.
505529
func (kc *kClient) Close() error {
530+
defer func() {
531+
if err := recover(); err != nil {
532+
klog.Warnf("KITEX: panic when close client, error=%s, stack=%s", err, string(debug.Stack()))
533+
}
534+
}()
506535
if kc.closed {
507536
return nil
508537
}
@@ -631,3 +660,14 @@ func (kc *kClient) warmingUp() error {
631660

632661
return nil
633662
}
663+
664+
// return fallback policy from call option and client option.
665+
func getFallbackPolicy(callOptFB, cliOptFB *fallback.Policy) (fb *fallback.Policy, hasFallback bool) {
666+
if callOptFB != nil {
667+
return callOptFB, true
668+
}
669+
if cliOptFB != nil {
670+
return cliOptFB, true
671+
}
672+
return nil, false
673+
}

0 commit comments

Comments
 (0)