Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ ByteString getTransactionId() {
public void close() {
ByteString id = getTransactionId();
if (id != null && !id.isEmpty()) {
rpc.clearTransactionAffinity(id);
rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint));
}
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,17 @@ public void close() {
* Keeps track of which channels have been 'given' to single-use transactions for a given Spanner
* instance.
*/
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();
private static final class SharedChannelUsage {
private final BitSet channelUsage;
private int referenceCount;

private SharedChannelUsage(int numChannels) {
this.channelUsage = new BitSet(numChannels);
this.referenceCount = 0;
}
}

private static final Map<SpannerImpl, SharedChannelUsage> CHANNEL_USAGE = new HashMap<>();

private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES =
EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE);
Expand All @@ -182,6 +192,8 @@ public void close() {

private final SessionClient sessionClient;

private final SpannerImpl spanner;

private final TraceWrapper tracer;

/** The current multiplexed session that is used by this client. */
Expand Down Expand Up @@ -213,15 +225,20 @@ public void close() {

@VisibleForTesting
MultiplexedSessionDatabaseClient(SessionClient sessionClient, Clock clock) {
this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
this.spanner = sessionClient.getSpanner();
this.numChannels = spanner.getOptions().getNumChannels();
synchronized (CHANNEL_USAGE) {
CHANNEL_USAGE.putIfAbsent(sessionClient.getSpanner(), new BitSet(numChannels));
this.channelUsage = CHANNEL_USAGE.get(sessionClient.getSpanner());
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should seriously consider removing this manual channel distribution logic from this class. It was introduced when using the Gax channel pool to prevent low-QPS from sticking to just one channel. My understanding is that grpc-gcp will do that automatically, as it falls back to a round-robin scheme when there is low load. That would significantly simplify this class without breaking the purpose of what this was intended to do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customers still have an option to disable grpc-gcp and switch to GAX so cleanup task should be done later.

if (sharedChannelUsage == null) {
sharedChannelUsage = new SharedChannelUsage(numChannels);
CHANNEL_USAGE.put(this.spanner, sharedChannelUsage);
}
sharedChannelUsage.referenceCount++;
this.channelUsage = sharedChannelUsage.channelUsage;
}
this.sessionExpirationDuration =
Duration.ofMillis(
sessionClient
.getSpanner()
spanner
.getOptions()
.getSessionPoolOptions()
.getMultiplexedSessionMaintenanceDuration()
Expand Down Expand Up @@ -354,10 +371,23 @@ AtomicLong getNumSessionsReleased() {
}

void close() {
boolean releaseChannelUsage = false;
synchronized (this) {
if (!this.isClosed) {
this.isClosed = true;
this.maintainer.stop();
releaseChannelUsage = true;
}
}
if (releaseChannelUsage) {
synchronized (CHANNEL_USAGE) {
SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner);
if (sharedChannelUsage != null) {
sharedChannelUsage.referenceCount--;
if (sharedChannelUsage.referenceCount == 0) {
CHANNEL_USAGE.remove(this.spanner);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.google.cloud.grpc.GcpManagedChannel;
import com.google.cloud.grpc.GcpManagedChannelBuilder;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.fallback.GcpFallbackChannel;
Expand Down Expand Up @@ -301,6 +302,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private final boolean isGrpcGcpExtensionEnabled;
private final boolean isDynamicChannelPoolEnabled;
@Nullable private final KeyAwareChannel keyAwareChannel;
@Nullable private final GcpManagedChannel grpcGcpChannel;

private final GrpcCallContext baseGrpcCallContext;

Expand Down Expand Up @@ -420,6 +422,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
.build();
ClientContext clientContext = ClientContext.create(spannerStubSettings);
this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel());
this.grpcGcpChannel = extractGrpcGcpChannel(clientContext.getTransportChannel());
this.spannerStub =
GrpcSpannerStubWithStubSettingsAndClientContext.create(
spannerStubSettings, clientContext);
Expand Down Expand Up @@ -540,6 +543,7 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
}
} else {
this.keyAwareChannel = null;
this.grpcGcpChannel = null;
this.databaseAdminStub = null;
this.instanceAdminStub = null;
this.spannerStub = null;
Expand Down Expand Up @@ -589,13 +593,51 @@ private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transport
return null;
}

@Nullable
private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transportChannel) {
if (!(transportChannel instanceof GrpcTransportChannel)) {
return null;
}
Channel channel = ((GrpcTransportChannel) transportChannel).getChannel();
if (channel instanceof GcpManagedChannel) {
return (GcpManagedChannel) channel;
}
return null;
}

@Override
public void clearTransactionAffinity(ByteString transactionId) {
if (keyAwareChannel != null) {
keyAwareChannel.clearTransactionAffinity(transactionId);
}
}

@Override
public void clearTransactionAndChannelAffinity(
ByteString transactionId, @Nullable Long channelHint) {
if (keyAwareChannel != null) {
keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint);
return;
}
clearTransactionAffinity(transactionId);
Comment thread
rahul2393 marked this conversation as resolved.
Outdated
clearChannelHintAffinity(grpcGcpChannel, channelHint);
}

@VisibleForTesting
static void clearChannelHintAffinity(
@Nullable ManagedChannel channel, @Nullable Long channelHint) {
if (!(channel instanceof GcpManagedChannel) || channelHint == null) {
return;
}
ClientCall<ExecuteSqlRequest, ResultSet> call =
Comment thread
rahul2393 marked this conversation as resolved.
Outdated
channel.newCall(
SpannerGrpc.getExecuteSqlMethod(),
CallOptions.DEFAULT
.withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint))
.withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
call.cancel("Cloud Spanner transaction closed", null);
}

private static String parseGrpcGcpApiConfig() {
try {
return Resources.toString(
Expand Down Expand Up @@ -772,16 +814,35 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerO
}
optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build());

// Configure dynamic channel pool options if enabled.
// Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults
// or user-provided configuration.
if (options.isDynamicChannelPoolEnabled()) {
optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions());
// Always pass channel-pool options when grpc-gcp is enabled so affinity cleanup settings are
// applied regardless of whether dynamic channel pool is enabled. In the non-DCP path, only
// propagate the affinity cleanup configuration to avoid implicitly turning on dynamic scaling.
if (options.isGrpcGcpExtensionEnabled()) {
optionsBuilder.withChannelPoolOptions(getGrpcGcpChannelPoolOptions(options));
}

return optionsBuilder.build();
}

@VisibleForTesting
static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) {
GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
if (options.isDynamicChannelPoolEnabled()) {
return channelPoolOptions;
}

// When DCP is disabled, Spanner's numChannels should still produce a fixed grpc-gcp channel
// pool instead of only capping the pool's maximum size.
return GcpChannelPoolOptions.newBuilder()
.setMaxSize(options.getNumChannels())
.setMinSize(options.getNumChannels())
.setInitSize(options.getNumChannels())
.disableDynamicScaling()
.setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime())
.setCleanupInterval(channelPoolOptions.getCleanupInterval())
.build();
Comment on lines +813 to +827
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The method getGrpcGcpChannelPoolOptions assumes that options.getGcpChannelPoolOptions() returns a non-null value. If it returns null, a NullPointerException will occur when calling getAffinityKeyLifetime() or getCleanupInterval(). While SpannerOptions typically provides a default, adding a null check or ensuring a non-null value is safer for robustness.

  static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) {
    GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
    if (options.isDynamicChannelPoolEnabled()) {
      return channelPoolOptions;
    }

    GcpChannelPoolOptions.Builder builder = GcpChannelPoolOptions.newBuilder().disableDynamicScaling();
    if (channelPoolOptions != null) {
      builder.setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime())
          .setCleanupInterval(channelPoolOptions.getCleanupInterval());
    }
    return builder.build();
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcpChannelPoolOptions is always initialized to either merged user options. The builder setter also rejects null, adding a defensive null branch would just hide invariant breaks.

}

@SuppressWarnings("rawtypes")
private static void maybeEnableGrpcGcpExtension(
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder,
Expand All @@ -793,21 +854,19 @@ private static void maybeEnableGrpcGcpExtension(
final String jsonApiConfig = parseGrpcGcpApiConfig();
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);

// When dynamic channel pool is enabled, use the DCP initial size as the pool size.
// When disabled, use the explicitly configured numChannels.
final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels();

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> baseConfigurator =
defaultChannelProviderBuilder.getChannelConfigurator();
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
channelBuilder -> {
if (baseConfigurator != null) {
channelBuilder = baseConfigurator.apply(channelBuilder);
}
// The grpc-gcp pool is configured entirely through GcpChannelPoolOptions above. Avoid
// the deprecated setPoolSize path, which only adjusts maxSize and does not eagerly create
// a fixed-size pool.
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(grpcGcpOptions)
.setPoolSize(poolSize);
.withOptions(grpcGcpOptions);
};

// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
Expand Down Expand Up @@ -2275,20 +2334,9 @@ <ReqT, RespT> GrpcCallContext newCallContext(
Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options);
if (affinity != null) {
if (this.isGrpcGcpExtensionEnabled) {
// Set channel affinity in gRPC-GCP.
String affinityKey;
if (this.isDynamicChannelPoolEnabled) {
// When dynamic channel pooling is enabled, we use the raw affinity value as the key.
// This allows grpc-gcp to use round-robin for new keys, enabling new channels
// (created during scale-up) to receive requests. The affinity key lifetime setting
// ensures the affinity map doesn't grow unbounded.
affinityKey = String.valueOf(affinity);
} else {
// When DCP is disabled, compute bounded channel hint to prevent
// gRPC-GCP affinity map from getting unbounded.
int boundedChannelHint = affinity.intValue() % this.numChannels;
affinityKey = String.valueOf(boundedChannelHint);
}
// Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key.
// Cleanup is handled explicitly by unbind on terminal/single-use operations.
String affinityKey = String.valueOf(affinity);
context =
context.withCallOptions(
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.cloud.grpc.GcpManagedChannel;
import com.google.cloud.spanner.XGoogSpannerRequestId;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand All @@ -32,6 +33,7 @@
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -304,6 +306,34 @@ void clearTransactionAffinity(ByteString transactionId) {
clearAffinity(transactionId);
}

void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) {
if (channelHint != null) {
ManagedChannel channel = defaultChannel;
String address = transactionAffinities.get(transactionId);
Comment thread
rahul2393 marked this conversation as resolved.
Outdated
if (address != null) {
ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
if (endpoint != null) {
channel = endpoint.getChannel();
}
}
clearChannelHintAffinity(channel, channelHint);
}
clearAffinity(transactionId);
}

private static void clearChannelHintAffinity(ManagedChannel channel, long channelHint) {
if (!(channel instanceof GcpManagedChannel)) {
return;
}
ClientCall<ExecuteSqlRequest, ResultSet> call =
channel.newCall(
SpannerGrpc.getExecuteSqlMethod(),
CallOptions.DEFAULT
.withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint))
.withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
call.cancel("Cloud Spanner transaction closed", null);
}
Comment thread
rahul2393 marked this conversation as resolved.
Outdated

private void maybeExcludeEndpointOnNextCall(
@Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) {
if (endpoint == null || logicalRequestKey == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ default RequestIdCreator getRequestIdCreator() {
/** Clears any client-side affinity associated with the given transaction id. */
default void clearTransactionAffinity(ByteString transactionId) {}

/**
* Clears any client-side transaction affinity and transport-level channel affinity associated
* with the given transaction.
*/
default void clearTransactionAndChannelAffinity(
ByteString transactionId, @Nullable Long channelHint) {
clearTransactionAffinity(transactionId);
}

// Instance admin APIs.
Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable String pageToken)
throws SpannerException;
Expand Down
Loading
Loading