Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,23 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.Group;
import com.google.spanner.v1.Mutation;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RoutingHint;
import com.google.spanner.v1.Tablet;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import javax.annotation.Nullable;

Expand All @@ -54,6 +52,7 @@
@InternalApi
public final class ChannelFinder {
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
private static final int CACHE_UPDATE_DRAIN_BATCH_SIZE = 64;
private static final int MAX_CACHE_UPDATE_THREADS =
Math.max(2, Runtime.getRuntime().availableProcessors());
private static final ExecutorService CACHE_UPDATE_POOL = createCacheUpdatePool();
Expand All @@ -62,8 +61,11 @@ public final class ChannelFinder {
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
private final KeyRangeCache rangeCache;
private final AtomicReference<CacheUpdate> pendingUpdate = new AtomicReference<>();
private volatile java.util.concurrent.CountDownLatch drainingLatch;
private final ConcurrentLinkedQueue<PendingCacheUpdate> pendingUpdates =
new ConcurrentLinkedQueue<>();
private final AtomicBoolean drainScheduled = new AtomicBoolean();
private volatile java.util.concurrent.CountDownLatch drainingLatch =
new java.util.concurrent.CountDownLatch(0);
@Nullable private final EndpointLifecycleManager lifecycleManager;
@Nullable private final String finderKey;

Expand Down Expand Up @@ -105,46 +107,43 @@ private static ExecutorService createCacheUpdatePool() {
return executor;
}

private static final class PendingCacheUpdate {
private final CacheUpdate update;

private PendingCacheUpdate(CacheUpdate update) {
this.update = update;
}
}

private boolean isMaterialUpdate(CacheUpdate update) {
return update.getGroupCount() > 0
|| update.getRangeCount() > 0
|| (update.hasKeyRecipes() && update.getKeyRecipes().getRecipeCount() > 0);
}

private boolean shouldProcessUpdate(CacheUpdate update) {
if (isMaterialUpdate(update)) {
return true;
}
long updateDatabaseId = update.getDatabaseId();
return updateDatabaseId != 0 && databaseId.get() != updateDatabaseId;
}

public void update(CacheUpdate update) {
Set<String> currentAddresses;
synchronized (updateLock) {
long currentId = databaseId.get();
if (currentId != update.getDatabaseId()) {
if (currentId != 0) {
recipeCache.clear();
rangeCache.clear();
}
databaseId.set(update.getDatabaseId());
}
if (update.hasKeyRecipes()) {
recipeCache.addRecipes(update.getKeyRecipes());
}
rangeCache.addRanges(update);

// Notify the lifecycle manager about server addresses so it can create endpoints
// in the background and start probing, and evict stale endpoints atomically.
if (lifecycleManager != null && finderKey != null) {
Set<String> currentAddresses = new HashSet<>();
for (Group group : update.getGroupList()) {
for (Tablet tablet : group.getTabletsList()) {
String addr = tablet.getServerAddress();
if (!addr.isEmpty()) {
currentAddresses.add(addr);
}
}
}
// Also include addresses from existing cached tablets not in this update.
currentAddresses.addAll(rangeCache.getActiveAddresses());
// Atomically ensure endpoints exist and evict stale ones.
lifecycleManager.updateActiveAddresses(finderKey, currentAddresses);
}
applyUpdateLocked(update);
currentAddresses = snapshotActiveAddressesLocked();
}
publishLifecycleUpdate(currentAddresses);
}

public void updateAsync(CacheUpdate update) {
// Replace any pending update atomically. Each CacheUpdate contains the full current state,
// so intermediate updates can be safely dropped to prevent unbounded queue growth.
if (pendingUpdate.getAndSet(update) == null) {
// No previous pending update means no drain task is scheduled yet — submit one.
if (!shouldProcessUpdate(update)) {
return;
}
pendingUpdates.add(new PendingCacheUpdate(update));
if (drainScheduled.compareAndSet(false, true)) {
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
drainingLatch = latch;
CACHE_UPDATE_POOL.execute(
Expand All @@ -159,27 +158,89 @@ public void updateAsync(CacheUpdate update) {
}

private void drainPendingUpdate() {
CacheUpdate toApply;
while ((toApply = pendingUpdate.getAndSet(null)) != null) {
update(toApply);
List<PendingCacheUpdate> batch = new ArrayList<>(CACHE_UPDATE_DRAIN_BATCH_SIZE);
while (true) {
drainBatch(batch);
if (!batch.isEmpty()) {
applyBatch(batch);
batch.clear();
}
drainScheduled.set(false);
if (pendingUpdates.isEmpty() || !drainScheduled.compareAndSet(false, true)) {
return;
}
}
}

private void drainBatch(List<PendingCacheUpdate> batch) {
PendingCacheUpdate toApply;
while (batch.size() < CACHE_UPDATE_DRAIN_BATCH_SIZE
&& (toApply = pendingUpdates.poll()) != null) {
batch.add(toApply);
}
}

private void applyBatch(List<PendingCacheUpdate> batch) {
Set<String> currentAddresses;
synchronized (updateLock) {
for (PendingCacheUpdate pendingUpdate : batch) {
applyUpdateLocked(pendingUpdate.update);
}
currentAddresses = snapshotActiveAddressesLocked();
}
publishLifecycleUpdate(currentAddresses);
}

private void applyUpdateLocked(CacheUpdate update) {
long currentId = databaseId.get();
long updateDatabaseId = update.getDatabaseId();
if (updateDatabaseId != 0 && currentId != updateDatabaseId) {
if (currentId != 0) {
recipeCache.clear();
rangeCache.clear();
}
databaseId.set(updateDatabaseId);
}
if (update.hasKeyRecipes()) {
recipeCache.addRecipes(update.getKeyRecipes());
}
rangeCache.addRanges(update);
}

@Nullable
private Set<String> snapshotActiveAddressesLocked() {
if (lifecycleManager == null || finderKey == null) {
return null;
}
return rangeCache.getActiveAddresses();
}

private void publishLifecycleUpdate(@Nullable Set<String> currentAddresses) {
if (currentAddresses == null) {
return;
}
lifecycleManager.updateActiveAddressesAsync(finderKey, currentAddresses);
}

/**
* Test-only hook used by {@link KeyAwareChannel#awaitPendingCacheUpdates()} to wait until the
* async cache update worker has finished applying the latest pending update.
*/
@VisibleForTesting
void awaitPendingUpdates() throws InterruptedException {
// Spin until no pending update remains.
long deadline = System.nanoTime() + java.util.concurrent.TimeUnit.SECONDS.toNanos(5);
while (pendingUpdate.get() != null && System.nanoTime() < deadline) {
Thread.sleep(1);
}
// Wait for the drain task to fully complete (including the update() call).
java.util.concurrent.CountDownLatch latch = drainingLatch;
if (latch != null) {
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
while (System.nanoTime() < deadline) {
java.util.concurrent.CountDownLatch latch = drainingLatch;
if (latch != null) {
long remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
break;
}
latch.await(remainingNanos, java.util.concurrent.TimeUnit.NANOSECONDS);
}
if (pendingUpdates.isEmpty() && !drainScheduled.get()) {
return;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -103,6 +105,11 @@ static final class EndpointState {
private final ChannelEndpointCache endpointCache;
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();
private final Map<String, Long> finderGenerations = new ConcurrentHashMap<>();
private final Map<String, PendingActiveAddressUpdate> pendingActiveAddressUpdates =
new ConcurrentHashMap<>();
private final Set<String> queuedFinderKeys = ConcurrentHashMap.newKeySet();
private final ConcurrentLinkedQueue<String> queuedFinders = new ConcurrentLinkedQueue<>();

/**
* Active addresses reported by each ChannelFinder, keyed by database id.
Expand All @@ -118,15 +125,27 @@ static final class EndpointState {

private final Object activeAddressLock = new Object();

private final ExecutorService activeAddressUpdateExecutor;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final AtomicBoolean activeAddressDrainScheduled = new AtomicBoolean(false);
private final long probeIntervalSeconds;
private final Duration idleEvictionDuration;
private final Clock clock;
private final String defaultEndpointAddress;

private ScheduledFuture<?> evictionFuture;

private static final class PendingActiveAddressUpdate {
private final Set<String> activeAddresses;
private final long generation;

private PendingActiveAddressUpdate(Set<String> activeAddresses, long generation) {
this.activeAddresses = activeAddresses;
this.generation = generation;
}
}

EndpointLifecycleManager(ChannelEndpointCache endpointCache) {
this(
endpointCache,
Expand All @@ -146,6 +165,13 @@ static final class EndpointState {
this.idleEvictionDuration = idleEvictionDuration;
this.clock = clock;
this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress();
this.activeAddressUpdateExecutor =
Executors.newSingleThreadExecutor(
r -> {
Thread t = new Thread(r, "spanner-active-address-update");
t.setDaemon(true);
return t;
});
this.scheduler =
Executors.newScheduledThreadPool(
2,
Expand Down Expand Up @@ -213,6 +239,59 @@ private void clearTransientFailureEvictionMarker(String address) {
}
}

/**
* Enqueues active-address reconciliation on a dedicated worker so cache-map updates do not block
* on endpoint lifecycle bookkeeping.
*/
void updateActiveAddressesAsync(String finderKey, Set<String> activeAddresses) {
if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) {
return;
}
synchronized (activeAddressLock) {
long generation = finderGenerations.getOrDefault(finderKey, 0L);
pendingActiveAddressUpdates.put(
finderKey, new PendingActiveAddressUpdate(new HashSet<>(activeAddresses), generation));
if (queuedFinderKeys.add(finderKey)) {
queuedFinders.add(finderKey);
}
}
scheduleActiveAddressDrain();
}

private void scheduleActiveAddressDrain() {
if (!activeAddressDrainScheduled.compareAndSet(false, true)) {
return;
}
activeAddressUpdateExecutor.execute(this::drainPendingActiveAddressUpdates);
}

private void drainPendingActiveAddressUpdates() {
while (true) {
String finderKey = queuedFinders.poll();
if (finderKey == null) {
activeAddressDrainScheduled.set(false);
if (queuedFinders.isEmpty() || !activeAddressDrainScheduled.compareAndSet(false, true)) {
return;
}
continue;
}

queuedFinderKeys.remove(finderKey);
PendingActiveAddressUpdate pendingUpdate = pendingActiveAddressUpdates.remove(finderKey);
if (pendingUpdate == null) {
continue;
}

synchronized (activeAddressLock) {
long currentGeneration = finderGenerations.getOrDefault(finderKey, 0L);
if (currentGeneration != pendingUpdate.generation) {
continue;
}
}
updateActiveAddresses(finderKey, pendingUpdate.activeAddresses);
}
}

/**
* Records that real (non-probe) traffic was routed to an endpoint. This refreshes the idle
* eviction timer for this endpoint.
Expand Down Expand Up @@ -295,6 +374,9 @@ void unregisterFinder(String finderKey) {
return;
}
synchronized (activeAddressLock) {
finderGenerations.merge(finderKey, 1L, Long::sum);
pendingActiveAddressUpdates.remove(finderKey);
queuedFinderKeys.remove(finderKey);
if (activeAddressesPerFinder.remove(finderKey) == null) {
return;
}
Expand Down Expand Up @@ -588,6 +670,7 @@ void shutdown() {
}

logger.log(Level.FINE, "Shutting down endpoint lifecycle manager");
activeAddressUpdateExecutor.shutdownNow();

if (evictionFuture != null) {
evictionFuture.cancel(false);
Expand All @@ -602,6 +685,9 @@ void shutdown() {
}
endpoints.clear();
transientFailureEvictedAddresses.clear();
pendingActiveAddressUpdates.clear();
queuedFinderKeys.clear();
queuedFinders.clear();

scheduler.shutdown();
try {
Expand Down
Loading
Loading