Skip to content

Commit aca0428

Browse files
authored
fix(spanner): improve grpc-gcp affinity cleanup and location-aware retries (#12682)
2 parents ebad7e9 + 69fc961 commit aca0428

18 files changed

+771
-51
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
import com.google.spanner.v1.Transaction;
5656
import com.google.spanner.v1.TransactionOptions;
5757
import com.google.spanner.v1.TransactionSelector;
58+
import java.util.Collections;
59+
import java.util.EnumMap;
5860
import java.util.Map;
5961
import java.util.concurrent.ThreadLocalRandom;
6062
import java.util.concurrent.atomic.AtomicLong;
@@ -202,9 +204,19 @@ private SingleReadContext(Builder builder) {
202204
// of a channel hint. GAX will automatically choose a hint when used
203205
// with a multiplexed session to perform a round-robin channel selection. We are
204206
// passing a hint here to prefer random channel selection instead of doing GAX round-robin.
207+
// Also signal unbind so the grpc-gcp affinity map entry is cleaned up once the call
208+
// completes. The unbind flag is preserved on retries via prepareRetryOnDifferentGrpcChannel.
205209
this.channelHint =
206210
getChannelHintOptions(
207-
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
211+
session.getOptions(),
212+
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
213+
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
214+
if (this.channelHint != null) {
215+
Map<SpannerRpc.Option, Object> mutable = new EnumMap<>(SpannerRpc.Option.class);
216+
mutable.putAll(this.channelHint);
217+
mutable.put(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE);
218+
this.channelHint = Collections.unmodifiableMap(mutable);
219+
}
208220
}
209221

210222
@Override
@@ -241,7 +253,10 @@ TransactionSelector getTransactionSelector() {
241253
boolean prepareRetryOnDifferentGrpcChannel() {
242254
if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) {
243255
long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L;
244-
channelHint = optionMap(SessionOption.channelHint(channelHintForTransaction));
256+
channelHint =
257+
optionMap(
258+
SessionOption.channelHint(channelHintForTransaction),
259+
SessionOption.unbindChannelHint());
245260
return true;
246261
}
247262
return super.prepareRetryOnDifferentGrpcChannel();
@@ -360,7 +375,9 @@ static Builder newBuilder() {
360375
}
361376
this.channelHint =
362377
getChannelHintOptions(
363-
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
378+
session.getOptions(),
379+
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
380+
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
364381
}
365382

366383
@Override
@@ -862,6 +879,19 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
862879

863880
static Map<SpannerRpc.Option, ?> getChannelHintOptions(
864881
Map<SpannerRpc.Option, ?> channelHintForSession, Long channelHintForTransaction) {
882+
return getChannelHintOptions(
883+
channelHintForSession, channelHintForTransaction, /* useTransactionHint= */ false);
884+
}
885+
886+
static Map<SpannerRpc.Option, ?> getChannelHintOptions(
887+
Map<SpannerRpc.Option, ?> channelHintForSession,
888+
Long channelHintForTransaction,
889+
boolean useTransactionHint) {
890+
// grpc-gcp uses a per-operation/per-transaction random hint instead of reusing the session
891+
// hint so requests distribute independently from session affinity.
892+
if (useTransactionHint && channelHintForTransaction != null) {
893+
return optionMap(SessionOption.channelHint(channelHintForTransaction));
894+
}
865895
if (channelHintForSession != null) {
866896
return channelHintForSession;
867897
} else if (channelHintForTransaction != null) {

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,10 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) {
398398
private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
399399
boolean singleUse) {
400400
try {
401+
int singleUseChannelHint =
402+
singleUse && !sessionClient.getSpanner().getOptions().isGrpcGcpExtensionEnabled()
403+
? getSingleUseChannelHint()
404+
: NO_CHANNEL_HINT;
401405
return new MultiplexedSessionTransaction(
402406
this,
403407
tracer.getCurrentSpan(),
@@ -406,7 +410,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
406410
// session, such as for example a DatabaseNotFound exception. We therefore do not need
407411
// any special handling of such errors.
408412
multiplexedSessionReference.get().get(),
409-
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
413+
singleUseChannelHint,
410414
singleUse);
411415
} catch (ExecutionException executionException) {
412416
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ static SessionOption channelHint(long hint) {
8989
return new SessionOption(SpannerRpc.Option.CHANNEL_HINT, hint);
9090
}
9191

92+
static SessionOption unbindChannelHint() {
93+
return new SessionOption(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE);
94+
}
95+
9296
SpannerRpc.Option rpcOption() {
9397
return rpcOption;
9498
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.List;
5252
import java.util.Map;
5353
import java.util.concurrent.ExecutionException;
54+
import java.util.concurrent.ThreadLocalRandom;
5455
import javax.annotation.Nullable;
5556

5657
/**
@@ -299,10 +300,11 @@ public CommitResponse writeAtLeastOnceWithOptions(
299300
}
300301
CommitRequest request = requestBuilder.build();
301302
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
303+
Map<SpannerRpc.Option, ?> singleUseWriteOptions = getSingleUseWriteChannelHintOptions();
302304

303305
try (IScope s = tracer.withSpan(span)) {
304306
return SpannerRetryHelper.runTxWithRetriesOnAborted(
305-
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
307+
() -> new CommitResponse(spanner.getRpc().commit(request, singleUseWriteOptions)));
306308
} catch (RuntimeException e) {
307309
span.setStatus(e);
308310
throw e;
@@ -326,6 +328,14 @@ private RequestOptions getRequestOptions(TransactionOption... transactionOptions
326328
return null;
327329
}
328330

331+
private Map<SpannerRpc.Option, ?> getSingleUseWriteChannelHintOptions() {
332+
if (!spanner.getOptions().isGrpcGcpExtensionEnabled()) {
333+
return getOptions();
334+
}
335+
return optionMap(
336+
SessionOption.channelHint(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)));
337+
}
338+
329339
@Override
330340
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
331341
Iterable<MutationGroup> mutationGroups, TransactionOption... transactionOptions)

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ private TransactionContextImpl(Builder builder) {
232232
this.clock = builder.clock;
233233
this.channelHint =
234234
getChannelHintOptions(
235-
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
235+
session.getOptions(),
236+
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
237+
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
236238
this.previousTransactionId = builder.previousTransactionId;
237239
}
238240

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,17 @@ long getNthRequest() {
8383
return this.nthRequest;
8484
}
8585

86+
/**
87+
* Returns a stable identifier for the logical request, excluding channel and attempt.
88+
*
89+
* <p>This can be used to associate retry-local state across attempts of the same RPC without
90+
* affecting unrelated requests.
91+
*/
92+
@InternalApi
93+
public String getLogicalRequestKey() {
94+
return this.nthClientId + "." + this.nthRequest;
95+
}
96+
8697
@VisibleForTesting
8798
static final Pattern REGEX =
8899
Pattern.compile("^(\\d)\\.([0-9a-z]{16})\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$");

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Objects;
3333
import java.util.concurrent.ThreadLocalRandom;
3434
import java.util.concurrent.atomic.AtomicLong;
35+
import java.util.function.Predicate;
3536

3637
/**
3738
* Finds a server for a request using location-aware routing metadata.
@@ -40,6 +41,8 @@
4041
*/
4142
@InternalApi
4243
public final class ChannelFinder {
44+
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
45+
4346
private final Object updateLock = new Object();
4447
private final AtomicLong databaseId = new AtomicLong();
4548
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
@@ -71,47 +74,83 @@ public void update(CacheUpdate update) {
7174
}
7275

7376
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
74-
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
77+
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
78+
}
79+
80+
public ChannelEndpoint findServer(
81+
ReadRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
82+
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), excludedEndpoints);
7583
}
7684

7785
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder, boolean preferLeader) {
86+
return findServer(reqBuilder, preferLeader, NO_EXCLUDED_ENDPOINTS);
87+
}
88+
89+
public ChannelEndpoint findServer(
90+
ReadRequest.Builder reqBuilder, boolean preferLeader, Predicate<String> excludedEndpoints) {
7891
recipeCache.computeKeys(reqBuilder);
7992
return fillRoutingHint(
8093
preferLeader,
8194
KeyRangeCache.RangeMode.COVERING_SPLIT,
8295
reqBuilder.getDirectedReadOptions(),
83-
reqBuilder.getRoutingHintBuilder());
96+
reqBuilder.getRoutingHintBuilder(),
97+
excludedEndpoints);
8498
}
8599

86100
public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) {
87-
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
101+
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
102+
}
103+
104+
public ChannelEndpoint findServer(
105+
ExecuteSqlRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
106+
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), excludedEndpoints);
88107
}
89108

90109
public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean preferLeader) {
110+
return findServer(reqBuilder, preferLeader, NO_EXCLUDED_ENDPOINTS);
111+
}
112+
113+
public ChannelEndpoint findServer(
114+
ExecuteSqlRequest.Builder reqBuilder,
115+
boolean preferLeader,
116+
Predicate<String> excludedEndpoints) {
91117
recipeCache.computeKeys(reqBuilder);
92118
return fillRoutingHint(
93119
preferLeader,
94120
KeyRangeCache.RangeMode.PICK_RANDOM,
95121
reqBuilder.getDirectedReadOptions(),
96-
reqBuilder.getRoutingHintBuilder());
122+
reqBuilder.getRoutingHintBuilder(),
123+
excludedEndpoints);
97124
}
98125

99126
public ChannelEndpoint findServer(BeginTransactionRequest.Builder reqBuilder) {
127+
return findServer(reqBuilder, NO_EXCLUDED_ENDPOINTS);
128+
}
129+
130+
public ChannelEndpoint findServer(
131+
BeginTransactionRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
100132
if (!reqBuilder.hasMutationKey()) {
101133
return null;
102134
}
103135
return routeMutation(
104136
reqBuilder.getMutationKey(),
105137
preferLeader(reqBuilder.getOptions()),
106-
reqBuilder.getRoutingHintBuilder());
138+
reqBuilder.getRoutingHintBuilder(),
139+
excludedEndpoints);
107140
}
108141

109142
public ChannelEndpoint fillRoutingHint(CommitRequest.Builder reqBuilder) {
143+
return fillRoutingHint(reqBuilder, NO_EXCLUDED_ENDPOINTS);
144+
}
145+
146+
public ChannelEndpoint fillRoutingHint(
147+
CommitRequest.Builder reqBuilder, Predicate<String> excludedEndpoints) {
110148
Mutation mutation = selectMutationForRouting(reqBuilder.getMutationsList());
111149
if (mutation == null) {
112150
return null;
113151
}
114-
return routeMutation(mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder());
152+
return routeMutation(
153+
mutation, /* preferLeader= */ true, reqBuilder.getRoutingHintBuilder(), excludedEndpoints);
115154
}
116155

117156
private static Mutation selectMutationForRouting(List<Mutation> mutations) {
@@ -139,7 +178,10 @@ private static Mutation selectMutationForRouting(List<Mutation> mutations) {
139178
}
140179

141180
private ChannelEndpoint routeMutation(
142-
Mutation mutation, boolean preferLeader, RoutingHint.Builder hintBuilder) {
181+
Mutation mutation,
182+
boolean preferLeader,
183+
RoutingHint.Builder hintBuilder,
184+
Predicate<String> excludedEndpoints) {
143185
recipeCache.applySchemaGeneration(hintBuilder);
144186
TargetRange target = recipeCache.mutationToTargetRange(mutation);
145187
if (target == null) {
@@ -150,20 +192,23 @@ private ChannelEndpoint routeMutation(
150192
preferLeader,
151193
KeyRangeCache.RangeMode.COVERING_SPLIT,
152194
DirectedReadOptions.getDefaultInstance(),
153-
hintBuilder);
195+
hintBuilder,
196+
excludedEndpoints);
154197
}
155198

156199
private ChannelEndpoint fillRoutingHint(
157200
boolean preferLeader,
158201
KeyRangeCache.RangeMode rangeMode,
159202
DirectedReadOptions directedReadOptions,
160-
RoutingHint.Builder hintBuilder) {
203+
RoutingHint.Builder hintBuilder,
204+
Predicate<String> excludedEndpoints) {
161205
long id = databaseId.get();
162206
if (id == 0) {
163207
return null;
164208
}
165209
hintBuilder.setDatabaseId(id);
166-
return rangeCache.fillRoutingHint(preferLeader, rangeMode, directedReadOptions, hintBuilder);
210+
return rangeCache.fillRoutingHint(
211+
preferLeader, rangeMode, directedReadOptions, hintBuilder, excludedEndpoints);
167212
}
168213

169214
private static boolean preferLeader(TransactionSelector selector) {

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2059,6 +2059,13 @@ public ApiFuture<CommitResponse> commitAsync(
20592059
CommitRequest request, @Nullable Map<Option, ?> options) {
20602060
GrpcCallContext context =
20612061
newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true);
2062+
// Signal grpc-gcp to unbind the affinity key after this call completes.
2063+
// Commit is a terminal RPC — no more RPCs will use this transaction's affinity key.
2064+
if (this.isGrpcGcpExtensionEnabled) {
2065+
context =
2066+
context.withCallOptions(
2067+
context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
2068+
}
20622069
return spannerStub.commitCallable().futureCall(request, context);
20632070
}
20642071

@@ -2078,6 +2085,13 @@ public ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Opt
20782085
GrpcCallContext context =
20792086
newCallContext(
20802087
options, request.getSession(), request, SpannerGrpc.getRollbackMethod(), true);
2088+
// Signal grpc-gcp to unbind the affinity key after this call completes.
2089+
// Rollback is a terminal RPC — no more RPCs will use this transaction's affinity key.
2090+
if (this.isGrpcGcpExtensionEnabled) {
2091+
context =
2092+
context.withCallOptions(
2093+
context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
2094+
}
20812095
return spannerStub.rollbackCallable().futureCall(request, context);
20822096
}
20832097

@@ -2278,6 +2292,13 @@ <ReqT, RespT> GrpcCallContext newCallContext(
22782292
context =
22792293
context.withCallOptions(
22802294
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));
2295+
// Check if the caller wants to unbind the affinity key after this call completes.
2296+
Boolean unbind = Option.UNBIND_CHANNEL_HINT.get(options);
2297+
if (Boolean.TRUE.equals(unbind)) {
2298+
context =
2299+
context.withCallOptions(
2300+
context.getCallOptions().withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true));
2301+
}
22812302
} else {
22822303
// Set channel affinity in GAX.
22832304
context = context.withChannelAffinity(affinity.intValue());

0 commit comments

Comments
 (0)