Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.google.cloud.spanner.spi.v1;

import com.google.api.core.InternalApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CacheUpdate;
import com.google.spanner.v1.CommitRequest;
Expand All @@ -34,6 +36,9 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
Expand All @@ -47,11 +52,20 @@
@InternalApi
public final class ChannelFinder {
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
private static final ExecutorService CACHE_UPDATE_POOL =
Executors.newCachedThreadPool(
r -> {
Thread t = new Thread(r, "spanner-cache-update");
t.setDaemon(true);
return t;
});
Comment on lines +55 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using Executors.newCachedThreadPool() for a static shared pool can lead to unbounded thread creation if many ChannelFinder instances (e.g., in a multi-database environment) receive updates simultaneously. Consider using a ThreadPoolExecutor with a defined maximum pool size to prevent potential resource exhaustion.


private final Object updateLock = new Object();
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
private final KeyRangeCache rangeCache;
private final Executor cacheUpdateExecutor =
MoreExecutors.newSequentialExecutor(CACHE_UPDATE_POOL);
@Nullable private final EndpointLifecycleManager lifecycleManager;
@Nullable private final String finderKey;

Expand Down Expand Up @@ -112,6 +126,17 @@ public void update(CacheUpdate update) {
}
}

public void updateAsync(CacheUpdate update) {
cacheUpdateExecutor.execute(() -> update(update));
}

@VisibleForTesting
void awaitPendingUpdates() throws InterruptedException {
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
cacheUpdateExecutor.execute(latch::countDown);
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ class EndpointLifecycleManager {
*/
private static final int MAX_TRANSIENT_FAILURE_COUNT = 3;

private enum EvictionReason {
TRANSIENT_FAILURE,
SHUTDOWN,
IDLE,
STALE
}

/** Per-endpoint lifecycle state. */
static final class EndpointState {
final String address;
Expand All @@ -95,6 +102,7 @@ static final class EndpointState {

private final ChannelEndpointCache endpointCache;
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();

/**
* Active addresses reported by each ChannelFinder, keyed by database id.
Expand Down Expand Up @@ -235,6 +243,7 @@ void updateActiveAddresses(String finderKey, Set<String> activeAddresses) {
for (Set<String> addresses : activeAddressesPerFinder.values()) {
allActive.addAll(addresses);
}
transientFailureEvictedAddresses.retainAll(allActive);

// Evict managed endpoints not referenced by any finder.
List<String> stale = new ArrayList<>();
Expand Down Expand Up @@ -276,6 +285,7 @@ void unregisterFinder(String finderKey) {
for (Set<String> addresses : activeAddressesPerFinder.values()) {
allActive.addAll(addresses);
}
transientFailureEvictedAddresses.retainAll(allActive);

List<String> stale = new ArrayList<>();
for (String address : endpoints.keySet()) {
Expand Down Expand Up @@ -412,6 +422,7 @@ private void probe(String address) {
case READY:
state.lastReadyAt = clock.instant();
state.consecutiveTransientFailures = 0;
transientFailureEvictedAddresses.remove(address);
break;

case IDLE:
Expand Down Expand Up @@ -439,13 +450,13 @@ private void probe(String address) {
Level.FINE,
"Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes",
new Object[] {address, state.consecutiveTransientFailures});
evictEndpoint(address);
evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE);
}
break;

case SHUTDOWN:
logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address);
evictEndpoint(address);
evictEndpoint(address, EvictionReason.SHUTDOWN);
break;

default:
Expand Down Expand Up @@ -482,16 +493,26 @@ void checkIdleEviction() {
}

for (String address : toEvict) {
evictEndpoint(address);
evictEndpoint(address, EvictionReason.IDLE);
}
}

/** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */
private void evictEndpoint(String address) {
evictEndpoint(address, EvictionReason.STALE);
}

/** Evicts an endpoint and records whether it should still be reported as unhealthy. */
private void evictEndpoint(String address, EvictionReason reason) {
logger.log(Level.FINE, "Evicting endpoint {0}", address);

stopProbing(address);
endpoints.remove(address);
if (reason == EvictionReason.TRANSIENT_FAILURE) {
transientFailureEvictedAddresses.add(address);
} else {
transientFailureEvictedAddresses.remove(address);
}
endpointCache.evict(address);
}

Expand Down Expand Up @@ -526,6 +547,10 @@ boolean isManaged(String address) {
return endpoints.containsKey(address);
}

boolean wasRecentlyEvictedTransientFailure(String address) {
return transientFailureEvictedAddresses.contains(address);
}

/** Returns the endpoint state for testing. */
@VisibleForTesting
EndpointState getEndpointState(String address) {
Expand Down Expand Up @@ -558,6 +583,7 @@ void shutdown() {
}
}
endpoints.clear();
transientFailureEvictedAddresses.clear();

scheduler.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) {
return finder;
}

@com.google.common.annotations.VisibleForTesting
void awaitPendingCacheUpdates() throws InterruptedException {
for (ChannelFinderReference ref : channelFinders.values()) {
ChannelFinder finder = ref.get();
if (finder != null) {
finder.awaitPendingUpdates();
}
}
}

/** Records real traffic to the selected endpoint for idle eviction tracking. */
private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) {
if (lifecycleManager == null) {
Expand Down Expand Up @@ -798,25 +808,25 @@ public void onMessage(ResponseT message) {
if (message instanceof PartialResultSet) {
PartialResultSet response = (PartialResultSet) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof ResultSet) {
ResultSet response = (ResultSet) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof Transaction) {
Transaction response = (Transaction) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromTransaction(response);
} else if (message instanceof CommitResponse) {
CommitResponse response = (CommitResponse) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
}
if (transactionId != null) {
Expand Down
Loading
Loading