Skip to content

Commit 85d55c6

Browse files
committed
fix keep alives
1 parent e94496f commit 85d55c6

3 files changed

Lines changed: 39 additions & 2 deletions

File tree

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.ManagedChannel;
2828
import io.grpc.ManagedChannelBuilder;
2929
import java.io.IOException;
30+
import java.time.Duration;
3031
import java.util.Map;
3132
import java.util.Objects;
3233
import java.util.concurrent.ConcurrentHashMap;
@@ -51,6 +52,9 @@ class GrpcChannelEndpointCache implements ChannelEndpointCache {
5152
/** Timeout for graceful channel shutdown. */
5253
private static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
5354

55+
@VisibleForTesting static final Duration ROUTED_KEEPALIVE_TIME = Duration.ofSeconds(2);
56+
@VisibleForTesting static final Duration ROUTED_KEEPALIVE_TIMEOUT = Duration.ofSeconds(20);
57+
5458
private final InstantiatingGrpcChannelProvider baseProvider;
5559
private final Map<String, GrpcChannelEndpoint> servers = new ConcurrentHashMap<>();
5660
private final GrpcChannelEndpoint defaultEndpoint;
@@ -129,6 +133,8 @@ InstantiatingGrpcChannelProvider createProviderWithAuthorityOverride(String addr
129133
}
130134
Builder builder = endpointProvider.toBuilder();
131135
builder.setChannelPoolSettings(ChannelPoolSettings.staticallySized(1));
136+
builder.setKeepAliveTimeDuration(ROUTED_KEEPALIVE_TIME);
137+
builder.setKeepAliveTimeoutDuration(ROUTED_KEEPALIVE_TIMEOUT);
132138
builder.setKeepAliveWithoutCalls(Boolean.TRUE);
133139
final com.google.api.core.ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>
134140
baseConfigurator =

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/LocationAwareSharedBackendReplicaHarnessTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,7 @@ public void readWriteTransactionAbortedCommitUsesReadAffinityReplicaForBypassTra
507507
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(PROJECT, INSTANCE, DATABASE));
508508

509509
seedLocationMetadata(client);
510+
waitForReplicaRoutedStrongRead(client, harness, /* expectedReplicaIndex= */ 1);
510511
harness.clearRequests();
511512
AtomicInteger attempts = new AtomicInteger();
512513
AtomicInteger firstReplicaIndex = new AtomicInteger(-1);
@@ -655,6 +656,30 @@ private static int waitForReplicaRoutedRead(
655656
throw new AssertionError("Timed out waiting for location-aware read to route to replica");
656657
}
657658

659+
private static void waitForReplicaRoutedStrongRead(
660+
DatabaseClient client, SharedBackendReplicaHarness harness, int expectedReplicaIndex)
661+
throws InterruptedException {
662+
long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
663+
while (System.nanoTime() < deadlineNanos) {
664+
harness.clearRequests();
665+
try (ResultSet resultSet =
666+
client.singleUse().read(TABLE, KeySet.singleKey(Key.of("b")), Arrays.asList("k"))) {
667+
if (resultSet.next()) {
668+
if (!harness
669+
.replicas
670+
.get(expectedReplicaIndex)
671+
.getRequests(SharedBackendReplicaHarness.METHOD_STREAMING_READ)
672+
.isEmpty()) {
673+
return;
674+
}
675+
}
676+
}
677+
Thread.sleep(50L);
678+
}
679+
throw new AssertionError(
680+
"Timed out waiting for strong read to route to replica " + expectedReplicaIndex);
681+
}
682+
658683
private static int findReplicaWithRequest(SharedBackendReplicaHarness harness, String method) {
659684
for (int replicaIndex = 0; replicaIndex < harness.replicas.size(); replicaIndex++) {
660685
if (!harness.replicas.get(replicaIndex).getRequests(method).isEmpty()) {

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCacheTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,13 @@ public void routedChannelsUseSingleUnderlyingChannel() throws Exception {
9898
}
9999

100100
@Test
101-
public void routedChannelsEnableKeepAliveWithoutCallsOnlyForEndpointProvider() throws Exception {
101+
public void routedChannelsOverrideKeepAliveSettingsOnlyForEndpointProvider() throws Exception {
102102
InstantiatingGrpcChannelProvider provider =
103103
InstantiatingGrpcChannelProvider.newBuilder()
104104
.setEndpoint(DEFAULT_ENDPOINT)
105105
.setPoolSize(4)
106106
.setKeepAliveTimeDuration(java.time.Duration.ofSeconds(120))
107+
.setKeepAliveTimeoutDuration(java.time.Duration.ofSeconds(60))
107108
.setKeepAliveWithoutCalls(Boolean.FALSE)
108109
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
109110
.build();
@@ -113,9 +114,14 @@ public void routedChannelsEnableKeepAliveWithoutCallsOnlyForEndpointProvider() t
113114
cache.createProviderWithAuthorityOverride(ROUTED_ENDPOINT_A);
114115

115116
assertThat(provider.getKeepAliveWithoutCalls()).isFalse();
117+
assertThat(provider.getKeepAliveTimeDuration()).isEqualTo(java.time.Duration.ofSeconds(120));
118+
assertThat(provider.getKeepAliveTimeoutDuration())
119+
.isEqualTo(java.time.Duration.ofSeconds(60));
116120
assertThat(routedProvider.getKeepAliveWithoutCalls()).isTrue();
117121
assertThat(routedProvider.getKeepAliveTimeDuration())
118-
.isEqualTo(provider.getKeepAliveTimeDuration());
122+
.isEqualTo(GrpcChannelEndpointCache.ROUTED_KEEPALIVE_TIME);
123+
assertThat(routedProvider.getKeepAliveTimeoutDuration())
124+
.isEqualTo(GrpcChannelEndpointCache.ROUTED_KEEPALIVE_TIMEOUT);
119125
} finally {
120126
cache.shutdown();
121127
}

0 commit comments

Comments
 (0)