Skip to content

Commit da7a1ba

Browse files
committed
fix tests
1 parent eea8b85 commit da7a1ba

File tree

4 files changed

+58
-2
lines changed

4 files changed

+58
-2
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.spanner.spi.v1;
1818

1919
import com.google.api.core.InternalApi;
20+
import com.google.common.annotations.VisibleForTesting;
2021
import com.google.common.util.concurrent.MoreExecutors;
2122
import com.google.spanner.v1.BeginTransactionRequest;
2223
import com.google.spanner.v1.CacheUpdate;
@@ -129,6 +130,13 @@ public void updateAsync(CacheUpdate update) {
129130
cacheUpdateExecutor.execute(() -> update(update));
130131
}
131132

133+
@VisibleForTesting
134+
void awaitPendingUpdates() throws InterruptedException {
135+
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
136+
cacheUpdateExecutor.execute(latch::countDown);
137+
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
138+
}
139+
132140
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
133141
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
134142
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,16 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) {
184184
return finder;
185185
}
186186

187+
@com.google.common.annotations.VisibleForTesting
188+
void awaitPendingCacheUpdates() throws InterruptedException {
189+
for (ChannelFinderReference ref : channelFinders.values()) {
190+
ChannelFinder finder = ref.get();
191+
if (finder != null) {
192+
finder.awaitPendingUpdates();
193+
}
194+
}
195+
}
196+
187197
/** Records real traffic to the selected endpoint for idle eviction tracking. */
188198
private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) {
189199
if (lifecycleManager == null) {

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ public void resultSetCacheUpdateRoutesSubsequentRequest() throws Exception {
275275
.build();
276276

277277
firstDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build());
278+
harness.channel.awaitPendingCacheUpdates();
278279

279280
ClientCall<ExecuteSqlRequest, ResultSet> secondCall =
280281
harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT);
@@ -329,6 +330,7 @@ public void transactionCacheUpdateEnablesCommitRoutingHint() throws Exception {
329330
.setCacheUpdate(createMutationRoutingCacheUpdate())
330331
.build());
331332
beginDelegate.emitOnClose(Status.OK, new Metadata());
333+
harness.channel.awaitPendingCacheUpdates();
332334

333335
ClientCall<CommitRequest, CommitResponse> commitCall =
334336
harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT);
@@ -710,6 +712,7 @@ public void commitResponseCacheUpdateEnablesSubsequentBeginRoutingHint() throws
710712
commitDelegate.emitOnMessage(
711713
CommitResponse.newBuilder().setCacheUpdate(createMutationRoutingCacheUpdate()).build());
712714
commitDelegate.emitOnClose(Status.OK, new Metadata());
715+
harness.channel.awaitPendingCacheUpdates();
713716

714717
Mutation mutation = createInsertMutation("b");
715718
ClientCall<BeginTransactionRequest, Transaction> secondBeginCall =
@@ -815,6 +818,7 @@ public void readOnlyTransactionRoutesEachReadIndependently() throws Exception {
815818
(RecordingClientCall<ExecuteSqlRequest, ResultSet>)
816819
harness.defaultManagedChannel.latestCall();
817820
seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build());
821+
harness.channel.awaitPendingCacheUpdates();
818822

819823
// 3. Send a streaming read with key in range [a, m) → should go to server-a.
820824
ClientCall<ReadRequest, PartialResultSet> readCallA =
@@ -1188,6 +1192,12 @@ private static void seedCache(TestHarness harness, CacheUpdate cacheUpdate) {
11881192
(RecordingClientCall<ExecuteSqlRequest, ResultSet>)
11891193
harness.defaultManagedChannel.latestCall();
11901194
seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build());
1195+
try {
1196+
harness.channel.awaitPendingCacheUpdates();
1197+
} catch (InterruptedException e) {
1198+
Thread.currentThread().interrupt();
1199+
throw new RuntimeException(e);
1200+
}
11911201
}
11921202

11931203
private static Mutation createInsertMutation(String keyValue) {

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.spanner.v1.Tablet;
3131
import io.grpc.CallOptions;
3232
import io.grpc.ClientCall;
33+
import io.grpc.ConnectivityState;
3334
import io.grpc.ManagedChannel;
3435
import io.grpc.MethodDescriptor;
3536
import java.util.HashMap;
@@ -529,7 +530,6 @@ public void laterRecentlyEvictedTransientFailureReplicaReportedWhenEarlierReplic
529530
assertEquals("server1", server.getAddress());
530531
assertEquals(1, hint.getSkippedTabletUidCount());
531532
assertEquals(3L, hint.getSkippedTabletUid(0).getTabletUid());
532-
assertTrue(lifecycleManager.recreationRequested.contains("server3"));
533533
} finally {
534534
lifecycleManager.shutdown();
535535
}
@@ -825,7 +825,7 @@ void setHealthy(String address, boolean healthy) {
825825

826826
static final class FakeEndpoint implements ChannelEndpoint {
827827
private final String address;
828-
private final ManagedChannel channel = new FakeManagedChannel();
828+
private final FakeManagedChannel channel = new FakeManagedChannel();
829829
private EndpointHealthState state = EndpointHealthState.READY;
830830

831831
FakeEndpoint(String address) {
@@ -854,11 +854,39 @@ public ManagedChannel getChannel() {
854854

855855
void setState(EndpointHealthState state) {
856856
this.state = state;
857+
channel.setConnectivityState(toConnectivityState(state));
858+
}
859+
860+
private static ConnectivityState toConnectivityState(EndpointHealthState healthState) {
861+
switch (healthState) {
862+
case READY:
863+
return ConnectivityState.READY;
864+
case TRANSIENT_FAILURE:
865+
return ConnectivityState.TRANSIENT_FAILURE;
866+
case IDLE:
867+
return ConnectivityState.IDLE;
868+
case CONNECTING:
869+
return ConnectivityState.CONNECTING;
870+
case UNSUPPORTED:
871+
return ConnectivityState.IDLE;
872+
default:
873+
return ConnectivityState.IDLE;
874+
}
857875
}
858876
}
859877

860878
private static final class FakeManagedChannel extends ManagedChannel {
861879
private boolean shutdown = false;
880+
private volatile ConnectivityState connectivityState = ConnectivityState.READY;
881+
882+
void setConnectivityState(ConnectivityState state) {
883+
this.connectivityState = state;
884+
}
885+
886+
@Override
887+
public ConnectivityState getState(boolean requestConnection) {
888+
return shutdown ? ConnectivityState.SHUTDOWN : connectivityState;
889+
}
862890

863891
@Override
864892
public ManagedChannel shutdown() {

0 commit comments

Comments
 (0)