Skip to content

Commit b74b453

Browse files
committed
add more test for R/W shape
1 parent da00277 commit b74b453

10 files changed

Lines changed: 371 additions & 62 deletions

File tree

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ public ChannelFinder(
8282
ChannelEndpointCache endpointCache,
8383
@Nullable EndpointLifecycleManager lifecycleManager,
8484
@Nullable String finderKey) {
85-
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager);
85+
this.rangeCache =
86+
new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager, finderKey);
8687
this.lifecycleManager = lifecycleManager;
8788
this.finderKey = finderKey;
8889
}
@@ -91,6 +92,11 @@ void useDeterministicRandom() {
9192
rangeCache.useDeterministicRandom();
9293
}
9394

95+
@Nullable
96+
String finderKey() {
97+
return finderKey;
98+
}
99+
94100
private static ExecutorService createCacheUpdatePool() {
95101
ThreadPoolExecutor executor =
96102
new ThreadPoolExecutor(

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLatencyRegistry.java

Lines changed: 81 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,51 @@
1717
package com.google.cloud.spanner.spi.v1;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Ticker;
21+
import com.google.common.cache.Cache;
22+
import com.google.common.cache.CacheBuilder;
2023
import java.time.Duration;
2124
import java.util.Objects;
2225
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.TimeUnit;
2328
import java.util.concurrent.atomic.AtomicInteger;
2429

2530
/** Shared process-local latency scores for routed Spanner endpoints. */
2631
final class EndpointLatencyRegistry {
32+
private static final String GLOBAL_SCOPE = "__global__";
2733

2834
static final Duration DEFAULT_ERROR_PENALTY = Duration.ofSeconds(10);
2935
static final Duration DEFAULT_RTT = Duration.ofMillis(10);
3036
static final double DEFAULT_PENALTY_VALUE = 1_000_000.0;
37+
@VisibleForTesting static final Duration TRACKER_EXPIRE_AFTER_ACCESS = Duration.ofMinutes(10);
38+
@VisibleForTesting static final long MAX_TRACKERS = 100_000L;
3139

32-
private static final ConcurrentHashMap<TrackerKey, LatencyTracker> TRACKERS =
33-
new ConcurrentHashMap<>();
40+
private static volatile Cache<TrackerKey, LatencyTracker> TRACKERS =
41+
newTrackerCache(Ticker.systemTicker());
3442
private static final ConcurrentHashMap<String, AtomicInteger> INFLIGHT_REQUESTS =
3543
new ConcurrentHashMap<>();
3644

3745
private EndpointLatencyRegistry() {}
3846

39-
static boolean hasScore(long operationUid, String endpointLabelOrAddress) {
40-
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
41-
return trackerKey != null && TRACKERS.containsKey(trackerKey);
47+
static boolean hasScore(
48+
@javax.annotation.Nullable String databaseScope,
49+
long operationUid,
50+
String endpointLabelOrAddress) {
51+
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
52+
return trackerKey != null && TRACKERS.getIfPresent(trackerKey) != null;
4253
}
4354

44-
static double getSelectionCost(long operationUid, String endpointLabelOrAddress) {
45-
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
55+
static double getSelectionCost(
56+
@javax.annotation.Nullable String databaseScope,
57+
long operationUid,
58+
String endpointLabelOrAddress) {
59+
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
4660
if (trackerKey == null) {
4761
return Double.MAX_VALUE;
4862
}
4963
double activeRequests = getInflight(endpointLabelOrAddress);
50-
LatencyTracker tracker = TRACKERS.get(trackerKey);
64+
LatencyTracker tracker = TRACKERS.getIfPresent(trackerKey);
5165
if (tracker != null) {
5266
return tracker.getScore() * (activeRequests + 1.0);
5367
}
@@ -57,24 +71,35 @@ static double getSelectionCost(long operationUid, String endpointLabelOrAddress)
5771
return defaultRttMicros() * (activeRequests + 1.0);
5872
}
5973

60-
static void recordLatency(long operationUid, String endpointLabelOrAddress, Duration latency) {
61-
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
74+
static void recordLatency(
75+
@javax.annotation.Nullable String databaseScope,
76+
long operationUid,
77+
String endpointLabelOrAddress,
78+
Duration latency) {
79+
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
6280
if (trackerKey == null || latency == null) {
6381
return;
6482
}
65-
TRACKERS.computeIfAbsent(trackerKey, ignored -> new EwmaLatencyTracker()).update(latency);
83+
getOrCreateTracker(trackerKey).update(latency);
6684
}
6785

68-
static void recordError(long operationUid, String endpointLabelOrAddress) {
69-
recordError(operationUid, endpointLabelOrAddress, DEFAULT_ERROR_PENALTY);
86+
static void recordError(
87+
@javax.annotation.Nullable String databaseScope,
88+
long operationUid,
89+
String endpointLabelOrAddress) {
90+
recordError(databaseScope, operationUid, endpointLabelOrAddress, DEFAULT_ERROR_PENALTY);
7091
}
7192

72-
static void recordError(long operationUid, String endpointLabelOrAddress, Duration penalty) {
73-
TrackerKey trackerKey = trackerKey(operationUid, endpointLabelOrAddress);
93+
static void recordError(
94+
@javax.annotation.Nullable String databaseScope,
95+
long operationUid,
96+
String endpointLabelOrAddress,
97+
Duration penalty) {
98+
TrackerKey trackerKey = trackerKey(databaseScope, operationUid, endpointLabelOrAddress);
7499
if (trackerKey == null || penalty == null) {
75100
return;
76101
}
77-
TRACKERS.computeIfAbsent(trackerKey, ignored -> new EwmaLatencyTracker()).recordError(penalty);
102+
getOrCreateTracker(trackerKey).recordError(penalty);
78103
}
79104

80105
static void beginRequest(String endpointLabelOrAddress) {
@@ -94,10 +119,7 @@ static void finishRequest(String endpointLabelOrAddress) {
94119
if (counter == null) {
95120
return;
96121
}
97-
int updated = counter.decrementAndGet();
98-
if (updated <= 0) {
99-
INFLIGHT_REQUESTS.remove(address, counter);
100-
}
122+
counter.updateAndGet(current -> current > 0 ? current - 1 : 0);
101123
}
102124

103125
static int getInflight(String endpointLabelOrAddress) {
@@ -111,10 +133,15 @@ static int getInflight(String endpointLabelOrAddress) {
111133

112134
@VisibleForTesting
113135
static void clear() {
114-
TRACKERS.clear();
136+
TRACKERS.invalidateAll();
115137
INFLIGHT_REQUESTS.clear();
116138
}
117139

140+
@VisibleForTesting
141+
static void useTrackerTicker(Ticker ticker) {
142+
TRACKERS = newTrackerCache(ticker);
143+
}
144+
118145
@VisibleForTesting
119146
static String normalizeAddress(String endpointLabelOrAddress) {
120147
if (endpointLabelOrAddress == null || endpointLabelOrAddress.isEmpty()) {
@@ -124,24 +151,49 @@ static String normalizeAddress(String endpointLabelOrAddress) {
124151
}
125152

126153
@VisibleForTesting
127-
static TrackerKey trackerKey(long operationUid, String endpointLabelOrAddress) {
154+
static TrackerKey trackerKey(
155+
@javax.annotation.Nullable String databaseScope,
156+
long operationUid,
157+
String endpointLabelOrAddress) {
128158
String address = normalizeAddress(endpointLabelOrAddress);
129159
if (operationUid <= 0 || address == null) {
130160
return null;
131161
}
132-
return new TrackerKey(operationUid, address);
162+
return new TrackerKey(normalizeScope(databaseScope), operationUid, address);
133163
}
134164

135165
private static long defaultRttMicros() {
136166
return DEFAULT_RTT.toNanos() / 1_000L;
137167
}
138168

169+
private static String normalizeScope(@javax.annotation.Nullable String databaseScope) {
170+
return (databaseScope == null || databaseScope.isEmpty()) ? GLOBAL_SCOPE : databaseScope;
171+
}
172+
173+
private static LatencyTracker getOrCreateTracker(TrackerKey trackerKey) {
174+
try {
175+
return TRACKERS.get(trackerKey, EwmaLatencyTracker::new);
176+
} catch (ExecutionException e) {
177+
throw new IllegalStateException("Failed to create latency tracker", e);
178+
}
179+
}
180+
181+
private static Cache<TrackerKey, LatencyTracker> newTrackerCache(Ticker ticker) {
182+
return CacheBuilder.newBuilder()
183+
.maximumSize(MAX_TRACKERS)
184+
.expireAfterAccess(TRACKER_EXPIRE_AFTER_ACCESS.toNanos(), TimeUnit.NANOSECONDS)
185+
.ticker(ticker)
186+
.build();
187+
}
188+
139189
@VisibleForTesting
140190
static final class TrackerKey {
191+
private final String databaseScope;
141192
private final long operationUid;
142193
private final String address;
143194

144-
private TrackerKey(long operationUid, String address) {
195+
private TrackerKey(String databaseScope, long operationUid, String address) {
196+
this.databaseScope = databaseScope;
145197
this.operationUid = operationUid;
146198
this.address = address;
147199
}
@@ -155,17 +207,19 @@ public boolean equals(Object other) {
155207
return false;
156208
}
157209
TrackerKey that = (TrackerKey) other;
158-
return operationUid == that.operationUid && Objects.equals(address, that.address);
210+
return operationUid == that.operationUid
211+
&& Objects.equals(databaseScope, that.databaseScope)
212+
&& Objects.equals(address, that.address);
159213
}
160214

161215
@Override
162216
public int hashCode() {
163-
return Objects.hash(operationUid, address);
217+
return Objects.hash(databaseScope, operationUid, address);
164218
}
165219

166220
@Override
167221
public String toString() {
168-
return operationUid + "@" + address;
222+
return databaseScope + ":" + operationUid + "@" + address;
169223
}
170224
}
171225
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,10 @@ private void recordFirstResponseLatency(
232232
}
233233
long latencyNanos = Math.max(0L, System.nanoTime() - startedAtNanos);
234234
EndpointLatencyRegistry.recordLatency(
235-
routingTarget.operationUid, routingTarget.targetEndpoint, Duration.ofNanos(latencyNanos));
235+
routingTarget.databaseScope,
236+
routingTarget.operationUid,
237+
routingTarget.targetEndpoint,
238+
Duration.ofNanos(latencyNanos));
236239
}
237240

238241
private Map<String, Float> parseServerTimingHeader(String serverTiming) {

0 commit comments

Comments
 (0)