Skip to content

Commit 55c9857

Browse files
authored
fix(spanner): fix grpc-gcp affinity cleanup and multiplexed channel usage leaks (#12726)
## Summary This change fixes two regressions in the Spanner Java client introduced around multiplexed session initialization and grpc-gcp affinity handling. ### 1. Fix static `CHANNEL_USAGE` leak in `MultiplexedSessionDatabaseClient` Fixes: #12693 `MultiplexedSessionDatabaseClient` stored per-`SpannerImpl` channel usage state in a static map but never removed entries on close. After multiplexed client creation became unconditional in `SpannerImpl.getDatabaseClient()`, applications that repeatedly created and closed `Spanner` instances could retain closed `SpannerImpl` objects, gRPC channels, and related transport state indefinitely. This change: - replaces the static `Map<SpannerImpl, BitSet>` with reference-counted shared state - removes the map entry when the last `MultiplexedSessionDatabaseClient` for a given `SpannerImpl` closes - preserves sharing semantics for multiple database clients created from the same `SpannerImpl` ### 2. Stop using bitset / bounded `% numChannels` affinity for grpc-gcp For grpc-gcp-enabled paths, channel affinity should use the raw random channel hint and rely on explicit unbind / cleanup, rather than: - bitset reservation - collapsing the hint with `% numChannels` This change: - keeps grpc-gcp on raw random affinity keys in `GapicSpannerRpc` - removes `% numChannels` mapping from the grpc-gcp call path - keeps the old non-grpc-gcp GAX affinity behavior unchanged ### 3. Add explicit grpc-gcp affinity cleanup for multi-use read-only transaction close Multi-use read-only transactions reuse a single random channel hint for the lifetime of the transaction. That hint should remain stable across all reads in the transaction, then be explicitly cleaned up when the transaction closes. This change: - adds a new RPC cleanup hook that carries both transaction id and channel hint - invokes that cleanup from multi-use read-only transaction `close()` - unbinds grpc-gcp affinity on transaction close without issuing an `ExecuteSql` RPC to Spanner - handles both: - location API disabled: cleanup via the underlying grpc-gcp channel - location API enabled: cleanup via `KeyAwareChannel`, using the routed endpoint associated with the transaction ### 4. Apply grpc-gcp affinity cleanup settings without implicitly enabling DCP grpc-gcp affinity cleanup settings (`affinityKeyLifetime`, `cleanupInterval`) should be applied whether or not dynamic channel pool (DCP) is enabled. This change: - always passes grpc-gcp channel-pool options when grpc-gcp is enabled - preserves full pool settings only when DCP is enabled - passes cleanup-only channel-pool options when DCP is disabled: - `affinityKeyLifetime` - `cleanupInterval` - dynamic scaling explicitly disabled
1 parent b7e34d2 commit 55c9857

File tree

12 files changed

+554
-104
lines changed

12 files changed

+554
-104
lines changed

java-spanner/.gitignore

Lines changed: 0 additions & 1 deletion
This file was deleted.

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ ByteString getTransactionId() {
466466
public void close() {
467467
ByteString id = getTransactionId();
468468
if (id != null && !id.isEmpty()) {
469-
rpc.clearTransactionAffinity(id);
469+
rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint));
470470
}
471471
super.close();
472472
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,17 @@ public void close() {
161161
* Keeps track of which channels have been 'given' to single-use transactions for a given Spanner
162162
* instance.
163163
*/
164-
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();
164+
private static final class SharedChannelUsage {
165+
private final BitSet channelUsage;
166+
private int referenceCount;
167+
168+
private SharedChannelUsage(int numChannels) {
169+
this.channelUsage = new BitSet(numChannels);
170+
this.referenceCount = 0;
171+
}
172+
}
173+
174+
private static final Map<SpannerImpl, SharedChannelUsage> CHANNEL_USAGE = new HashMap<>();
165175

166176
private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES =
167177
EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE);
@@ -182,6 +192,8 @@ public void close() {
182192

183193
private final SessionClient sessionClient;
184194

195+
private final SpannerImpl spanner;
196+
185197
private final TraceWrapper tracer;
186198

187199
/** The current multiplexed session that is used by this client. */
@@ -213,15 +225,20 @@ public void close() {
213225

214226
@VisibleForTesting
215227
MultiplexedSessionDatabaseClient(SessionClient sessionClient, Clock clock) {
216-
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
228+
this.spanner = sessionClient.getSpanner();
229+
this.numChannels = spanner.getOptions().getNumChannels();
217230
synchronized (CHANNEL_USAGE) {
218-
CHANNEL_USAGE.putIfAbsent(sessionClient.getSpanner(), new BitSet(numChannels));
219-
this.channelUsage = CHANNEL_USAGE.get(sessionClient.getSpanner());
231+
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
232+
if (sharedChannelUsage == null) {
233+
sharedChannelUsage = new SharedChannelUsage(numChannels);
234+
CHANNEL_USAGE.put(this.spanner, sharedChannelUsage);
235+
}
236+
sharedChannelUsage.referenceCount++;
237+
this.channelUsage = sharedChannelUsage.channelUsage;
220238
}
221239
this.sessionExpirationDuration =
222240
Duration.ofMillis(
223-
sessionClient
224-
.getSpanner()
241+
spanner
225242
.getOptions()
226243
.getSessionPoolOptions()
227244
.getMultiplexedSessionMaintenanceDuration()
@@ -354,10 +371,23 @@ AtomicLong getNumSessionsReleased() {
354371
}
355372

356373
void close() {
374+
boolean releaseChannelUsage = false;
357375
synchronized (this) {
358376
if (!this.isClosed) {
359377
this.isClosed = true;
360378
this.maintainer.stop();
379+
releaseChannelUsage = true;
380+
}
381+
}
382+
if (releaseChannelUsage) {
383+
synchronized (CHANNEL_USAGE) {
384+
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
385+
if (sharedChannelUsage != null) {
386+
sharedChannelUsage.referenceCount--;
387+
if (sharedChannelUsage.referenceCount == 0) {
388+
CHANNEL_USAGE.remove(this.spanner);
389+
}
390+
}
361391
}
362392
}
363393
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import com.google.cloud.grpc.GcpManagedChannel;
6565
import com.google.cloud.grpc.GcpManagedChannelBuilder;
6666
import com.google.cloud.grpc.GcpManagedChannelOptions;
67+
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
6768
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
6869
import com.google.cloud.grpc.GrpcTransportOptions;
6970
import com.google.cloud.grpc.fallback.GcpFallbackChannel;
@@ -301,6 +302,7 @@ public class GapicSpannerRpc implements SpannerRpc {
301302
private final boolean isGrpcGcpExtensionEnabled;
302303
private final boolean isDynamicChannelPoolEnabled;
303304
@Nullable private final KeyAwareChannel keyAwareChannel;
305+
@Nullable private final GcpManagedChannel grpcGcpChannel;
304306

305307
private final GrpcCallContext baseGrpcCallContext;
306308

@@ -420,6 +422,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
420422
.build();
421423
ClientContext clientContext = ClientContext.create(spannerStubSettings);
422424
this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel());
425+
this.grpcGcpChannel = extractGrpcGcpChannel(clientContext.getTransportChannel());
423426
this.spannerStub =
424427
GrpcSpannerStubWithStubSettingsAndClientContext.create(
425428
spannerStubSettings, clientContext);
@@ -540,6 +543,7 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
540543
}
541544
} else {
542545
this.keyAwareChannel = null;
546+
this.grpcGcpChannel = null;
543547
this.databaseAdminStub = null;
544548
this.instanceAdminStub = null;
545549
this.spannerStub = null;
@@ -589,13 +593,35 @@ private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transport
589593
return null;
590594
}
591595

596+
@Nullable
597+
private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transportChannel) {
598+
if (!(transportChannel instanceof GrpcTransportChannel)) {
599+
return null;
600+
}
601+
Channel channel = ((GrpcTransportChannel) transportChannel).getChannel();
602+
if (channel instanceof GcpManagedChannel) {
603+
return (GcpManagedChannel) channel;
604+
}
605+
return null;
606+
}
607+
592608
@Override
593609
public void clearTransactionAffinity(ByteString transactionId) {
594610
if (keyAwareChannel != null) {
595611
keyAwareChannel.clearTransactionAffinity(transactionId);
596612
}
597613
}
598614

615+
@Override
616+
public void clearTransactionAndChannelAffinity(
617+
ByteString transactionId, @Nullable Long channelHint) {
618+
if (keyAwareChannel != null) {
619+
keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint);
620+
return;
621+
}
622+
GrpcGcpAffinityUtil.clearChannelHintAffinity(grpcGcpChannel, channelHint);
623+
}
624+
599625
private static String parseGrpcGcpApiConfig() {
600626
try {
601627
return Resources.toString(
@@ -772,16 +798,35 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerO
772798
}
773799
optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build());
774800

775-
// Configure dynamic channel pool options if enabled.
776-
// Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults
777-
// or user-provided configuration.
778-
if (options.isDynamicChannelPoolEnabled()) {
779-
optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions());
801+
// Always pass channel-pool options when grpc-gcp is enabled so affinity cleanup settings are
802+
// applied regardless of whether dynamic channel pool is enabled. In the non-DCP path, only
803+
// propagate the affinity cleanup configuration to avoid implicitly turning on dynamic scaling.
804+
if (options.isGrpcGcpExtensionEnabled()) {
805+
optionsBuilder.withChannelPoolOptions(getGrpcGcpChannelPoolOptions(options));
780806
}
781807

782808
return optionsBuilder.build();
783809
}
784810

811+
@VisibleForTesting
812+
static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) {
813+
GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
814+
if (options.isDynamicChannelPoolEnabled()) {
815+
return channelPoolOptions;
816+
}
817+
818+
// When DCP is disabled, Spanner's numChannels should still produce a fixed grpc-gcp channel
819+
// pool instead of only capping the pool's maximum size.
820+
return GcpChannelPoolOptions.newBuilder()
821+
.setMaxSize(options.getNumChannels())
822+
.setMinSize(options.getNumChannels())
823+
.setInitSize(options.getNumChannels())
824+
.disableDynamicScaling()
825+
.setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime())
826+
.setCleanupInterval(channelPoolOptions.getCleanupInterval())
827+
.build();
828+
}
829+
785830
@SuppressWarnings("rawtypes")
786831
private static void maybeEnableGrpcGcpExtension(
787832
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder,
@@ -793,21 +838,19 @@ private static void maybeEnableGrpcGcpExtension(
793838
final String jsonApiConfig = parseGrpcGcpApiConfig();
794839
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);
795840

796-
// When dynamic channel pool is enabled, use the DCP initial size as the pool size.
797-
// When disabled, use the explicitly configured numChannels.
798-
final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels();
799-
800841
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> baseConfigurator =
801842
defaultChannelProviderBuilder.getChannelConfigurator();
802843
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
803844
channelBuilder -> {
804845
if (baseConfigurator != null) {
805846
channelBuilder = baseConfigurator.apply(channelBuilder);
806847
}
848+
// The grpc-gcp pool is configured entirely through GcpChannelPoolOptions above. Avoid
849+
// the deprecated setPoolSize path, which only adjusts maxSize and does not eagerly create
850+
// a fixed-size pool.
807851
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
808852
.withApiConfigJsonString(jsonApiConfig)
809-
.withOptions(grpcGcpOptions)
810-
.setPoolSize(poolSize);
853+
.withOptions(grpcGcpOptions);
811854
};
812855

813856
// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
@@ -2275,20 +2318,9 @@ <ReqT, RespT> GrpcCallContext newCallContext(
22752318
Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options);
22762319
if (affinity != null) {
22772320
if (this.isGrpcGcpExtensionEnabled) {
2278-
// Set channel affinity in gRPC-GCP.
2279-
String affinityKey;
2280-
if (this.isDynamicChannelPoolEnabled) {
2281-
// When dynamic channel pooling is enabled, we use the raw affinity value as the key.
2282-
// This allows grpc-gcp to use round-robin for new keys, enabling new channels
2283-
// (created during scale-up) to receive requests. The affinity key lifetime setting
2284-
// ensures the affinity map doesn't grow unbounded.
2285-
affinityKey = String.valueOf(affinity);
2286-
} else {
2287-
// When DCP is disabled, compute bounded channel hint to prevent
2288-
// gRPC-GCP affinity map from getting unbounded.
2289-
int boundedChannelHint = affinity.intValue() % this.numChannels;
2290-
affinityKey = String.valueOf(boundedChannelHint);
2291-
}
2321+
// Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key.
2322+
// Cleanup is handled explicitly by unbind on terminal/single-use operations.
2323+
String affinityKey = String.valueOf(affinity);
22922324
context =
22932325
context.withCallOptions(
22942326
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.cloud.grpc.GcpManagedChannel;
20+
import com.google.spanner.v1.ExecuteSqlRequest;
21+
import com.google.spanner.v1.ResultSet;
22+
import com.google.spanner.v1.SpannerGrpc;
23+
import io.grpc.CallOptions;
24+
import io.grpc.ClientCall;
25+
import io.grpc.ManagedChannel;
26+
import javax.annotation.Nullable;
27+
28+
final class GrpcGcpAffinityUtil {
29+
private GrpcGcpAffinityUtil() {}
30+
31+
static void clearChannelHintAffinity(
32+
@Nullable ManagedChannel channel, @Nullable Long channelHint) {
33+
if (!(channel instanceof GcpManagedChannel) || channelHint == null) {
34+
return;
35+
}
36+
// TODO: Replace this synthetic call once grpc-gcp exposes a direct API for unbinding
37+
// affinity keys without creating and immediately cancelling a ClientCall.
38+
ClientCall<ExecuteSqlRequest, ResultSet> call =
39+
channel.newCall(
40+
SpannerGrpc.getExecuteSqlMethod(),
41+
CallOptions.DEFAULT
42+
.withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint))
43+
.withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
44+
call.cancel("Cloud Spanner transaction closed", null);
45+
}
46+
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,21 @@ void clearTransactionAffinity(ByteString transactionId) {
304304
clearAffinity(transactionId);
305305
}
306306

307+
void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) {
308+
String address = transactionAffinities.remove(transactionId);
309+
readOnlyTxPreferLeader.invalidate(transactionId);
310+
if (channelHint != null) {
311+
ManagedChannel channel = defaultChannel;
312+
if (address != null) {
313+
ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
314+
if (endpoint != null) {
315+
channel = endpoint.getChannel();
316+
}
317+
}
318+
GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, channelHint);
319+
}
320+
}
321+
307322
private void maybeExcludeEndpointOnNextCall(
308323
@Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) {
309324
if (endpoint == null || logicalRequestKey == null) {

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,15 @@ default RequestIdCreator getRequestIdCreator() {
199199
/** Clears any client-side affinity associated with the given transaction id. */
200200
default void clearTransactionAffinity(ByteString transactionId) {}
201201

202+
/**
203+
* Clears any client-side transaction affinity and transport-level channel affinity associated
204+
* with the given transaction.
205+
*/
206+
default void clearTransactionAndChannelAffinity(
207+
ByteString transactionId, @Nullable Long channelHint) {
208+
clearTransactionAffinity(transactionId);
209+
}
210+
202211
// Instance admin APIs.
203212
Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable String pageToken)
204213
throws SpannerException;

0 commit comments

Comments
 (0)