Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -202,9 +204,19 @@ private SingleReadContext(Builder builder) {
// of a channel hint. GAX will automatically choose a hint when used
// with a multiplexed session to perform a round-robin channel selection. We are
// passing a hint here to prefer random channel selection instead of doing GAX round-robin.
// Also signal unbind so the grpc-gcp affinity map entry is cleaned up once the call
// completes. The unbind flag is preserved on retries via prepareRetryOnDifferentGrpcChannel.
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
session.getOptions(),
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
if (this.channelHint != null) {
Map<SpannerRpc.Option, Object> mutable = new EnumMap<>(SpannerRpc.Option.class);
mutable.putAll(this.channelHint);
mutable.put(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE);
this.channelHint = Collections.unmodifiableMap(mutable);
}
}

@Override
Expand Down Expand Up @@ -241,7 +253,10 @@ TransactionSelector getTransactionSelector() {
boolean prepareRetryOnDifferentGrpcChannel() {
if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) {
long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L;
channelHint = optionMap(SessionOption.channelHint(channelHintForTransaction));
channelHint =
optionMap(
SessionOption.channelHint(channelHintForTransaction),
SessionOption.unbindChannelHint());
return true;
}
return super.prepareRetryOnDifferentGrpcChannel();
Expand Down Expand Up @@ -360,7 +375,9 @@ static Builder newBuilder() {
}
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
session.getOptions(),
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
}

@Override
Expand Down Expand Up @@ -862,6 +879,19 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() {

static Map<SpannerRpc.Option, ?> getChannelHintOptions(
Map<SpannerRpc.Option, ?> channelHintForSession, Long channelHintForTransaction) {
return getChannelHintOptions(
channelHintForSession, channelHintForTransaction, /* useTransactionHint= */ false);
}

static Map<SpannerRpc.Option, ?> getChannelHintOptions(
Map<SpannerRpc.Option, ?> channelHintForSession,
Long channelHintForTransaction,
boolean useTransactionHint) {
// grpc-gcp uses a per-operation/per-transaction random hint instead of reusing the session
// hint so requests distribute independently from session affinity.
if (useTransactionHint && channelHintForTransaction != null) {
return optionMap(SessionOption.channelHint(channelHintForTransaction));
}
if (channelHintForSession != null) {
return channelHintForSession;
} else if (channelHintForTransaction != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) {
private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
boolean singleUse) {
try {
int singleUseChannelHint =
singleUse && !sessionClient.getSpanner().getOptions().isGrpcGcpExtensionEnabled()
? getSingleUseChannelHint()
: NO_CHANNEL_HINT;
return new MultiplexedSessionTransaction(
this,
tracer.getCurrentSpan(),
Expand All @@ -406,7 +410,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
// session, such as for example a DatabaseNotFound exception. We therefore do not need
// any special handling of such errors.
multiplexedSessionReference.get().get(),
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
singleUseChannelHint,
singleUse);
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ static SessionOption channelHint(long hint) {
return new SessionOption(SpannerRpc.Option.CHANNEL_HINT, hint);
}

static SessionOption unbindChannelHint() {
return new SessionOption(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE);
}

SpannerRpc.Option rpcOption() {
return rpcOption;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -299,10 +300,11 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
CommitRequest request = requestBuilder.build();
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
Map<SpannerRpc.Option, ?> singleUseWriteOptions = getSingleUseWriteChannelHintOptions();

try (IScope s = tracer.withSpan(span)) {
return SpannerRetryHelper.runTxWithRetriesOnAborted(
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
() -> new CommitResponse(spanner.getRpc().commit(request, singleUseWriteOptions)));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -326,6 +328,14 @@ private RequestOptions getRequestOptions(TransactionOption... transactionOptions
return null;
}

private Map<SpannerRpc.Option, ?> getSingleUseWriteChannelHintOptions() {
if (!spanner.getOptions().isGrpcGcpExtensionEnabled()) {
return getOptions();
}
return optionMap(
SessionOption.channelHint(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)));
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... transactionOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ private TransactionContextImpl(Builder builder) {
this.clock = builder.clock;
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
session.getOptions(),
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
this.previousTransactionId = builder.previousTransactionId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ long getNthRequest() {
return this.nthRequest;
}

/**
* Returns a stable identifier for the logical request, excluding channel and attempt.
*
* <p>This can be used to associate retry-local state across attempts of the same RPC without
* affecting unrelated requests.
*/
@InternalApi
public String getLogicalRequestKey() {
return this.nthClientId + "." + this.nthRequest;
}

@VisibleForTesting
static final Pattern REGEX =
Pattern.compile("^(\\d)\\.([0-9a-z]{16})\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

/**
* Finds a server for a request using location-aware routing metadata.
Expand All @@ -40,6 +41,8 @@
*/
@InternalApi
public final class ChannelFinder {
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;

private final Object updateLock = new Object();
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
Expand Down Expand Up @@ -71,47 +74,83 @@ public void update(CacheUpdate update) {
}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
}

public ChannelEndpoint findServer(
ReadRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), excludedEndpoints);
}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder, boolean preferLeader) {
return findServer(reqBuilder, preferLeader, NO_EXCLUDED_ENDPOINTS);
}

public ChannelEndpoint findServer(
ReadRequest.Builder reqBuilder, boolean preferLeader, Predicate<String> excludedEndpoints) {
recipeCache.computeKeys(reqBuilder);
return fillRoutingHint(
preferLeader,
KeyRangeCache.RangeMode.COVERING_SPLIT,
reqBuilder.getDirectedReadOptions(),
reqBuilder.getRoutingHintBuilder());
reqBuilder.getRoutingHintBuilder(),
excludedEndpoints);
}

public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
}

public ChannelEndpoint findServer(
ExecuteSqlRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), excludedEndpoints);
}

public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean preferLeader) {
return findServer(reqBuilder, preferLeader, NO_EXCLUDED_ENDPOINTS);
}

public ChannelEndpoint findServer(
ExecuteSqlRequest.Builder reqBuilder,
boolean preferLeader,
Predicate<String> excludedEndpoints) {
recipeCache.computeKeys(reqBuilder);
return fillRoutingHint(
preferLeader,
KeyRangeCache.RangeMode.PICK_RANDOM,
reqBuilder.getDirectedReadOptions(),
reqBuilder.getRoutingHintBuilder());
reqBuilder.getRoutingHintBuilder(),
excludedEndpoints);
}

public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) {
return findServer(reqBuilder, NO_EXCLUDED_ENDPOINTS);
}

public ChannelEndpoint findServer(
BeginTransactionRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
if (!reqBuilder.hasMutationKey()) {
return null;
}
return routeMutation(
reqBuilder.getMutationKey(),
preferLeader(reqBuilder.getOptions()),
reqBuilder.getRoutingHintBuilder());
reqBuilder.getRoutingHintBuilder(),
excludedEndpoints);
}

public ChannelEndpoint fillRoutingHint(CommitRequest.Builder reqBuilder) {
return fillRoutingHint(reqBuilder, NO_EXCLUDED_ENDPOINTS);
}

public ChannelEndpoint fillRoutingHint(
CommitRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
Mutation mutation = selectMutationForRouting(reqBuilder.getMutationsList());
if (mutation == null) {
return null;
}
return routeMutation(mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder());
return routeMutation(
mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder(), excludedEndpoints);
}

private static Mutation selectMutationForRouting(List<Mutation> mutations) {
Expand Down Expand Up @@ -139,7 +178,10 @@ private static Mutation selectMutationForRouting(List<Mutation> mutations) {
}

private ChannelEndpoint routeMutation(
Mutation mutation, boolean preferLeader, RoutingHint.Builder hintBuilder) {
Mutation mutation,
boolean preferLeader,
RoutingHint.Builder hintBuilder,
Predicate<String> excludedEndpoints) {
recipeCache.applySchemaGeneration(hintBuilder);
TargetRange target = recipeCache.mutationToTargetRange(mutation);
if (target == null) {
Expand All @@ -150,20 +192,23 @@ private ChannelEndpoint routeMutation(
preferLeader,
KeyRangeCache.RangeMode.COVERING_SPLIT,
DirectedReadOptions.getDefaultInstance(),
hintBuilder);
hintBuilder,
excludedEndpoints);
}

private ChannelEndpoint fillRoutingHint(
boolean preferLeader,
KeyRangeCache.RangeMode rangeMode,
DirectedReadOptions directedReadOptions,
RoutingHint.Builder hintBuilder) {
RoutingHint.Builder hintBuilder,
Predicate<String> excludedEndpoints) {
long id = databaseId.get();
if (id == 0) {
return null;
}
hintBuilder.setDatabaseId(id);
return rangeCache.fillRoutingHint(preferLeader, rangeMode, directedReadOptions, hintBuilder);
return rangeCache.fillRoutingHint(
preferLeader, rangeMode, directedReadOptions, hintBuilder, excludedEndpoints);
}

private static boolean preferLeader(TransactionSelector selector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,13 @@ public ApiFuture<CommitResponse> commitAsync(
CommitRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true);
// Signal grpc-gcp to unbind the affinity key after this call completes.
// Commit is a terminal RPC — no more RPCs will use this transaction's affinity key.
if (this.isGrpcGcpExtensionEnabled) {
context =
context.withCallOptions(
context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
}
return spannerStub.commitCallable().futureCall(request, context);
}

Expand All @@ -2078,6 +2085,13 @@ public ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Opt
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getRollbackMethod(), true);
// Signal grpc-gcp to unbind the affinity key after this call completes.
// Rollback is a terminal RPC — no more RPCs will use this transaction's affinity key.
if (this.isGrpcGcpExtensionEnabled) {
context =
context.withCallOptions(
context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
}
return spannerStub.rollbackCallable().futureCall(request, context);
}

Expand Down Expand Up @@ -2278,6 +2292,13 @@ <ReqT, RespT> GrpcCallContext newCallContext(
context =
context.withCallOptions(
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));
// Check if the caller wants to unbind the affinity key after this call completes.
Boolean unbind = Option.UNBIND_CHANNEL_HINT.get(options);
if (Boolean.TRUE.equals(unbind)) {
context =
context.withCallOptions(
context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
}
} else {
// Set channel affinity in GAX.
context = context.withChannelAffinity(affinity.intValue());
Expand Down
Loading
Loading