Skip to content

Commit 61e2f16

Browse files
committed
add safety and boundness on thread
1 parent c99d2af commit 61e2f16

File tree

2 files changed

+38
-15
lines changed

2 files changed

+38
-15
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.core.InternalApi;
2020
import com.google.common.annotations.VisibleForTesting;
21+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2122
import com.google.spanner.v1.BeginTransactionRequest;
2223
import com.google.spanner.v1.CacheUpdate;
2324
import com.google.spanner.v1.CommitRequest;
@@ -36,8 +37,10 @@
3637
import java.util.Objects;
3738
import java.util.Set;
3839
import java.util.concurrent.ExecutorService;
39-
import java.util.concurrent.Executors;
40+
import java.util.concurrent.LinkedBlockingQueue;
4041
import java.util.concurrent.ThreadLocalRandom;
42+
import java.util.concurrent.ThreadPoolExecutor;
43+
import java.util.concurrent.TimeUnit;
4144
import java.util.concurrent.atomic.AtomicLong;
4245
import java.util.concurrent.atomic.AtomicReference;
4346
import java.util.function.Predicate;
@@ -51,13 +54,9 @@
5154
@InternalApi
5255
public final class ChannelFinder {
5356
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
54-
private static final ExecutorService CACHE_UPDATE_POOL =
55-
Executors.newCachedThreadPool(
56-
r -> {
57-
Thread t = new Thread(r, "spanner-cache-update");
58-
t.setDaemon(true);
59-
return t;
60-
});
57+
private static final int MAX_CACHE_UPDATE_THREADS =
58+
Math.max(2, Runtime.getRuntime().availableProcessors());
59+
private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool();
6160

6261
private final Object updateLock = new Object();
6362
private final AtomicLong databaseId = new AtomicLong();
@@ -90,6 +89,22 @@ void useDeterministicRandom() {
9089
rangeCache.useDeterministicRandom();
9190
}
9291

92+
private static ExecutorService createCacheUpdatePool() {
93+
ThreadPoolExecutor executor =
94+
new ThreadPoolExecutor(
95+
MAX_CACHE_UPDATE_THREADS,
96+
MAX_CACHE_UPDATE_THREADS,
97+
30L,
98+
TimeUnit.SECONDS,
99+
new LinkedBlockingQueue<>(),
100+
new ThreadFactoryBuilder()
101+
.setDaemon(true)
102+
.setNameFormat("spanner-cache-update-%d")
103+
.build());
104+
executor.allowCoreThreadTimeOut(true);
105+
return executor;
106+
}
107+
93108
public void update(CacheUpdate update) {
94109
synchronized (updateLock) {
95110
long currentId = databaseId.get();

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,24 @@ public void addRanges(CacheUpdate cacheUpdate) {
114114
touchedGroups.get(i).update(cacheUpdate.getGroup(i));
115115
}
116116

117-
writeLock.lock();
118117
try {
119-
for (Range rangeIn : cacheUpdate.getRangeList()) {
120-
replaceRangeIfNewer(rangeIn);
121-
}
122-
for (CachedGroup group : touchedGroups) {
123-
unref(group);
118+
writeLock.lock();
119+
try {
120+
for (Range rangeIn : cacheUpdate.getRangeList()) {
121+
replaceRangeIfNewer(rangeIn);
122+
}
123+
} finally {
124+
writeLock.unlock();
124125
}
125126
} finally {
126-
writeLock.unlock();
127+
writeLock.lock();
128+
try {
129+
for (int i = touchedGroups.size() - 1; i >= 0; i--) {
130+
unref(touchedGroups.get(i));
131+
}
132+
} finally {
133+
writeLock.unlock();
134+
}
127135
}
128136
}
129137

0 commit comments

Comments
 (0)