Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion java-spanner/.gitignore
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ ByteString getTransactionId() {
public void close() {
ByteString id = getTransactionId();
if (id != null && !id.isEmpty()) {
rpc.clearTransactionAffinity(id);
rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint));
}
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,17 @@ public void close() {
* Keeps track of which channels have been 'given' to single-use transactions for a given Spanner
* instance.
*/
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();
private static final class SharedChannelUsage {
private final BitSet channelUsage;
private int referenceCount;

private SharedChannelUsage(int numChannels) {
this.channelUsage = new BitSet(numChannels);
this.referenceCount = 0;
}
}

private static final Map<SpannerImpl, SharedChannelUsage> CHANNEL_USAGE = new HashMap<>();

private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES =
EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE);
Expand All @@ -182,6 +192,8 @@ public void close() {

private final SessionClient sessionClient;

private final SpannerImpl spanner;

private final TraceWrapper tracer;

/** The current multiplexed session that is used by this client. */
Expand Down Expand Up @@ -213,15 +225,20 @@ public void close() {

@VisibleForTesting
MultiplexedSessionDatabaseClient(SessionClient sessionClient, Clock clock) {
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
this.spanner = sessionClient.getSpanner();
this.numChannels = spanner.getOptions().getNumChannels();
synchronized (CHANNEL_USAGE) {
CHANNEL_USAGE.putIfAbsent(sessionClient.getSpanner(), new BitSet(numChannels));
this.channelUsage = CHANNEL_USAGE.get(sessionClient.getSpanner());
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should seriously consider removing this manual channel distribution logic from this class. It was introduced when using the Gax channel pool to prevent low-QPS from sticking to just one channel. My understanding is that grpc-gcp will do that automatically, as it falls back to a round-robin scheme when there is low load. That would significantly simplify this class without breaking the purpose of what this was intended to do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customers still have an option to disable grpc-gcp and switch to GAX so cleanup task should be done later.

if (sharedChannelUsage == null) {
sharedChannelUsage = new SharedChannelUsage(numChannels);
CHANNEL_USAGE.put(this.spanner, sharedChannelUsage);
}
sharedChannelUsage.referenceCount++;
this.channelUsage = sharedChannelUsage.channelUsage;
}
this.sessionExpirationDuration =
Duration.ofMillis(
sessionClient
.getSpanner()
spanner
.getOptions()
.getSessionPoolOptions()
.getMultiplexedSessionMaintenanceDuration()
Expand Down Expand Up @@ -354,10 +371,23 @@ AtomicLong getNumSessionsReleased() {
}

void close() {
boolean releaseChannelUsage = false;
synchronized (this) {
if (!this.isClosed) {
this.isClosed = true;
this.maintainer.stop();
releaseChannelUsage = true;
}
}
if (releaseChannelUsage) {
synchronized (CHANNEL_USAGE) {
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
if (sharedChannelUsage != null) {
sharedChannelUsage.referenceCount--;
if (sharedChannelUsage.referenceCount == 0) {
CHANNEL_USAGE.remove(this.spanner);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.google.cloud.grpc.GcpManagedChannel;
import com.google.cloud.grpc.GcpManagedChannelBuilder;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.fallback.GcpFallbackChannel;
Expand Down Expand Up @@ -301,6 +302,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private final boolean isGrpcGcpExtensionEnabled;
private final boolean isDynamicChannelPoolEnabled;
@Nullable private final KeyAwareChannel keyAwareChannel;
@Nullable private final GcpManagedChannel grpcGcpChannel;

private final GrpcCallContext baseGrpcCallContext;

Expand Down Expand Up @@ -420,6 +422,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
.build();
ClientContext clientContext = ClientContext.create(spannerStubSettings);
this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel());
this.grpcGcpChannel = extractGrpcGcpChannel(clientContext.getTransportChannel());
this.spannerStub =
GrpcSpannerStubWithStubSettingsAndClientContext.create(
spannerStubSettings, clientContext);
Expand Down Expand Up @@ -540,6 +543,7 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
}
} else {
this.keyAwareChannel = null;
this.grpcGcpChannel = null;
this.databaseAdminStub = null;
this.instanceAdminStub = null;
this.spannerStub = null;
Expand Down Expand Up @@ -589,13 +593,35 @@ private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transport
return null;
}

@Nullable
private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transportChannel) {
if (!(transportChannel instanceof GrpcTransportChannel)) {
return null;
}
Channel channel = ((GrpcTransportChannel) transportChannel).getChannel();
if (channel instanceof GcpManagedChannel) {
return (GcpManagedChannel) channel;
}
return null;
}

@Override
public void clearTransactionAffinity(ByteString transactionId) {
if (keyAwareChannel != null) {
keyAwareChannel.clearTransactionAffinity(transactionId);
}
}

@Override
public void clearTransactionAndChannelAffinity(
ByteString transactionId, @Nullable Long channelHint) {
if (keyAwareChannel != null) {
keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint);
return;
}
GrpcGcpAffinityUtil.clearChannelHintAffinity(grpcGcpChannel, channelHint);
}

private static String parseGrpcGcpApiConfig() {
try {
return Resources.toString(
Expand Down Expand Up @@ -772,16 +798,35 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerO
}
optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build());

// Configure dynamic channel pool options if enabled.
// Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults
// or user-provided configuration.
if (options.isDynamicChannelPoolEnabled()) {
optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions());
// Always pass channel-pool options when grpc-gcp is enabled so affinity cleanup settings are
// applied regardless of whether dynamic channel pool is enabled. In the non-DCP path, only
// propagate the affinity cleanup configuration to avoid implicitly turning on dynamic scaling.
if (options.isGrpcGcpExtensionEnabled()) {
optionsBuilder.withChannelPoolOptions(getGrpcGcpChannelPoolOptions(options));
}

return optionsBuilder.build();
}

@VisibleForTesting
static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) {
GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
if (options.isDynamicChannelPoolEnabled()) {
return channelPoolOptions;
}

// When DCP is disabled, Spanner's numChannels should still produce a fixed grpc-gcp channel
// pool instead of only capping the pool's maximum size.
return GcpChannelPoolOptions.newBuilder()
.setMaxSize(options.getNumChannels())
.setMinSize(options.getNumChannels())
.setInitSize(options.getNumChannels())
.disableDynamicScaling()
.setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime())
.setCleanupInterval(channelPoolOptions.getCleanupInterval())
.build();
Comment on lines +813 to +827
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The method getGrpcGcpChannelPoolOptions assumes that options.getGcpChannelPoolOptions() returns a non-null value. If it returns null, a NullPointerException will occur when calling getAffinityKeyLifetime() or getCleanupInterval(). While SpannerOptions typically provides a default, adding a null check or ensuring a non-null value is safer for robustness.

  static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) {
    GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
    if (options.isDynamicChannelPoolEnabled()) {
      return channelPoolOptions;
    }

    GcpChannelPoolOptions.Builder builder = GcpChannelPoolOptions.newBuilder().disableDynamicScaling();
    if (channelPoolOptions != null) {
      builder.setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime())
          .setCleanupInterval(channelPoolOptions.getCleanupInterval());
    }
    return builder.build();
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcpChannelPoolOptions is always initialized to either merged user options. The builder setter also rejects null, adding a defensive null branch would just hide invariant breaks.

}

@SuppressWarnings("rawtypes")
private static void maybeEnableGrpcGcpExtension(
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder,
Expand All @@ -793,21 +838,19 @@ private static void maybeEnableGrpcGcpExtension(
final String jsonApiConfig = parseGrpcGcpApiConfig();
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);

// When dynamic channel pool is enabled, use the DCP initial size as the pool size.
// When disabled, use the explicitly configured numChannels.
final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels();

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> baseConfigurator =
defaultChannelProviderBuilder.getChannelConfigurator();
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
channelBuilder -> {
if (baseConfigurator != null) {
channelBuilder = baseConfigurator.apply(channelBuilder);
}
// The grpc-gcp pool is configured entirely through GcpChannelPoolOptions above. Avoid
// the deprecated setPoolSize path, which only adjusts maxSize and does not eagerly create
// a fixed-size pool.
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(grpcGcpOptions)
.setPoolSize(poolSize);
.withOptions(grpcGcpOptions);
};

// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
Expand Down Expand Up @@ -2275,20 +2318,9 @@ <ReqT, RespT> GrpcCallContext newCallContext(
Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options);
if (affinity != null) {
if (this.isGrpcGcpExtensionEnabled) {
// Set channel affinity in gRPC-GCP.
String affinityKey;
if (this.isDynamicChannelPoolEnabled) {
// When dynamic channel pooling is enabled, we use the raw affinity value as the key.
// This allows grpc-gcp to use round-robin for new keys, enabling new channels
// (created during scale-up) to receive requests. The affinity key lifetime setting
// ensures the affinity map doesn't grow unbounded.
affinityKey = String.valueOf(affinity);
} else {
// When DCP is disabled, compute bounded channel hint to prevent
// gRPC-GCP affinity map from getting unbounded.
int boundedChannelHint = affinity.intValue() % this.numChannels;
affinityKey = String.valueOf(boundedChannelHint);
}
// Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key.
// Cleanup is handled explicitly by unbind on terminal/single-use operations.
String affinityKey = String.valueOf(affinity);
context =
context.withCallOptions(
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.spi.v1;

import com.google.cloud.grpc.GcpManagedChannel;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.SpannerGrpc;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import javax.annotation.Nullable;

final class GrpcGcpAffinityUtil {
private GrpcGcpAffinityUtil() {}

static void clearChannelHintAffinity(
@Nullable ManagedChannel channel, @Nullable Long channelHint) {
if (!(channel instanceof GcpManagedChannel) || channelHint == null) {
return;
}
// TODO: Replace this synthetic call once grpc-gcp exposes a direct API for unbinding
// affinity keys without creating and immediately cancelling a ClientCall.
ClientCall<ExecuteSqlRequest, ResultSet> call =
channel.newCall(
SpannerGrpc.getExecuteSqlMethod(),
CallOptions.DEFAULT
.withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint))
.withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
call.cancel("Cloud Spanner transaction closed", null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,21 @@ void clearTransactionAffinity(ByteString transactionId) {
clearAffinity(transactionId);
}

void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) {
String address = transactionAffinities.remove(transactionId);
readOnlyTxPreferLeader.invalidate(transactionId);
if (channelHint != null) {
ManagedChannel channel = defaultChannel;
if (address != null) {
ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
if (endpoint != null) {
channel = endpoint.getChannel();
}
}
GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, channelHint);
}
}

private void maybeExcludeEndpointOnNextCall(
@Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) {
if (endpoint == null || logicalRequestKey == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ default RequestIdCreator getRequestIdCreator() {
/** Clears any client-side affinity associated with the given transaction id. */
default void clearTransactionAffinity(ByteString transactionId) {}

/**
* Clears any client-side transaction affinity and transport-level channel affinity associated
* with the given transaction.
*/
default void clearTransactionAndChannelAffinity(
ByteString transactionId, @Nullable Long channelHint) {
clearTransactionAffinity(transactionId);
}

// Instance admin APIs.
Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable String pageToken)
throws SpannerException;
Expand Down
Loading
Loading