Skip to content

Commit c4e4496

Browse files
committed
chore(spanner): track latency when using KeyAwareChannel
Track latency for streaming SQL and streaming reads that return only one PartialResultSet. The choice for which RPCs are eligible for tracking is done by the LatencyTracker. This allows us to include/exclude more RPCs in the future.
1 parent c29b99f commit c4e4496

File tree

6 files changed

+168
-13
lines changed

6 files changed

+168
-13
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelEndpoint.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,13 @@ public interface ChannelEndpoint {
7171
* @return the managed channel for this server
7272
*/
7373
ManagedChannel getChannel();
74+
75+
/**
76+
* Returns the latency tracker for this endpoint, or null if not supported.
77+
*
78+
* @return the latency tracker or null
79+
*/
80+
default LatencyTracker getLatencyTracker() {
81+
return null;
82+
}
7483
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EwmaLatencyTracker.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.api.core.BetaApi;
2020
import com.google.api.core.InternalApi;
2121
import com.google.common.base.Preconditions;
22+
import com.google.spanner.v1.PartialResultSet;
23+
import io.grpc.MethodDescriptor;
2224
import java.time.Duration;
2325
import java.util.concurrent.TimeUnit;
2426
import javax.annotation.concurrent.GuardedBy;
@@ -67,8 +69,7 @@ public double getScore() {
6769
}
6870
}
6971

70-
@Override
71-
public void update(Duration latency) {
72+
void update(Duration latency) {
7273
long latencyMicros;
7374
try {
7475
latencyMicros = TimeUnit.MICROSECONDS.convert(latency.toNanos(), TimeUnit.NANOSECONDS);
@@ -92,4 +93,21 @@ public void recordError(Duration penalty) {
9293
// Treat the error as a sample with high latency (penalty)
9394
update(penalty);
9495
}
96+
97+
@Override
98+
public boolean isEligible(MethodDescriptor<?, ?> methodDescriptor) {
99+
String methodName = methodDescriptor.getFullMethodName();
100+
return KeyAwareChannel.STREAMING_READ_METHOD.equals(methodName)
101+
|| KeyAwareChannel.STREAMING_SQL_METHOD.equals(methodName);
102+
}
103+
104+
@Override
105+
public void maybeUpdate(Object message, Duration latency) {
106+
if (message instanceof PartialResultSet) {
107+
PartialResultSet response = (PartialResultSet) message;
108+
if (response.getLast()) {
109+
update(latency);
110+
}
111+
}
112+
}
95113
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelEndpointCache.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ public GrpcChannelEndpointCache(InstantiatingGrpcChannelProvider channelProvider
6767
throws IOException {
6868
this.baseProvider = channelProvider;
6969
String defaultEndpoint = channelProvider.getEndpoint();
70-
this.defaultEndpoint = new GrpcChannelEndpoint(defaultEndpoint, channelProvider);
70+
this.defaultEndpoint =
71+
new GrpcChannelEndpoint(defaultEndpoint, channelProvider, new EwmaLatencyTracker());
7172
this.defaultAuthority = this.defaultEndpoint.getChannel().authority();
7273
this.servers.put(defaultEndpoint, this.defaultEndpoint);
7374
}
@@ -92,7 +93,8 @@ public ChannelEndpoint get(String address) {
9293
// This is thread-safe as withEndpoint() returns a new provider instance.
9394
InstantiatingGrpcChannelProvider newProvider =
9495
createProviderWithAuthorityOverride(addr);
95-
GrpcChannelEndpoint endpoint = new GrpcChannelEndpoint(addr, newProvider);
96+
GrpcChannelEndpoint endpoint =
97+
new GrpcChannelEndpoint(addr, newProvider, new EwmaLatencyTracker());
9698
logger.log(Level.FINE, "Location-aware endpoint created for address: {0}", addr);
9799
return endpoint;
98100
} catch (IOException e) {
@@ -178,10 +180,10 @@ private void shutdownChannel(GrpcChannelEndpoint server, boolean awaitTerminatio
178180
}
179181
}
180182

181-
/** gRPC implementation of {@link ChannelEndpoint}. */
182183
static class GrpcChannelEndpoint implements ChannelEndpoint {
183184
private final String address;
184185
private final ManagedChannel channel;
186+
private final LatencyTracker latencyTracker;
185187

186188
/**
187189
* Creates a server from a channel provider.
@@ -190,7 +192,8 @@ static class GrpcChannelEndpoint implements ChannelEndpoint {
190192
* @param provider the channel provider (must be a gRPC provider)
191193
* @throws IOException if the channel cannot be created
192194
*/
193-
GrpcChannelEndpoint(String address, InstantiatingGrpcChannelProvider provider)
195+
GrpcChannelEndpoint(
196+
String address, InstantiatingGrpcChannelProvider provider, LatencyTracker latencyTracker)
194197
throws IOException {
195198
this.address = address;
196199
// Build a raw ManagedChannel directly instead of going through getTransportChannel(),
@@ -203,6 +206,7 @@ static class GrpcChannelEndpoint implements ChannelEndpoint {
203206
provider.withHeaders(java.util.Collections.emptyMap());
204207
}
205208
this.channel = readyProvider.createDecoratedChannelBuilder().build();
209+
this.latencyTracker = latencyTracker;
206210
}
207211

208212
/**
@@ -212,9 +216,10 @@ static class GrpcChannelEndpoint implements ChannelEndpoint {
212216
* @param channel the managed channel
213217
*/
214218
@VisibleForTesting
215-
GrpcChannelEndpoint(String address, ManagedChannel channel) {
219+
GrpcChannelEndpoint(String address, ManagedChannel channel, LatencyTracker latencyTracker) {
216220
this.address = address;
217221
this.channel = channel;
222+
this.latencyTracker = latencyTracker;
218223
}
219224

220225
@Override
@@ -267,5 +272,10 @@ public boolean isTransientFailure() {
267272
public ManagedChannel getChannel() {
268273
return channel;
269274
}
275+
276+
@Override
277+
public LatencyTracker getLatencyTracker() {
278+
return latencyTracker;
279+
}
270280
}
271281
}

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.io.IOException;
4747
import java.lang.ref.ReferenceQueue;
4848
import java.lang.ref.SoftReference;
49+
import java.time.Duration;
4950
import java.util.HashSet;
5051
import java.util.Map;
5152
import java.util.Set;
@@ -72,9 +73,8 @@ final class KeyAwareChannel extends ManagedChannel {
7273
private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L;
7374
private static final long MAX_TRACKED_EXCLUDED_LOGICAL_REQUESTS = 100_000L;
7475
private static final long EXCLUDED_LOGICAL_REQUEST_TTL_MINUTES = 10L;
75-
private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead";
76-
private static final String STREAMING_SQL_METHOD =
77-
"google.spanner.v1.Spanner/ExecuteStreamingSql";
76+
static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead";
77+
static final String STREAMING_SQL_METHOD = "google.spanner.v1.Spanner/ExecuteStreamingSql";
7878
private static final String UNARY_SQL_METHOD = "google.spanner.v1.Spanner/ExecuteSql";
7979
private static final String BEGIN_TRANSACTION_METHOD =
8080
"google.spanner.v1.Spanner/BeginTransaction";
@@ -462,6 +462,7 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
462462
private boolean isReadOnlyBegin;
463463
private boolean readOnlyIsStrong;
464464
private final Object lock = new Object();
465+
volatile long startTimeNanos;
465466

466467
KeyAwareClientCall(
467468
KeyAwareChannel parentChannel,
@@ -610,6 +611,7 @@ public void sendMessage(RequestT message) {
610611
}
611612
delegate.start(responseListener, headers);
612613
drainPendingRequests();
614+
startTimeNanos = System.nanoTime();
613615
delegate.sendMessage(message);
614616
if (pendingHalfClose) {
615617
delegate.halfClose();
@@ -810,6 +812,7 @@ private RoutingDecision(@Nullable ChannelFinder finder, @Nullable ChannelEndpoin
810812
static final class KeyAwareClientCallListener<ResponseT>
811813
extends SimpleForwardingClientCallListener<ResponseT> {
812814
private final KeyAwareClientCall<?, ResponseT> call;
815+
private boolean firstMessageReceived = false;
813816

814817
KeyAwareClientCallListener(
815818
ClientCall.Listener<ResponseT> responseListener, KeyAwareClientCall<?, ResponseT> call) {
@@ -819,6 +822,18 @@ static final class KeyAwareClientCallListener<ResponseT>
819822

820823
@Override
821824
public void onMessage(ResponseT message) {
825+
if (!firstMessageReceived) {
826+
firstMessageReceived = true;
827+
// call.selectedEndpoint will in real usage never be null when we reach this
828+
// point.
829+
if (call.selectedEndpoint != null) {
830+
LatencyTracker tracker = call.selectedEndpoint.getLatencyTracker();
831+
if (tracker != null && tracker.isEligible(call.methodDescriptor)) {
832+
Duration latency = Duration.ofNanos(System.nanoTime() - call.startTimeNanos);
833+
tracker.maybeUpdate(message, latency);
834+
}
835+
}
836+
}
822837
ByteString transactionId = null;
823838
if (message instanceof PartialResultSet) {
824839
PartialResultSet response = (PartialResultSet) message;

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LatencyTracker.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.core.BetaApi;
2020
import com.google.api.core.InternalApi;
21+
import io.grpc.MethodDescriptor;
2122
import java.time.Duration;
2223

2324
/**
@@ -38,16 +39,25 @@ public interface LatencyTracker {
3839
double getScore();
3940

4041
/**
41-
* Updates the latency score with a new observation.
42+
* Potentially updates the latency score based on the response message.
4243
*
43-
* @param latency the observed latency.
44+
* @param message the response message.
45+
* @param latency the measured latency.
4446
*/
45-
void update(Duration latency);
47+
void maybeUpdate(Object message, Duration latency);
4648

4749
/**
4850
* Records an error and applies a latency penalty.
4951
*
5052
* @param penalty the penalty to apply.
5153
*/
5254
void recordError(Duration penalty);
55+
56+
/**
57+
* Returns whether a call with the given method descriptor is eligible for latency measurement.
58+
*
59+
* @param methodDescriptor the method descriptor of the call.
60+
* @return true if eligible, false otherwise.
61+
*/
62+
boolean isEligible(MethodDescriptor<?, ?> methodDescriptor);
5363
}

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,93 @@ public void resultSetCacheUpdateRoutesSubsequentRequest() throws Exception {
286286
assertThat(harness.endpointCache.callCountForAddress("routed:1234")).isEqualTo(1);
287287
}
288288

289+
@Test
290+
public void callTracksLatencyOnMessage() throws Exception {
291+
TestHarness harness = createHarness();
292+
ExecuteSqlRequest request = ExecuteSqlRequest.newBuilder().setSession(SESSION).build();
293+
294+
ClientCall<ExecuteSqlRequest, PartialResultSet> call =
295+
harness.channel.newCall(SpannerGrpc.getExecuteStreamingSqlMethod(), CallOptions.DEFAULT);
296+
CapturingListener<PartialResultSet> listener = new CapturingListener<>();
297+
call.start(listener, new Metadata());
298+
call.sendMessage(request);
299+
300+
@SuppressWarnings("unchecked")
301+
RecordingClientCall<ExecuteSqlRequest, PartialResultSet> delegate =
302+
(RecordingClientCall<ExecuteSqlRequest, PartialResultSet>)
303+
harness.defaultManagedChannel.latestCall();
304+
305+
FakeEndpoint defaultEndpoint = harness.endpointCache.defaultEndpoint;
306+
LatencyTracker tracker = defaultEndpoint.getLatencyTracker();
307+
308+
double initialScore = tracker.getScore();
309+
310+
// Emit a message with last=true to trigger onMessage and latency update.
311+
delegate.emitOnMessage(PartialResultSet.newBuilder().setLast(true).build());
312+
313+
// Verify that the score has been updated (it should not be equal to the initial score).
314+
double newScore = tracker.getScore();
315+
assertThat(newScore).isNotEqualTo(initialScore);
316+
}
317+
318+
@Test
319+
public void callDoesNotTrackLatencyForNonEligibleRpc() throws Exception {
320+
TestHarness harness = createHarness();
321+
ExecuteSqlRequest request = ExecuteSqlRequest.newBuilder().setSession(SESSION).build();
322+
323+
ClientCall<ExecuteSqlRequest, ResultSet> call =
324+
harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT);
325+
CapturingListener<ResultSet> listener = new CapturingListener<>();
326+
call.start(listener, new Metadata());
327+
call.sendMessage(request);
328+
329+
@SuppressWarnings("unchecked")
330+
RecordingClientCall<ExecuteSqlRequest, ResultSet> delegate =
331+
(RecordingClientCall<ExecuteSqlRequest, ResultSet>)
332+
harness.defaultManagedChannel.latestCall();
333+
334+
FakeEndpoint defaultEndpoint = harness.endpointCache.defaultEndpoint;
335+
LatencyTracker tracker = defaultEndpoint.getLatencyTracker();
336+
337+
double initialScore = tracker.getScore();
338+
339+
// Emit a message.
340+
delegate.emitOnMessage(ResultSet.newBuilder().build());
341+
342+
// Verify that the score has not been updated.
343+
double newScore = tracker.getScore();
344+
assertThat(newScore).isEqualTo(initialScore);
345+
}
346+
347+
@Test
348+
public void callDoesNotTrackLatencyForNonLastPartialResultSet() throws Exception {
349+
TestHarness harness = createHarness();
350+
ExecuteSqlRequest request = ExecuteSqlRequest.newBuilder().setSession(SESSION).build();
351+
352+
ClientCall<ExecuteSqlRequest, PartialResultSet> call =
353+
harness.channel.newCall(SpannerGrpc.getExecuteStreamingSqlMethod(), CallOptions.DEFAULT);
354+
CapturingListener<PartialResultSet> listener = new CapturingListener<>();
355+
call.start(listener, new Metadata());
356+
call.sendMessage(request);
357+
358+
@SuppressWarnings("unchecked")
359+
RecordingClientCall<ExecuteSqlRequest, PartialResultSet> delegate =
360+
(RecordingClientCall<ExecuteSqlRequest, PartialResultSet>)
361+
harness.defaultManagedChannel.latestCall();
362+
363+
FakeEndpoint defaultEndpoint = harness.endpointCache.defaultEndpoint;
364+
LatencyTracker tracker = defaultEndpoint.getLatencyTracker();
365+
366+
double initialScore = tracker.getScore();
367+
368+
// Emit a message with last=false.
369+
delegate.emitOnMessage(PartialResultSet.newBuilder().setLast(false).build());
370+
371+
// Verify that the score has not been updated.
372+
double newScore = tracker.getScore();
373+
assertThat(newScore).isEqualTo(initialScore);
374+
}
375+
289376
@Test
290377
public void beginTransactionWithMutationKeyAddsRoutingHint() throws Exception {
291378
TestHarness harness = createHarness();
@@ -1350,12 +1437,18 @@ int callCountForAddress(String address) {
13501437
private static final class FakeEndpoint implements ChannelEndpoint {
13511438
private final String address;
13521439
private final FakeManagedChannel channel;
1440+
private final LatencyTracker latencyTracker = new EwmaLatencyTracker();
13531441

13541442
private FakeEndpoint(String address) {
13551443
this.address = address;
13561444
this.channel = new FakeManagedChannel(address);
13571445
}
13581446

1447+
@Override
1448+
public LatencyTracker getLatencyTracker() {
1449+
return latencyTracker;
1450+
}
1451+
13591452
@Override
13601453
public String getAddress() {
13611454
return address;

0 commit comments

Comments
 (0)