Skip to content

Commit e39c38b

Browse files
xds: reuse GrpcXdsTransport and underlying gRPC channel to the same xDS server by ref-counting
This PR implements reusing the gRPC xDS transport (and underlying gRPC channel) to the same xDS server by ref-counting, which is already implemented in gRPC C++ ([link](https://github.com/grpc/grpc/blob/5a3a5d53145b94895610825e783a8896a61a3c73/src/core/xds/grpc/xds_transport_grpc.cc#L399-L414)) and gRPC Go ([link](https://github.com/grpc/grpc-go/blob/81c7924ec9f5f4a01c18b82c9d67691c1cd93bd5/internal/xds/clients/grpctransport/grpc_transport.go#L78-L120)). This optimization is expected to reduce memory footprint of the xDS management server and xDS enabled clients as channel establishment and lifecycle management of the connection is expensive. * Implemented a map to store `GrpcXdsTransport` instances keyed by the `Bootstrapper.ServerInfo` and each `GrpcXdsTransport` has a ref count. Note, the map cannot be simply keyed by the xDS server address as the client could have different channel credentials to the same xDS server, which should be counted as different transport instances. * When `GrpcXdsTransportFactory.create()` is called, the existing transport is reused if it already exists in the map and increment its ref count, otherwise create a new transport, store it in the map, and increment its ref count. * When `GrpcXdsTransport.shutdown()` is called, its ref count is decremented and the underlying gRPC channel is shut down when its ref count reaches zero. * Note this ref-counting of the `GrpcXdsTransport` is different and orthogonal to the ref-counting of the xDS client keyed by the xDS server target name to allow for xDS-based fallback per [gRFC A71](https://github.com/grpc/proposal/blob/master/A71-xds-fallback.md). Prod risk level: Low * Reusing the underlying gRPC channel to the xDS server would not affect the gRPC xDS (ADS/LRS) streams which would be multiplexed on the same channel, however, this means new xDS (ADS/LRS) streams and RPCs may fail due to hitting the limit of `MAX_CONCURRENT_STREAMS`. Tested: * Verified end-to-end with a xDS enabled gRPC Java client communicating to multiple different gRPC backend servers behind *different targets* using the xDS management server for name resolution and endpoint discovery. Verified gRPC xDS transport creation, ref-counting, reuse, shutdown, deletion from map when ref count is zero all worked as expected. Implementation details / context: * Used `java.util.concurrent.ConcurrentHashMap` APIs `compute` and `computeIfPresent` where the entire method invocation is performed atomically to achieve a concurrent and thread-safe solution which follows Java best practices. Alternatives considered: * Write own synchronization logic with synchronized block and locks. After discussion internally, it was preferred to use existing concurrency libraries which is less error-prone and should offer better performance.
1 parent 36a43fc commit e39c38b

2 files changed

Lines changed: 114 additions & 5 deletions

File tree

xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,39 +31,74 @@
3131
import io.grpc.Status;
3232
import io.grpc.xds.client.Bootstrapper;
3333
import io.grpc.xds.client.XdsTransportFactory;
34+
import java.util.Map;
35+
import java.util.concurrent.ConcurrentHashMap;
3436
import java.util.concurrent.TimeUnit;
3537

38+
/**
39+
* A factory for creating gRPC-based transports for xDS communication.
40+
*
41+
* <p>WARNING: This class reuses channels when possible, based on the provided {@link
42+
* Bootstrapper.ServerInfo} with important considerations. The {@link Bootstrapper.ServerInfo}
43+
* includes {@link ChannelCredentials}, which is compared by reference equality. This means every
44+
* {@link Bootstrapper.BootstrapInfo} would have non-equal copies of {@link
45+
* Bootstrapper.ServerInfo}, even if they all represent the same xDS server configuration. For gRPC
46+
* name resolution with the {@code xds} and {@code google-c2p} scheme, this transport sharing works
47+
* as expected as it internally reuses a single {@link Bootstrapper.BootstrapInfo} instance.
48+
* Otherwise, new transports would be created for each {@link Bootstrapper.ServerInfo} despite them
49+
* possibly representing the same xDS server configuration and defeating the purpose of transport
50+
* sharing.
51+
*/
3652
final class GrpcXdsTransportFactory implements XdsTransportFactory {
3753

3854
private final CallCredentials callCredentials;
55+
// The map of xDS server info to its corresponding gRPC xDS transport.
56+
// This enables reusing and sharing the same underlying gRPC channel.
57+
//
58+
// NOTE: ConcurrentHashMap is used as a per-entry lock and all reads and writes must be a mutation
59+
// via the ConcurrentHashMap APIs to acquire the per-entry lock in order to ensure thread safety
60+
// for reference counting of each GrpcXdsTransport instance.
61+
private static final Map<Bootstrapper.ServerInfo, GrpcXdsTransport> xdsServerInfoToTransportMap =
62+
new ConcurrentHashMap<>();
3963

4064
GrpcXdsTransportFactory(CallCredentials callCredentials) {
4165
this.callCredentials = callCredentials;
4266
}
4367

4468
@Override
4569
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
46-
return new GrpcXdsTransport(serverInfo, callCredentials);
70+
return xdsServerInfoToTransportMap.compute(
71+
serverInfo,
72+
(info, transport) -> {
73+
if (transport == null) {
74+
transport = new GrpcXdsTransport(serverInfo, callCredentials);
75+
}
76+
++transport.refCount;
77+
return transport;
78+
});
4779
}
4880

4981
@VisibleForTesting
5082
public XdsTransport createForTest(ManagedChannel channel) {
51-
return new GrpcXdsTransport(channel, callCredentials);
83+
return new GrpcXdsTransport(channel, callCredentials, null);
5284
}
5385

5486
@VisibleForTesting
5587
static class GrpcXdsTransport implements XdsTransport {
5688

5789
private final ManagedChannel channel;
5890
private final CallCredentials callCredentials;
91+
private final Bootstrapper.ServerInfo serverInfo;
92+
// Must only be accessed via the ConcurrentHashMap APIs which act as the locking methods.
93+
private int refCount = 0;
5994

6095
public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) {
6196
this(serverInfo, null);
6297
}
6398

6499
@VisibleForTesting
65100
public GrpcXdsTransport(ManagedChannel channel) {
66-
this(channel, null);
101+
this(channel, null, null);
67102
}
68103

69104
public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials callCredentials) {
@@ -73,12 +108,17 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials call
73108
.keepAliveTime(5, TimeUnit.MINUTES)
74109
.build();
75110
this.callCredentials = callCredentials;
111+
this.serverInfo = serverInfo;
76112
}
77113

78114
@VisibleForTesting
79-
public GrpcXdsTransport(ManagedChannel channel, CallCredentials callCredentials) {
115+
public GrpcXdsTransport(
116+
ManagedChannel channel,
117+
CallCredentials callCredentials,
118+
Bootstrapper.ServerInfo serverInfo) {
80119
this.channel = checkNotNull(channel, "channel");
81120
this.callCredentials = callCredentials;
121+
this.serverInfo = serverInfo;
82122
}
83123

84124
@Override
@@ -98,7 +138,19 @@ public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
98138

99139
@Override
100140
public void shutdown() {
101-
channel.shutdown();
141+
if (serverInfo == null) {
142+
channel.shutdown();
143+
return;
144+
}
145+
xdsServerInfoToTransportMap.computeIfPresent(
146+
serverInfo,
147+
(info, transport) -> {
148+
if (--transport.refCount == 0) { // Prefix decrement and return the updated value.
149+
transport.channel.shutdown();
150+
return null; // Remove mapping.
151+
}
152+
return transport;
153+
});
102154
}
103155

104156
private class XdsStreamingCall<ReqT, RespT> implements

xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,24 @@
3030
import io.grpc.Server;
3131
import io.grpc.Status;
3232
import io.grpc.stub.StreamObserver;
33+
import io.grpc.testing.GrpcCleanupRule;
3334
import io.grpc.xds.client.Bootstrapper;
3435
import io.grpc.xds.client.XdsTransportFactory;
3536
import java.util.concurrent.BlockingQueue;
3637
import java.util.concurrent.LinkedBlockingQueue;
3738
import java.util.concurrent.TimeUnit;
3839
import org.junit.After;
3940
import org.junit.Before;
41+
import org.junit.Rule;
4042
import org.junit.Test;
4143
import org.junit.runner.RunWith;
4244
import org.junit.runners.JUnit4;
4345

4446
@RunWith(JUnit4.class)
4547
public class GrpcXdsTransportFactoryTest {
4648

49+
@Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
50+
4751
private Server server;
4852

4953
@Before
@@ -118,6 +122,59 @@ public void callApis() throws Exception {
118122
xdsTransport.shutdown();
119123
}
120124

125+
@Test
126+
public void refCountedXdsTransport_sameXdsServerAddress_returnsExistingTransport() {
127+
Bootstrapper.ServerInfo xdsServerInfo =
128+
Bootstrapper.ServerInfo.create(
129+
"localhost:" + server.getPort(), InsecureChannelCredentials.create());
130+
GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null);
131+
// Calling create() for the first time creates a new GrpcXdsTransport instance.
132+
// The ref count was previously 0 and now is 1.
133+
XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo);
134+
// Calling create() for the second time to the same xDS server address returns the same
135+
// GrpcXdsTransport instance. The ref count was previously 1 and now is 2.
136+
XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo);
137+
assertThat(transport1).isSameInstanceAs(transport2);
138+
// Calling shutdown() for the first time does not shut down the GrpcXdsTransport instance.
139+
// The ref count was previously 2 and now is 1.
140+
transport1.shutdown();
141+
// Calling shutdown() for the second time shuts down the GrpcXdsTransport instance.
142+
// The ref count was previously 1 and now is 0.
143+
transport2.shutdown();
144+
}
145+
146+
@Test
147+
public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTransport()
148+
throws Exception {
149+
// Create and start a second xDS server on a different port.
150+
Server server2 =
151+
grpcCleanupRule.register(
152+
Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create())
153+
.addService(echoAdsService())
154+
.build()
155+
.start());
156+
Bootstrapper.ServerInfo xdsServerInfo1 =
157+
Bootstrapper.ServerInfo.create(
158+
"localhost:" + server.getPort(), InsecureChannelCredentials.create());
159+
Bootstrapper.ServerInfo xdsServerInfo2 =
160+
Bootstrapper.ServerInfo.create(
161+
"localhost:" + server2.getPort(), InsecureChannelCredentials.create());
162+
GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null);
163+
// Calling create() to the first xDS server creates a new GrpcXdsTransport instance.
164+
// The ref count was previously 0 and now is 1.
165+
XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo1);
166+
// Calling create() to the second xDS server creates a different GrpcXdsTransport instance.
167+
// The ref count was previously 0 and now is 1.
168+
XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo2);
169+
assertThat(transport1).isNotSameInstanceAs(transport2);
170+
// Calling shutdown() shuts down the GrpcXdsTransport instance for the first xDS server.
171+
// The ref count was previously 1 and now is 0.
172+
transport1.shutdown();
173+
// Calling shutdown() shuts down the GrpcXdsTransport instance for the second xDS server.
174+
// The ref count was previously 1 and now is 0.
175+
transport2.shutdown();
176+
}
177+
121178
private static class FakeEventHandler implements
122179
XdsTransportFactory.EventHandler<DiscoveryResponse> {
123180
private final BlockingQueue<DiscoveryResponse> respQ = new LinkedBlockingQueue<>();

0 commit comments

Comments
 (0)