Skip to content

Commit 7c4bb2e

Browse files
committed
add EWMA support for stale_reads
1 parent 2463cbf commit 7c4bb2e

17 files changed

Lines changed: 1922 additions & 101 deletions
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import java.time.Duration;
21+
import java.util.Objects;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
/** Shared process-local latency scores for routed Spanner endpoints. */
26+
final class EndpointLatencyRegistry {
27+
28+
static final Duration DEFAULT_ERROR_PENALTY = Duration.ofSeconds(10);
29+
static final Duration DEFAULT_RTT = Duration.ofMillis(10);
30+
static final double DEFAULT_PENALTY_VALUE = 1_000_000.0;
31+
32+
private static final ConcurrentHashMap<TrackerKey, LatencyTracker> TRACKERS =
33+
new ConcurrentHashMap<>();
34+
private static final ConcurrentHashMap<String, AtomicInteger> INFLIGHT_REQUESTS =
35+
new ConcurrentHashMap<>();
36+
37+
private EndpointLatencyRegistry() {}
38+
39+
static boolean hasScore(long operationUid, String endpointLabelOrAddress) {
40+
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
41+
return trackerKey != null && TRACKERS.containsKey(trackerKey);
42+
}
43+
44+
static double getSelectionCost(long operationUid, String endpointLabelOrAddress) {
45+
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
46+
if (trackerKey == null) {
47+
return Double.MAX_VALUE;
48+
}
49+
double activeRequests = getInflight(endpointLabelOrAddress);
50+
LatencyTracker tracker = TRACKERS.get(trackerKey);
51+
if (tracker != null) {
52+
return tracker.getScore() * (activeRequests + 1.0);
53+
}
54+
if (activeRequests > 0.0) {
55+
return DEFAULT_PENALTY_VALUE + activeRequests;
56+
}
57+
return defaultRttMicros() * (activeRequests + 1.0);
58+
}
59+
60+
static void recordLatency(long operationUid, String endpointLabelOrAddress, Duration latency) {
61+
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
62+
if (trackerKey == null || latency == null) {
63+
return;
64+
}
65+
TRACKERS.computeIfAbsent(trackerKey, ignored -> new EwmaLatencyTracker()).update(latency);
66+
}
67+
68+
static void recordError(long operationUid, String endpointLabelOrAddress) {
69+
recordError(operationUid, endpointLabelOrAddress, DEFAULT_ERROR_PENALTY);
70+
}
71+
72+
static void recordError(long operationUid, String endpointLabelOrAddress, Duration penalty) {
73+
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
74+
if (trackerKey == null || penalty == null) {
75+
return;
76+
}
77+
TRACKERS.computeIfAbsent(trackerKey, ignored -> new EwmaLatencyTracker()).recordError(penalty);
78+
}
79+
80+
static void beginRequest(String endpointLabelOrAddress) {
81+
String address = normalizeAddress(endpointLabelOrAddress);
82+
if (address == null) {
83+
return;
84+
}
85+
INFLIGHT_REQUESTS.computeIfAbsent(address, ignored -> new AtomicInteger()).incrementAndGet();
86+
}
87+
88+
static void finishRequest(String endpointLabelOrAddress) {
89+
String address = normalizeAddress(endpointLabelOrAddress);
90+
if (address == null) {
91+
return;
92+
}
93+
AtomicInteger counter = INFLIGHT_REQUESTS.get(address);
94+
if (counter == null) {
95+
return;
96+
}
97+
int updated = counter.decrementAndGet();
98+
if (updated <= 0) {
99+
INFLIGHT_REQUESTS.remove(address, counter);
100+
}
101+
}
102+
103+
static int getInflight(String endpointLabelOrAddress) {
104+
String address = normalizeAddress(endpointLabelOrAddress);
105+
if (address == null) {
106+
return 0;
107+
}
108+
AtomicInteger counter = INFLIGHT_REQUESTS.get(address);
109+
return counter == null ? 0 : Math.max(0, counter.get());
110+
}
111+
112+
@VisibleForTesting
113+
static void clear() {
114+
TRACKERS.clear();
115+
INFLIGHT_REQUESTS.clear();
116+
}
117+
118+
@VisibleForTesting
119+
static String normalizeAddress(String endpointLabelOrAddress) {
120+
if (endpointLabelOrAddress == null || endpointLabelOrAddress.isEmpty()) {
121+
return null;
122+
}
123+
return endpointLabelOrAddress;
124+
}
125+
126+
@VisibleForTesting
127+
static TrackerKey trackerKey(long operationUid, String endpointLabelOrAddress) {
128+
String address = normalizeAddress(endpointLabelOrAddress);
129+
if (operationUid <= 0 || address == null) {
130+
return null;
131+
}
132+
return new TrackerKey(operationUid, address);
133+
}
134+
135+
private static long defaultRttMicros() {
136+
return DEFAULT_RTT.toNanos() / 1_000L;
137+
}
138+
139+
@VisibleForTesting
140+
static final class TrackerKey {
141+
private final long operationUid;
142+
private final String address;
143+
144+
private TrackerKey(long operationUid, String address) {
145+
this.operationUid = operationUid;
146+
this.address = address;
147+
}
148+
149+
@Override
150+
public boolean equals(Object other) {
151+
if (this == other) {
152+
return true;
153+
}
154+
if (!(other instanceof TrackerKey)) {
155+
return false;
156+
}
157+
TrackerKey that = (TrackerKey) other;
158+
return operationUid == that.operationUid && Objects.equals(address, that.address);
159+
}
160+
161+
@Override
162+
public int hashCode() {
163+
return Objects.hash(operationUid, address);
164+
}
165+
166+
@Override
167+
public String toString() {
168+
return operationUid + "@" + address;
169+
}
170+
}
171+
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.time.Duration;
2525
import java.time.Instant;
2626
import java.util.ArrayList;
27+
import java.util.HashMap;
2728
import java.util.HashSet;
2829
import java.util.List;
2930
import java.util.Map;
@@ -70,8 +71,9 @@ class EndpointLifecycleManager {
7071
private static final long EVICTION_CHECK_INTERVAL_SECONDS = 300;
7172

7273
/**
73-
* Maximum consecutive TRANSIENT_FAILURE probes before evicting an endpoint. Gives the channel
74-
* time to recover from transient network issues before we tear it down and recreate.
74+
* Maximum observed TRANSIENT_FAILURE probes before evicting an endpoint. The counter resets only
75+
* after the channel reaches READY, so CONNECTING/IDLE oscillation does not hide a persistently
76+
* unhealthy endpoint.
7577
*/
7678
private static final int MAX_TRANSIENT_FAILURE_COUNT = 3;
7779

@@ -104,6 +106,7 @@ static final class EndpointState {
104106

105107
private final ChannelEndpointCache endpointCache;
106108
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
109+
private final Set<String> evictedAddresses = ConcurrentHashMap.newKeySet();
107110
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();
108111
private final Map<String, Long> finderGenerations = new ConcurrentHashMap<>();
109112
private final Map<String, PendingActiveAddressUpdate> pendingActiveAddressUpdates =
@@ -215,6 +218,7 @@ private boolean ensureEndpointExists(String address) {
215218
address,
216219
addr -> {
217220
logger.log(Level.FINE, "Creating endpoint state for address: {0}", addr);
221+
evictedAddresses.remove(addr);
218222
created[0] = true;
219223
return new EndpointState(addr, clock.instant());
220224
});
@@ -493,7 +497,8 @@ private void stopProbing(String address) {
493497
* <p>All exceptions are caught to prevent {@link ScheduledExecutorService} from cancelling future
494498
* runs of this task.
495499
*/
496-
private void probe(String address) {
500+
@VisibleForTesting
501+
void probe(String address) {
497502
try {
498503
if (isShutdown.get()) {
499504
return;
@@ -530,25 +535,24 @@ private void probe(String address) {
530535
logger.log(
531536
Level.FINE, "Probe for {0}: channel IDLE, requesting connection (warmup)", address);
532537
channel.getState(true);
533-
state.consecutiveTransientFailures = 0;
534538
break;
535539

536540
case CONNECTING:
537-
state.consecutiveTransientFailures = 0;
538541
break;
539542

540543
case TRANSIENT_FAILURE:
541544
state.consecutiveTransientFailures++;
542545
logger.log(
543546
Level.FINE,
544-
"Probe for {0}: channel in TRANSIENT_FAILURE ({1}/{2})",
547+
"Probe for {0}: channel in TRANSIENT_FAILURE ({1}/{2} observed failures since last"
548+
+ " READY)",
545549
new Object[] {
546550
address, state.consecutiveTransientFailures, MAX_TRANSIENT_FAILURE_COUNT
547551
});
548552
if (state.consecutiveTransientFailures >= MAX_TRANSIENT_FAILURE_COUNT) {
549553
logger.log(
550554
Level.FINE,
551-
"Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes",
555+
"Evicting endpoint {0}: {1} TRANSIENT_FAILURE probes without reaching READY",
552556
new Object[] {address, state.consecutiveTransientFailures});
553557
evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE);
554558
}
@@ -608,6 +612,7 @@ private void evictEndpoint(String address, EvictionReason reason) {
608612

609613
stopProbing(address);
610614
endpoints.remove(address);
615+
evictedAddresses.add(address);
611616
if (reason == EvictionReason.TRANSIENT_FAILURE) {
612617
markTransientFailureEvicted(address);
613618
} else {
@@ -636,6 +641,7 @@ void requestEndpointRecreation(String address) {
636641

637642
logger.log(Level.FINE, "Recreating previously evicted endpoint for address: {0}", address);
638643
EndpointState state = new EndpointState(address, clock.instant());
644+
evictedAddresses.remove(address);
639645
if (endpoints.putIfAbsent(address, state) == null) {
640646
// Schedule after putIfAbsent returns so the entry is visible to the scheduler thread.
641647
scheduler.submit(() -> createAndStartProbing(address));
@@ -663,6 +669,32 @@ int managedEndpointCount() {
663669
return endpoints.size();
664670
}
665671

672+
Map<String, Long> snapshotEndpointStateCounts() {
673+
Map<String, Long> counts = new HashMap<>();
674+
snapshotEndpointStates().values().forEach(state -> counts.merge(state, 1L, Long::sum));
675+
return counts;
676+
}
677+
678+
Map<String, String> snapshotEndpointStates() {
679+
Map<String, String> states = new HashMap<>();
680+
for (String address : endpoints.keySet()) {
681+
ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
682+
String stateName = "unknown";
683+
if (endpoint != null) {
684+
ConnectivityState state = endpoint.getChannel().getState(false);
685+
stateName =
686+
state == ConnectivityState.TRANSIENT_FAILURE
687+
? "transient_failure"
688+
: state.name().toLowerCase();
689+
}
690+
states.put(address, stateName);
691+
}
692+
for (String address : evictedAddresses) {
693+
states.putIfAbsent(address, "evicted");
694+
}
695+
return states;
696+
}
697+
666698
/** Shuts down the lifecycle manager and all probing. */
667699
void shutdown() {
668700
if (!isShutdown.compareAndSet(false, true)) {
@@ -684,6 +716,7 @@ void shutdown() {
684716
}
685717
}
686718
endpoints.clear();
719+
evictedAddresses.clear();
687720
transientFailureEvictedAddresses.clear();
688721
pendingActiveAddressUpdates.clear();
689722
queuedFinderKeys.clear();

0 commit comments

Comments
 (0)