Skip to content

Commit ca9edb9

Browse files
authored
fix(spanner): enforce READY-only location aware routing and add endpoint lifecycle management (#12678)
2 parents 7b31e5d + 7deda1d commit ca9edb9

File tree

13 files changed

+1795
-109
lines changed

13 files changed

+1795
-109
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,27 @@ public interface ChannelEndpoint {
4141
String getAddress();
4242

4343
/**
44-
* Returns whether this server is ready to accept RPCs.
44+
* Returns whether this server's channel is in {@code READY} state and can accept location-aware
45+
* RPCs.
4546
*
46-
* <p>A server is considered unhealthy if:
47+
* <p>Only endpoints in {@code READY} state are eligible for location-aware routing. Endpoints in
48+
* {@code IDLE}, {@code CONNECTING}, {@code TRANSIENT_FAILURE}, or {@code SHUTDOWN} are not
49+
* considered healthy for location-aware routing purposes.
4750
*
48-
* <ul>
49-
* <li>The underlying channel is shutdown or terminated
50-
* <li>The channel is in a transient failure state
51-
* </ul>
52-
*
53-
* @return true if the server is healthy and ready to accept RPCs
51+
* @return true if the channel is in READY state
5452
*/
5553
boolean isHealthy();
5654

55+
/**
56+
* Returns whether this server's channel is in {@code TRANSIENT_FAILURE} state.
57+
*
58+
* <p>When an endpoint is in transient failure, it should be reported as a skipped tablet in
59+
* routing hints so the server can refresh the client cache.
60+
*
61+
* @return true if the channel is in TRANSIENT_FAILURE state
62+
*/
63+
boolean isTransientFailure();
64+
5765
/**
5866
* Returns the gRPC channel for making RPCs to this server.
5967
*

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpointCache.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@ public interface ChannelEndpointCache {
5454
*/
5555
ChannelEndpoint get(String address);
5656

57+
/**
58+
* Returns a cached channel for the given address without creating it.
59+
*
60+
* <p>Unlike {@link #get(String)}, this method does not create a new endpoint if one does not
61+
* already exist in the cache. This is used by location-aware routing to avoid foreground endpoint
62+
* creation on the request path.
63+
*
64+
* @param address the server address in "host:port" format
65+
* @return the cached channel instance, or null if no endpoint exists for this address
66+
*/
67+
@javax.annotation.Nullable
68+
ChannelEndpoint getIfPresent(String address);
69+
5770
/**
5871
* Evicts a server connection from the cache and gracefully shuts down its channel.
5972
*

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,22 @@
2222
import com.google.spanner.v1.CommitRequest;
2323
import com.google.spanner.v1.DirectedReadOptions;
2424
import com.google.spanner.v1.ExecuteSqlRequest;
25+
import com.google.spanner.v1.Group;
2526
import com.google.spanner.v1.Mutation;
2627
import com.google.spanner.v1.ReadRequest;
2728
import com.google.spanner.v1.RoutingHint;
29+
import com.google.spanner.v1.Tablet;
2830
import com.google.spanner.v1.TransactionOptions;
2931
import com.google.spanner.v1.TransactionSelector;
3032
import java.util.ArrayList;
33+
import java.util.HashSet;
3134
import java.util.List;
3235
import java.util.Objects;
36+
import java.util.Set;
3337
import java.util.concurrent.ThreadLocalRandom;
3438
import java.util.concurrent.atomic.AtomicLong;
3539
import java.util.function.Predicate;
40+
import javax.annotation.Nullable;
3641

3742
/**
3843
* Finds a server for a request using location-aware routing metadata.
@@ -47,9 +52,25 @@ public final class ChannelFinder {
4752
private final AtomicLong databaseId = new AtomicLong();
4853
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
4954
private final KeyRangeCache rangeCache;
55+
@Nullable private final EndpointLifecycleManager lifecycleManager;
56+
@Nullable private final String finderKey;
5057

5158
public ChannelFinder(ChannelEndpointCache endpointCache) {
52-
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache));
59+
this(endpointCache, null, null);
60+
}
61+
62+
public ChannelFinder(
63+
ChannelEndpointCache endpointCache, @Nullable EndpointLifecycleManager lifecycleManager) {
64+
this(endpointCache, lifecycleManager, null);
65+
}
66+
67+
ChannelFinder(
68+
ChannelEndpointCache endpointCache,
69+
@Nullable EndpointLifecycleManager lifecycleManager,
70+
@Nullable String finderKey) {
71+
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager);
72+
this.lifecycleManager = lifecycleManager;
73+
this.finderKey = finderKey;
5374
}
5475

5576
void useDeterministicRandom() {
@@ -70,6 +91,24 @@ public void update(CacheUpdate update) {
7091
recipeCache.addRecipes(update.getKeyRecipes());
7192
}
7293
rangeCache.addRanges(update);
94+
95+
// Notify the lifecycle manager about server addresses so it can create endpoints
96+
// in the background and start probing, and evict stale endpoints atomically.
97+
if (lifecycleManager != null && finderKey != null) {
98+
Set<String> currentAddresses = new HashSet<>();
99+
for (Group group : update.getGroupList()) {
100+
for (Tablet tablet : group.getTabletsList()) {
101+
String addr = tablet.getServerAddress();
102+
if (!addr.isEmpty()) {
103+
currentAddresses.add(addr);
104+
}
105+
}
106+
}
107+
// Also include addresses from existing cached tablets not in this update.
108+
currentAddresses.addAll(rangeCache.getActiveAddresses());
109+
// Atomically ensure endpoints exist and evict stale ones.
110+
lifecycleManager.updateActiveAddresses(finderKey, currentAddresses);
111+
}
73112
}
74113
}
75114

0 commit comments

Comments
 (0)