3131import io .grpc .Status ;
3232import io .grpc .xds .client .Bootstrapper ;
3333import io .grpc .xds .client .XdsTransportFactory ;
34+ import java .util .Map ;
35+ import java .util .concurrent .ConcurrentHashMap ;
3436import java .util .concurrent .TimeUnit ;
3537
3638final class GrpcXdsTransportFactory implements XdsTransportFactory {
3739
3840 private final CallCredentials callCredentials ;
41+ // The map of xDS server info to its corresponding gRPC xDS transport.
42+ // This enables reusing and sharing the same underlying gRPC channel.
43+ private static final Map <Bootstrapper .ServerInfo , GrpcXdsTransport > xdsServerInfoToTransportMap =
44+ new ConcurrentHashMap <>();
3945
4046 GrpcXdsTransportFactory (CallCredentials callCredentials ) {
4147 this .callCredentials = callCredentials ;
4248 }
4349
4450 @ Override
4551 public XdsTransport create (Bootstrapper .ServerInfo serverInfo ) {
46- return new GrpcXdsTransport (serverInfo , callCredentials );
52+ return xdsServerInfoToTransportMap .compute (
53+ serverInfo ,
54+ (info , transport ) -> {
55+ if (transport == null ) {
56+ transport = new GrpcXdsTransport (serverInfo , callCredentials );
57+ }
58+ ++transport .refCount ;
59+ return transport ;
60+ });
4761 }
4862
4963 @ VisibleForTesting
5064 public XdsTransport createForTest (ManagedChannel channel ) {
51- return new GrpcXdsTransport (channel , callCredentials );
65+ return new GrpcXdsTransport (channel , callCredentials , null );
66+ }
67+
68+ @ VisibleForTesting
69+ static boolean hasTransport (Bootstrapper .ServerInfo serverInfo ) {
70+ return xdsServerInfoToTransportMap .containsKey (serverInfo );
5271 }
5372
5473 @ VisibleForTesting
5574 static class GrpcXdsTransport implements XdsTransport {
5675
5776 private final ManagedChannel channel ;
5877 private final CallCredentials callCredentials ;
78+ private final Bootstrapper .ServerInfo serverInfo ;
79+ // Must only be accessed within the provided atomic methods of ConcurrentHashMap.
80+ private int refCount = 0 ;
5981
6082 public GrpcXdsTransport (Bootstrapper .ServerInfo serverInfo ) {
6183 this (serverInfo , null );
6284 }
6385
6486 @ VisibleForTesting
6587 public GrpcXdsTransport (ManagedChannel channel ) {
66- this (channel , null );
88+ this (channel , null , null );
6789 }
6890
6991 public GrpcXdsTransport (Bootstrapper .ServerInfo serverInfo , CallCredentials callCredentials ) {
@@ -73,12 +95,17 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials call
7395 .keepAliveTime (5 , TimeUnit .MINUTES )
7496 .build ();
7597 this .callCredentials = callCredentials ;
98+ this .serverInfo = serverInfo ;
7699 }
77100
78101 @ VisibleForTesting
79- public GrpcXdsTransport (ManagedChannel channel , CallCredentials callCredentials ) {
102+ public GrpcXdsTransport (
103+ ManagedChannel channel ,
104+ CallCredentials callCredentials ,
105+ Bootstrapper .ServerInfo serverInfo ) {
80106 this .channel = checkNotNull (channel , "channel" );
81107 this .callCredentials = callCredentials ;
108+ this .serverInfo = serverInfo ;
82109 }
83110
84111 @ Override
@@ -98,7 +125,19 @@ public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
98125
99126 @ Override
100127 public void shutdown () {
101- channel .shutdown ();
128+ if (serverInfo == null ) {
129+ channel .shutdown ();
130+ return ;
131+ }
132+ xdsServerInfoToTransportMap .computeIfPresent (
133+ serverInfo ,
134+ (info , transport ) -> {
135+ if (--transport .refCount == 0 ) { // Prefix decrement and return the updated value.
136+ transport .channel .shutdown ();
137+ return null ; // Remove mapping.
138+ }
139+ return transport ;
140+ });
102141 }
103142
104143 private class XdsStreamingCall <ReqT , RespT > implements
0 commit comments