Skip to content

Commit c94b6d4

Browse files
committed
fix(spanner): fix grpc-gcp affinity cleanup and multiplexed channel usage leaks
1 parent 15faaaa commit c94b6d4

File tree

8 files changed

+400
-26
lines changed

8 files changed

+400
-26
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ ByteString getTransactionId() {
425425
public void close() {
426426
ByteString id = getTransactionId();
427427
if (id != null && !id.isEmpty()) {
428-
rpc.clearTransactionAffinity(id);
428+
rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint));
429429
}
430430
super.close();
431431
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,17 @@ public void close() {
161161
* Keeps track of which channels have been 'given' to single-use transactions for a given Spanner
162162
* instance.
163163
*/
164-
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();
164+
private static final class SharedChannelUsage {
165+
private final BitSet channelUsage;
166+
private int referenceCount;
167+
168+
private SharedChannelUsage(int numChannels) {
169+
this.channelUsage = new BitSet(numChannels);
170+
this.referenceCount = 0;
171+
}
172+
}
173+
174+
private static final Map<SpannerImpl, SharedChannelUsage> CHANNEL_USAGE = new HashMap<>();
165175

166176
private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES =
167177
EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE);
@@ -182,6 +192,8 @@ public void close() {
182192

183193
private final SessionClient sessionClient;
184194

195+
private final SpannerImpl spanner;
196+
185197
private final TraceWrapper tracer;
186198

187199
/** The current multiplexed session that is used by this client. */
@@ -213,15 +225,20 @@ public void close() {
213225

214226
@VisibleForTesting
215227
MultiplexedSessionDatabaseClient(SessionClient sessionClient, Clock clock) {
216-
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
228+
this.spanner = sessionClient.getSpanner();
229+
this.numChannels = spanner.getOptions().getNumChannels();
217230
synchronized (CHANNEL_USAGE) {
218-
CHANNEL_USAGE.putIfAbsent(sessionClient.getSpanner(), new BitSet(numChannels));
219-
this.channelUsage = CHANNEL_USAGE.get(sessionClient.getSpanner());
231+
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
232+
if (sharedChannelUsage == null) {
233+
sharedChannelUsage = new SharedChannelUsage(numChannels);
234+
CHANNEL_USAGE.put(this.spanner, sharedChannelUsage);
235+
}
236+
sharedChannelUsage.referenceCount++;
237+
this.channelUsage = sharedChannelUsage.channelUsage;
220238
}
221239
this.sessionExpirationDuration =
222240
Duration.ofMillis(
223-
sessionClient
224-
.getSpanner()
241+
spanner
225242
.getOptions()
226243
.getSessionPoolOptions()
227244
.getMultiplexedSessionMaintenanceDuration()
@@ -354,10 +371,23 @@ AtomicLong getNumSessionsReleased() {
354371
}
355372

356373
void close() {
374+
boolean releaseChannelUsage = false;
357375
synchronized (this) {
358376
if (!this.isClosed) {
359377
this.isClosed = true;
360378
this.maintainer.stop();
379+
releaseChannelUsage = true;
380+
}
381+
}
382+
if (releaseChannelUsage) {
383+
synchronized (CHANNEL_USAGE) {
384+
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
385+
if (sharedChannelUsage != null) {
386+
sharedChannelUsage.referenceCount--;
387+
if (sharedChannelUsage.referenceCount == 0) {
388+
CHANNEL_USAGE.remove(this.spanner);
389+
}
390+
}
361391
}
362392
}
363393
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import com.google.cloud.grpc.GcpManagedChannel;
6565
import com.google.cloud.grpc.GcpManagedChannelBuilder;
6666
import com.google.cloud.grpc.GcpManagedChannelOptions;
67+
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
6768
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
6869
import com.google.cloud.grpc.GrpcTransportOptions;
6970
import com.google.cloud.grpc.fallback.GcpFallbackChannel;
@@ -301,6 +302,7 @@ public class GapicSpannerRpc implements SpannerRpc {
301302
private final boolean isGrpcGcpExtensionEnabled;
302303
private final boolean isDynamicChannelPoolEnabled;
303304
@Nullable private final KeyAwareChannel keyAwareChannel;
305+
@Nullable private final GcpManagedChannel grpcGcpChannel;
304306

305307
private final GrpcCallContext baseGrpcCallContext;
306308

@@ -420,6 +422,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
420422
.build();
421423
ClientContext clientContext = ClientContext.create(spannerStubSettings);
422424
this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel());
425+
this.grpcGcpChannel = extractGrpcGcpChannel(clientContext.getTransportChannel());
423426
this.spannerStub =
424427
GrpcSpannerStubWithStubSettingsAndClientContext.create(
425428
spannerStubSettings, clientContext);
@@ -540,6 +543,7 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
540543
}
541544
} else {
542545
this.keyAwareChannel = null;
546+
this.grpcGcpChannel = null;
543547
this.databaseAdminStub = null;
544548
this.instanceAdminStub = null;
545549
this.spannerStub = null;
@@ -589,13 +593,51 @@ private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transport
589593
return null;
590594
}
591595

596+
@Nullable
597+
private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transportChannel) {
598+
if (!(transportChannel instanceof GrpcTransportChannel)) {
599+
return null;
600+
}
601+
Channel channel = ((GrpcTransportChannel) transportChannel).getChannel();
602+
if (channel instanceof GcpManagedChannel) {
603+
return (GcpManagedChannel) channel;
604+
}
605+
return null;
606+
}
607+
592608
@Override
593609
public void clearTransactionAffinity(ByteString transactionId) {
594610
if (keyAwareChannel != null) {
595611
keyAwareChannel.clearTransactionAffinity(transactionId);
596612
}
597613
}
598614

615+
@Override
616+
public void clearTransactionAndChannelAffinity(
617+
ByteString transactionId, @Nullable Long channelHint) {
618+
if (keyAwareChannel != null) {
619+
keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint);
620+
return;
621+
}
622+
clearTransactionAffinity(transactionId);
623+
clearChannelHintAffinity(grpcGcpChannel, channelHint);
624+
}
625+
626+
@VisibleForTesting
627+
static void clearChannelHintAffinity(
628+
@Nullable ManagedChannel channel, @Nullable Long channelHint) {
629+
if (!(channel instanceof GcpManagedChannel) || channelHint == null) {
630+
return;
631+
}
632+
ClientCall<ExecuteSqlRequest, ResultSet> call =
633+
channel.newCall(
634+
SpannerGrpc.getExecuteSqlMethod(),
635+
CallOptions.DEFAULT
636+
.withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint))
637+
.withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
638+
call.cancel("Cloud Spanner transaction closed", null);
639+
}
640+
599641
private static String parseGrpcGcpApiConfig() {
600642
try {
601643
return Resources.toString(
@@ -772,16 +814,30 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerO
772814
}
773815
optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build());
774816

775-
// Configure dynamic channel pool options if enabled.
776-
// Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults
777-
// or user-provided configuration.
778-
if (options.isDynamicChannelPoolEnabled()) {
779-
optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions());
817+
// Always pass channel-pool options when grpc-gcp is enabled so affinity cleanup settings are
818+
// applied regardless of whether dynamic channel pool is enabled. In the non-DCP path, only
819+
// propagate the affinity cleanup configuration to avoid implicitly turning on dynamic scaling.
820+
if (options.isGrpcGcpExtensionEnabled()) {
821+
optionsBuilder.withChannelPoolOptions(getGrpcGcpChannelPoolOptions(options));
780822
}
781823

782824
return optionsBuilder.build();
783825
}
784826

827+
@VisibleForTesting
828+
static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) {
829+
GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
830+
if (options.isDynamicChannelPoolEnabled()) {
831+
return channelPoolOptions;
832+
}
833+
834+
return GcpChannelPoolOptions.newBuilder()
835+
.disableDynamicScaling()
836+
.setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime())
837+
.setCleanupInterval(channelPoolOptions.getCleanupInterval())
838+
.build();
839+
}
840+
785841
@SuppressWarnings("rawtypes")
786842
private static void maybeEnableGrpcGcpExtension(
787843
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder,
@@ -2275,20 +2331,9 @@ <ReqT, RespT> GrpcCallContext newCallContext(
22752331
Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options);
22762332
if (affinity != null) {
22772333
if (this.isGrpcGcpExtensionEnabled) {
2278-
// Set channel affinity in gRPC-GCP.
2279-
String affinityKey;
2280-
if (this.isDynamicChannelPoolEnabled) {
2281-
// When dynamic channel pooling is enabled, we use the raw affinity value as the key.
2282-
// This allows grpc-gcp to use round-robin for new keys, enabling new channels
2283-
// (created during scale-up) to receive requests. The affinity key lifetime setting
2284-
// ensures the affinity map doesn't grow unbounded.
2285-
affinityKey = String.valueOf(affinity);
2286-
} else {
2287-
// When DCP is disabled, compute bounded channel hint to prevent
2288-
// gRPC-GCP affinity map from getting unbounded.
2289-
int boundedChannelHint = affinity.intValue() % this.numChannels;
2290-
affinityKey = String.valueOf(boundedChannelHint);
2291-
}
2334+
// Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key.
2335+
// Cleanup is handled explicitly by unbind on terminal/single-use operations.
2336+
String affinityKey = String.valueOf(affinity);
22922337
context =
22932338
context.withCallOptions(
22942339
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.api.core.InternalApi;
2222
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
23+
import com.google.cloud.grpc.GcpManagedChannel;
2324
import com.google.cloud.spanner.XGoogSpannerRequestId;
2425
import com.google.common.cache.Cache;
2526
import com.google.common.cache.CacheBuilder;
@@ -32,6 +33,7 @@
3233
import com.google.spanner.v1.ReadRequest;
3334
import com.google.spanner.v1.ResultSet;
3435
import com.google.spanner.v1.RollbackRequest;
36+
import com.google.spanner.v1.SpannerGrpc;
3537
import com.google.spanner.v1.Transaction;
3638
import com.google.spanner.v1.TransactionSelector;
3739
import io.grpc.CallOptions;
@@ -294,6 +296,34 @@ void clearTransactionAffinity(ByteString transactionId) {
294296
clearAffinity(transactionId);
295297
}
296298

299+
void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) {
300+
if (channelHint != null) {
301+
ManagedChannel channel = defaultChannel;
302+
String address = transactionAffinities.get(transactionId);
303+
if (address != null) {
304+
ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
305+
if (endpoint != null) {
306+
channel = endpoint.getChannel();
307+
}
308+
}
309+
clearChannelHintAffinity(channel, channelHint);
310+
}
311+
clearAffinity(transactionId);
312+
}
313+
314+
private static void clearChannelHintAffinity(ManagedChannel channel, long channelHint) {
315+
if (!(channel instanceof GcpManagedChannel)) {
316+
return;
317+
}
318+
ClientCall<ExecuteSqlRequest, ResultSet> call =
319+
channel.newCall(
320+
SpannerGrpc.getExecuteSqlMethod(),
321+
CallOptions.DEFAULT
322+
.withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint))
323+
.withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
324+
call.cancel("Cloud Spanner transaction closed", null);
325+
}
326+
297327
private void maybeExcludeEndpointOnNextCall(
298328
@Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) {
299329
if (endpoint == null || logicalRequestKey == null) {

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,15 @@ default RequestIdCreator getRequestIdCreator() {
199199
/** Clears any client-side affinity associated with the given transaction id. */
200200
default void clearTransactionAffinity(ByteString transactionId) {}
201201

202+
/**
203+
* Clears any client-side transaction affinity and transport-level channel affinity associated
204+
* with the given transaction.
205+
*/
206+
default void clearTransactionAndChannelAffinity(
207+
ByteString transactionId, @Nullable Long channelHint) {
208+
clearTransactionAffinity(transactionId);
209+
}
210+
202211
// Instance admin APIs.
203212
Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable String pageToken)
204213
throws SpannerException;

0 commit comments

Comments
 (0)