-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(spanner): fix grpc-gcp affinity cleanup and multiplexed channel usage leaks #12726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c94b6d4
ce64c9e
1b90d0d
c8db2c1
2711774
bb03696
0dc3d69
3288019
d7e900f
5c4891d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -161,7 +161,17 @@ public void close() { | |
| * Keeps track of which channels have been 'given' to single-use transactions for a given Spanner | ||
| * instance. | ||
| */ | ||
| private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>(); | ||
| private static final class SharedChannelUsage { | ||
| private final BitSet channelUsage; | ||
| private int referenceCount; | ||
|
|
||
| private SharedChannelUsage(int numChannels) { | ||
| this.channelUsage = new BitSet(numChannels); | ||
| this.referenceCount = 0; | ||
| } | ||
| } | ||
|
|
||
| private static final Map<SpannerImpl, SharedChannelUsage> CHANNEL_USAGE = new HashMap<>(); | ||
|
|
||
| private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES = | ||
| EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE); | ||
|
|
@@ -182,6 +192,8 @@ public void close() { | |
|
|
||
| private final SessionClient sessionClient; | ||
|
|
||
| private final SpannerImpl spanner; | ||
|
|
||
| private final TraceWrapper tracer; | ||
|
|
||
| /** The current multiplexed session that is used by this client. */ | ||
|
|
@@ -213,15 +225,20 @@ public void close() { | |
|
|
||
| @VisibleForTesting | ||
| MultiplexedSessionDatabaseClient(SessionClient sessionClient, Clock clock) { | ||
| this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); | ||
| this.spanner = sessionClient.getSpanner(); | ||
| this.numChannels = spanner.getOptions().getNumChannels(); | ||
| synchronized (CHANNEL_USAGE) { | ||
| CHANNEL_USAGE.putIfAbsent(sessionClient.getSpanner(), new BitSet(numChannels)); | ||
| this.channelUsage = CHANNEL_USAGE.get(sessionClient.getSpanner()); | ||
| SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that we should seriously consider removing this manual channel distribution logic from this class. It was introduced when using the Gax channel pool to prevent low-QPS from sticking to just one channel. My understanding is that grpc-gcp will do that automatically, as it falls back to a round-robin scheme when there is low load. That would significantly simplify this class without breaking the purpose of what this was intended to do.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Customers still have an option to disable grpc-gcp and switch to GAX so cleanup task should be done later. |
||
| if (sharedChannelUsage == null) { | ||
| sharedChannelUsage = new SharedChannelUsage(numChannels); | ||
| CHANNEL_USAGE.put(this.spanner, sharedChannelUsage); | ||
| } | ||
| sharedChannelUsage.referenceCount++; | ||
| this.channelUsage = sharedChannelUsage.channelUsage; | ||
| } | ||
| this.sessionExpirationDuration = | ||
| Duration.ofMillis( | ||
| sessionClient | ||
| .getSpanner() | ||
| spanner | ||
| .getOptions() | ||
| .getSessionPoolOptions() | ||
| .getMultiplexedSessionMaintenanceDuration() | ||
|
|
@@ -354,10 +371,23 @@ AtomicLong getNumSessionsReleased() { | |
| } | ||
|
|
||
| void close() { | ||
| boolean releaseChannelUsage = false; | ||
| synchronized (this) { | ||
| if (!this.isClosed) { | ||
| this.isClosed = true; | ||
| this.maintainer.stop(); | ||
| releaseChannelUsage = true; | ||
| } | ||
| } | ||
| if (releaseChannelUsage) { | ||
| synchronized (CHANNEL_USAGE) { | ||
| SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner); | ||
| if (sharedChannelUsage != null) { | ||
| sharedChannelUsage.referenceCount--; | ||
| if (sharedChannelUsage.referenceCount == 0) { | ||
| CHANNEL_USAGE.remove(this.spanner); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,6 +64,7 @@ | |
| import com.google.cloud.grpc.GcpManagedChannel; | ||
| import com.google.cloud.grpc.GcpManagedChannelBuilder; | ||
| import com.google.cloud.grpc.GcpManagedChannelOptions; | ||
| import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; | ||
| import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; | ||
| import com.google.cloud.grpc.GrpcTransportOptions; | ||
| import com.google.cloud.grpc.fallback.GcpFallbackChannel; | ||
|
|
@@ -301,6 +302,7 @@ public class GapicSpannerRpc implements SpannerRpc { | |
| private final boolean isGrpcGcpExtensionEnabled; | ||
| private final boolean isDynamicChannelPoolEnabled; | ||
| @Nullable private final KeyAwareChannel keyAwareChannel; | ||
| @Nullable private final GcpManagedChannel grpcGcpChannel; | ||
|
|
||
| private final GrpcCallContext baseGrpcCallContext; | ||
|
|
||
|
|
@@ -420,6 +422,7 @@ public GapicSpannerRpc(final SpannerOptions options) { | |
| .build(); | ||
| ClientContext clientContext = ClientContext.create(spannerStubSettings); | ||
| this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel()); | ||
| this.grpcGcpChannel = extractGrpcGcpChannel(clientContext.getTransportChannel()); | ||
| this.spannerStub = | ||
| GrpcSpannerStubWithStubSettingsAndClientContext.create( | ||
| spannerStubSettings, clientContext); | ||
|
|
@@ -540,6 +543,7 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla | |
| } | ||
| } else { | ||
| this.keyAwareChannel = null; | ||
| this.grpcGcpChannel = null; | ||
| this.databaseAdminStub = null; | ||
| this.instanceAdminStub = null; | ||
| this.spannerStub = null; | ||
|
|
@@ -589,13 +593,35 @@ private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transport | |
| return null; | ||
| } | ||
|
|
||
| @Nullable | ||
| private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transportChannel) { | ||
| if (!(transportChannel instanceof GrpcTransportChannel)) { | ||
| return null; | ||
| } | ||
| Channel channel = ((GrpcTransportChannel) transportChannel).getChannel(); | ||
| if (channel instanceof GcpManagedChannel) { | ||
| return (GcpManagedChannel) channel; | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public void clearTransactionAffinity(ByteString transactionId) { | ||
| if (keyAwareChannel != null) { | ||
| keyAwareChannel.clearTransactionAffinity(transactionId); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void clearTransactionAndChannelAffinity( | ||
| ByteString transactionId, @Nullable Long channelHint) { | ||
| if (keyAwareChannel != null) { | ||
| keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint); | ||
| return; | ||
| } | ||
| GrpcGcpAffinityUtil.clearChannelHintAffinity(grpcGcpChannel, channelHint); | ||
| } | ||
|
|
||
| private static String parseGrpcGcpApiConfig() { | ||
| try { | ||
| return Resources.toString( | ||
|
|
@@ -772,16 +798,35 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerO | |
| } | ||
| optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build()); | ||
|
|
||
| // Configure dynamic channel pool options if enabled. | ||
| // Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults | ||
| // or user-provided configuration. | ||
| if (options.isDynamicChannelPoolEnabled()) { | ||
| optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions()); | ||
| // Always pass channel-pool options when grpc-gcp is enabled so affinity cleanup settings are | ||
| // applied regardless of whether dynamic channel pool is enabled. In the non-DCP path, only | ||
| // propagate the affinity cleanup configuration to avoid implicitly turning on dynamic scaling. | ||
| if (options.isGrpcGcpExtensionEnabled()) { | ||
| optionsBuilder.withChannelPoolOptions(getGrpcGcpChannelPoolOptions(options)); | ||
| } | ||
|
|
||
| return optionsBuilder.build(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) { | ||
| GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions(); | ||
| if (options.isDynamicChannelPoolEnabled()) { | ||
| return channelPoolOptions; | ||
| } | ||
|
|
||
| // When DCP is disabled, Spanner's numChannels should still produce a fixed grpc-gcp channel | ||
| // pool instead of only capping the pool's maximum size. | ||
| return GcpChannelPoolOptions.newBuilder() | ||
| .setMaxSize(options.getNumChannels()) | ||
| .setMinSize(options.getNumChannels()) | ||
| .setInitSize(options.getNumChannels()) | ||
| .disableDynamicScaling() | ||
| .setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime()) | ||
| .setCleanupInterval(channelPoolOptions.getCleanupInterval()) | ||
| .build(); | ||
|
Comment on lines
+813
to
+827
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The method static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) {
GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
if (options.isDynamicChannelPoolEnabled()) {
return channelPoolOptions;
}
GcpChannelPoolOptions.Builder builder = GcpChannelPoolOptions.newBuilder().disableDynamicScaling();
if (channelPoolOptions != null) {
builder.setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime())
.setCleanupInterval(channelPoolOptions.getCleanupInterval());
}
return builder.build();
}
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gcpChannelPoolOptions is always initialized to either merged user options. The builder setter also rejects null, adding a defensive null branch would just hide invariant breaks. |
||
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| private static void maybeEnableGrpcGcpExtension( | ||
| InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder, | ||
|
|
@@ -793,21 +838,19 @@ private static void maybeEnableGrpcGcpExtension( | |
| final String jsonApiConfig = parseGrpcGcpApiConfig(); | ||
| final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options); | ||
|
|
||
| // When dynamic channel pool is enabled, use the DCP initial size as the pool size. | ||
| // When disabled, use the explicitly configured numChannels. | ||
| final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels(); | ||
|
|
||
| ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> baseConfigurator = | ||
| defaultChannelProviderBuilder.getChannelConfigurator(); | ||
| ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction = | ||
| channelBuilder -> { | ||
| if (baseConfigurator != null) { | ||
| channelBuilder = baseConfigurator.apply(channelBuilder); | ||
| } | ||
| // The grpc-gcp pool is configured entirely through GcpChannelPoolOptions above. Avoid | ||
| // the deprecated setPoolSize path, which only adjusts maxSize and does not eagerly create | ||
| // a fixed-size pool. | ||
| return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder) | ||
| .withApiConfigJsonString(jsonApiConfig) | ||
| .withOptions(grpcGcpOptions) | ||
| .setPoolSize(poolSize); | ||
| .withOptions(grpcGcpOptions); | ||
| }; | ||
|
|
||
| // Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1. | ||
|
|
@@ -2275,20 +2318,9 @@ <ReqT, RespT> GrpcCallContext newCallContext( | |
| Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options); | ||
| if (affinity != null) { | ||
| if (this.isGrpcGcpExtensionEnabled) { | ||
| // Set channel affinity in gRPC-GCP. | ||
| String affinityKey; | ||
| if (this.isDynamicChannelPoolEnabled) { | ||
| // When dynamic channel pooling is enabled, we use the raw affinity value as the key. | ||
| // This allows grpc-gcp to use round-robin for new keys, enabling new channels | ||
| // (created during scale-up) to receive requests. The affinity key lifetime setting | ||
| // ensures the affinity map doesn't grow unbounded. | ||
| affinityKey = String.valueOf(affinity); | ||
| } else { | ||
| // When DCP is disabled, compute bounded channel hint to prevent | ||
| // gRPC-GCP affinity map from getting unbounded. | ||
| int boundedChannelHint = affinity.intValue() % this.numChannels; | ||
| affinityKey = String.valueOf(boundedChannelHint); | ||
| } | ||
| // Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key. | ||
| // Cleanup is handled explicitly by unbind on terminal/single-use operations. | ||
| String affinityKey = String.valueOf(affinity); | ||
| context = | ||
| context.withCallOptions( | ||
| context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /* | ||
| * Copyright 2026 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.spanner.spi.v1; | ||
|
|
||
| import com.google.cloud.grpc.GcpManagedChannel; | ||
| import com.google.spanner.v1.ExecuteSqlRequest; | ||
| import com.google.spanner.v1.ResultSet; | ||
| import com.google.spanner.v1.SpannerGrpc; | ||
| import io.grpc.CallOptions; | ||
| import io.grpc.ClientCall; | ||
| import io.grpc.ManagedChannel; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| final class GrpcGcpAffinityUtil { | ||
| private GrpcGcpAffinityUtil() {} | ||
|
|
||
| static void clearChannelHintAffinity( | ||
| @Nullable ManagedChannel channel, @Nullable Long channelHint) { | ||
| if (!(channel instanceof GcpManagedChannel) || channelHint == null) { | ||
| return; | ||
| } | ||
| // TODO: Replace this synthetic call once grpc-gcp exposes a direct API for unbinding | ||
| // affinity keys without creating and immediately cancelling a ClientCall. | ||
| ClientCall<ExecuteSqlRequest, ResultSet> call = | ||
| channel.newCall( | ||
| SpannerGrpc.getExecuteSqlMethod(), | ||
| CallOptions.DEFAULT | ||
| .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) | ||
| .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); | ||
| call.cancel("Cloud Spanner transaction closed", null); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@olavloite This was added in the commit https://github.com/googleapis/google-cloud-java/pull/12715/changes#diff-abde1c3268c5f4417d5742c6c8a5541c6cf3e7d05bfc0be134e044891f9dd181 causing github check failure