Skip to content

Commit 26286c0

Browse files
committed
remove per-request exclusion
1 parent 4912eac commit 26286c0

16 files changed

Lines changed: 744 additions & 522 deletions

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,22 @@ public interface ChannelEndpoint {
7171
* @return the managed channel for this server
7272
*/
7373
ManagedChannel getChannel();
74+
75+
/**
76+
* Records that an application RPC started on this endpoint.
77+
*
78+
* <p>This is used for request-load-aware routing decisions. Implementations must keep the count
79+
* scoped to this endpoint instance so evicted or recreated endpoints do not share inflight state.
80+
*/
81+
void incrementActiveRequests();
82+
83+
/**
84+
* Records that an application RPC finished on this endpoint.
85+
*
86+
* <p>Implementations must not allow the count to go negative.
87+
*/
88+
void decrementActiveRequests();
89+
90+
/** Returns the number of currently active application RPCs on this endpoint. */
91+
int getActiveRequestCount();
7492
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLatencyRegistry.java

Lines changed: 88 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
import com.google.common.cache.CacheBuilder;
2323
import java.time.Duration;
2424
import java.util.Objects;
25-
import java.util.concurrent.ConcurrentHashMap;
2625
import java.util.concurrent.ExecutionException;
2726
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.atomic.AtomicInteger;
2927

3028
/** Shared process-local latency scores for routed Spanner endpoints. */
3129
final class EndpointLatencyRegistry {
@@ -39,28 +37,62 @@ final class EndpointLatencyRegistry {
3937

4038
private static volatile Cache<TrackerKey, LatencyTracker> TRACKERS =
4139
newTrackerCache(Ticker.systemTicker());
42-
private static final ConcurrentHashMap<String, AtomicInteger> INFLIGHT_REQUESTS =
43-
new ConcurrentHashMap<>();
4440

4541
private EndpointLatencyRegistry() {}
4642

4743
static boolean hasScore(
4844
@javax.annotation.Nullable String databaseScope,
4945
long operationUid,
5046
String endpointLabelOrAddress) {
51-
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
47+
return hasScore(databaseScope, operationUid, false, endpointLabelOrAddress);
48+
}
49+
50+
static boolean hasScore(
51+
@javax.annotation.Nullable String databaseScope,
52+
long operationUid,
53+
boolean preferLeader,
54+
String endpointLabelOrAddress) {
55+
TrackerKey trackerKey =
56+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
5257
return trackerKey != null && TRACKERS.getIfPresent(trackerKey) != null;
5358
}
5459

5560
static double getSelectionCost(
5661
@javax.annotation.Nullable String databaseScope,
5762
long operationUid,
5863
String endpointLabelOrAddress) {
59-
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
64+
return getSelectionCost(databaseScope, operationUid, false, null, endpointLabelOrAddress);
65+
}
66+
67+
static double getSelectionCost(
68+
@javax.annotation.Nullable String databaseScope,
69+
long operationUid,
70+
boolean preferLeader,
71+
String endpointLabelOrAddress) {
72+
return getSelectionCost(
73+
databaseScope, operationUid, preferLeader, null, endpointLabelOrAddress);
74+
}
75+
76+
static double getSelectionCost(
77+
@javax.annotation.Nullable String databaseScope,
78+
long operationUid,
79+
@javax.annotation.Nullable ChannelEndpoint endpoint,
80+
String endpointLabelOrAddress) {
81+
return getSelectionCost(databaseScope, operationUid, false, endpoint, endpointLabelOrAddress);
82+
}
83+
84+
static double getSelectionCost(
85+
@javax.annotation.Nullable String databaseScope,
86+
long operationUid,
87+
boolean preferLeader,
88+
@javax.annotation.Nullable ChannelEndpoint endpoint,
89+
String endpointLabelOrAddress) {
90+
TrackerKey trackerKey =
91+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
6092
if (trackerKey == null) {
6193
return Double.MAX_VALUE;
6294
}
63-
double activeRequests = getInflight(endpointLabelOrAddress);
95+
double activeRequests = endpoint == null ? 0.0 : endpoint.getActiveRequestCount();
6496
LatencyTracker tracker = TRACKERS.getIfPresent(trackerKey);
6597
if (tracker != null) {
6698
return tracker.getScore() * (activeRequests + 1.0);
@@ -76,7 +108,17 @@ static void recordLatency(
76108
long operationUid,
77109
String endpointLabelOrAddress,
78110
Duration latency) {
79-
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
111+
recordLatency(databaseScope, operationUid, false, endpointLabelOrAddress, latency);
112+
}
113+
114+
static void recordLatency(
115+
@javax.annotation.Nullable String databaseScope,
116+
long operationUid,
117+
boolean preferLeader,
118+
String endpointLabelOrAddress,
119+
Duration latency) {
120+
TrackerKey trackerKey =
121+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
80122
if (trackerKey == null || latency == null) {
81123
return;
82124
}
@@ -87,54 +129,43 @@ static void recordError(
87129
@javax.annotation.Nullable String databaseScope,
88130
long operationUid,
89131
String endpointLabelOrAddress) {
90-
recordError(databaseScope, operationUid, endpointLabelOrAddress, DEFAULT_ERROR_PENALTY);
132+
recordError(databaseScope, operationUid, false, endpointLabelOrAddress, DEFAULT_ERROR_PENALTY);
91133
}
92134

93135
static void recordError(
94136
@javax.annotation.Nullable String databaseScope,
95137
long operationUid,
96-
String endpointLabelOrAddress,
97-
Duration penalty) {
98-
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
99-
if (trackerKey == null || penalty == null) {
100-
return;
101-
}
102-
getOrCreateTracker(trackerKey).recordError(penalty);
138+
boolean preferLeader,
139+
String endpointLabelOrAddress) {
140+
recordError(
141+
databaseScope, operationUid, preferLeader, endpointLabelOrAddress, DEFAULT_ERROR_PENALTY);
103142
}
104143

105-
static void beginRequest(String endpointLabelOrAddress) {
106-
String address = normalizeAddress(endpointLabelOrAddress);
107-
if (address == null) {
108-
return;
109-
}
110-
INFLIGHT_REQUESTS.computeIfAbsent(address, ignored -> new AtomicInteger()).incrementAndGet();
144+
static void recordError(
145+
@javax.annotation.Nullable String databaseScope,
146+
long operationUid,
147+
String endpointLabelOrAddress,
148+
Duration penalty) {
149+
recordError(databaseScope, operationUid, false, endpointLabelOrAddress, penalty);
111150
}
112151

113-
static void finishRequest(String endpointLabelOrAddress) {
114-
String address = normalizeAddress(endpointLabelOrAddress);
115-
if (address == null) {
116-
return;
117-
}
118-
AtomicInteger counter = INFLIGHT_REQUESTS.get(address);
119-
if (counter == null) {
152+
static void recordError(
153+
@javax.annotation.Nullable String databaseScope,
154+
long operationUid,
155+
boolean preferLeader,
156+
String endpointLabelOrAddress,
157+
Duration penalty) {
158+
TrackerKey trackerKey =
159+
trackerKey(databaseScope, operationUid, preferLeader, endpointLabelOrAddress);
160+
if (trackerKey == null || penalty == null) {
120161
return;
121162
}
122-
counter.updateAndGet(current -> current > 0 ? current - 1 : 0);
123-
}
124-
125-
static int getInflight(String endpointLabelOrAddress) {
126-
String address = normalizeAddress(endpointLabelOrAddress);
127-
if (address == null) {
128-
return 0;
129-
}
130-
AtomicInteger counter = INFLIGHT_REQUESTS.get(address);
131-
return counter == null ? 0 : Math.max(0, counter.get());
163+
getOrCreateTracker(trackerKey).recordError(penalty);
132164
}
133165

134166
@VisibleForTesting
135167
static void clear() {
136168
TRACKERS.invalidateAll();
137-
INFLIGHT_REQUESTS.clear();
138169
}
139170

140171
@VisibleForTesting
@@ -155,11 +186,20 @@ static TrackerKey trackerKey(
155186
@javax.annotation.Nullable String databaseScope,
156187
long operationUid,
157188
String endpointLabelOrAddress) {
189+
return trackerKey(databaseScope, operationUid, false, endpointLabelOrAddress);
190+
}
191+
192+
@VisibleForTesting
193+
static TrackerKey trackerKey(
194+
@javax.annotation.Nullable String databaseScope,
195+
long operationUid,
196+
boolean preferLeader,
197+
String endpointLabelOrAddress) {
158198
String address = normalizeAddress(endpointLabelOrAddress);
159199
if (operationUid <= 0 || address == null) {
160200
return null;
161201
}
162-
return new TrackerKey(normalizeScope(databaseScope), operationUid, address);
202+
return new TrackerKey(normalizeScope(databaseScope), operationUid, preferLeader, address);
163203
}
164204

165205
private static long defaultRttMicros() {
@@ -190,11 +230,14 @@ private static Cache<TrackerKey, LatencyTracker> newTrackerCache(Ticker ticker)
190230
static final class TrackerKey {
191231
private final String databaseScope;
192232
private final long operationUid;
233+
private final boolean preferLeader;
193234
private final String address;
194235

195-
private TrackerKey(String databaseScope, long operationUid, String address) {
236+
private TrackerKey(
237+
String databaseScope, long operationUid, boolean preferLeader, String address) {
196238
this.databaseScope = databaseScope;
197239
this.operationUid = operationUid;
240+
this.preferLeader = preferLeader;
198241
this.address = address;
199242
}
200243

@@ -208,18 +251,19 @@ public boolean equals(Object other) {
208251
}
209252
TrackerKey that = (TrackerKey) other;
210253
return operationUid == that.operationUid
254+
&& preferLeader == that.preferLeader
211255
&& Objects.equals(databaseScope, that.databaseScope)
212256
&& Objects.equals(address, that.address);
213257
}
214258

215259
@Override
216260
public int hashCode() {
217-
return Objects.hash(databaseScope, operationUid, address);
261+
return Objects.hash(databaseScope, operationUid, preferLeader, address);
218262
}
219263

220264
@Override
221265
public String toString() {
222-
return databaseScope + ":" + operationUid + "@" + address;
266+
return databaseScope + ":" + operationUid + ":" + preferLeader + "@" + address;
223267
}
224268
}
225269
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.time.Duration;
2525
import java.time.Instant;
2626
import java.util.ArrayList;
27-
import java.util.HashMap;
2827
import java.util.HashSet;
2928
import java.util.List;
3029
import java.util.Map;
@@ -106,7 +105,6 @@ static final class EndpointState {
106105

107106
private final ChannelEndpointCache endpointCache;
108107
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
109-
private final Set<String> evictedAddresses = ConcurrentHashMap.newKeySet();
110108
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();
111109
private final Map<String, Long> finderGenerations = new ConcurrentHashMap<>();
112110
private final Map<String, PendingActiveAddressUpdate> pendingActiveAddressUpdates =
@@ -218,7 +216,6 @@ private boolean ensureEndpointExists(String address) {
218216
address,
219217
addr -> {
220218
logger.log(Level.FINE, "Creating endpoint state for address: {0}", addr);
221-
evictedAddresses.remove(addr);
222219
created[0] = true;
223220
return new EndpointState(addr, clock.instant());
224221
});
@@ -612,7 +609,6 @@ private void evictEndpoint(String address, EvictionReason reason) {
612609

613610
stopProbing(address);
614611
endpoints.remove(address);
615-
evictedAddresses.add(address);
616612
if (reason == EvictionReason.TRANSIENT_FAILURE) {
617613
markTransientFailureEvicted(address);
618614
} else {
@@ -641,7 +637,6 @@ void requestEndpointRecreation(String address) {
641637

642638
logger.log(Level.FINE, "Recreating previously evicted endpoint for address: {0}", address);
643639
EndpointState state = new EndpointState(address, clock.instant());
644-
evictedAddresses.remove(address);
645640
if (endpoints.putIfAbsent(address, state) == null) {
646641
// Schedule after putIfAbsent returns so the entry is visible to the scheduler thread.
647642
scheduler.submit(() -> createAndStartProbing(address));
@@ -669,32 +664,6 @@ int managedEndpointCount() {
669664
return endpoints.size();
670665
}
671666

672-
Map<String, Long> snapshotEndpointStateCounts() {
673-
Map<String, Long> counts = new HashMap<>();
674-
snapshotEndpointStates().values().forEach(state -> counts.merge(state, 1L, Long::sum));
675-
return counts;
676-
}
677-
678-
Map<String, String> snapshotEndpointStates() {
679-
Map<String, String> states = new HashMap<>();
680-
for (String address : endpoints.keySet()) {
681-
ChannelEndpoint endpoint = endpointCache.getIfPresent(address);
682-
String stateName = "unknown";
683-
if (endpoint != null) {
684-
ConnectivityState state = endpoint.getChannel().getState(false);
685-
stateName =
686-
state == ConnectivityState.TRANSIENT_FAILURE
687-
? "transient_failure"
688-
: state.name().toLowerCase();
689-
}
690-
states.put(address, stateName);
691-
}
692-
for (String address : evictedAddresses) {
693-
states.putIfAbsent(address, "evicted");
694-
}
695-
return states;
696-
}
697-
698667
/** Shuts down the lifecycle manager and all probing. */
699668
void shutdown() {
700669
if (!isShutdown.compareAndSet(false, true)) {
@@ -716,7 +685,6 @@ void shutdown() {
716685
}
717686
}
718687
endpoints.clear();
719-
evictedAddresses.clear();
720688
transientFailureEvictedAddresses.clear();
721689
pendingActiveAddressUpdates.clear();
722690
queuedFinderKeys.clear();

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointOverloadCooldownTracker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,10 @@ private Duration cooldownForFailures(int failures) {
141141
}
142142
cooldown = cooldown.multipliedBy(2);
143143
}
144-
long bound = Math.max(1L, cooldown.toMillis() + 1L);
145-
return Duration.ofMillis(randomLong.applyAsLong(bound));
144+
long cooldownMillis = Math.max(1L, cooldown.toMillis());
145+
long floorMillis = Math.max(1L, cooldownMillis / 2L);
146+
long rangeSize = Math.max(1L, cooldownMillis - floorMillis + 1L);
147+
return Duration.ofMillis(floorMillis + randomLong.applyAsLong(rangeSize));
146148
}
147149

148150
@VisibleForTesting

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.ConcurrentHashMap;
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
3637
import java.util.logging.Level;
3738
import java.util.logging.Logger;
3839
import javax.annotation.Nullable;
@@ -204,6 +205,7 @@ private void shutdownChannel(GrpcChannelEndpoint server, boolean awaitTerminatio
204205
static class GrpcChannelEndpoint implements ChannelEndpoint {
205206
private final String address;
206207
private final ManagedChannel channel;
208+
private final AtomicInteger activeRequests = new AtomicInteger();
207209

208210
/**
209211
* Creates a server from a channel provider.
@@ -289,5 +291,20 @@ public boolean isTransientFailure() {
289291
public ManagedChannel getChannel() {
290292
return channel;
291293
}
294+
295+
@Override
296+
public void incrementActiveRequests() {
297+
activeRequests.incrementAndGet();
298+
}
299+
300+
@Override
301+
public void decrementActiveRequests() {
302+
activeRequests.updateAndGet(current -> current > 0 ? current - 1 : 0);
303+
}
304+
305+
@Override
306+
public int getActiveRequestCount() {
307+
return Math.max(0, activeRequests.get());
308+
}
292309
}
293310
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ private void recordFirstResponseLatency(
234234
EndpointLatencyRegistry.recordLatency(
235235
routingTarget.databaseScope,
236236
routingTarget.operationUid,
237+
routingTarget.preferLeader,
237238
routingTarget.targetEndpoint,
238239
Duration.ofNanos(latencyNanos));
239240
}

0 commit comments

Comments
 (0)