Skip to content

Commit 2598291

Browse files
committed
retry unavailable errors on different replica
1 parent ea55529 commit 2598291

3 files changed

Lines changed: 166 additions & 5 deletions

File tree

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ final class KeyAwareChannel extends ManagedChannel {
9595
// Bounded to prevent unbounded growth if application code does not close read-only transactions.
9696
private final Cache<ByteString, Boolean> readOnlyTxPreferLeader =
9797
CacheBuilder.newBuilder().maximumSize(MAX_TRACKED_READ_ONLY_TRANSACTIONS).build();
98-
// If a routed endpoint returns RESOURCE_EXHAUSTED, the next retry attempt of that same logical
99-
// request should avoid that endpoint once so other requests are unaffected. Bound and age out
100-
// entries in case a caller gives up and never issues a retry.
98+
// If a routed endpoint returns RESOURCE_EXHAUSTED or UNAVAILABLE, the next retry attempt of
99+
// that same logical request should avoid that endpoint once so other requests are unaffected.
100+
// Bound and age out entries in case a caller gives up and never issues a retry.
101101
private final Cache<String, Set<String>> excludedEndpointsForLogicalRequest =
102102
CacheBuilder.newBuilder()
103103
.maximumSize(MAX_TRACKED_EXCLUDED_LOGICAL_REQUESTS)
@@ -364,6 +364,11 @@ private void maybeExcludeEndpointOnNextCall(
364364
});
365365
}
366366

367+
private static boolean shouldExcludeEndpointOnRetry(io.grpc.Status.Code statusCode) {
368+
return statusCode == io.grpc.Status.Code.RESOURCE_EXHAUSTED
369+
|| statusCode == io.grpc.Status.Code.UNAVAILABLE;
370+
}
371+
367372
private Predicate<String> consumeExcludedEndpointsForCurrentCall(
368373
@Nullable String logicalRequestKey) {
369374
Predicate<String> requestScopedExcluded = address -> false;
@@ -898,7 +903,7 @@ public void onMessage(ResponseT message) {
898903

899904
@Override
900905
public void onClose(io.grpc.Status status, Metadata trailers) {
901-
if (status.getCode() == io.grpc.Status.Code.RESOURCE_EXHAUSTED) {
906+
if (shouldExcludeEndpointOnRetry(status.getCode())) {
902907
call.parentChannel.maybeExcludeEndpointOnNextCall(
903908
call.selectedEndpoint, call.logicalRequestKey);
904909
}

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/LocationAwareSharedBackendReplicaHarnessTest.java

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,156 @@ public void singleUseReadCooldownSkipsReplicaOnNextRequestForBypassTraffic() thr
248248
}
249249
}
250250

251+
@Test
252+
public void singleUseReadReroutesOnUnavailableForBypassTraffic() throws Exception {
253+
try (SharedBackendReplicaHarness harness = SharedBackendReplicaHarness.create(2);
254+
Spanner spanner = createSpanner(harness)) {
255+
configureBackend(harness, singleRowReadResultSet("b"));
256+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(PROJECT, INSTANCE, DATABASE));
257+
258+
seedLocationMetadata(client);
259+
waitForReplicaRoutedRead(client, harness, 0);
260+
harness.clearRequests();
261+
262+
harness
263+
.replicas
264+
.get(0)
265+
.putMethodErrors(
266+
SharedBackendReplicaHarness.METHOD_STREAMING_READ, unavailable("isolated-replica"));
267+
268+
try (ResultSet resultSet =
269+
client
270+
.singleUse()
271+
.read(
272+
TABLE,
273+
KeySet.singleKey(Key.of("b")),
274+
Arrays.asList("k"),
275+
Options.directedRead(DIRECTED_READ_OPTIONS))) {
276+
assertTrue(resultSet.next());
277+
}
278+
279+
assertEquals(
280+
1,
281+
harness
282+
.replicas
283+
.get(0)
284+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
285+
.size());
286+
assertEquals(
287+
1,
288+
harness
289+
.replicas
290+
.get(1)
291+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
292+
.size());
293+
assertEquals(
294+
0,
295+
harness
296+
.defaultReplica
297+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
298+
.size());
299+
ReadRequest replicaARequest =
300+
(ReadRequest)
301+
harness
302+
.replicas
303+
.get(0)
304+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
305+
.get(0);
306+
assertTrue(replicaARequest.getResumeToken().isEmpty());
307+
assertRetriedOnSameLogicalRequest(
308+
harness
309+
.replicas
310+
.get(0)
311+
.getRequestIds(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
312+
.get(0),
313+
harness
314+
.replicas
315+
.get(1)
316+
.getRequestIds(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
317+
.get(0));
318+
}
319+
}
320+
321+
@Test
322+
public void singleUseReadCooldownSkipsUnavailableReplicaOnNextRequestForBypassTraffic()
323+
throws Exception {
324+
try (SharedBackendReplicaHarness harness = SharedBackendReplicaHarness.create(2);
325+
Spanner spanner = createSpanner(harness)) {
326+
configureBackend(harness, singleRowReadResultSet("b"));
327+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(PROJECT, INSTANCE, DATABASE));
328+
329+
seedLocationMetadata(client);
330+
waitForReplicaRoutedRead(client, harness, 0);
331+
harness.clearRequests();
332+
333+
harness
334+
.replicas
335+
.get(0)
336+
.putMethodErrors(
337+
SharedBackendReplicaHarness.METHOD_STREAMING_READ, unavailable("isolated-replica"));
338+
339+
try (ResultSet firstRead =
340+
client
341+
.singleUse()
342+
.read(
343+
TABLE,
344+
KeySet.singleKey(Key.of("b")),
345+
Arrays.asList("k"),
346+
Options.directedRead(DIRECTED_READ_OPTIONS))) {
347+
assertTrue(firstRead.next());
348+
}
349+
350+
try (ResultSet secondRead =
351+
client
352+
.singleUse()
353+
.read(
354+
TABLE,
355+
KeySet.singleKey(Key.of("b")),
356+
Arrays.asList("k"),
357+
Options.directedRead(DIRECTED_READ_OPTIONS))) {
358+
assertTrue(secondRead.next());
359+
}
360+
361+
assertEquals(
362+
1,
363+
harness
364+
.replicas
365+
.get(0)
366+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
367+
.size());
368+
assertEquals(
369+
2,
370+
harness
371+
.replicas
372+
.get(1)
373+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
374+
.size());
375+
assertEquals(
376+
0,
377+
harness
378+
.defaultReplica
379+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
380+
.size());
381+
List<AbstractMessage> replicaBRequests =
382+
harness.replicas.get(1).getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ);
383+
for (AbstractMessage request : replicaBRequests) {
384+
assertTrue(((ReadRequest) request).getResumeToken().isEmpty());
385+
}
386+
List<String> replicaBRequestIds =
387+
harness.replicas.get(1).getRequestIds(SharedBackendReplicaHarness.METHOD_STREAMING_READ);
388+
assertRetriedOnSameLogicalRequest(
389+
harness
390+
.replicas
391+
.get(0)
392+
.getRequestIds(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
393+
.get(0),
394+
replicaBRequestIds.get(0));
395+
assertNotEquals(
396+
XGoogSpannerRequestId.of(replicaBRequestIds.get(0)).getLogicalRequestKey(),
397+
XGoogSpannerRequestId.of(replicaBRequestIds.get(1)).getLogicalRequestKey());
398+
}
399+
}
400+
251401
@Test
252402
public void singleUseReadMidStreamRecvFailureWithoutRetryInfoRetriesForBypassTraffic()
253403
throws Exception {
@@ -486,6 +636,10 @@ private static StatusRuntimeException resourceExhausted(String description) {
486636
return Status.RESOURCE_EXHAUSTED.withDescription(description).asRuntimeException();
487637
}
488638

639+
private static StatusRuntimeException unavailable(String description) {
640+
return Status.UNAVAILABLE.withDescription(description).asRuntimeException();
641+
}
642+
489643
private static void assertRetriedOnSameLogicalRequest(
490644
String firstRequestId, String secondRequestId) {
491645
XGoogSpannerRequestId first = XGoogSpannerRequestId.of(firstRequestId);

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,9 @@ private PartialResultSetsIterator(
208208
ByteString resumeToken) {
209209
this.resultSet = resultSet;
210210
this.currentRow = parseResumeToken(resumeToken);
211-
this.hasNext = currentRow < resultSet.getRowsCount();
211+
this.hasNext =
212+
currentRow < resultSet.getRowsCount()
213+
|| (currentRow == 0 && resultSet.getRowsCount() == 0);
212214
this.setPrecommitToken = setPrecommitToken;
213215
this.transactionId = transactionId;
214216
}

0 commit comments

Comments
 (0)