Skip to content

Commit be795b5

Browse files
authored
chore(spanner): optimize lock contention and skipped tablet reporting (#12719)
## Summary - **Replace `synchronized` with `ReadWriteLock` in `KeyRangeCache`** to reduce lock contention on the hot path. Read operations (`findServer`, `getActiveAddresses`, `size`, `debugString`) now take a read lock, allowing concurrent lookups. Group updates are performed outside the write lock to minimize critical section duration. - **Move cache updates to an async executor** in `ChannelFinder` via `updateAsync()`, using a sequential executor backed by a shared daemon thread pool. This prevents `CacheUpdate` processing from blocking the gRPC response listener thread. - **Report skipped tablets for recently evicted TRANSIENT_FAILURE endpoints.** Previously, when an endpoint was evicted after repeated TRANSIENT_FAILURE probes and not yet recreated, its tablets were silently skipped — the server never learned the client considered them unhealthy. Now `EndpointLifecycleManager` tracks addresses evicted for TRANSIENT_FAILURE and `KeyRangeCache` includes their tablet UIDs in `skipped_tablets`, giving the server better routing visibility. - **Deduplicate skipped tablet UIDs** to avoid reporting the same tablet multiple times when it appears across replicas. - **Report known TRANSIENT_FAILURE replicas** even when an earlier healthy replica was already selected, so the server gets a complete picture of unhealthy tablets for the group. - Mark `CachedRange.lastAccess` as `volatile` for safe cross-thread reads.
1 parent 57baaea commit be795b5

File tree

7 files changed

+503
-42
lines changed

7 files changed

+503
-42
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.google.cloud.spanner.spi.v1;
1818

1919
import com.google.api.core.InternalApi;
20+
import com.google.common.annotations.VisibleForTesting;
21+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2022
import com.google.spanner.v1.BeginTransactionRequest;
2123
import com.google.spanner.v1.CacheUpdate;
2224
import com.google.spanner.v1.CommitRequest;
@@ -34,8 +36,13 @@
3436
import java.util.List;
3537
import java.util.Objects;
3638
import java.util.Set;
39+
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.LinkedBlockingQueue;
3741
import java.util.concurrent.ThreadLocalRandom;
42+
import java.util.concurrent.ThreadPoolExecutor;
43+
import java.util.concurrent.TimeUnit;
3844
import java.util.concurrent.atomic.AtomicLong;
45+
import java.util.concurrent.atomic.AtomicReference;
3946
import java.util.function.Predicate;
4047
import javax.annotation.Nullable;
4148

@@ -47,11 +54,16 @@
4754
@InternalApi
4855
public final class ChannelFinder {
4956
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
57+
private static final int MAX_CACHE_UPDATE_THREADS =
58+
Math.max(2, Runtime.getRuntime().availableProcessors());
59+
private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool();
5060

5161
private final Object updateLock = new Object();
5262
private final AtomicLong databaseId = new AtomicLong();
5363
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
5464
private final KeyRangeCache rangeCache;
65+
private final AtomicReference<CacheUpdate> pendingUpdate = new AtomicReference<>();
66+
private volatile java.util.concurrent.CountDownLatch drainingLatch;
5567
@Nullable private final EndpointLifecycleManager lifecycleManager;
5668
@Nullable private final String finderKey;
5769

@@ -77,6 +89,22 @@ void useDeterministicRandom() {
7789
rangeCache.useDeterministicRandom();
7890
}
7991

92+
private static ExecutorService createCacheUpdatePool() {
93+
ThreadPoolExecutor executor =
94+
new ThreadPoolExecutor(
95+
MAX_CACHE_UPDATE_THREADS,
96+
MAX_CACHE_UPDATE_THREADS,
97+
30L,
98+
TimeUnit.SECONDS,
99+
new LinkedBlockingQueue<>(),
100+
new ThreadFactoryBuilder()
101+
.setDaemon(true)
102+
.setNameFormat("spanner-cache-update-%d")
103+
.build());
104+
executor.allowCoreThreadTimeOut(true);
105+
return executor;
106+
}
107+
80108
public void update(CacheUpdate update) {
81109
synchronized (updateLock) {
82110
long currentId = databaseId.get();
@@ -112,6 +140,49 @@ public void update(CacheUpdate update) {
112140
}
113141
}
114142

143+
public void updateAsync(CacheUpdate update) {
144+
// Replace any pending update atomically. Each CacheUpdate contains the full current state,
145+
// so intermediate updates can be safely dropped to prevent unbounded queue growth.
146+
if (pendingUpdate.getAndSet(update) == null) {
147+
// No previous pending update means no drain task is scheduled yet — submit one.
148+
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
149+
drainingLatch = latch;
150+
CACHE_UPDATE_POOL.execute(
151+
() -> {
152+
try {
153+
drainPendingUpdate();
154+
} finally {
155+
latch.countDown();
156+
}
157+
});
158+
}
159+
}
160+
161+
private void drainPendingUpdate() {
162+
CacheUpdate toApply;
163+
while ((toApply = pendingUpdate.getAndSet(null)) != null) {
164+
update(toApply);
165+
}
166+
}
167+
168+
/**
169+
* Test-only hook used by {@link KeyAwareChannel#awaitPendingCacheUpdates()} to wait until the
170+
* async cache update worker has finished applying the latest pending update.
171+
*/
172+
@VisibleForTesting
173+
void awaitPendingUpdates() throws InterruptedException {
174+
// Spin until no pending update remains.
175+
long deadline = System.nanoTime() + java.util.concurrent.TimeUnit.SECONDS.toNanos(5);
176+
while (pendingUpdate.get() != null && System.nanoTime() < deadline) {
177+
Thread.sleep(1);
178+
}
179+
// Wait for the drain task to fully complete (including the update() call).
180+
java.util.concurrent.CountDownLatch latch = drainingLatch;
181+
if (latch != null) {
182+
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
183+
}
184+
}
185+
115186
public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
116187
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
117188
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ class EndpointLifecycleManager {
7373
*/
7474
private static final int MAX_TRANSIENT_FAILURE_COUNT = 3;
7575

76+
private enum EvictionReason {
77+
TRANSIENT_FAILURE,
78+
SHUTDOWN,
79+
IDLE,
80+
STALE
81+
}
82+
7683
/** Per-endpoint lifecycle state. */
7784
static final class EndpointState {
7885
final String address;
@@ -95,6 +102,7 @@ static final class EndpointState {
95102

96103
private final ChannelEndpointCache endpointCache;
97104
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
105+
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();
98106

99107
/**
100108
* Active addresses reported by each ChannelFinder, keyed by database id.
@@ -103,8 +111,8 @@ static final class EndpointState {
103111
* stable database-id key instead of a strong ChannelFinder reference. KeyAwareChannel unregisters
104112
* stale entries when a finder is cleared.
105113
*
106-
* <p>All reads and writes to this map, and stale-endpoint eviction based on it, are synchronized
107-
* on {@link #activeAddressLock}.
114+
* <p>All reads and writes to this map, and all updates to {@link
115+
* #transientFailureEvictedAddresses}, are synchronized on {@link #activeAddressLock}.
108116
*/
109117
private final Map<String, Set<String>> activeAddressesPerFinder = new ConcurrentHashMap<>();
110118

@@ -187,6 +195,24 @@ private boolean ensureEndpointExists(String address) {
187195
return created[0];
188196
}
189197

198+
private void retainTransientFailureEvictionMarkers(Set<String> activeAddresses) {
199+
synchronized (activeAddressLock) {
200+
transientFailureEvictedAddresses.retainAll(activeAddresses);
201+
}
202+
}
203+
204+
private void markTransientFailureEvicted(String address) {
205+
synchronized (activeAddressLock) {
206+
transientFailureEvictedAddresses.add(address);
207+
}
208+
}
209+
210+
private void clearTransientFailureEvictionMarker(String address) {
211+
synchronized (activeAddressLock) {
212+
transientFailureEvictedAddresses.remove(address);
213+
}
214+
}
215+
190216
/**
191217
* Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle
192218
* eviction timer for this endpoint.
@@ -235,6 +261,7 @@ void updateActiveAddresses(String finderKey, Set<String> activeAddresses) {
235261
for (Set<String> addresses : activeAddressesPerFinder.values()) {
236262
allActive.addAll(addresses);
237263
}
264+
retainTransientFailureEvictionMarkers(allActive);
238265

239266
// Evict managed endpoints not referenced by any finder.
240267
List<String> stale = new ArrayList<>();
@@ -276,6 +303,7 @@ void unregisterFinder(String finderKey) {
276303
for (Set<String> addresses : activeAddressesPerFinder.values()) {
277304
allActive.addAll(addresses);
278305
}
306+
retainTransientFailureEvictionMarkers(allActive);
279307

280308
List<String> stale = new ArrayList<>();
281309
for (String address : endpoints.keySet()) {
@@ -412,6 +440,7 @@ private void probe(String address) {
412440
case READY:
413441
state.lastReadyAt = clock.instant();
414442
state.consecutiveTransientFailures = 0;
443+
clearTransientFailureEvictionMarker(address);
415444
break;
416445

417446
case IDLE:
@@ -439,13 +468,13 @@ private void probe(String address) {
439468
Level.FINE,
440469
"Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes",
441470
new Object[] {address, state.consecutiveTransientFailures});
442-
evictEndpoint(address);
471+
evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE);
443472
}
444473
break;
445474

446475
case SHUTDOWN:
447476
logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address);
448-
evictEndpoint(address);
477+
evictEndpoint(address, EvictionReason.SHUTDOWN);
449478
break;
450479

451480
default:
@@ -482,16 +511,26 @@ void checkIdleEviction() {
482511
}
483512

484513
for (String address : toEvict) {
485-
evictEndpoint(address);
514+
evictEndpoint(address, EvictionReason.IDLE);
486515
}
487516
}
488517

489518
/** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */
490519
private void evictEndpoint(String address) {
520+
evictEndpoint(address, EvictionReason.STALE);
521+
}
522+
523+
/** Evicts an endpoint and records whether it should still be reported as unhealthy. */
524+
private void evictEndpoint(String address, EvictionReason reason) {
491525
logger.log(Level.FINE, "Evicting endpoint {0}", address);
492526

493527
stopProbing(address);
494528
endpoints.remove(address);
529+
if (reason == EvictionReason.TRANSIENT_FAILURE) {
530+
markTransientFailureEvicted(address);
531+
} else {
532+
clearTransientFailureEvictionMarker(address);
533+
}
495534
endpointCache.evict(address);
496535
}
497536

@@ -526,6 +565,10 @@ boolean isManaged(String address) {
526565
return endpoints.containsKey(address);
527566
}
528567

568+
boolean wasRecentlyEvictedTransientFailure(String address) {
569+
return transientFailureEvictedAddresses.contains(address);
570+
}
571+
529572
/** Returns the endpoint state for testing. */
530573
@VisibleForTesting
531574
EndpointState getEndpointState(String address) {
@@ -558,6 +601,7 @@ void shutdown() {
558601
}
559602
}
560603
endpoints.clear();
604+
transientFailureEvictedAddresses.clear();
561605

562606
scheduler.shutdown();
563607
try {

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,16 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) {
184184
return finder;
185185
}
186186

187+
@com.google.common.annotations.VisibleForTesting
188+
void awaitPendingCacheUpdates() throws InterruptedException {
189+
for (ChannelFinderReference ref : channelFinders.values()) {
190+
ChannelFinder finder = ref.get();
191+
if (finder != null) {
192+
finder.awaitPendingUpdates();
193+
}
194+
}
195+
}
196+
187197
/** Records real traffic to the selected endpoint for idle eviction tracking. */
188198
private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) {
189199
if (lifecycleManager == null) {
@@ -798,25 +808,25 @@ public void onMessage(ResponseT message) {
798808
if (message instanceof PartialResultSet) {
799809
PartialResultSet response = (PartialResultSet) message;
800810
if (response.hasCacheUpdate() && call.channelFinder != null) {
801-
call.channelFinder.update(response.getCacheUpdate());
811+
call.channelFinder.updateAsync(response.getCacheUpdate());
802812
}
803813
transactionId = transactionIdFromMetadata(response);
804814
} else if (message instanceof ResultSet) {
805815
ResultSet response = (ResultSet) message;
806816
if (response.hasCacheUpdate() && call.channelFinder != null) {
807-
call.channelFinder.update(response.getCacheUpdate());
817+
call.channelFinder.updateAsync(response.getCacheUpdate());
808818
}
809819
transactionId = transactionIdFromMetadata(response);
810820
} else if (message instanceof Transaction) {
811821
Transaction response = (Transaction) message;
812822
if (response.hasCacheUpdate() && call.channelFinder != null) {
813-
call.channelFinder.update(response.getCacheUpdate());
823+
call.channelFinder.updateAsync(response.getCacheUpdate());
814824
}
815825
transactionId = transactionIdFromTransaction(response);
816826
} else if (message instanceof CommitResponse) {
817827
CommitResponse response = (CommitResponse) message;
818828
if (response.hasCacheUpdate() && call.channelFinder != null) {
819-
call.channelFinder.update(response.getCacheUpdate());
829+
call.channelFinder.updateAsync(response.getCacheUpdate());
820830
}
821831
}
822832
if (transactionId != null) {

0 commit comments

Comments
 (0)