@@ -33,6 +33,7 @@ import (
3333 "github.com/cloudwego/kitex/pkg/diagnosis"
3434 "github.com/cloudwego/kitex/pkg/discovery"
3535 "github.com/cloudwego/kitex/pkg/endpoint"
36+ "github.com/cloudwego/kitex/pkg/event"
3637 "github.com/cloudwego/kitex/pkg/kerrors"
3738 "github.com/cloudwego/kitex/pkg/klog"
3839 "github.com/cloudwego/kitex/pkg/loadbalance"
@@ -46,7 +47,6 @@ import (
4647 "github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo"
4748 "github.com/cloudwego/kitex/pkg/rpctimeout"
4849 "github.com/cloudwego/kitex/pkg/serviceinfo"
49- "github.com/cloudwego/kitex/pkg/transmeta"
5050 "github.com/cloudwego/kitex/pkg/utils"
5151 "github.com/cloudwego/kitex/pkg/warmup"
5252 "github.com/cloudwego/kitex/transport"
@@ -74,21 +74,27 @@ type kClient struct {
7474 closed bool
7575}
7676
77+ // Set finalizer on kClient does not take effect, because kClient has a circular reference problem
78+ // when construct the endpoint.Endpoint in the invokeHandleEndpoint,
79+ // so wrapping kClient as kcFinalizerClient, and set finalizer on kcFinalizerClient, it can solve this problem.
80+ type kcFinalizerClient struct {
81+ * kClient
82+ }
83+
7784// NewClient creates a kitex.Client with the given ServiceInfo, it is from generated code.
7885func NewClient (svcInfo * serviceinfo.ServiceInfo , opts ... Option ) (Client , error ) {
7986 if svcInfo == nil {
8087 return nil , errors .New ("NewClient: no service info" )
8188 }
82- kc := & kClient {
83- svcInfo : svcInfo ,
84- opt : client .NewOptions (opts ),
85- }
89+ kc := & kcFinalizerClient {kClient : & kClient {}}
90+ kc .svcInfo = svcInfo
91+ kc .opt = client .NewOptions (opts )
8692 if err := kc .init (); err != nil {
8793 return nil , err
8894 }
8995 // like os.File, if kc is garbage-collected, but Close is not called, call Close.
90- runtime .SetFinalizer (kc , func (c * kClient ) {
91- c .Close ()
96+ runtime .SetFinalizer (kc , func (c * kcFinalizerClient ) {
97+ _ = c .Close ()
9298 })
9399 return kc , nil
94100}
@@ -107,6 +113,9 @@ func (kc *kClient) init() (err error) {
107113 if err = kc .initProxy (); err != nil {
108114 return err
109115 }
116+ if err = kc .initConnPool (); err != nil {
117+ return err
118+ }
110119 if err = kc .initLBCache (); err != nil {
111120 return err
112121 }
@@ -181,6 +190,33 @@ func (kc *kClient) initProxy() error {
181190 return nil
182191}
183192
193+ func (kc * kClient ) initConnPool () error {
194+ pool := kc .opt .RemoteOpt .ConnPool
195+ kc .opt .CloseCallbacks = append (kc .opt .CloseCallbacks , pool .Close )
196+
197+ if df , ok := pool .(interface { Dump () interface {} }); ok {
198+ kc .opt .DebugService .RegisterProbeFunc (diagnosis .ConnPoolKey , df .Dump )
199+ }
200+ if r , ok := pool .(remote.ConnPoolReporter ); ok && kc .opt .RemoteOpt .EnableConnPoolReporter {
201+ r .EnableReporter ()
202+ }
203+
204+ if long , ok := pool .(remote.LongConnPool ); ok {
205+ kc .opt .Bus .Watch (discovery .ChangeEventName , func (ev * event.Event ) {
206+ ch , ok := ev .Extra .(* discovery.Change )
207+ if ! ok {
208+ return
209+ }
210+ for _ , inst := range ch .Removed {
211+ if addr := inst .Address (); addr != nil {
212+ long .Clean (addr .Network (), addr .String ())
213+ }
214+ }
215+ })
216+ }
217+ return nil
218+ }
219+
184220func (kc * kClient ) initLBCache () error {
185221 if kc .opt .Proxy != nil && kc .opt .Resolver == nil {
186222 return nil
@@ -226,7 +262,7 @@ func (kc *kClient) initMiddlewares(ctx context.Context) {
226262 if kc .opt .XDSEnabled && kc .opt .XDSRouterMiddleware != nil && kc .opt .Proxy == nil {
227263 kc .mws = append (kc .mws , kc .opt .XDSRouterMiddleware )
228264 }
229- kc .mws = append (kc .mws , kc .opt .CBSuite .ServiceCBMW (), rpcTimeoutMW (ctx ))
265+ kc .mws = append (kc .mws , kc .opt .CBSuite .ServiceCBMW (), rpcTimeoutMW (ctx ), contextMW )
230266 kc .mws = append (kc .mws , builderMWs ... )
231267 kc .mws = append (kc .mws , acl .NewACLMiddleware (kc .opt .ACLRules ))
232268 if kc .opt .Proxy == nil {
@@ -330,6 +366,7 @@ func (kc *kClient) Call(ctx context.Context, method string, request, response in
330366 err := kc .eps (ctx , request , response )
331367 kc .opt .TracerCtl .DoFinish (ctx , ri , err )
332368 if err == nil {
369+ err = ri .Invocation ().BizStatusErr ()
333370 rpcinfo .PutRPCInfo (ri )
334371 }
335372 return err
@@ -357,6 +394,9 @@ func (kc *kClient) Call(ctx context.Context, method string, request, response in
357394
358395 kc .opt .TracerCtl .DoFinish (ctx , ri , err )
359396 callOpts .Recycle ()
397+ if err == nil {
398+ err = ri .Invocation ().BizStatusErr ()
399+ }
360400 if recycleRI {
361401 // why need check recycleRI to decide if recycle RPCInfo?
362402 // 1. no retry, rpc timeout happen will cause panic when response return
@@ -380,21 +420,14 @@ func (kc *kClient) initDebugService() {
380420
381421func (kc * kClient ) richRemoteOption () {
382422 kc .opt .RemoteOpt .SvcInfo = kc .svcInfo
383- // add default meta handler
384- if len (kc .opt .MetaHandlers ) == 0 {
385- if kc .opt .Configs .TransportProtocol ()& transport .GRPC == transport .GRPC {
386- kc .opt .MetaHandlers = append (kc .opt .MetaHandlers , transmeta .ClientHTTP2Handler )
387- }
388- if kc .opt .Configs .TransportProtocol ()& transport .TTHeader == transport .TTHeader {
389- kc .opt .MetaHandlers = append (kc .opt .MetaHandlers , transmeta .ClientTTHeaderHandler )
390- }
391- }
392423 // for client trans info handler
393- // TODO in stream situations, meta is only assembled when the stream creates
394- // metaHandler needs to be called separately.
395- // (newClientStreamer: call WriteMeta before remotecli.NewClient)
396- transInfoHdlr := bound .NewTransMetaHandler (kc .opt .MetaHandlers )
397- kc .opt .RemoteOpt .PrependBoundHandler (transInfoHdlr )
424+ if len (kc .opt .MetaHandlers ) > 0 {
425+ // TODO in stream situations, meta is only assembled when the stream creates
426+ // metaHandler needs to be called separately.
427+ // (newClientStreamer: call WriteMeta before remotecli.NewClient)
428+ transInfoHdlr := bound .NewTransMetaHandler (kc .opt .MetaHandlers )
429+ kc .opt .RemoteOpt .PrependBoundHandler (transInfoHdlr )
430+ }
398431}
399432
400433func (kc * kClient ) buildInvokeChain () error {
@@ -461,7 +494,7 @@ func (kc *kClient) invokeHandleEndpoint() (endpoint.Endpoint, error) {
461494// Close is not concurrency safe.
462495func (kc * kClient ) Close () error {
463496 if kc .closed {
464- panic ( "client is already closed" )
497+ return nil
465498 }
466499 kc .closed = true
467500 var errs utils.ErrChain
@@ -475,7 +508,6 @@ func (kc *kClient) Close() error {
475508 errs .Append (err )
476509 }
477510 }
478- runtime .SetFinalizer (kc , nil )
479511 if errs .HasError () {
480512 return errs
481513 }
0 commit comments