Skip to content

Commit b8bf432

Browse files
authored
fix(spanner): preserve all async cache updates (#12740)
### Summary This change fixes cache updates being missed after cache application moved off the request hot path. It replaces the lossy async updater with a lossless queue, skips truly empty async cache updates, moves endpoint lifecycle reconciliation off the cache-apply critical path, and adds cache-update telemetry to verify convergence and backlog behavior in production. ## Problem After moving cache application off the request hot path, cache updates could either be lost or applied too slowly. In production this showed up as: - cache update backlog growing continuously - pending cache update objects increasing memory usage - long delays in applying updates on the executor path - logs showing many cache updates with little or no change in cache size We also observed that a large number of responses carried a `cache_update` field that was present but effectively empty, which added queue pressure without changing routing state. ## Root Cause There were a few separate issues: - async cache update handling used a lossy pending-update model, so intermediate updates could be overwritten before being applied - `databaseId=0` updates could clear an already initialized finder state even when they did not carry meaningful cache contents - endpoint lifecycle reconciliation still ran in the cache update path, adding unnecessary work to cache application - many empty cache updates were still being enqueued and drained even though they were no-ops ## What This PR Changes - replace the lossy async cache updater with a lossless FIFO queue so all material cache updates are preserved - keep zero-database-id updates from clearing existing cache state - skip truly empty async cache updates while still preserving: - all non-empty updates - database-id-only transitions that can invalidate old cache state - batch queue draining to reduce scheduling and lock churn - decouple lifecycle address reconciliation from the cache update critical section ## Result Based on production logs after the fix: - empty cache updates are being skipped in large numbers - material pending cache updates are shrinking instead of growing - cache apply time remains very small - routing cache size remains stable and traffic converges away from the default endpoint quickly This indicates the main issue was queue pressure from lossy handling and empty/no-op updates, not slow cache map mutation itself.
1 parent 6b83871 commit b8bf432

File tree

3 files changed

+509
-51
lines changed

3 files changed

+509
-51
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 112 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,23 @@
2424
import com.google.spanner.v1.CommitRequest;
2525
import com.google.spanner.v1.DirectedReadOptions;
2626
import com.google.spanner.v1.ExecuteSqlRequest;
27-
import com.google.spanner.v1.Group;
2827
import com.google.spanner.v1.Mutation;
2928
import com.google.spanner.v1.ReadRequest;
3029
import com.google.spanner.v1.RoutingHint;
31-
import com.google.spanner.v1.Tablet;
3230
import com.google.spanner.v1.TransactionOptions;
3331
import com.google.spanner.v1.TransactionSelector;
3432
import java.util.ArrayList;
35-
import java.util.HashSet;
3633
import java.util.List;
3734
import java.util.Objects;
3835
import java.util.Set;
36+
import java.util.concurrent.ConcurrentLinkedQueue;
3937
import java.util.concurrent.ExecutorService;
4038
import java.util.concurrent.LinkedBlockingQueue;
4139
import java.util.concurrent.ThreadLocalRandom;
4240
import java.util.concurrent.ThreadPoolExecutor;
4341
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicBoolean;
4443
import java.util.concurrent.atomic.AtomicLong;
45-
import java.util.concurrent.atomic.AtomicReference;
4644
import java.util.function.Predicate;
4745
import javax.annotation.Nullable;
4846

@@ -54,6 +52,7 @@
5452
@InternalApi
5553
public final class ChannelFinder {
5654
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
55+
private static final int CACHE_UPDATE_DRAIN_BATCH_SIZE = 64;
5756
private static final int MAX_CACHE_UPDATE_THREADS =
5857
Math.max(2, Runtime.getRuntime().availableProcessors());
5958
private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool();
@@ -62,8 +61,11 @@ public final class ChannelFinder {
6261
private final AtomicLong databaseId = new AtomicLong();
6362
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
6463
private final KeyRangeCache rangeCache;
65-
private final AtomicReference<CacheUpdate> pendingUpdate = new AtomicReference<>();
66-
private volatile java.util.concurrent.CountDownLatch drainingLatch;
64+
private final ConcurrentLinkedQueue<PendingCacheUpdate> pendingUpdates =
65+
new ConcurrentLinkedQueue<>();
66+
private final AtomicBoolean drainScheduled = new AtomicBoolean();
67+
private volatile java.util.concurrent.CountDownLatch drainingLatch =
68+
new java.util.concurrent.CountDownLatch(0);
6769
@Nullable private final EndpointLifecycleManager lifecycleManager;
6870
@Nullable private final String finderKey;
6971

@@ -105,46 +107,43 @@ private static ExecutorService createCacheUpdatePool() {
105107
return executor;
106108
}
107109

110+
private static final class PendingCacheUpdate {
111+
private final CacheUpdate update;
112+
113+
private PendingCacheUpdate(CacheUpdate update) {
114+
this.update = update;
115+
}
116+
}
117+
118+
private boolean isMaterialUpdate(CacheUpdate update) {
119+
return update.getGroupCount() > 0
120+
|| update.getRangeCount() > 0
121+
|| (update.hasKeyRecipes() && update.getKeyRecipes().getRecipeCount() > 0);
122+
}
123+
124+
private boolean shouldProcessUpdate(CacheUpdate update) {
125+
if (isMaterialUpdate(update)) {
126+
return true;
127+
}
128+
long updateDatabaseId = update.getDatabaseId();
129+
return updateDatabaseId != 0 && databaseId.get() != updateDatabaseId;
130+
}
131+
108132
public void update(CacheUpdate update) {
133+
Set<String> currentAddresses;
109134
synchronized (updateLock) {
110-
long currentId = databaseId.get();
111-
if (currentId != update.getDatabaseId()) {
112-
if (currentId != 0) {
113-
recipeCache.clear();
114-
rangeCache.clear();
115-
}
116-
databaseId.set(update.getDatabaseId());
117-
}
118-
if (update.hasKeyRecipes()) {
119-
recipeCache.addRecipes(update.getKeyRecipes());
120-
}
121-
rangeCache.addRanges(update);
122-
123-
// Notify the lifecycle manager about server addresses so it can create endpoints
124-
// in the background and start probing, and evict stale endpoints atomically.
125-
if (lifecycleManager != null && finderKey != null) {
126-
Set<String> currentAddresses = new HashSet<>();
127-
for (Group group : update.getGroupList()) {
128-
for (Tablet tablet : group.getTabletsList()) {
129-
String addr = tablet.getServerAddress();
130-
if (!addr.isEmpty()) {
131-
currentAddresses.add(addr);
132-
}
133-
}
134-
}
135-
// Also include addresses from existing cached tablets not in this update.
136-
currentAddresses.addAll(rangeCache.getActiveAddresses());
137-
// Atomically ensure endpoints exist and evict stale ones.
138-
lifecycleManager.updateActiveAddresses(finderKey, currentAddresses);
139-
}
135+
applyUpdateLocked(update);
136+
currentAddresses = snapshotActiveAddressesLocked();
140137
}
138+
publishLifecycleUpdate(currentAddresses);
141139
}
142140

143141
public void updateAsync(CacheUpdate update) {
144-
// Replace any pending update atomically. Each CacheUpdate contains the full current state,
145-
// so intermediate updates can be safely dropped to prevent unbounded queue growth.
146-
if (pendingUpdate.getAndSet(update) == null) {
147-
// No previous pending update means no drain task is scheduled yet — submit one.
142+
if (!shouldProcessUpdate(update)) {
143+
return;
144+
}
145+
pendingUpdates.add(new PendingCacheUpdate(update));
146+
if (drainScheduled.compareAndSet(false, true)) {
148147
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
149148
drainingLatch = latch;
150149
CACHE_UPDATE_POOL.execute(
@@ -159,27 +158,89 @@ public void updateAsync(CacheUpdate update) {
159158
}
160159

161160
private void drainPendingUpdate() {
162-
CacheUpdate toApply;
163-
while ((toApply = pendingUpdate.getAndSet(null)) != null) {
164-
update(toApply);
161+
List<PendingCacheUpdate> batch = new ArrayList<>(CACHE_UPDATE_DRAIN_BATCH_SIZE);
162+
while (true) {
163+
drainBatch(batch);
164+
if (!batch.isEmpty()) {
165+
applyBatch(batch);
166+
batch.clear();
167+
}
168+
drainScheduled.set(false);
169+
if (pendingUpdates.isEmpty() || !drainScheduled.compareAndSet(false, true)) {
170+
return;
171+
}
172+
}
173+
}
174+
175+
private void drainBatch(List<PendingCacheUpdate> batch) {
176+
PendingCacheUpdate toApply;
177+
while (batch.size() < CACHE_UPDATE_DRAIN_BATCH_SIZE
178+
&& (toApply = pendingUpdates.poll()) != null) {
179+
batch.add(toApply);
165180
}
166181
}
167182

183+
private void applyBatch(List<PendingCacheUpdate> batch) {
184+
Set<String> currentAddresses;
185+
synchronized (updateLock) {
186+
for (PendingCacheUpdate pendingUpdate : batch) {
187+
applyUpdateLocked(pendingUpdate.update);
188+
}
189+
currentAddresses = snapshotActiveAddressesLocked();
190+
}
191+
publishLifecycleUpdate(currentAddresses);
192+
}
193+
194+
private void applyUpdateLocked(CacheUpdate update) {
195+
long currentId = databaseId.get();
196+
long updateDatabaseId = update.getDatabaseId();
197+
if (updateDatabaseId != 0 && currentId != updateDatabaseId) {
198+
if (currentId != 0) {
199+
recipeCache.clear();
200+
rangeCache.clear();
201+
}
202+
databaseId.set(updateDatabaseId);
203+
}
204+
if (update.hasKeyRecipes()) {
205+
recipeCache.addRecipes(update.getKeyRecipes());
206+
}
207+
rangeCache.addRanges(update);
208+
}
209+
210+
@Nullable
211+
private Set<String> snapshotActiveAddressesLocked() {
212+
if (lifecycleManager == null || finderKey == null) {
213+
return null;
214+
}
215+
return rangeCache.getActiveAddresses();
216+
}
217+
218+
private void publishLifecycleUpdate(@Nullable Set<String> currentAddresses) {
219+
if (currentAddresses == null) {
220+
return;
221+
}
222+
lifecycleManager.updateActiveAddressesAsync(finderKey, currentAddresses);
223+
}
224+
168225
/**
169226
* Test-only hook used by {@link KeyAwareChannel#awaitPendingCacheUpdates()} to wait until the
170227
* async cache update worker has finished applying the latest pending update.
171228
*/
172229
@VisibleForTesting
173230
void awaitPendingUpdates() throws InterruptedException {
174-
// Spin until no pending update remains.
175231
long deadline = System.nanoTime() + java.util.concurrent.TimeUnit.SECONDS.toNanos(5);
176-
while (pendingUpdate.get() != null && System.nanoTime() < deadline) {
177-
Thread.sleep(1);
178-
}
179-
// Wait for the drain task to fully complete (including the update() call).
180-
java.util.concurrent.CountDownLatch latch = drainingLatch;
181-
if (latch != null) {
182-
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
232+
while (System.nanoTime() < deadline) {
233+
java.util.concurrent.CountDownLatch latch = drainingLatch;
234+
if (latch != null) {
235+
long remainingNanos = deadline - System.nanoTime();
236+
if (remainingNanos <= 0) {
237+
break;
238+
}
239+
latch.await(remainingNanos, java.util.concurrent.TimeUnit.NANOSECONDS);
240+
}
241+
if (pendingUpdates.isEmpty() && !drainScheduled.get()) {
242+
return;
243+
}
183244
}
184245
}
185246

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Map;
3030
import java.util.Set;
3131
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentLinkedQueue;
33+
import java.util.concurrent.ExecutorService;
3234
import java.util.concurrent.Executors;
3335
import java.util.concurrent.ScheduledExecutorService;
3436
import java.util.concurrent.ScheduledFuture;
@@ -103,6 +105,11 @@ static final class EndpointState {
103105
private final ChannelEndpointCache endpointCache;
104106
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
105107
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();
108+
private final Map<String, Long> finderGenerations = new ConcurrentHashMap<>();
109+
private final Map<String, PendingActiveAddressUpdate> pendingActiveAddressUpdates =
110+
new ConcurrentHashMap<>();
111+
private final Set<String> queuedFinderKeys = ConcurrentHashMap.newKeySet();
112+
private final ConcurrentLinkedQueue<String> queuedFinders = new ConcurrentLinkedQueue<>();
106113

107114
/**
108115
* Active addresses reported by each ChannelFinder, keyed by database id.
@@ -118,15 +125,27 @@ static final class EndpointState {
118125

119126
private final Object activeAddressLock = new Object();
120127

128+
private final ExecutorService activeAddressUpdateExecutor;
121129
private final ScheduledExecutorService scheduler;
122130
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
131+
private final AtomicBoolean activeAddressDrainScheduled = new AtomicBoolean(false);
123132
private final long probeIntervalSeconds;
124133
private final Duration idleEvictionDuration;
125134
private final Clock clock;
126135
private final String defaultEndpointAddress;
127136

128137
private ScheduledFuture<?> evictionFuture;
129138

139+
private static final class PendingActiveAddressUpdate {
140+
private final Set<String> activeAddresses;
141+
private final long generation;
142+
143+
private PendingActiveAddressUpdate(Set<String> activeAddresses, long generation) {
144+
this.activeAddresses = activeAddresses;
145+
this.generation = generation;
146+
}
147+
}
148+
130149
EndpointLifecycleManager(ChannelEndpointCache endpointCache) {
131150
this(
132151
endpointCache,
@@ -146,6 +165,13 @@ static final class EndpointState {
146165
this.idleEvictionDuration = idleEvictionDuration;
147166
this.clock = clock;
148167
this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress();
168+
this.activeAddressUpdateExecutor =
169+
Executors.newSingleThreadExecutor(
170+
r -> {
171+
Thread t = new Thread(r, "spanner-active-address-update");
172+
t.setDaemon(true);
173+
return t;
174+
});
149175
this.scheduler =
150176
Executors.newScheduledThreadPool(
151177
2,
@@ -213,6 +239,59 @@ private void clearTransientFailureEvictionMarker(String address) {
213239
}
214240
}
215241

242+
/**
243+
* Enqueues active-address reconciliation on a dedicated worker so cache-map updates do not block
244+
* on endpoint lifecycle bookkeeping.
245+
*/
246+
void updateActiveAddressesAsync(String finderKey, Set<String> activeAddresses) {
247+
if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) {
248+
return;
249+
}
250+
synchronized (activeAddressLock) {
251+
long generation = finderGenerations.getOrDefault(finderKey, 0L);
252+
pendingActiveAddressUpdates.put(
253+
finderKey, new PendingActiveAddressUpdate(new HashSet<>(activeAddresses), generation));
254+
if (queuedFinderKeys.add(finderKey)) {
255+
queuedFinders.add(finderKey);
256+
}
257+
}
258+
scheduleActiveAddressDrain();
259+
}
260+
261+
private void scheduleActiveAddressDrain() {
262+
if (!activeAddressDrainScheduled.compareAndSet(false, true)) {
263+
return;
264+
}
265+
activeAddressUpdateExecutor.execute(this::drainPendingActiveAddressUpdates);
266+
}
267+
268+
private void drainPendingActiveAddressUpdates() {
269+
while (true) {
270+
String finderKey = queuedFinders.poll();
271+
if (finderKey == null) {
272+
activeAddressDrainScheduled.set(false);
273+
if (queuedFinders.isEmpty() || !activeAddressDrainScheduled.compareAndSet(false, true)) {
274+
return;
275+
}
276+
continue;
277+
}
278+
279+
queuedFinderKeys.remove(finderKey);
280+
PendingActiveAddressUpdate pendingUpdate = pendingActiveAddressUpdates.remove(finderKey);
281+
if (pendingUpdate == null) {
282+
continue;
283+
}
284+
285+
synchronized (activeAddressLock) {
286+
long currentGeneration = finderGenerations.getOrDefault(finderKey, 0L);
287+
if (currentGeneration != pendingUpdate.generation) {
288+
continue;
289+
}
290+
}
291+
updateActiveAddresses(finderKey, pendingUpdate.activeAddresses);
292+
}
293+
}
294+
216295
/**
217296
* Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle
218297
* eviction timer for this endpoint.
@@ -295,6 +374,9 @@ void unregisterFinder(String finderKey) {
295374
return;
296375
}
297376
synchronized (activeAddressLock) {
377+
finderGenerations.merge(finderKey, 1L, Long::sum);
378+
pendingActiveAddressUpdates.remove(finderKey);
379+
queuedFinderKeys.remove(finderKey);
298380
if (activeAddressesPerFinder.remove(finderKey) == null) {
299381
return;
300382
}
@@ -588,6 +670,7 @@ void shutdown() {
588670
}
589671

590672
logger.log(Level.FINE, "Shutting down endpoint lifecycle manager");
673+
activeAddressUpdateExecutor.shutdownNow();
591674

592675
if (evictionFuture != null) {
593676
evictionFuture.cancel(false);
@@ -602,6 +685,9 @@ void shutdown() {
602685
}
603686
endpoints.clear();
604687
transientFailureEvictedAddresses.clear();
688+
pendingActiveAddressUpdates.clear();
689+
queuedFinderKeys.clear();
690+
queuedFinders.clear();
605691

606692
scheduler.shutdown();
607693
try {

0 commit comments

Comments
 (0)