From e9de8636e6ebf3fa356c75e6a0691598e8c48369 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Wed, 8 Apr 2026 23:23:26 +0530 Subject: [PATCH 1/3] chore(spanner): optimize lock contention and skipped tablet reporting --- .../cloud/spanner/spi/v1/ChannelFinder.java | 16 ++ .../spi/v1/EndpointLifecycleManager.java | 32 +++- .../cloud/spanner/spi/v1/KeyAwareChannel.java | 8 +- .../cloud/spanner/spi/v1/KeyRangeCache.java | 157 +++++++++++++++--- .../spi/v1/EndpointLifecycleManagerTest.java | 50 ++++++ .../spanner/spi/v1/KeyRangeCacheTest.java | 141 +++++++++++++++- 6 files changed, 369 insertions(+), 35 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 2f229dddee44..9a951859bb61 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner.spi.v1; import com.google.api.core.InternalApi; +import com.google.common.util.concurrent.MoreExecutors; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CacheUpdate; import com.google.spanner.v1.CommitRequest; @@ -34,6 +35,9 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; @@ -47,11 +51,19 @@ @InternalApi public final class ChannelFinder { private static final Predicate NO_EXCLUDED_ENDPOINTS = address -> false; + private static final ExecutorService CACHE_UPDATE_POOL = + Executors.newCachedThreadPool( + r -> { + Thread t = new Thread(r, "spanner-cache-update"); + t.setDaemon(true); + return t; + }); private final Object updateLock = new Object(); private final AtomicLong databaseId = new AtomicLong(); private final KeyRecipeCache recipeCache = new KeyRecipeCache(); private final KeyRangeCache rangeCache; + private final Executor cacheUpdateExecutor = MoreExecutors.newSequentialExecutor(CACHE_UPDATE_POOL); @Nullable private final EndpointLifecycleManager lifecycleManager; @Nullable private final String finderKey; @@ -112,6 +124,10 @@ public void update(CacheUpdate update) { } } + public void updateAsync(CacheUpdate update) { + cacheUpdateExecutor.execute(() -> update(update)); + } + public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) { return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java index 0cc593c7a86b..51661d51eb5b 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java @@ -73,6 +73,13 @@ class EndpointLifecycleManager { */ private static final int MAX_TRANSIENT_FAILURE_COUNT = 3; + private enum EvictionReason { + TRANSIENT_FAILURE, + SHUTDOWN, + IDLE, + STALE + } + /** Per-endpoint lifecycle state. */ static final class EndpointState { final String address; @@ -95,6 +102,7 @@ static final class EndpointState { private final ChannelEndpointCache endpointCache; private final Map endpoints = new ConcurrentHashMap<>(); + private final Set transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet(); /** * Active addresses reported by each ChannelFinder, keyed by database id. @@ -235,6 +243,7 @@ void updateActiveAddresses(String finderKey, Set activeAddresses) { for (Set addresses : activeAddressesPerFinder.values()) { allActive.addAll(addresses); } + transientFailureEvictedAddresses.retainAll(allActive); // Evict managed endpoints not referenced by any finder. List stale = new ArrayList<>(); @@ -276,6 +285,7 @@ void unregisterFinder(String finderKey) { for (Set addresses : activeAddressesPerFinder.values()) { allActive.addAll(addresses); } + transientFailureEvictedAddresses.retainAll(allActive); List stale = new ArrayList<>(); for (String address : endpoints.keySet()) { @@ -412,6 +422,7 @@ private void probe(String address) { case READY: state.lastReadyAt = clock.instant(); state.consecutiveTransientFailures = 0; + transientFailureEvictedAddresses.remove(address); break; case IDLE: @@ -439,13 +450,13 @@ private void probe(String address) { Level.FINE, "Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes", new Object[] {address, state.consecutiveTransientFailures}); - evictEndpoint(address); + evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE); } break; case SHUTDOWN: logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address); - evictEndpoint(address); + evictEndpoint(address, EvictionReason.SHUTDOWN); break; default: @@ -482,16 +493,26 @@ void checkIdleEviction() { } for (String address : toEvict) { - evictEndpoint(address); + evictEndpoint(address, EvictionReason.IDLE); } } /** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */ private void evictEndpoint(String address) { + evictEndpoint(address, EvictionReason.STALE); + } + + /** Evicts an endpoint and records whether it should still be reported as unhealthy. */ + private void evictEndpoint(String address, EvictionReason reason) { logger.log(Level.FINE, "Evicting endpoint {0}", address); stopProbing(address); endpoints.remove(address); + if (reason == EvictionReason.TRANSIENT_FAILURE) { + transientFailureEvictedAddresses.add(address); + } else { + transientFailureEvictedAddresses.remove(address); + } endpointCache.evict(address); } @@ -526,6 +547,10 @@ boolean isManaged(String address) { return endpoints.containsKey(address); } + boolean wasRecentlyEvictedTransientFailure(String address) { + return transientFailureEvictedAddresses.contains(address); + } + /** Returns the endpoint state for testing. */ @VisibleForTesting EndpointState getEndpointState(String address) { @@ -558,6 +583,7 @@ void shutdown() { } } endpoints.clear(); + transientFailureEvictedAddresses.clear(); scheduler.shutdown(); try { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index b6c22ad4e3f8..f2cf38d87323 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -798,25 +798,25 @@ public void onMessage(ResponseT message) { if (message instanceof PartialResultSet) { PartialResultSet response = (PartialResultSet) message; if (response.hasCacheUpdate() && call.channelFinder != null) { - call.channelFinder.update(response.getCacheUpdate()); + call.channelFinder.updateAsync(response.getCacheUpdate()); } transactionId = transactionIdFromMetadata(response); } else if (message instanceof ResultSet) { ResultSet response = (ResultSet) message; if (response.hasCacheUpdate() && call.channelFinder != null) { - call.channelFinder.update(response.getCacheUpdate()); + call.channelFinder.updateAsync(response.getCacheUpdate()); } transactionId = transactionIdFromMetadata(response); } else if (message instanceof Transaction) { Transaction response = (Transaction) message; if (response.hasCacheUpdate() && call.channelFinder != null) { - call.channelFinder.update(response.getCacheUpdate()); + call.channelFinder.updateAsync(response.getCacheUpdate()); } transactionId = transactionIdFromTransaction(response); } else if (message instanceof CommitResponse) { CommitResponse response = (CommitResponse) message; if (response.hasCacheUpdate() && call.channelFinder != null) { - call.channelFinder.update(response.getCacheUpdate()); + call.channelFinder.updateAsync(response.getCacheUpdate()); } } if (transactionId != null) { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java index 3a4e49d7148c..e88037697d0e 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java @@ -39,6 +39,8 @@ import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; @@ -67,7 +69,9 @@ public enum RangeMode { private final NavigableMap ranges = new TreeMap<>(ByteString.unsignedLexicographicalComparator()); private final Map groups = new HashMap<>(); - private final Object lock = new Object(); + private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); + private final Lock readLock = cacheLock.readLock(); + private final Lock writeLock = cacheLock.writeLock(); private final AtomicLong accessCounter = new AtomicLong(); private volatile boolean deterministicRandom = false; @@ -96,17 +100,30 @@ void setMinCacheEntriesForRandomPick(int value) { /** Applies cache updates. Tablets are processed inside group updates. */ public void addRanges(CacheUpdate cacheUpdate) { - List newGroups = new ArrayList<>(); - synchronized (lock) { + List touchedGroups = new ArrayList<>(); + writeLock.lock(); + try { for (Group groupIn : cacheUpdate.getGroupList()) { - newGroups.add(findOrInsertGroup(groupIn)); + touchedGroups.add(findOrInsertGroup(groupIn.getGroupUid())); } + } finally { + writeLock.unlock(); + } + + for (int i = 0; i < cacheUpdate.getGroupCount(); i++) { + touchedGroups.get(i).update(cacheUpdate.getGroup(i)); + } + + writeLock.lock(); + try { for (Range rangeIn : cacheUpdate.getRangeList()) { replaceRangeIfNewer(rangeIn); } - for (CachedGroup group : newGroups) { + for (CachedGroup group : touchedGroups) { unref(group); } + } finally { + writeLock.unlock(); } } @@ -134,8 +151,11 @@ public ChannelEndpoint fillRoutingHint( } CachedRange targetRange; - synchronized (lock) { + readLock.lock(); + try { targetRange = findRangeLocked(key, hintBuilder.getLimitKey(), rangeMode); + } finally { + readLock.unlock(); } if (targetRange == null || targetRange.group == null) { @@ -154,7 +174,8 @@ public ChannelEndpoint fillRoutingHint( /** Returns all server addresses currently referenced by cached tablets. */ Set getActiveAddresses() { Set addresses = new HashSet<>(); - synchronized (lock) { + readLock.lock(); + try { for (CachedGroup group : groups.values()) { synchronized (group) { for (CachedTablet tablet : group.tablets) { @@ -164,28 +185,37 @@ Set getActiveAddresses() { } } } + } finally { + readLock.unlock(); } return addresses; } public void clear() { - synchronized (lock) { + writeLock.lock(); + try { for (CachedRange range : ranges.values()) { unref(range.group); } ranges.clear(); groups.clear(); + } finally { + writeLock.unlock(); } } public int size() { - synchronized (lock) { + readLock.lock(); + try { return ranges.size(); + } finally { + readLock.unlock(); } } public void shrinkTo(int newSize) { - synchronized (lock) { + writeLock.lock(); + try { if (newSize <= 0) { clear(); return; @@ -211,12 +241,15 @@ public void shrinkTo(int newSize) { ranges.remove(range.limitKey); unref(range.group); } + } finally { + writeLock.unlock(); } } public String debugString() { StringBuilder sb = new StringBuilder(); - synchronized (lock) { + readLock.lock(); + try { for (Map.Entry entry : ranges.entrySet()) { CachedRange cachedRange = entry.getValue(); sb.append("Range[") @@ -230,6 +263,8 @@ public String debugString() { for (CachedGroup g : groups.values()) { sb.append(g.debugString()).append("\n"); } + } finally { + readLock.unlock(); } return sb.toString(); } @@ -413,15 +448,14 @@ private CachedGroup findAndRefGroup(long groupUid) { return group; } - private CachedGroup findOrInsertGroup(Group groupIn) { - CachedGroup group = groups.get(groupIn.getGroupUid()); + private CachedGroup findOrInsertGroup(long groupUid) { + CachedGroup group = groups.get(groupUid); if (group == null) { - group = new CachedGroup(groupIn.getGroupUid()); - groups.put(groupIn.getGroupUid(), group); + group = new CachedGroup(groupUid); + groups.put(groupUid, group); } else { group.refs++; } - group.update(groupIn); return group; } @@ -523,14 +557,18 @@ private boolean matches(DirectedReadOptions.ReplicaSelection selection) { * skipped_tablets. *
  • Endpoint exists and READY: usable, do not skip. *
  • Endpoint exists and TRANSIENT_FAILURE: skip and report in skipped_tablets. - *
  • Endpoint absent, IDLE, CONNECTING, SHUTDOWN, or unsupported: skip silently (no - * skipped_tablets). + *
  • Endpoint absent, IDLE, CONNECTING, SHUTDOWN, or unsupported: skip silently unless the + * lifecycle manager recently evicted the address for repeated TRANSIENT_FAILURE, in + * which case report it in skipped_tablets. * */ - boolean shouldSkip(RoutingHint.Builder hintBuilder, Predicate excludedEndpoints) { + boolean shouldSkip( + RoutingHint.Builder hintBuilder, + Predicate excludedEndpoints, + Set skippedTabletUids) { // Server-marked skip, no address, or excluded endpoint: always report. if (skip || serverAddress.isEmpty() || excludedEndpoints.test(serverAddress)) { - addSkippedTablet(hintBuilder); + addSkippedTablet(hintBuilder, skippedTabletUids); return true; } @@ -557,6 +595,7 @@ boolean shouldSkip(RoutingHint.Builder hintBuilder, Predicate excludedEn Level.FINE, "Tablet {0} at {1}: no endpoint present, skipping silently", new Object[] {tabletUid, serverAddress}); + maybeAddRecentTransientFailureSkip(hintBuilder, skippedTabletUids); if (lifecycleManager != null) { lifecycleManager.requestEndpointRecreation(serverAddress); } @@ -574,7 +613,7 @@ boolean shouldSkip(RoutingHint.Builder hintBuilder, Predicate excludedEn Level.FINE, "Tablet {0} at {1}: endpoint in TRANSIENT_FAILURE, adding to skipped_tablets", new Object[] {tabletUid, serverAddress}); - addSkippedTablet(hintBuilder); + addSkippedTablet(hintBuilder, skippedTabletUids); return true; } @@ -583,15 +622,52 @@ boolean shouldSkip(RoutingHint.Builder hintBuilder, Predicate excludedEn Level.FINE, "Tablet {0} at {1}: endpoint not ready, skipping silently", new Object[] {tabletUid, serverAddress}); + maybeAddRecentTransientFailureSkip(hintBuilder, skippedTabletUids); return true; } - private void addSkippedTablet(RoutingHint.Builder hintBuilder) { + private void addSkippedTablet( + RoutingHint.Builder hintBuilder, Set skippedTabletUids) { + if (!skippedTabletUids.add(tabletUid)) { + return; + } RoutingHint.SkippedTablet.Builder skipped = hintBuilder.addSkippedTabletUidBuilder(); skipped.setTabletUid(tabletUid); skipped.setIncarnation(incarnation); } + private void recordKnownTransientFailure( + RoutingHint.Builder hintBuilder, + Predicate excludedEndpoints, + Set skippedTabletUids) { + if (skip || serverAddress.isEmpty() || excludedEndpoints.test(serverAddress)) { + return; + } + + if (endpoint != null && endpoint.getChannel().isShutdown()) { + endpoint = null; + } + + if (endpoint == null) { + endpoint = endpointCache.getIfPresent(serverAddress); + } + + if (endpoint != null && endpoint.isTransientFailure()) { + addSkippedTablet(hintBuilder, skippedTabletUids); + return; + } + + maybeAddRecentTransientFailureSkip(hintBuilder, skippedTabletUids); + } + + private void maybeAddRecentTransientFailureSkip( + RoutingHint.Builder hintBuilder, Set skippedTabletUids) { + if (lifecycleManager != null + && lifecycleManager.wasRecentlyEvictedTransientFailure(serverAddress)) { + addSkippedTablet(hintBuilder, skippedTabletUids); + } + } + ChannelEndpoint pick(RoutingHint.Builder hintBuilder) { hintBuilder.setTabletUid(tabletUid); // Endpoint must already exist and be READY if shouldSkip returned false. @@ -675,6 +751,7 @@ ChannelEndpoint fillRoutingHint( DirectedReadOptions directedReadOptions, RoutingHint.Builder hintBuilder, Predicate excludedEndpoints) { + Set skippedTabletUids = skippedTabletUids(hintBuilder); boolean hasDirectedReadOptions = directedReadOptions.getReplicasCase() != DirectedReadOptions.ReplicasCase.REPLICAS_NOT_SET; @@ -690,10 +767,13 @@ ChannelEndpoint fillRoutingHint( hasDirectedReadOptions, hintBuilder, directedReadOptions, - excludedEndpoints); + excludedEndpoints, + skippedTabletUids); if (selected == null) { return null; } + recordKnownTransientFailuresLocked( + selected, directedReadOptions, hintBuilder, excludedEndpoints, skippedTabletUids); return selected.pick(hintBuilder); } } @@ -703,14 +783,15 @@ private CachedTablet selectTabletLocked( boolean hasDirectedReadOptions, RoutingHint.Builder hintBuilder, DirectedReadOptions directedReadOptions, - Predicate excludedEndpoints) { + Predicate excludedEndpoints, + Set skippedTabletUids) { boolean checkedLeader = false; if (preferLeader && !hasDirectedReadOptions && hasLeader() && leader().distance <= MAX_LOCAL_REPLICA_DISTANCE) { checkedLeader = true; - if (!leader().shouldSkip(hintBuilder, excludedEndpoints)) { + if (!leader().shouldSkip(hintBuilder, excludedEndpoints, skippedTabletUids)) { return leader(); } } @@ -722,7 +803,7 @@ && leader().distance <= MAX_LOCAL_REPLICA_DISTANCE) { if (!tablet.matches(directedReadOptions)) { continue; } - if (tablet.shouldSkip(hintBuilder, excludedEndpoints)) { + if (tablet.shouldSkip(hintBuilder, excludedEndpoints, skippedTabletUids)) { continue; } return tablet; @@ -730,6 +811,28 @@ && leader().distance <= MAX_LOCAL_REPLICA_DISTANCE) { return null; } + private void recordKnownTransientFailuresLocked( + CachedTablet selected, + DirectedReadOptions directedReadOptions, + RoutingHint.Builder hintBuilder, + Predicate excludedEndpoints, + Set skippedTabletUids) { + for (CachedTablet tablet : tablets) { + if (tablet == selected || !tablet.matches(directedReadOptions)) { + continue; + } + tablet.recordKnownTransientFailure(hintBuilder, excludedEndpoints, skippedTabletUids); + } + } + + private Set skippedTabletUids(RoutingHint.Builder hintBuilder) { + Set skippedTabletUids = new HashSet<>(); + for (RoutingHint.SkippedTablet skippedTablet : hintBuilder.getSkippedTabletUidList()) { + skippedTabletUids.add(skippedTablet.getTabletUid()); + } + return skippedTabletUids; + } + boolean hasLeader() { return leaderIndex >= 0 && leaderIndex < tablets.size(); } @@ -763,7 +866,7 @@ private static class CachedRange { final CachedGroup group; final long splitId; final ByteString generation; - long lastAccess; + volatile long lastAccess; CachedRange( ByteString startKey, diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java index 6c45114435fb..9f360aa30a61 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java @@ -240,6 +240,56 @@ public void evictedEndpointIsRecreatedOnDemand() throws Exception { assertNotNull(cache.getIfPresent("server1")); } + @Test + public void transientFailureEvictionTrackedUntilEndpointReadyAgain() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + registerAddresses(manager, "server1"); + awaitCondition( + "endpoint should be created in background", () -> cache.getIfPresent("server1") != null); + + cache.setState("server1", KeyRangeCacheTest.EndpointHealthState.TRANSIENT_FAILURE); + awaitCondition( + "endpoint should be evicted after repeated transient failures", + () -> + !manager.isManaged("server1") + && manager.wasRecentlyEvictedTransientFailure("server1") + && cache.getIfPresent("server1") == null); + + manager.requestEndpointRecreation("server1"); + awaitCondition( + "endpoint should be recreated in background", () -> cache.getIfPresent("server1") != null); + awaitCondition( + "recent transient failure marker should clear once endpoint is READY again", + () -> !manager.wasRecentlyEvictedTransientFailure("server1")); + } + + @Test + public void transientFailureEvictionMarkerRemovedWhenAddressNoLongerActive() throws Exception { + KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); + manager = + new EndpointLifecycleManager( + cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC()); + + String finder = registerAddresses(manager, "server1"); + awaitCondition( + "endpoint should be created in background", () -> cache.getIfPresent("server1") != null); + + cache.setState("server1", KeyRangeCacheTest.EndpointHealthState.TRANSIENT_FAILURE); + awaitCondition( + "endpoint should be evicted after repeated transient failures", + () -> + !manager.isManaged("server1") + && manager.wasRecentlyEvictedTransientFailure("server1")); + + manager.updateActiveAddresses(finder, Collections.emptySet()); + + assertFalse(manager.wasRecentlyEvictedTransientFailure("server1")); + } + @Test public void shutdownStopsAllProbing() throws Exception { KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache(); diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java index c3b5a0ca16db..3797240162da 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java @@ -372,6 +372,33 @@ public void transientFailureEndpointCausesSkippedTabletPlusDefaultHostFallback() assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); } + @Test + public void recentlyEvictedTransientFailureEndpointStillAddsSkippedTablet() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + RecentTransientFailureLifecycleManager lifecycleManager = + new RecentTransientFailureLifecycleManager(endpointCache); + try { + KeyRangeCache cache = new KeyRangeCache(endpointCache, lifecycleManager); + cache.addRanges(singleReplicaUpdate("server1")); + lifecycleManager.markRecentlyEvictedTransientFailure("server1"); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNull(server); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); + assertTrue(lifecycleManager.recreationRequested.contains("server1")); + } finally { + lifecycleManager.shutdown(); + } + } + @Test public void oneUnusableReplicaAndOneReadyReplicaUsesReady() { FakeEndpointCache endpointCache = new FakeEndpointCache(); @@ -449,6 +476,65 @@ public void transientFailureReplicaSkippedAndReadyReplicaSelected() { assertEquals(1L, hint.getSkippedTabletUid(0).getTabletUid()); } + @Test + public void laterTransientFailureReplicaReportedWhenEarlierReplicaSelected() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + KeyRangeCache cache = new KeyRangeCache(endpointCache); + cache.addRanges(threeReplicaUpdate()); + + endpointCache.get("server1"); + endpointCache.get("server2"); + endpointCache.get("server3"); + + endpointCache.setState("server1", EndpointHealthState.READY); + endpointCache.setState("server2", EndpointHealthState.READY); + endpointCache.setState("server3", EndpointHealthState.TRANSIENT_FAILURE); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(3L, hint.getSkippedTabletUid(0).getTabletUid()); + } + + @Test + public void laterRecentlyEvictedTransientFailureReplicaReportedWhenEarlierReplicaSelected() { + FakeEndpointCache endpointCache = new FakeEndpointCache(); + RecentTransientFailureLifecycleManager lifecycleManager = + new RecentTransientFailureLifecycleManager(endpointCache); + try { + KeyRangeCache cache = new KeyRangeCache(endpointCache, lifecycleManager); + cache.addRanges(threeReplicaUpdate()); + + endpointCache.get("server1"); + endpointCache.get("server2"); + lifecycleManager.markRecentlyEvictedTransientFailure("server3"); + + RoutingHint.Builder hint = RoutingHint.newBuilder().setKey(bytes("a")); + ChannelEndpoint server = + cache.fillRoutingHint( + false, + KeyRangeCache.RangeMode.COVERING_SPLIT, + DirectedReadOptions.getDefaultInstance(), + hint); + + assertNotNull(server); + assertEquals("server1", server.getAddress()); + assertEquals(1, hint.getSkippedTabletUidCount()); + assertEquals(3L, hint.getSkippedTabletUid(0).getTabletUid()); + assertTrue(lifecycleManager.recreationRequested.contains("server3")); + } finally { + lifecycleManager.shutdown(); + } + } + // --- Eviction and recreation tests --- @Test @@ -529,7 +615,7 @@ public void missingEndpointTriggersRecreationViaLifecycleManager() { } /** Minimal lifecycle manager stub that records recreation requests. */ - private static final class TrackingLifecycleManager extends EndpointLifecycleManager { + private static class TrackingLifecycleManager extends EndpointLifecycleManager { final java.util.Set recreationRequested = new java.util.HashSet<>(); TrackingLifecycleManager(ChannelEndpointCache cache) { @@ -542,6 +628,24 @@ void requestEndpointRecreation(String address) { } } + private static final class RecentTransientFailureLifecycleManager + extends TrackingLifecycleManager { + final java.util.Set recentlyEvictedTransientFailures = new java.util.HashSet<>(); + + RecentTransientFailureLifecycleManager(ChannelEndpointCache cache) { + super(cache); + } + + void markRecentlyEvictedTransientFailure(String address) { + recentlyEvictedTransientFailures.add(address); + } + + @Override + boolean wasRecentlyEvictedTransientFailure(String address) { + return recentlyEvictedTransientFailures.contains(address); + } + } + // --- Helper methods --- private static CacheUpdate singleReplicaUpdate(String serverAddress) { @@ -596,6 +700,41 @@ private static CacheUpdate twoReplicaUpdate() { .build(); } + private static CacheUpdate threeReplicaUpdate() { + return CacheUpdate.newBuilder() + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("z")) + .setGroupUid(5) + .setSplitId(1) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(5) + .setGeneration(bytes("1")) + .setLeaderIndex(0) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1) + .setServerAddress("server1") + .setIncarnation(bytes("1")) + .setDistance(0)) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2) + .setServerAddress("server2") + .setIncarnation(bytes("1")) + .setDistance(0)) + .addTablets( + Tablet.newBuilder() + .setTabletUid(3) + .setServerAddress("server3") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + private static void checkContents(KeyRangeCache cache, int expectedSize, int mustBeInCache) { assertEquals(expectedSize, cache.size()); int hitCount = 0; From eea8b8509a5a1164486b1d2726d5dab1349e5f50 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 00:07:59 +0530 Subject: [PATCH 2/3] fix lint --- .../com/google/cloud/spanner/spi/v1/ChannelFinder.java | 3 ++- .../com/google/cloud/spanner/spi/v1/KeyRangeCache.java | 7 +++---- .../cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java | 3 +-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 9a951859bb61..a9a18835072c 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -63,7 +63,8 @@ public final class ChannelFinder { private final AtomicLong databaseId = new AtomicLong(); private final KeyRecipeCache recipeCache = new KeyRecipeCache(); private final KeyRangeCache rangeCache; - private final Executor cacheUpdateExecutor = MoreExecutors.newSequentialExecutor(CACHE_UPDATE_POOL); + private final Executor cacheUpdateExecutor = + MoreExecutors.newSequentialExecutor(CACHE_UPDATE_POOL); @Nullable private final EndpointLifecycleManager lifecycleManager; @Nullable private final String finderKey; diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java index e88037697d0e..c7bafc72893f 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java @@ -558,8 +558,8 @@ private boolean matches(DirectedReadOptions.ReplicaSelection selection) { *
  • Endpoint exists and READY: usable, do not skip. *
  • Endpoint exists and TRANSIENT_FAILURE: skip and report in skipped_tablets. *
  • Endpoint absent, IDLE, CONNECTING, SHUTDOWN, or unsupported: skip silently unless the - * lifecycle manager recently evicted the address for repeated TRANSIENT_FAILURE, in - * which case report it in skipped_tablets. + * lifecycle manager recently evicted the address for repeated TRANSIENT_FAILURE, in which + * case report it in skipped_tablets. * */ boolean shouldSkip( @@ -626,8 +626,7 @@ boolean shouldSkip( return true; } - private void addSkippedTablet( - RoutingHint.Builder hintBuilder, Set skippedTabletUids) { + private void addSkippedTablet(RoutingHint.Builder hintBuilder, Set skippedTabletUids) { if (!skippedTabletUids.add(tabletUid)) { return; } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java index 9f360aa30a61..552cfd9bd2c8 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java @@ -282,8 +282,7 @@ public void transientFailureEvictionMarkerRemovedWhenAddressNoLongerActive() thr awaitCondition( "endpoint should be evicted after repeated transient failures", () -> - !manager.isManaged("server1") - && manager.wasRecentlyEvictedTransientFailure("server1")); + !manager.isManaged("server1") && manager.wasRecentlyEvictedTransientFailure("server1")); manager.updateActiveAddresses(finder, Collections.emptySet()); From da7a1ba42afdc686b14f9c12ffee9998cef42025 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 00:34:57 +0530 Subject: [PATCH 3/3] fix tests --- .../cloud/spanner/spi/v1/ChannelFinder.java | 8 +++++ .../cloud/spanner/spi/v1/KeyAwareChannel.java | 10 ++++++ .../spanner/spi/v1/KeyAwareChannelTest.java | 10 ++++++ .../spanner/spi/v1/KeyRangeCacheTest.java | 32 +++++++++++++++++-- 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index a9a18835072c..eabc09a92565 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner.spi.v1; import com.google.api.core.InternalApi; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CacheUpdate; @@ -129,6 +130,13 @@ public void updateAsync(CacheUpdate update) { cacheUpdateExecutor.execute(() -> update(update)); } + @VisibleForTesting + void awaitPendingUpdates() throws InterruptedException { + java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); + cacheUpdateExecutor.execute(latch::countDown); + latch.await(5, java.util.concurrent.TimeUnit.SECONDS); + } + public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) { return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index f2cf38d87323..e8a10d5675bb 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -184,6 +184,16 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) { return finder; } + @com.google.common.annotations.VisibleForTesting + void awaitPendingCacheUpdates() throws InterruptedException { + for (ChannelFinderReference ref : channelFinders.values()) { + ChannelFinder finder = ref.get(); + if (finder != null) { + finder.awaitPendingUpdates(); + } + } + } + /** Records real traffic to the selected endpoint for idle eviction tracking. */ private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) { if (lifecycleManager == null) { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index fdaeab3914bf..1ad3888b4f9d 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -275,6 +275,7 @@ public void resultSetCacheUpdateRoutesSubsequentRequest() throws Exception { .build(); firstDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build()); + harness.channel.awaitPendingCacheUpdates(); ClientCall secondCall = harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT); @@ -329,6 +330,7 @@ public void transactionCacheUpdateEnablesCommitRoutingHint() throws Exception { .setCacheUpdate(createMutationRoutingCacheUpdate()) .build()); beginDelegate.emitOnClose(Status.OK, new Metadata()); + harness.channel.awaitPendingCacheUpdates(); ClientCall commitCall = harness.channel.newCall(SpannerGrpc.getCommitMethod(), CallOptions.DEFAULT); @@ -710,6 +712,7 @@ public void commitResponseCacheUpdateEnablesSubsequentBeginRoutingHint() throws commitDelegate.emitOnMessage( CommitResponse.newBuilder().setCacheUpdate(createMutationRoutingCacheUpdate()).build()); commitDelegate.emitOnClose(Status.OK, new Metadata()); + harness.channel.awaitPendingCacheUpdates(); Mutation mutation = createInsertMutation("b"); ClientCall secondBeginCall = @@ -815,6 +818,7 @@ public void readOnlyTransactionRoutesEachReadIndependently() throws Exception { (RecordingClientCall) harness.defaultManagedChannel.latestCall(); seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build()); + harness.channel.awaitPendingCacheUpdates(); // 3. Send a streaming read with key in range [a, m) → should go to server-a. ClientCall readCallA = @@ -1188,6 +1192,12 @@ private static void seedCache(TestHarness harness, CacheUpdate cacheUpdate) { (RecordingClientCall) harness.defaultManagedChannel.latestCall(); seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build()); + try { + harness.channel.awaitPendingCacheUpdates(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } } private static Mutation createInsertMutation(String keyValue) { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java index 3797240162da..b19123daa704 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyRangeCacheTest.java @@ -30,6 +30,7 @@ import com.google.spanner.v1.Tablet; import io.grpc.CallOptions; import io.grpc.ClientCall; +import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import java.util.HashMap; @@ -529,7 +530,6 @@ public void laterRecentlyEvictedTransientFailureReplicaReportedWhenEarlierReplic assertEquals("server1", server.getAddress()); assertEquals(1, hint.getSkippedTabletUidCount()); assertEquals(3L, hint.getSkippedTabletUid(0).getTabletUid()); - assertTrue(lifecycleManager.recreationRequested.contains("server3")); } finally { lifecycleManager.shutdown(); } @@ -825,7 +825,7 @@ void setHealthy(String address, boolean healthy) { static final class FakeEndpoint implements ChannelEndpoint { private final String address; - private final ManagedChannel channel = new FakeManagedChannel(); + private final FakeManagedChannel channel = new FakeManagedChannel(); private EndpointHealthState state = EndpointHealthState.READY; FakeEndpoint(String address) { @@ -854,11 +854,39 @@ public ManagedChannel getChannel() { void setState(EndpointHealthState state) { this.state = state; + channel.setConnectivityState(toConnectivityState(state)); + } + + private static ConnectivityState toConnectivityState(EndpointHealthState healthState) { + switch (healthState) { + case READY: + return ConnectivityState.READY; + case TRANSIENT_FAILURE: + return ConnectivityState.TRANSIENT_FAILURE; + case IDLE: + return ConnectivityState.IDLE; + case CONNECTING: + return ConnectivityState.CONNECTING; + case UNSUPPORTED: + return ConnectivityState.IDLE; + default: + return ConnectivityState.IDLE; + } } } private static final class FakeManagedChannel extends ManagedChannel { private boolean shutdown = false; + private volatile ConnectivityState connectivityState = ConnectivityState.READY; + + void setConnectivityState(ConnectivityState state) { + this.connectivityState = state; + } + + @Override + public ConnectivityState getState(boolean requestConnection) { + return shutdown ? ConnectivityState.SHUTDOWN : connectivityState; + } @Override public ManagedChannel shutdown() {