Skip to content

Commit 26c365b

Browse files
committed
fix getState()
1 parent ecb86fd commit 26c365b

File tree

3 files changed

+19
-35
lines changed

3 files changed

+19
-35
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717
package com.google.cloud.spanner.spi.v1;
1818

1919
import com.google.api.core.InternalApi;
20-
import com.google.api.gax.grpc.GrpcTransportChannel;
2120
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
2221
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
23-
import com.google.api.gax.rpc.TransportChannelProvider;
2422
import com.google.cloud.spanner.ErrorCode;
2523
import com.google.cloud.spanner.SpannerExceptionFactory;
2624
import com.google.common.annotations.VisibleForTesting;
@@ -92,13 +90,14 @@ public ChannelEndpoint get(String address) {
9290
try {
9391
// Create a new provider with the same config but different endpoint.
9492
// This is thread-safe as withEndpoint() returns a new provider instance.
95-
TransportChannelProvider newProvider = createProviderWithAuthorityOverride(addr);
93+
InstantiatingGrpcChannelProvider newProvider =
94+
createProviderWithAuthorityOverride(addr);
9695
GrpcChannelEndpoint endpoint = new GrpcChannelEndpoint(addr, newProvider);
97-
logger.log(Level.INFO, "Location-aware endpoint created for address: {0}", addr);
96+
logger.log(Level.FINE, "Location-aware endpoint created for address: {0}", addr);
9897
return endpoint;
9998
} catch (IOException e) {
10099
logger.log(
101-
Level.WARNING, "Failed to create location-aware endpoint for address: " + addr, e);
100+
Level.FINE, "Failed to create location-aware endpoint for address: " + addr, e);
102101
throw SpannerExceptionFactory.newSpannerException(
103102
ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e);
104103
}
@@ -111,7 +110,7 @@ public ChannelEndpoint getIfPresent(String address) {
111110
return servers.get(address);
112111
}
113112

114-
private TransportChannelProvider createProviderWithAuthorityOverride(String address) {
113+
private InstantiatingGrpcChannelProvider createProviderWithAuthorityOverride(String address) {
115114
InstantiatingGrpcChannelProvider endpointProvider =
116115
(InstantiatingGrpcChannelProvider) baseProvider.withEndpoint(address);
117116
if (Objects.equals(defaultAuthority, address)) {
@@ -191,15 +190,19 @@ static class GrpcChannelEndpoint implements ChannelEndpoint {
191190
* @param provider the channel provider (must be a gRPC provider)
192191
* @throws IOException if the channel cannot be created
193192
*/
194-
GrpcChannelEndpoint(String address, TransportChannelProvider provider) throws IOException {
193+
GrpcChannelEndpoint(String address, InstantiatingGrpcChannelProvider provider)
194+
throws IOException {
195195
this.address = address;
196-
TransportChannelProvider readyProvider = provider;
196+
// Build a raw ManagedChannel directly instead of going through getTransportChannel(),
197+
// which wraps the channel in a ChannelPool that does not support getState().
198+
// Location-aware routing needs getState() to check channel connectivity.
199+
InstantiatingGrpcChannelProvider readyProvider = provider;
197200
if (provider.needsHeaders()) {
198-
readyProvider = provider.withHeaders(java.util.Collections.emptyMap());
201+
readyProvider =
202+
(InstantiatingGrpcChannelProvider)
203+
provider.withHeaders(java.util.Collections.emptyMap());
199204
}
200-
GrpcTransportChannel transportChannel =
201-
(GrpcTransportChannel) readyProvider.getTransportChannel();
202-
this.channel = (ManagedChannel) transportChannel.getChannel();
205+
this.channel = readyProvider.createDecoratedChannelBuilder().build();
203206
}
204207

205208
/**
@@ -240,7 +243,7 @@ public boolean isHealthy() {
240243
return ready;
241244
} catch (UnsupportedOperationException e) {
242245
logger.log(
243-
Level.WARNING,
246+
Level.FINE,
244247
"getState(false) unsupported for location-aware endpoint {0}, treating as not ready",
245248
address);
246249
return false;

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,10 @@ private ChannelFinder getOrCreateChannelFinder(String databaseId) {
159159
}
160160

161161
/** Records real traffic to the selected endpoint for idle eviction tracking. */
162-
private void onRequestRouted(
163-
@Nullable String session, @Nullable ChannelEndpoint selectedEndpoint) {
162+
private void onRequestRouted(@Nullable ChannelEndpoint selectedEndpoint) {
164163
if (lifecycleManager == null) {
165164
return;
166165
}
167-
// Record real traffic for idle eviction tracking.
168166
if (selectedEndpoint != null && !defaultEndpointAddress.equals(selectedEndpoint.getAddress())) {
169167
lifecycleManager.recordRealTraffic(selectedEndpoint.getAddress());
170168
}
@@ -545,8 +543,7 @@ public void sendMessage(RequestT message) {
545543
this.channelFinder = finder;
546544

547545
// Record real traffic for idle eviction tracking.
548-
String session = extractSessionFromMessage(message);
549-
parentChannel.onRequestRouted(session, endpoint);
546+
parentChannel.onRequestRouted(endpoint);
550547

551548
recordRouteSelectionTrace(
552549
methodDescriptor,
@@ -677,22 +674,6 @@ private void drainPendingRequests() {
677674
}
678675
}
679676

680-
@Nullable
681-
private static String extractSessionFromMessage(Object message) {
682-
if (message instanceof ReadRequest) {
683-
return ((ReadRequest) message).getSession();
684-
} else if (message instanceof ExecuteSqlRequest) {
685-
return ((ExecuteSqlRequest) message).getSession();
686-
} else if (message instanceof BeginTransactionRequest) {
687-
return ((BeginTransactionRequest) message).getSession();
688-
} else if (message instanceof CommitRequest) {
689-
return ((CommitRequest) message).getSession();
690-
} else if (message instanceof RollbackRequest) {
691-
return ((RollbackRequest) message).getSession();
692-
}
693-
return null;
694-
}
695-
696677
void maybeRecordAffinity(ByteString transactionId) {
697678
parentChannel.recordAffinity(transactionId, selectedEndpoint, allowDefaultAffinity);
698679
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyRangeCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
import java.util.Comparator;
3232
import java.util.HashMap;
3333
import java.util.HashSet;
34-
import java.util.Set;
3534
import java.util.List;
3635
import java.util.Map;
3736
import java.util.NavigableMap;
3837
import java.util.Objects;
38+
import java.util.Set;
3939
import java.util.TreeMap;
4040
import java.util.concurrent.ThreadLocalRandom;
4141
import java.util.concurrent.atomic.AtomicLong;

0 commit comments

Comments
 (0)