Skip to content

Commit aff88f7

Browse files
committed
Revert "xds: reuse GrpcXdsTransport and underlying gRPC channel to the same xDS server by ref-counting"
This reverts commit e39c38b. This causes hangs for xds when MAX_CONCURRENT_STREAMS limit is too low for the number of targets being used. Each XdsClient will often use 2 streams (ADS, LRS), so a concurrency limit of 100 will likely have issues after 50 targets. Hitting MAX_CONCURRENT_STREAMS prevents the resource timer from being started as the ADS RPC does not become ready, so users see a hang instead of an xds resource timeout after 15 seconds. b/529399722
1 parent 2c5a7ca commit aff88f7

2 files changed

Lines changed: 5 additions & 114 deletions

File tree

xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java

Lines changed: 5 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -31,74 +31,39 @@
3131
import io.grpc.Status;
3232
import io.grpc.xds.client.Bootstrapper;
3333
import io.grpc.xds.client.XdsTransportFactory;
34-
import java.util.Map;
35-
import java.util.concurrent.ConcurrentHashMap;
3634
import java.util.concurrent.TimeUnit;
3735

38-
/**
39-
* A factory for creating gRPC-based transports for xDS communication.
40-
*
41-
* <p>WARNING: This class reuses channels when possible, based on the provided {@link
42-
* Bootstrapper.ServerInfo} with important considerations. The {@link Bootstrapper.ServerInfo}
43-
* includes {@link ChannelCredentials}, which is compared by reference equality. This means every
44-
* {@link Bootstrapper.BootstrapInfo} would have non-equal copies of {@link
45-
* Bootstrapper.ServerInfo}, even if they all represent the same xDS server configuration. For gRPC
46-
* name resolution with the {@code xds} and {@code google-c2p} scheme, this transport sharing works
47-
* as expected as it internally reuses a single {@link Bootstrapper.BootstrapInfo} instance.
48-
* Otherwise, new transports would be created for each {@link Bootstrapper.ServerInfo} despite them
49-
* possibly representing the same xDS server configuration and defeating the purpose of transport
50-
* sharing.
51-
*/
5236
final class GrpcXdsTransportFactory implements XdsTransportFactory {
5337

5438
private final CallCredentials callCredentials;
55-
// The map of xDS server info to its corresponding gRPC xDS transport.
56-
// This enables reusing and sharing the same underlying gRPC channel.
57-
//
58-
// NOTE: ConcurrentHashMap is used as a per-entry lock and all reads and writes must be a mutation
59-
// via the ConcurrentHashMap APIs to acquire the per-entry lock in order to ensure thread safety
60-
// for reference counting of each GrpcXdsTransport instance.
61-
private static final Map<Bootstrapper.ServerInfo, GrpcXdsTransport> xdsServerInfoToTransportMap =
62-
new ConcurrentHashMap<>();
6339

6440
GrpcXdsTransportFactory(CallCredentials callCredentials) {
6541
this.callCredentials = callCredentials;
6642
}
6743

6844
@Override
6945
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
70-
return xdsServerInfoToTransportMap.compute(
71-
serverInfo,
72-
(info, transport) -> {
73-
if (transport == null) {
74-
transport = new GrpcXdsTransport(serverInfo, callCredentials);
75-
}
76-
++transport.refCount;
77-
return transport;
78-
});
46+
return new GrpcXdsTransport(serverInfo, callCredentials);
7947
}
8048

8149
@VisibleForTesting
8250
public XdsTransport createForTest(ManagedChannel channel) {
83-
return new GrpcXdsTransport(channel, callCredentials, null);
51+
return new GrpcXdsTransport(channel, callCredentials);
8452
}
8553

8654
@VisibleForTesting
8755
static class GrpcXdsTransport implements XdsTransport {
8856

8957
private final ManagedChannel channel;
9058
private final CallCredentials callCredentials;
91-
private final Bootstrapper.ServerInfo serverInfo;
92-
// Must only be accessed via the ConcurrentHashMap APIs which act as the locking methods.
93-
private int refCount = 0;
9459

9560
public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) {
9661
this(serverInfo, null);
9762
}
9863

9964
@VisibleForTesting
10065
public GrpcXdsTransport(ManagedChannel channel) {
101-
this(channel, null, null);
66+
this(channel, null);
10267
}
10368

10469
public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials callCredentials) {
@@ -108,17 +73,12 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials call
10873
.keepAliveTime(5, TimeUnit.MINUTES)
10974
.build();
11075
this.callCredentials = callCredentials;
111-
this.serverInfo = serverInfo;
11276
}
11377

11478
@VisibleForTesting
115-
public GrpcXdsTransport(
116-
ManagedChannel channel,
117-
CallCredentials callCredentials,
118-
Bootstrapper.ServerInfo serverInfo) {
79+
public GrpcXdsTransport(ManagedChannel channel, CallCredentials callCredentials) {
11980
this.channel = checkNotNull(channel, "channel");
12081
this.callCredentials = callCredentials;
121-
this.serverInfo = serverInfo;
12282
}
12383

12484
@Override
@@ -138,19 +98,7 @@ public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
13898

13999
@Override
140100
public void shutdown() {
141-
if (serverInfo == null) {
142-
channel.shutdown();
143-
return;
144-
}
145-
xdsServerInfoToTransportMap.computeIfPresent(
146-
serverInfo,
147-
(info, transport) -> {
148-
if (--transport.refCount == 0) { // Prefix decrement and return the updated value.
149-
transport.channel.shutdown();
150-
return null; // Remove mapping.
151-
}
152-
return transport;
153-
});
101+
channel.shutdown();
154102
}
155103

156104
private class XdsStreamingCall<ReqT, RespT> implements

xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,20 @@
3030
import io.grpc.Server;
3131
import io.grpc.Status;
3232
import io.grpc.stub.StreamObserver;
33-
import io.grpc.testing.GrpcCleanupRule;
3433
import io.grpc.xds.client.Bootstrapper;
3534
import io.grpc.xds.client.XdsTransportFactory;
3635
import java.util.concurrent.BlockingQueue;
3736
import java.util.concurrent.LinkedBlockingQueue;
3837
import java.util.concurrent.TimeUnit;
3938
import org.junit.After;
4039
import org.junit.Before;
41-
import org.junit.Rule;
4240
import org.junit.Test;
4341
import org.junit.runner.RunWith;
4442
import org.junit.runners.JUnit4;
4543

4644
@RunWith(JUnit4.class)
4745
public class GrpcXdsTransportFactoryTest {
4846

49-
@Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
50-
5147
private Server server;
5248

5349
@Before
@@ -122,59 +118,6 @@ public void callApis() throws Exception {
122118
xdsTransport.shutdown();
123119
}
124120

125-
@Test
126-
public void refCountedXdsTransport_sameXdsServerAddress_returnsExistingTransport() {
127-
Bootstrapper.ServerInfo xdsServerInfo =
128-
Bootstrapper.ServerInfo.create(
129-
"localhost:" + server.getPort(), InsecureChannelCredentials.create());
130-
GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null);
131-
// Calling create() for the first time creates a new GrpcXdsTransport instance.
132-
// The ref count was previously 0 and now is 1.
133-
XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo);
134-
// Calling create() for the second time to the same xDS server address returns the same
135-
// GrpcXdsTransport instance. The ref count was previously 1 and now is 2.
136-
XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo);
137-
assertThat(transport1).isSameInstanceAs(transport2);
138-
// Calling shutdown() for the first time does not shut down the GrpcXdsTransport instance.
139-
// The ref count was previously 2 and now is 1.
140-
transport1.shutdown();
141-
// Calling shutdown() for the second time shuts down the GrpcXdsTransport instance.
142-
// The ref count was previously 1 and now is 0.
143-
transport2.shutdown();
144-
}
145-
146-
@Test
147-
public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTransport()
148-
throws Exception {
149-
// Create and start a second xDS server on a different port.
150-
Server server2 =
151-
grpcCleanupRule.register(
152-
Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create())
153-
.addService(echoAdsService())
154-
.build()
155-
.start());
156-
Bootstrapper.ServerInfo xdsServerInfo1 =
157-
Bootstrapper.ServerInfo.create(
158-
"localhost:" + server.getPort(), InsecureChannelCredentials.create());
159-
Bootstrapper.ServerInfo xdsServerInfo2 =
160-
Bootstrapper.ServerInfo.create(
161-
"localhost:" + server2.getPort(), InsecureChannelCredentials.create());
162-
GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null);
163-
// Calling create() to the first xDS server creates a new GrpcXdsTransport instance.
164-
// The ref count was previously 0 and now is 1.
165-
XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo1);
166-
// Calling create() to the second xDS server creates a different GrpcXdsTransport instance.
167-
// The ref count was previously 0 and now is 1.
168-
XdsTransportFactory.XdsTransport transport2 = xdsTransportFactory.create(xdsServerInfo2);
169-
assertThat(transport1).isNotSameInstanceAs(transport2);
170-
// Calling shutdown() shuts down the GrpcXdsTransport instance for the first xDS server.
171-
// The ref count was previously 1 and now is 0.
172-
transport1.shutdown();
173-
// Calling shutdown() shuts down the GrpcXdsTransport instance for the second xDS server.
174-
// The ref count was previously 1 and now is 0.
175-
transport2.shutdown();
176-
}
177-
178121
private static class FakeEventHandler implements
179122
XdsTransportFactory.EventHandler<DiscoveryResponse> {
180123
private final BlockingQueue<DiscoveryResponse> respQ = new LinkedBlockingQueue<>();

0 commit comments

Comments
 (0)