diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 619ea42441a1..856f876a14de 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -55,6 +55,8 @@ import com.google.spanner.v1.Transaction; import com.google.spanner.v1.TransactionOptions; import com.google.spanner.v1.TransactionSelector; +import java.util.Collections; +import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; @@ -202,9 +204,19 @@ private SingleReadContext(Builder builder) { // of a channel hint. GAX will automatically choose a hint when used // with a multiplexed session to perform a round-robin channel selection. We are // passing a hint here to prefer random channel selection instead of doing GAX round-robin. + // Also signal unbind so the grpc-gcp affinity map entry is cleaned up once the call + // completes. The unbind flag is preserved on retries via prepareRetryOnDifferentGrpcChannel. this.channelHint = getChannelHintOptions( - session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); + session.getOptions(), + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE), + session.getSpanner().getOptions().isGrpcGcpExtensionEnabled()); + if (this.channelHint != null) { + Map mutable = new EnumMap<>(SpannerRpc.Option.class); + mutable.putAll(this.channelHint); + mutable.put(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE); + this.channelHint = Collections.unmodifiableMap(mutable); + } } @Override @@ -241,7 +253,10 @@ TransactionSelector getTransactionSelector() { boolean prepareRetryOnDifferentGrpcChannel() { if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) { long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L; - channelHint = optionMap(SessionOption.channelHint(channelHintForTransaction)); + channelHint = + optionMap( + SessionOption.channelHint(channelHintForTransaction), + SessionOption.unbindChannelHint()); return true; } return super.prepareRetryOnDifferentGrpcChannel(); @@ -360,7 +375,9 @@ static Builder newBuilder() { } this.channelHint = getChannelHintOptions( - session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); + session.getOptions(), + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE), + session.getSpanner().getOptions().isGrpcGcpExtensionEnabled()); } @Override @@ -862,6 +879,19 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() { static Map getChannelHintOptions( Map channelHintForSession, Long channelHintForTransaction) { + return getChannelHintOptions( + channelHintForSession, channelHintForTransaction, /* useTransactionHint= */ false); + } + + static Map getChannelHintOptions( + Map channelHintForSession, + Long channelHintForTransaction, + boolean useTransactionHint) { + // grpc-gcp uses a per-operation/per-transaction random hint instead of reusing the session + // hint so requests distribute independently from session affinity. + if (useTransactionHint && channelHintForTransaction != null) { + return optionMap(SessionOption.channelHint(channelHintForTransaction)); + } if (channelHintForSession != null) { return channelHintForSession; } else if (channelHintForTransaction != null) { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 1021e1c469d8..5bd94da32cc6 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -398,6 +398,10 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) { private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction( boolean singleUse) { try { + int singleUseChannelHint = + singleUse && !sessionClient.getSpanner().getOptions().isGrpcGcpExtensionEnabled() + ? getSingleUseChannelHint() + : NO_CHANNEL_HINT; return new MultiplexedSessionTransaction( this, tracer.getCurrentSpan(), @@ -406,7 +410,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction( // session, such as for example a DatabaseNotFound exception. We therefore do not need // any special handling of such errors. multiplexedSessionReference.get().get(), - singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT, + singleUseChannelHint, singleUse); } catch (ExecutionException executionException) { throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 1fb49f2ced3f..dabc99b93cfc 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -89,6 +89,10 @@ static SessionOption channelHint(long hint) { return new SessionOption(SpannerRpc.Option.CHANNEL_HINT, hint); } + static SessionOption unbindChannelHint() { + return new SessionOption(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE); + } + SpannerRpc.Option rpcOption() { return rpcOption; } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index e70ee390df11..e14d18fa8bf7 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; /** @@ -299,10 +300,11 @@ public CommitResponse writeAtLeastOnceWithOptions( } CommitRequest request = requestBuilder.build(); ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT); + Map singleUseWriteOptions = getSingleUseWriteChannelHintOptions(); try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( - () -> new CommitResponse(spanner.getRpc().commit(request, getOptions()))); + () -> new CommitResponse(spanner.getRpc().commit(request, singleUseWriteOptions))); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -326,6 +328,14 @@ private RequestOptions getRequestOptions(TransactionOption... transactionOptions return null; } + private Map getSingleUseWriteChannelHintOptions() { + if (!spanner.getOptions().isGrpcGcpExtensionEnabled()) { + return getOptions(); + } + return optionMap( + SessionOption.channelHint(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))); + } + @Override public ServerStream batchWriteAtLeastOnce( Iterable mutationGroups, TransactionOption... transactionOptions) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 3458b04e7a9f..b2615f240526 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -232,7 +232,9 @@ private TransactionContextImpl(Builder builder) { this.clock = builder.clock; this.channelHint = getChannelHintOptions( - session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); + session.getOptions(), + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE), + session.getSpanner().getOptions().isGrpcGcpExtensionEnabled()); this.previousTransactionId = builder.previousTransactionId; } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index d858fdb9273b..e02f813ca643 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -83,6 +83,17 @@ long getNthRequest() { return this.nthRequest; } + /** + * Returns a stable identifier for the logical request, excluding channel and attempt. + * + *

This can be used to associate retry-local state across attempts of the same RPC without + * affecting unrelated requests. + */ + @InternalApi + public String getLogicalRequestKey() { + return this.nthClientId + "." + this.nthRequest; + } + @VisibleForTesting static final Pattern REGEX = Pattern.compile("^(\\d)\\.([0-9a-z]{16})\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$"); diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 4ced18eb9203..4fd95b305b5c 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -32,6 +32,7 @@ import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; /** * Finds a server for a request using location-aware routing metadata. @@ -40,6 +41,8 @@ */ @InternalApi public final class ChannelFinder { + private static final Predicate NO_EXCLUDED_ENDPOINTS = address -> false; + private final Object updateLock = new Object(); private final AtomicLong databaseId = new AtomicLong(); private final KeyRecipeCache recipeCache = new KeyRecipeCache(); @@ -71,47 +74,83 @@ public void update(CacheUpdate update) { } public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) { - return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction())); + return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS); + } + + public ChannelEndpoint findServer( + ReadRequest.Builder reqBuilder, Predicate excludedEndpoints) { + return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), excludedEndpoints); } public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder, boolean preferLeader) { + return findServer(reqBuilder, preferLeader, NO_EXCLUDED_ENDPOINTS); + } + + public ChannelEndpoint findServer( + ReadRequest.Builder reqBuilder, boolean preferLeader, Predicate excludedEndpoints) { recipeCache.computeKeys(reqBuilder); return fillRoutingHint( preferLeader, KeyRangeCache.RangeMode.COVERING_SPLIT, reqBuilder.getDirectedReadOptions(), - reqBuilder.getRoutingHintBuilder()); + reqBuilder.getRoutingHintBuilder(), + excludedEndpoints); } public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) { - return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction())); + return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS); + } + + public ChannelEndpoint findServer( + ExecuteSqlRequest.Builder reqBuilder, Predicate excludedEndpoints) { + return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), excludedEndpoints); } public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean preferLeader) { + return findServer(reqBuilder, preferLeader, NO_EXCLUDED_ENDPOINTS); + } + + public ChannelEndpoint findServer( + ExecuteSqlRequest.Builder reqBuilder, + boolean preferLeader, + Predicate excludedEndpoints) { recipeCache.computeKeys(reqBuilder); return fillRoutingHint( preferLeader, KeyRangeCache.RangeMode.PICK_RANDOM, reqBuilder.getDirectedReadOptions(), - reqBuilder.getRoutingHintBuilder()); + reqBuilder.getRoutingHintBuilder(), + excludedEndpoints); } public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) { + return findServer(reqBuilder, NO_EXCLUDED_ENDPOINTS); + } + + public ChannelEndpoint findServer( + BeginTransactionRequest.Builder reqBuilder, Predicate excludedEndpoints) { if (!reqBuilder.hasMutationKey()) { return null; } return routeMutation( reqBuilder.getMutationKey(), preferLeader(reqBuilder.getOptions()), - reqBuilder.getRoutingHintBuilder()); + reqBuilder.getRoutingHintBuilder(), + excludedEndpoints); } public ChannelEndpoint fillRoutingHint(CommitRequest.Builder reqBuilder) { + return fillRoutingHint(reqBuilder, NO_EXCLUDED_ENDPOINTS); + } + + public ChannelEndpoint fillRoutingHint( + CommitRequest.Builder reqBuilder, Predicate excludedEndpoints) { Mutation mutation = selectMutationForRouting(reqBuilder.getMutationsList()); if (mutation == null) { return null; } - return routeMutation(mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder()); + return routeMutation( + mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder(), excludedEndpoints); } private static Mutation selectMutationForRouting(List mutations) { @@ -139,7 +178,10 @@ private static Mutation selectMutationForRouting(List mutations) { } private ChannelEndpoint routeMutation( - Mutation mutation, boolean preferLeader, RoutingHint.Builder hintBuilder) { + Mutation mutation, + boolean preferLeader, + RoutingHint.Builder hintBuilder, + Predicate excludedEndpoints) { recipeCache.applySchemaGeneration(hintBuilder); TargetRange target = recipeCache.mutationToTargetRange(mutation); if (target == null) { @@ -150,20 +192,23 @@ private ChannelEndpoint routeMutation( preferLeader, KeyRangeCache.RangeMode.COVERING_SPLIT, DirectedReadOptions.getDefaultInstance(), - hintBuilder); + hintBuilder, + excludedEndpoints); } private ChannelEndpoint fillRoutingHint( boolean preferLeader, KeyRangeCache.RangeMode rangeMode, DirectedReadOptions directedReadOptions, - RoutingHint.Builder hintBuilder) { + RoutingHint.Builder hintBuilder, + Predicate excludedEndpoints) { long id = databaseId.get(); if (id == 0) { return null; } hintBuilder.setDatabaseId(id); - return rangeCache.fillRoutingHint(preferLeader, rangeMode, directedReadOptions, hintBuilder); + return rangeCache.fillRoutingHint( + preferLeader, rangeMode, directedReadOptions, hintBuilder, excludedEndpoints); } private static boolean preferLeader(TransactionSelector selector) { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 2faa3a62fb99..8e4fc2458420 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -2059,6 +2059,13 @@ public ApiFuture commitAsync( CommitRequest request, @Nullable Map options) { GrpcCallContext context = newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true); + // Signal grpc-gcp to unbind the affinity key after this call completes. + // Commit is a terminal RPC — no more RPCs will use this transaction's affinity key. + if (this.isGrpcGcpExtensionEnabled) { + context = + context.withCallOptions( + context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); + } return spannerStub.commitCallable().futureCall(request, context); } @@ -2078,6 +2085,13 @@ public ApiFuture rollbackAsync(RollbackRequest request, @Nullable Map GrpcCallContext newCallContext( context = context.withCallOptions( context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey)); + // Check if the caller wants to unbind the affinity key after this call completes. + Boolean unbind = Option.UNBIND_CHANNEL_HINT.get(options); + if (Boolean.TRUE.equals(unbind)) { + context = + context.withCallOptions( + context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); + } } else { // Set channel affinity in GAX. context = context.withChannelAffinity(affinity.intValue()); diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 59fc03dfd80a..9205cf0eace0 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -16,8 +16,11 @@ package com.google.cloud.spanner.spi.v1; +import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID_CALL_OPTIONS_KEY; + import com.google.api.core.InternalApi; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.cloud.spanner.XGoogSpannerRequestId; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; @@ -38,11 +41,16 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.lang.ref.SoftReference; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import javax.annotation.Nullable; /** @@ -56,6 +64,8 @@ @InternalApi final class KeyAwareChannel extends ManagedChannel { private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L; + private static final long MAX_TRACKED_EXCLUDED_LOGICAL_REQUESTS = 100_000L; + private static final long EXCLUDED_LOGICAL_REQUEST_TTL_MINUTES = 10L; private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead"; private static final String STREAMING_SQL_METHOD = "google.spanner.v1.Spanner/ExecuteStreamingSql"; @@ -77,6 +87,14 @@ final class KeyAwareChannel extends ManagedChannel { // Bounded to prevent unbounded growth if application code does not close read-only transactions. private final Cache readOnlyTxPreferLeader = CacheBuilder.newBuilder().maximumSize(MAX_TRACKED_READ_ONLY_TRANSACTIONS).build(); + // If a routed endpoint returns RESOURCE_EXHAUSTED, the next retry attempt of that same logical + // request should avoid that endpoint once so other requests are unaffected. Bound and age out + // entries in case a caller gives up and never issues a retry. + private final Cache> excludedEndpointsForLogicalRequest = + CacheBuilder.newBuilder() + .maximumSize(MAX_TRACKED_EXCLUDED_LOGICAL_REQUESTS) + .expireAfterWrite(EXCLUDED_LOGICAL_REQUEST_TTL_MINUTES, TimeUnit.MINUTES) + .build(); private KeyAwareChannel( InstantiatingGrpcChannelProvider channelProvider, @@ -178,12 +196,13 @@ private static boolean isKeyAware(MethodDescriptor methodDescriptor) { } @Nullable - private ChannelEndpoint affinityEndpoint(ByteString transactionId) { + private ChannelEndpoint affinityEndpoint( + ByteString transactionId, Predicate excludedEndpoints) { if (transactionId == null || transactionId.isEmpty()) { return null; } String address = transactionAffinities.get(transactionId); - if (address == null) { + if (address == null || excludedEndpoints.test(address)) { return null; } return endpointCache.get(address); @@ -201,6 +220,40 @@ void clearTransactionAffinity(ByteString transactionId) { clearAffinity(transactionId); } + private void maybeExcludeEndpointOnNextCall( + @Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) { + if (endpoint == null || logicalRequestKey == null) { + return; + } + String address = endpoint.getAddress(); + if (!defaultEndpointAddress.equals(address)) { + excludedEndpointsForLogicalRequest + .asMap() + .compute( + logicalRequestKey, + (ignored, excludedEndpoints) -> { + Set updated = + excludedEndpoints == null ? ConcurrentHashMap.newKeySet() : excludedEndpoints; + updated.add(address); + return updated; + }); + } + } + + private Predicate consumeExcludedEndpointsForCurrentCall( + @Nullable String logicalRequestKey) { + if (logicalRequestKey == null) { + return address -> false; + } + Set excludedEndpoints = + excludedEndpointsForLogicalRequest.asMap().remove(logicalRequestKey); + if (excludedEndpoints == null || excludedEndpoints.isEmpty()) { + return address -> false; + } + excludedEndpoints = new HashSet<>(excludedEndpoints); + return excludedEndpoints::contains; + } + private boolean isReadOnlyTransaction(ByteString transactionId) { return transactionId != null && !transactionId.isEmpty() @@ -265,15 +318,40 @@ private static ByteString transactionIdFromTransaction(Transaction transaction) return null; } + private static void recordRouteSelectionTrace( + MethodDescriptor methodDescriptor, + String target, + boolean usedDefaultEndpoint, + boolean hasChannelFinder) { + Span span = Span.current(); + if (!span.getSpanContext().isValid()) { + return; + } + span.setAttribute("spanner.target", target); + span.setAttribute("spanner.route.used_default_endpoint", usedDefaultEndpoint); + span.setAttribute("spanner.route.has_channel_finder", hasChannelFinder); + span.setAttribute("spanner.route.method", methodDescriptor.getFullMethodName()); + span.addEvent( + "spanner.route.selected", + Attributes.builder() + .put("spanner.target", target) + .put("spanner.route.used_default_endpoint", usedDefaultEndpoint) + .put("spanner.route.has_channel_finder", hasChannelFinder) + .put("spanner.route.method", methodDescriptor.getFullMethodName()) + .build()); + } + static final class KeyAwareClientCall extends ForwardingClientCall { private final KeyAwareChannel parentChannel; private final MethodDescriptor methodDescriptor; private final CallOptions callOptions; + @Nullable private final String logicalRequestKey; private Listener responseListener; private Metadata headers; @Nullable private ClientCall delegate; private ChannelFinder channelFinder; + @Nullable private Predicate excludedEndpoints; @Nullable private ChannelEndpoint selectedEndpoint; @Nullable private ByteString transactionIdToClear; private boolean allowDefaultAffinity; @@ -293,6 +371,8 @@ static final class KeyAwareClientCall this.parentChannel = parentChannel; this.methodDescriptor = methodDescriptor; this.callOptions = callOptions; + XGoogSpannerRequestId requestId = callOptions.getOption(REQUEST_ID_CALL_OPTIONS_KEY); + this.logicalRequestKey = requestId == null ? null : requestId.getLogicalRequestKey(); } @Override @@ -336,6 +416,7 @@ public void sendMessage(RequestT message) { if (responseListener == null || headers == null) { throw new IllegalStateException("start must be called before sendMessage"); } + Predicate excludedEndpoints = excludedEndpoints(); ChannelEndpoint endpoint = null; ChannelFinder finder = null; @@ -361,7 +442,7 @@ public void sendMessage(RequestT message) { finder = parentChannel.getOrCreateChannelFinder(databaseId); } if (finder != null && reqBuilder.hasMutationKey()) { - endpoint = finder.findServer(reqBuilder); + endpoint = finder.findServer(reqBuilder, excludedEndpoints); } if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) { isReadOnlyBegin = true; @@ -379,12 +460,12 @@ public void sendMessage(RequestT message) { CommitRequest.Builder reqBuilder = null; if (finder != null && request.getMutationsCount() > 0) { reqBuilder = request.toBuilder(); - endpoint = finder.fillRoutingHint(reqBuilder); + endpoint = finder.fillRoutingHint(reqBuilder, excludedEndpoints); request = reqBuilder.build(); } if (!request.getTransactionId().isEmpty()) { ChannelEndpoint affinityEndpoint = - parentChannel.affinityEndpoint(request.getTransactionId()); + parentChannel.affinityEndpoint(request.getTransactionId(), excludedEndpoints); if (affinityEndpoint != null) { endpoint = affinityEndpoint; } @@ -396,7 +477,8 @@ public void sendMessage(RequestT message) { } else if (message instanceof RollbackRequest) { RollbackRequest request = (RollbackRequest) message; if (!request.getTransactionId().isEmpty()) { - endpoint = parentChannel.affinityEndpoint(request.getTransactionId()); + endpoint = + parentChannel.affinityEndpoint(request.getTransactionId(), excludedEndpoints); transactionIdToClear = request.getTransactionId(); } } else { @@ -408,9 +490,16 @@ public void sendMessage(RequestT message) { if (endpoint == null) { endpoint = parentChannel.endpointCache.defaultChannel(); } + if (endpoint == null) { + throw new IllegalStateException("No default endpoint available for key-aware call"); + } selectedEndpoint = endpoint; this.channelFinder = finder; - + recordRouteSelectionTrace( + methodDescriptor, + endpoint.getAddress(), + parentChannel.defaultEndpointAddress.equals(endpoint.getAddress()), + finder != null); delegate = endpoint.getChannel().newCall(methodDescriptor, callOptions); if (pendingMessageCompression != null) { delegate.setMessageCompression(pendingMessageCompression); @@ -551,12 +640,21 @@ private void maybeTrackReadOnlyBegin(TransactionSelector selector) { } } + private Predicate excludedEndpoints() { + if (excludedEndpoints == null) { + excludedEndpoints = parentChannel.consumeExcludedEndpointsForCurrentCall(logicalRequestKey); + } + return excludedEndpoints; + } + private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) { String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession()); ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction()); // Skip affinity for read-only transactions so each read routes independently. boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId); - ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId); + Predicate excludedEndpoints = excludedEndpoints(); + ChannelEndpoint endpoint = + isReadOnly ? null : parentChannel.affinityEndpoint(transactionId, excludedEndpoints); ChannelFinder finder = null; if (databaseId != null) { finder = parentChannel.getOrCreateChannelFinder(databaseId); @@ -565,8 +663,8 @@ private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) { Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId); ChannelEndpoint routed = preferLeaderOverride != null - ? finder.findServer(reqBuilder, preferLeaderOverride) - : finder.findServer(reqBuilder); + ? finder.findServer(reqBuilder, preferLeaderOverride, excludedEndpoints) + : finder.findServer(reqBuilder, excludedEndpoints); endpoint = routed; } return new RoutingDecision(finder, endpoint); @@ -577,7 +675,9 @@ private RoutingDecision routeFromRequest(ExecuteSqlRequest.Builder reqBuilder) { ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction()); // Skip affinity for read-only transactions so each query routes independently. boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId); - ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId); + Predicate excludedEndpoints = excludedEndpoints(); + ChannelEndpoint endpoint = + isReadOnly ? null : parentChannel.affinityEndpoint(transactionId, excludedEndpoints); ChannelFinder finder = null; if (databaseId != null) { finder = parentChannel.getOrCreateChannelFinder(databaseId); @@ -586,8 +686,8 @@ private RoutingDecision routeFromRequest(ExecuteSqlRequest.Builder reqBuilder) { Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId); ChannelEndpoint routed = preferLeaderOverride != null - ? finder.findServer(reqBuilder, preferLeaderOverride) - : finder.findServer(reqBuilder); + ? finder.findServer(reqBuilder, preferLeaderOverride, excludedEndpoints) + : finder.findServer(reqBuilder, excludedEndpoints); endpoint = routed; } return new RoutingDecision(finder, endpoint); @@ -655,6 +755,10 @@ public void onMessage(ResponseT message) { @Override public void onClose(io.grpc.Status status, Metadata trailers) { + if (status.getCode() == io.grpc.Status.Code.RESOURCE_EXHAUSTED) { + call.parentChannel.maybeExcludeEndpointOnNextCall( + call.selectedEndpoint, call.logicalRequestKey); + } call.maybeClearAffinity(); super.onClose(status, trailers); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java index bdbd495aa58c..09ecf19625c6 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java @@ -37,11 +37,13 @@ import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; import java.util.stream.IntStream; /** Cache for routing information used by location-aware routing. */ @InternalApi public final class KeyRangeCache { + private static final Predicate NO_EXCLUDED_ENDPOINTS = address -> false; private static final int MAX_LOCAL_REPLICA_DISTANCE = 5; private static final int DEFAULT_MIN_ENTRIES_FOR_RANDOM_PICK = 1000; @@ -102,6 +104,16 @@ public ChannelEndpoint fillRoutingHint( RangeMode rangeMode, DirectedReadOptions directedReadOptions, RoutingHint.Builder hintBuilder) { + return fillRoutingHint( + preferLeader, rangeMode, directedReadOptions, hintBuilder, NO_EXCLUDED_ENDPOINTS); + } + + public ChannelEndpoint fillRoutingHint( + boolean preferLeader, + RangeMode rangeMode, + DirectedReadOptions directedReadOptions, + RoutingHint.Builder hintBuilder, + Predicate excludedEndpoints) { ByteString key = hintBuilder.getKey(); if (key.isEmpty()) { return null; @@ -121,7 +133,8 @@ public ChannelEndpoint fillRoutingHint( hintBuilder.setKey(targetRange.startKey); hintBuilder.setLimitKey(targetRange.limitKey); - return targetRange.group.fillRoutingHint(preferLeader, directedReadOptions, hintBuilder); + return targetRange.group.fillRoutingHint( + preferLeader, directedReadOptions, hintBuilder, excludedEndpoints); } public void clear() { @@ -469,8 +482,11 @@ private boolean matches(DirectedReadOptions.ReplicaSelection selection) { } } - boolean shouldSkip(RoutingHint.Builder hintBuilder) { - if (skip || serverAddress.isEmpty() || (endpoint != null && !endpoint.isHealthy())) { + boolean shouldSkip(RoutingHint.Builder hintBuilder, Predicate excludedEndpoints) { + if (skip + || serverAddress.isEmpty() + || excludedEndpoints.test(serverAddress) + || (endpoint != null && !endpoint.isHealthy())) { RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); skipped.setTabletUid(tabletUid); skipped.setIncarnation(incarnation); @@ -562,7 +578,8 @@ synchronized void update(Group groupIn) { ChannelEndpoint fillRoutingHint( boolean preferLeader, DirectedReadOptions directedReadOptions, - RoutingHint.Builder hintBuilder) { + RoutingHint.Builder hintBuilder, + Predicate excludedEndpoints) { boolean hasDirectedReadOptions = directedReadOptions.getReplicasCase() != DirectedReadOptions.ReplicasCase.REPLICAS_NOT_SET; @@ -575,7 +592,11 @@ ChannelEndpoint fillRoutingHint( synchronized (this) { selected = selectTabletLocked( - preferLeader, hasDirectedReadOptions, hintBuilder, directedReadOptions); + preferLeader, + hasDirectedReadOptions, + hintBuilder, + directedReadOptions, + excludedEndpoints); if (selected == null) { return null; } @@ -605,19 +626,27 @@ private CachedTablet selectTabletLocked( boolean preferLeader, boolean hasDirectedReadOptions, RoutingHint.Builder hintBuilder, - DirectedReadOptions directedReadOptions) { + DirectedReadOptions directedReadOptions, + Predicate excludedEndpoints) { + boolean checkedLeader = false; if (preferLeader && !hasDirectedReadOptions && hasLeader() - && leader().distance <= MAX_LOCAL_REPLICA_DISTANCE - && !leader().shouldSkip(hintBuilder)) { - return leader(); + && leader().distance <= MAX_LOCAL_REPLICA_DISTANCE) { + checkedLeader = true; + if (!leader().shouldSkip(hintBuilder, excludedEndpoints)) { + return leader(); + } } - for (CachedTablet tablet : tablets) { + for (int index = 0; index < tablets.size(); index++) { + if (checkedLeader && index == leaderIndex) { + continue; + } + CachedTablet tablet = tablets.get(index); if (!tablet.matches(directedReadOptions)) { continue; } - if (tablet.shouldSkip(hintBuilder)) { + if (tablet.shouldSkip(hintBuilder, excludedEndpoints)) { continue; } return tablet; diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 7fd50f41c2d3..d75bdfa89b0e 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -81,7 +81,8 @@ public interface SpannerRpc extends ServiceRpc { /** Options passed in {@link SpannerRpc} methods to control how an RPC is issued. */ enum Option { - CHANNEL_HINT("Channel Hint"); + CHANNEL_HINT("Channel Hint"), + UNBIND_CHANNEL_HINT("Unbind Channel Hint"); private final String value; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java index b4bc7bf7bb6c..8cadf1fa09d8 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java @@ -210,6 +210,30 @@ public void testBuildRequestOptionsWithPriority() { assertEquals(RequestOptions.Priority.PRIORITY_LOW, requestOptionsLowPriority.getPriority()); } + @Test + public void getChannelHintOptionsPrefersSessionHintWhenGrpcGcpDisabled() { + Map sessionHint = + SessionClient.optionMap(SessionClient.SessionOption.channelHint(7L)); + + Map result = + AbstractReadContext.getChannelHintOptions(sessionHint, 11L, false); + + assertThat(result).isSameInstanceAs(sessionHint); + assertEquals(Long.valueOf(7L), Option.CHANNEL_HINT.getLong(result)); + } + + @Test + public void getChannelHintOptionsPrefersTransactionHintWhenGrpcGcpEnabled() { + Map sessionHint = + SessionClient.optionMap(SessionClient.SessionOption.channelHint(7L)); + + Map result = + AbstractReadContext.getChannelHintOptions(sessionHint, 11L, true); + + assertThat(result).isNotSameInstanceAs(sessionHint); + assertEquals(Long.valueOf(11L), Option.CHANNEL_HINT.getLong(result)); + } + @Test public void testGetExecuteSqlRequestBuilderWithPriority() { ExecuteSqlRequest.Builder request = diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java index 9a43ad07cdfe..af178a4f1790 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFutures; import com.google.cloud.spanner.SessionClient.SessionConsumer; import java.io.PrintWriter; import java.io.StringWriter; @@ -36,6 +37,8 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -245,6 +248,60 @@ public void testThrowExceptionMultiplexedSessionEnvVarInvalidValues() throws Exc } } + @Test + @SuppressWarnings("unchecked") + public void testGrpcGcpSingleUseDoesNotReserveBitsetChannelHint() throws Exception { + assumeTrue(isJava8()); + SessionClient sessionClient = mock(SessionClient.class); + SpannerImpl spanner = mock(SpannerImpl.class); + SpannerOptions spannerOptions = mock(SpannerOptions.class); + SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class); + TraceWrapper tracer = mock(TraceWrapper.class); + ISpan span = mock(ISpan.class); + + when(sessionClient.getSpanner()).thenReturn(spanner); + when(spanner.getOptions()).thenReturn(spannerOptions); + when(spanner.getTracer()).thenReturn(tracer); + when(tracer.getCurrentSpan()).thenReturn(span); + when(spannerOptions.getNumChannels()).thenReturn(4); + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); + when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); + when(sessionPoolOptions.getMultiplexedSessionMaintenanceDuration()) + .thenReturn(Duration.ofDays(7)); + when(sessionPoolOptions.getWaitForMinSessions()).thenReturn(Duration.ZERO); + + MultiplexedSessionDatabaseClient client = + new MultiplexedSessionDatabaseClient(sessionClient, Clock.systemUTC()); + SessionReference sessionReference = + new SessionReference( + "projects/p/instances/i/databases/d/sessions/s1", + null, + com.google.protobuf.Timestamp.getDefaultInstance(), + true, + null); + + Field sessionFutureField = + MultiplexedSessionDatabaseClient.class.getDeclaredField("multiplexedSessionReference"); + sessionFutureField.setAccessible(true); + @SuppressWarnings("unchecked") + AtomicReference> sessionFutureRef = + (AtomicReference>) + sessionFutureField.get(client); + sessionFutureRef.set(ApiFutures.immediateFuture(sessionReference)); + + java.lang.reflect.Method method = + MultiplexedSessionDatabaseClient.class.getDeclaredMethod( + "createDirectMultiplexedSessionTransaction", boolean.class); + method.setAccessible(true); + method.invoke(client, true); + + Field field = + MultiplexedSessionDatabaseClient.class.getDeclaredField("numCurrentSingleUseTransactions"); + field.setAccessible(true); + AtomicInteger counter = (AtomicInteger) field.get(client); + assertEquals(0, counter.get()); + } + private boolean isJava8() { return JavaVersionUtil.getJavaMajorVersion() == 8; } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java index 5f7227592299..0802e31e74a2 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java @@ -32,6 +32,7 @@ import com.google.spanner.v1.BatchCreateSessionsRequest; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.SpannerGrpc; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -64,6 +65,9 @@ public class RetryOnDifferentGrpcChannelMockServerTest extends AbstractMockServe /** Tracks the logical affinity keys before grpc-gcp routes the request. */ private static final Map> LOGICAL_AFFINITY_KEYS = new HashMap<>(); + /** Tracks how many calls explicitly request grpc-gcp affinity unbind. */ + private static final Map UNBIND_AFFINITY_CALL_COUNTS = new HashMap<>(); + @BeforeClass public static void setupAndStartServer() throws Exception { System.setProperty("spanner.retry_deadline_exceeded_on_different_channel", "true"); @@ -79,6 +83,7 @@ public static void removeSystemProperty() { @After public void clearRequests() { LOGICAL_AFFINITY_KEYS.clear(); + UNBIND_AFFINITY_CALL_COUNTS.clear(); mockSpanner.clearRequests(); mockSpanner.removeAllExecutionTimes(); } @@ -95,16 +100,22 @@ static GrpcInterceptorProvider createAffinityKeyInterceptorProvider() { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { - // Capture the AFFINITY_KEY before grpc-gcp processes it + String methodName = method.getFullMethodName(); + // Capture the AFFINITY_KEY before grpc-gcp processes it. String affinityKey = callOptions.getOption(GcpManagedChannel.AFFINITY_KEY); if (affinityKey != null) { - String methodName = method.getFullMethodName(); synchronized (LOGICAL_AFFINITY_KEYS) { Set keys = LOGICAL_AFFINITY_KEYS.computeIfAbsent(methodName, k -> new HashSet<>()); keys.add(affinityKey); } } + if (Boolean.TRUE.equals( + callOptions.getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY))) { + synchronized (UNBIND_AFFINITY_CALL_COUNTS) { + UNBIND_AFFINITY_CALL_COUNTS.merge(methodName, 1, Integer::sum); + } + } return next.newCall(method, callOptions); } }); @@ -311,6 +322,11 @@ public void testSingleUseQuery_retriesOnNewChannel() { LOGICAL_AFFINITY_KEYS .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", new HashSet<>()) .size()); + assertEquals( + 2, + UNBIND_AFFINITY_CALL_COUNTS + .getOrDefault(SpannerGrpc.getExecuteStreamingSqlMethod().getFullMethodName(), 0) + .intValue()); } @Test @@ -344,6 +360,11 @@ public void testSingleUseQuery_stopsRetrying() { .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", new HashSet<>()) .size(); assertEquals(totalRequests, distinctLogicalKeys); + assertEquals( + totalRequests, + UNBIND_AFFINITY_CALL_COUNTS + .getOrDefault(SpannerGrpc.getExecuteStreamingSqlMethod().getFullMethodName(), 0) + .intValue()); } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 53ab2c333d68..2b22a8c1d6b1 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -319,6 +319,112 @@ public void writeAtLeastOnceWithOptions() throws ParseException { assertThat(request.getMutationsList()).containsExactly(mutation); } + @SuppressWarnings("unchecked") + @Test + public void singleUseReadUsesRandomChannelHintWhenGrpcGcpEnabled() { + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); + ArgumentCaptor consumer = + ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); + ArgumentCaptor> readOptionsCaptor = + ArgumentCaptor.forClass((Class) Map.class); + PartialResultSet resultSet = + PartialResultSet.newBuilder() + .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) + .build(); + Mockito.when( + rpc.read( + Mockito.any(), consumer.capture(), readOptionsCaptor.capture(), any(), eq(false))) + .then( + invocation -> { + consumer.getValue().onPartialResultSet(resultSet); + consumer.getValue().onCompleted(); + return new NoOpStreamingCall(); + }); + + session + .singleUse(TimestampBound.strong()) + .readRow("Dummy", Key.of(), Collections.singletonList("C")); + + Map readOptions = readOptionsCaptor.getValue(); + assertThat(readOptions).isNotSameInstanceAs(options); + assertThat(readOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); + assertThat(readOptions.get(SpannerRpc.Option.UNBIND_CHANNEL_HINT)).isEqualTo(Boolean.TRUE); + } + + @SuppressWarnings("unchecked") + @Test + public void multiUseReadOnlyTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() + throws ParseException { + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); + ArgumentCaptor> beginOptionsCaptor = + ArgumentCaptor.forClass((Class) Map.class); + Transaction txnMetadata = + Transaction.newBuilder() + .setId(ByteString.copyFromUtf8("x")) + .setReadTimestamp(Timestamps.parse("2015-10-01T10:54:20.021Z")) + .build(); + Mockito.when(rpc.beginTransaction(Mockito.any(), beginOptionsCaptor.capture(), eq(false))) + .thenReturn(txnMetadata); + mockRead( + PartialResultSet.newBuilder() + .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) + .build()); + + session + .readOnlyTransaction(TimestampBound.strong()) + .readRow("Dummy", Key.of(), Collections.singletonList("C")); + + Map beginOptions = beginOptionsCaptor.getValue(); + assertThat(beginOptions).isNotSameInstanceAs(options); + assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); + } + + @SuppressWarnings("unchecked") + @Test + public void readWriteTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() { + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); + ArgumentCaptor> beginOptionsCaptor = + ArgumentCaptor.forClass((Class) Map.class); + Mockito.when( + rpc.beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), beginOptionsCaptor.capture(), eq(true))) + .thenReturn( + ApiFutures.immediateFuture( + Transaction.newBuilder().setId(ByteString.copyFromUtf8("TEST")).build())); + + session + .readWriteTransaction() + .run( + transaction -> { + transaction.buffer(Mutation.newInsertBuilder("T").set("C").to("x").build()); + return null; + }); + + Map beginOptions = beginOptionsCaptor.getValue(); + assertThat(beginOptions).isNotSameInstanceAs(options); + assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); + } + + @SuppressWarnings("unchecked") + @Test + public void writeAtLeastOnceUsesRandomChannelHintWhenGrpcGcpEnabled() throws ParseException { + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); + ArgumentCaptor> commitOptionsCaptor = + ArgumentCaptor.forClass((Class) Map.class); + CommitResponse response = + CommitResponse.newBuilder() + .setCommitTimestamp(Timestamps.parse("2015-10-01T10:54:20.021Z")) + .build(); + Mockito.when(rpc.commit(Mockito.any(), commitOptionsCaptor.capture())).thenReturn(response); + + session.writeAtLeastOnce( + Collections.singletonList(Mutation.newInsertBuilder("T").set("C").to("x").build())); + + Map commitOptions = commitOptionsCaptor.getValue(); + assertThat(commitOptions).isNotSameInstanceAs(options); + assertThat(commitOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); + } + private static long utcTimeSeconds(int year, int month, int day, int hour, int min, int secs) { GregorianCalendar calendar = new GregorianCalendar(TimeZone.getTimeZone("UTC")); calendar.set(year, month, day, hour, min, secs); diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index a4919389a85c..542d6e27ab6d 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThrows; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.cloud.spanner.XGoogSpannerRequestId; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.ListValue; @@ -451,6 +452,180 @@ public void singleUseCommitUsesSameMutationSelectionHeuristicAsBeginTransaction( assertEquals(expectedRoutingHint, commitDelegate.lastMessage.getRoutingHint()); } + @Test + public void resourceExhaustedRoutedEndpointIsAvoidedOnRetry() throws Exception { + TestHarness harness = createHarness(); + seedCache(harness, createLeaderAndReplicaCacheUpdate()); + CallOptions retryCallOptions = retryCallOptions(1L); + + ExecuteSqlRequest request = + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build()) + .build(); + + ClientCall firstCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), retryCallOptions); + firstCall.start(new CapturingListener(), new Metadata()); + firstCall.sendMessage(request); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + + @SuppressWarnings("unchecked") + RecordingClientCall firstDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + firstDelegate.emitOnClose(Status.RESOURCE_EXHAUSTED, new Metadata()); + + ClientCall secondCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), retryCallOptions); + secondCall.start(new CapturingListener(), new Metadata()); + secondCall.sendMessage(request); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(1); + } + + @Test + public void resourceExhaustedAffinityEndpointIsAvoidedForSubsequentTransactionRequest() + throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("rw-tx-resource-exhausted"); + seedCache(harness, createLeaderAndReplicaCacheUpdate()); + CallOptions retryCallOptions = retryCallOptions(2L); + + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), retryCallOptions); + beginCall.start(new CapturingListener(), new Metadata()); + beginCall.sendMessage( + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setTransaction( + TransactionSelector.newBuilder() + .setBegin( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()) + .build())) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build()) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + beginDelegate.emitOnMessage( + ResultSet.newBuilder() + .setMetadata( + ResultSetMetadata.newBuilder() + .setTransaction(Transaction.newBuilder().setId(transactionId))) + .build()); + beginDelegate.emitOnClose(Status.OK, new Metadata()); + + ExecuteSqlRequest transactionRequest = + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setTransaction(TransactionSelector.newBuilder().setId(transactionId)) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build()) + .build(); + + ClientCall routedCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), retryCallOptions); + routedCall.start(new CapturingListener(), new Metadata()); + routedCall.sendMessage(transactionRequest); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(2); + + @SuppressWarnings("unchecked") + RecordingClientCall routedDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + routedDelegate.emitOnClose(Status.RESOURCE_EXHAUSTED, new Metadata()); + + ClientCall retriedCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), retryCallOptions); + retriedCall.start(new CapturingListener(), new Metadata()); + retriedCall.sendMessage(transactionRequest); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(2); + assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(1); + } + + @Test + public void resourceExhaustedRoutedEndpointFallsBackToDefaultWhenNoReplicaExists() + throws Exception { + TestHarness harness = createHarness(); + CallOptions retryCallOptions = retryCallOptions(3L); + seedCache( + harness, + createRangeCacheUpdateForHint(RoutingHint.newBuilder().setKey(bytes("a")).build())); + + ExecuteSqlRequest request = + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("a")).build()) + .build(); + + ClientCall firstCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), retryCallOptions); + firstCall.start(new CapturingListener(), new Metadata()); + firstCall.sendMessage(request); + + @SuppressWarnings("unchecked") + RecordingClientCall firstDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + firstDelegate.emitOnClose(Status.RESOURCE_EXHAUSTED, new Metadata()); + + ClientCall secondCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), retryCallOptions); + secondCall.start(new CapturingListener(), new Metadata()); + secondCall.sendMessage(request); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + assertThat(harness.defaultManagedChannel.callCount()).isEqualTo(2); + } + + @Test + public void resourceExhaustedSkipDoesNotAffectDifferentLogicalRequest() throws Exception { + TestHarness harness = createHarness(); + seedCache(harness, createLeaderAndReplicaCacheUpdate()); + CallOptions firstLogicalRequest = retryCallOptions(4L); + CallOptions secondLogicalRequest = retryCallOptions(5L); + + ExecuteSqlRequest request = + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build()) + .build(); + + ClientCall firstCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), firstLogicalRequest); + firstCall.start(new CapturingListener(), new Metadata()); + firstCall.sendMessage(request); + + @SuppressWarnings("unchecked") + RecordingClientCall firstDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + firstDelegate.emitOnClose(Status.RESOURCE_EXHAUSTED, new Metadata()); + + ClientCall unrelatedCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), secondLogicalRequest); + unrelatedCall.start(new CapturingListener(), new Metadata()); + unrelatedCall.sendMessage(request); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(2); + assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(0); + + ClientCall retriedFirstCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), firstLogicalRequest); + retriedFirstCall.start(new CapturingListener(), new Metadata()); + retriedFirstCall.sendMessage(request); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(2); + assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(1); + } + @Test public void commitWithTransactionIdRoutesUsingRoutingHintWhenAffinityMissing() throws Exception { TestHarness harness = createHarness(); @@ -914,6 +1089,36 @@ private static CacheUpdate createTwoRangeCacheUpdate() { .build(); } + private static CacheUpdate createLeaderAndReplicaCacheUpdate() { + return CacheUpdate.newBuilder() + .setDatabaseId(7L) + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(1L) + .setSplitId(1L) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(1L) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1L) + .setServerAddress("server-a:1234") + .setIncarnation(bytes("1")) + .setDistance(0)) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2L) + .setServerAddress("server-b:1234") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + private static CacheUpdate createMutationRoutingCacheUpdate() throws TextFormat.ParseException { return createMutationRecipeCacheUpdate().toBuilder() .mergeFrom( @@ -1250,4 +1455,10 @@ void emitOnClose(Status status, Metadata trailers) { private static ByteString bytes(String value) { return ByteString.copyFromUtf8(value); } + + private static CallOptions retryCallOptions(long nthRequest) { + return CallOptions.DEFAULT.withOption( + XGoogSpannerRequestId.REQUEST_ID_CALL_OPTIONS_KEY, + XGoogSpannerRequestId.of(1L, 0L, nthRequest, 0L)); + } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java index 2405aa7a062b..027994a6f2fe 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java @@ -98,6 +98,54 @@ public void skipsUnhealthyTabletAfterItIsCached() { assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); } + @Test + public void skipsExplicitlyExcludedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + + cache.addRanges( + CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress("server1") + .setIncarnation(bytes("1")) + .setDistance(0)) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2) + .setServerAddress("server2") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build()); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + /* preferLeader= */ true, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint, + "server1"::equals); + + assertNotNull(server); + assertEquals("server2", server.getAddress()); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + } + @Test public void shrinkToEvictsRanges() { FakeEndpointCache endpointCache = new FakeEndpointCache(); diff --git a/java-spanner/google-cloud-spanner/src/test/resources/finder_test.textproto b/java-spanner/google-cloud-spanner/src/test/resources/finder_test.textproto index 5747c2aee0ef..f3b9adc68b79 100644 --- a/java-spanner/google-cloud-spanner/src/test/resources/finder_test.textproto +++ b/java-spanner/google-cloud-spanner/src/test/resources/finder_test.textproto @@ -7283,10 +7283,6 @@ test_case { tablet_uid: 195035137 incarnation: "\001\331" } - skipped_tablet_uid { - tablet_uid: 195035137 - incarnation: "\001\331" - } } } event { @@ -8372,10 +8368,6 @@ test_case { tablet_uid: 72351745 incarnation: "\001H" } - skipped_tablet_uid { - tablet_uid: 72351745 - incarnation: "\001H" - } } } event {