Skip to content

Commit f5f273b

Browse files
authored
feat(spanner): add shared endpoint cooldowns for location-aware rerouting (#12845)
## Summary This PR improves Java Spanner's location-aware bypass routing when routed replicas are overloaded or unavailable, and extends score-based replica selection The client now: - avoids recently overloaded routed endpoints using shared cooldowns - records RESOURCE_EXHAUSTED / UNAVAILABLE as EWMA error penalties - uses EWMA-based selection for both preferLeader=false and strong preferLeader=true read/query routing when operation_uid is available It also keeps the location-aware read path lock-free via immutable group snapshots. ## What changed - Added shared channel-level cooldown tracking for routed endpoints that return RESOURCE_EXHAUSTED / UNAVAILABLE, while still keeping request-scoped exclusions for same-logical-request retries. - Updated bypass retry behavior so eligible reads/queries can reroute to another replica instead of immediately returning to the same failed endpoint. - Recorded RESOURCE_EXHAUSTED / UNAVAILABLE as EWMA error penalties for routed replicas, so unhealthy endpoints are deprioritized even after the immediate retry/cooldown window. - Extended score-based routing to strong preferLeader=true read/query traffic when operation_uid is present, using leader preference as a bias instead of a hard override. - Kept preferLeader=true behavior unchanged for paths without operation_uid such as mutation/commit routing. - Refactored KeyRangeCache group state to immutable snapshots and removed per-group synchronization from the routing hot path.
1 parent decbfcf commit f5f273b

29 files changed

Lines changed: 4530 additions & 382 deletions

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,22 @@ public interface ChannelEndpoint {
7171
* @return the managed channel for this server
7272
*/
7373
ManagedChannel getChannel();
74+
75+
/**
76+
* Records that an application RPC started on this endpoint.
77+
*
78+
* <p>This is used for request-load-aware routing decisions. Implementations must keep the count
79+
* scoped to this endpoint instance so evicted or recreated endpoints do not share inflight state.
80+
*/
81+
void incrementActiveRequests();
82+
83+
/**
84+
* Records that an application RPC finished on this endpoint.
85+
*
86+
* <p>Implementations must not allow the count to go negative.
87+
*/
88+
void decrementActiveRequests();
89+
90+
/** Returns the number of currently active application RPCs on this endpoint. */
91+
int getActiveRequestCount();
7492
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ public ChannelFinder(
8282
ChannelEndpointCache endpointCache,
8383
@Nullable EndpointLifecycleManager lifecycleManager,
8484
@Nullable String finderKey) {
85-
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager);
85+
this.rangeCache =
86+
new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager, finderKey);
8687
this.lifecycleManager = lifecycleManager;
8788
this.finderKey = finderKey;
8889
}
@@ -91,6 +92,11 @@ void useDeterministicRandom() {
9192
rangeCache.useDeterministicRandom();
9293
}
9394

95+
@Nullable
96+
String finderKey() {
97+
return finderKey;
98+
}
99+
94100
private static ExecutorService createCacheUpdatePool() {
95101
ThreadPoolExecutor executor =
96102
new ThreadPoolExecutor(
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Ticker;
21+
import com.google.common.cache.Cache;
22+
import com.google.common.cache.CacheBuilder;
23+
import java.time.Duration;
24+
import java.util.Objects;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.TimeUnit;
27+
28+
/** Shared process-local latency scores for routed Spanner endpoints. */
29+
final class EndpointLatencyRegistry {
30+
private static final String GLOBAL_SCOPE = "__global__";
31+
32+
static final Duration DEFAULT_ERROR_PENALTY = Duration.ofSeconds(10);
33+
static final Duration DEFAULT_RTT = Duration.ofMillis(10);
34+
static final double DEFAULT_PENALTY_VALUE = 1_000_000.0;
35+
@VisibleForTesting static final Duration TRACKER_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(10);
36+
@VisibleForTesting static final long MAX_TRACKERS = 100_000L;
37+
38+
private static volatile Cache<TrackerKey, LatencyTracker> TRACKERS =
39+
newTrackerCache(Ticker.systemTicker());
40+
41+
private EndpointLatencyRegistry() {}
42+
43+
static boolean hasScore(
44+
@javax.annotation.Nullable String databaseScope,
45+
long operationUid,
46+
boolean preferLeader,
47+
String endpointLabelOrAddress) {
48+
TrackerKey trackerKey =
49+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
50+
return trackerKey != null && TRACKERS.getIfPresent(trackerKey) != null;
51+
}
52+
53+
static double getSelectionCost(
54+
@javax.annotation.Nullable String databaseScope,
55+
long operationUid,
56+
boolean preferLeader,
57+
String endpointLabelOrAddress) {
58+
return getSelectionCost(
59+
databaseScope, operationUid, preferLeader, null, endpointLabelOrAddress);
60+
}
61+
62+
static double getSelectionCost(
63+
@javax.annotation.Nullable String databaseScope,
64+
long operationUid,
65+
boolean preferLeader,
66+
@javax.annotation.Nullable ChannelEndpoint endpoint,
67+
String endpointLabelOrAddress) {
68+
TrackerKey trackerKey =
69+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
70+
if (trackerKey == null) {
71+
return Double.MAX_VALUE;
72+
}
73+
double activeRequests = endpoint == null ? 0.0 : endpoint.getActiveRequestCount();
74+
LatencyTracker tracker = TRACKERS.getIfPresent(trackerKey);
75+
if (tracker != null) {
76+
return tracker.getScore() * (activeRequests + 1.0);
77+
}
78+
if (activeRequests > 0.0) {
79+
return DEFAULT_PENALTY_VALUE + activeRequests;
80+
}
81+
return defaultRttMicros() * (activeRequests + 1.0);
82+
}
83+
84+
static void recordLatency(
85+
@javax.annotation.Nullable String databaseScope,
86+
long operationUid,
87+
boolean preferLeader,
88+
String endpointLabelOrAddress,
89+
Duration latency) {
90+
TrackerKey trackerKey =
91+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
92+
if (trackerKey == null || latency == null) {
93+
return;
94+
}
95+
getOrCreateTracker(trackerKey).update(latency);
96+
}
97+
98+
static void recordError(
99+
@javax.annotation.Nullable String databaseScope,
100+
long operationUid,
101+
boolean preferLeader,
102+
String endpointLabelOrAddress) {
103+
recordError(
104+
databaseScope, operationUid, preferLeader, endpointLabelOrAddress, DEFAULT_ERROR_PENALTY);
105+
}
106+
107+
static void recordError(
108+
@javax.annotation.Nullable String databaseScope,
109+
long operationUid,
110+
boolean preferLeader,
111+
String endpointLabelOrAddress,
112+
Duration penalty) {
113+
TrackerKey trackerKey =
114+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
115+
if (trackerKey == null || penalty == null) {
116+
return;
117+
}
118+
getOrCreateTracker(trackerKey).recordError(penalty);
119+
}
120+
121+
@VisibleForTesting
122+
static void clear() {
123+
TRACKERS.invalidateAll();
124+
}
125+
126+
@VisibleForTesting
127+
static void useTrackerTicker(Ticker ticker) {
128+
TRACKERS = newTrackerCache(ticker);
129+
}
130+
131+
@VisibleForTesting
132+
static String normalizeAddress(String endpointLabelOrAddress) {
133+
if (endpointLabelOrAddress == null || endpointLabelOrAddress.isEmpty()) {
134+
return null;
135+
}
136+
return endpointLabelOrAddress;
137+
}
138+
139+
@VisibleForTesting
140+
static TrackerKey trackerKey(
141+
@javax.annotation.Nullable String databaseScope,
142+
long operationUid,
143+
String endpointLabelOrAddress) {
144+
return trackerKey(databaseScope, operationUid, false, endpointLabelOrAddress);
145+
}
146+
147+
@VisibleForTesting
148+
static TrackerKey trackerKey(
149+
@javax.annotation.Nullable String databaseScope,
150+
long operationUid,
151+
boolean preferLeader,
152+
String endpointLabelOrAddress) {
153+
String address = normalizeAddress(endpointLabelOrAddress);
154+
if (operationUid <= 0 || address == null) {
155+
return null;
156+
}
157+
return new TrackerKey(normalizeScope(databaseScope), operationUid, preferLeader, address);
158+
}
159+
160+
private static long defaultRttMicros() {
161+
return DEFAULT_RTT.toNanos() / 1_000L;
162+
}
163+
164+
private static String normalizeScope(@javax.annotation.Nullable String databaseScope) {
165+
return (databaseScope == null || databaseScope.isEmpty()) ? GLOBAL_SCOPE : databaseScope;
166+
}
167+
168+
private static LatencyTracker getOrCreateTracker(TrackerKey trackerKey) {
169+
try {
170+
return TRACKERS.get(trackerKey, EwmaLatencyTracker::new);
171+
} catch (ExecutionException e) {
172+
throw new IllegalStateException("Failed to create latency tracker", e);
173+
}
174+
}
175+
176+
private static Cache<TrackerKey, LatencyTracker> newTrackerCache(Ticker ticker) {
177+
return CacheBuilder.newBuilder()
178+
.maximumSize(MAX_TRACKERS)
179+
.expireAfterAccess(TRACKER_EXPIRE_AFTER_ACCESS.toNanos(), TimeUnit.NANOSECONDS)
180+
.ticker(ticker)
181+
.build();
182+
}
183+
184+
@VisibleForTesting
185+
static final class TrackerKey {
186+
private final String databaseScope;
187+
private final long operationUid;
188+
private final boolean preferLeader;
189+
private final String address;
190+
191+
private TrackerKey(
192+
String databaseScope, long operationUid, boolean preferLeader, String address) {
193+
this.databaseScope = databaseScope;
194+
this.operationUid = operationUid;
195+
this.preferLeader = preferLeader;
196+
this.address = address;
197+
}
198+
199+
@Override
200+
public boolean equals(Object other) {
201+
if (this == other) {
202+
return true;
203+
}
204+
if (!(other instanceof TrackerKey)) {
205+
return false;
206+
}
207+
TrackerKey that = (TrackerKey) other;
208+
return operationUid == that.operationUid
209+
&& preferLeader == that.preferLeader
210+
&& Objects.equals(databaseScope, that.databaseScope)
211+
&& Objects.equals(address, that.address);
212+
}
213+
214+
@Override
215+
public int hashCode() {
216+
return Objects.hash(databaseScope, operationUid, preferLeader, address);
217+
}
218+
219+
@Override
220+
public String toString() {
221+
return databaseScope + ":" + operationUid + ":" + preferLeader + "@" + address;
222+
}
223+
}
224+
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ class EndpointLifecycleManager {
7070
private static final long EVICTION_CHECK_INTERVAL_SECONDS = 300;
7171

7272
/**
73-
* Maximum consecutive TRANSIENT_FAILURE probes before evicting an endpoint. Gives the channel
74-
* time to recover from transient network issues before we tear it down and recreate.
73+
* Maximum observed TRANSIENT_FAILURE probes before evicting an endpoint. The counter resets only
74+
* after the channel reaches READY, so CONNECTING/IDLE oscillation does not hide a persistently
75+
* unhealthy endpoint.
7576
*/
7677
private static final int MAX_TRANSIENT_FAILURE_COUNT = 3;
7778

@@ -493,7 +494,8 @@ private void stopProbing(String address) {
493494
* <p>All exceptions are caught to prevent {@link ScheduledExecutorService} from cancelling future
494495
* runs of this task.
495496
*/
496-
private void probe(String address) {
497+
@VisibleForTesting
498+
void probe(String address) {
497499
try {
498500
if (isShutdown.get()) {
499501
return;
@@ -530,25 +532,24 @@ private void probe(String address) {
530532
logger.log(
531533
Level.FINE, "Probe for {0}: channel IDLE, requesting connection (warmup)", address);
532534
channel.getState(true);
533-
state.consecutiveTransientFailures = 0;
534535
break;
535536

536537
case CONNECTING:
537-
state.consecutiveTransientFailures = 0;
538538
break;
539539

540540
case TRANSIENT_FAILURE:
541541
state.consecutiveTransientFailures++;
542542
logger.log(
543543
Level.FINE,
544-
"Probe for {0}: channel in TRANSIENT_FAILURE ({1}/{2})",
544+
"Probe for {0}: channel in TRANSIENT_FAILURE ({1}/{2} observed failures since last"
545+
+ " READY)",
545546
new Object[] {
546547
address, state.consecutiveTransientFailures, MAX_TRANSIENT_FAILURE_COUNT
547548
});
548549
if (state.consecutiveTransientFailures >= MAX_TRANSIENT_FAILURE_COUNT) {
549550
logger.log(
550551
Level.FINE,
551-
"Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes",
552+
"Evicting endpoint {0}: {1} TRANSIENT_FAILURE probes without reaching READY",
552553
new Object[] {address, state.consecutiveTransientFailures});
553554
evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE);
554555
}

0 commit comments

Comments
 (0)