Skip to content

Commit 2990c28

Browse files
committed
tcp: suggested changes
1 parent 2b44889 commit 2990c28

File tree

8 files changed

+208
-47
lines changed

8 files changed

+208
-47
lines changed

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ private void onRstStreamRead(int streamId, long errorCode) {
486486

487487
@Override
488488
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
489+
tcpMetrics.recordTcpInfo(ctx.channel());
489490
logger.fine("Network channel being closed by the application.");
490491
if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
491492
lifecycleManager.notifyShutdown(

netty/src/main/java/io/grpc/netty/NettyServer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static io.netty.channel.ChannelOption.ALLOCATOR;
2222
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
2323

24+
import com.google.common.annotations.VisibleForTesting;
2425
import com.google.common.base.MoreObjects;
2526
import com.google.common.base.Preconditions;
2627
import com.google.common.util.concurrent.ListenableFuture;
@@ -68,6 +69,7 @@
6869
import java.util.concurrent.Callable;
6970
import java.util.logging.Level;
7071
import java.util.logging.Logger;
72+
import javax.annotation.Nullable;
7173

7274
/**
7375
* Netty-based server implementation.
@@ -138,7 +140,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
138140
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
139141
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
140142
int maxRstCount, long maxRstPeriodNanos,
141-
Attributes eagAttributes, InternalChannelz channelz) {
143+
Attributes eagAttributes, InternalChannelz channelz,
144+
@Nullable MetricRecorder metricRecorder) {
142145
this.addresses = checkNotNull(addresses, "addresses");
143146
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
144147
checkNotNull(channelOptions, "channelOptions");
@@ -174,6 +177,13 @@ class NettyServer implements InternalServer, InternalWithLogId {
174177
this.channelz = Preconditions.checkNotNull(channelz);
175178
this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" :
176179
String.valueOf(addresses));
180+
this.metricRecorder = metricRecorder;
181+
}
182+
183+
@VisibleForTesting
184+
@Nullable
185+
MetricRecorder getMetricRecorder() {
186+
return metricRecorder;
177187
}
178188

179189
@Override

netty/src/main/java/io/grpc/netty/NettyServerBuilder.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.grpc.ExperimentalApi;
3333
import io.grpc.ForwardingServerBuilder;
3434
import io.grpc.Internal;
35+
import io.grpc.MetricRecorder;
3536
import io.grpc.ServerBuilder;
3637
import io.grpc.ServerCredentials;
3738
import io.grpc.ServerStreamTracer;
@@ -60,6 +61,7 @@
6061
import java.util.List;
6162
import java.util.Map;
6263
import java.util.concurrent.TimeUnit;
64+
import javax.annotation.Nullable;
6365
import javax.net.ssl.SSLException;
6466

6567
/**
@@ -116,6 +118,7 @@ public final class NettyServerBuilder extends ForwardingServerBuilder<NettyServe
116118
private int maxRstCount;
117119
private long maxRstPeriodNanos;
118120
private Attributes eagAttributes = Attributes.EMPTY;
121+
@Nullable private MetricRecorder metricRecorder;
119122

120123
/**
121124
* Creates a server builder that will bind to the given port.
@@ -703,6 +706,20 @@ void eagAttributes(Attributes eagAttributes) {
703706
this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
704707
}
705708

709+
/**
710+
* Sets the {@link MetricRecorder} to use for recording metrics.
711+
*
712+
* @param metricRecorder the metric recorder to use
713+
* @return this
714+
* @since 1.81.0
715+
*/
716+
@CanIgnoreReturnValue
717+
public NettyServerBuilder setMetricRecorder(
718+
@Nullable MetricRecorder metricRecorder) {
719+
this.metricRecorder = metricRecorder;
720+
return this;
721+
}
722+
706723
NettyServer buildTransportServers(
707724
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
708725
assertEventLoopsAndChannelType();
@@ -737,7 +754,8 @@ NettyServer buildTransportServers(
737754
maxRstCount,
738755
maxRstPeriodNanos,
739756
eagAttributes,
740-
this.serverImplBuilder.getChannelz());
757+
this.serverImplBuilder.getChannelz(),
758+
metricRecorder);
741759
}
742760

743761
@VisibleForTesting

netty/src/main/java/io/grpc/netty/TcpMetrics.java

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,18 @@
2323
import io.grpc.MetricInstrumentRegistry;
2424
import io.grpc.MetricRecorder;
2525
import io.netty.channel.Channel;
26+
import java.lang.reflect.Constructor;
2627
import java.lang.reflect.Method;
2728
import java.util.Arrays;
2829
import java.util.Collections;
30+
import java.util.HashMap;
2931
import java.util.List;
32+
import java.util.Map;
33+
import javax.annotation.Nullable;
3034

35+
/**
36+
* Utility for collecting TCP metrics from Netty channels.
37+
*/
3138
final class TcpMetrics {
3239

3340
private static final Metrics DEFAULT_METRICS;
@@ -51,9 +58,9 @@ static Metrics getDefaultMetrics() {
5158
static final class Metrics {
5259
final LongCounterMetricInstrument connectionsCreated;
5360
final LongUpDownCounterMetricInstrument connectionCount;
54-
final LongCounterMetricInstrument packetsRetransmitted;
55-
final LongCounterMetricInstrument recurringRetransmits;
56-
final DoubleHistogramMetricInstrument minRtt;
61+
@Nullable final LongCounterMetricInstrument packetsRetransmitted;
62+
@Nullable final LongCounterMetricInstrument recurringRetransmits;
63+
@Nullable final DoubleHistogramMetricInstrument minRtt;
5764

5865
Metrics(MetricInstrumentRegistry registry, boolean epollAvailable) {
5966
List<String> requiredLabels = Collections.singletonList("grpc.target");
@@ -171,12 +178,32 @@ private static DoubleHistogramMetricInstrument safelyRegisterDoubleHistogram(
171178
}
172179
}
173180

181+
private static final class ChannelReflectionAccessor {
182+
final Method tcpInfoMethod;
183+
final Constructor<?> tcpInfoConstructor;
184+
final Method totalRetransMethod;
185+
final Method retransmitsMethod;
186+
final Method rttMethod;
187+
188+
ChannelReflectionAccessor(
189+
Method tcpInfoMethod, Constructor<?> tcpInfoConstructor, Method totalRetransMethod,
190+
Method retransmitsMethod, Method rttMethod) {
191+
this.tcpInfoMethod = tcpInfoMethod;
192+
this.tcpInfoConstructor = tcpInfoConstructor;
193+
this.totalRetransMethod = totalRetransMethod;
194+
this.retransmitsMethod = retransmitsMethod;
195+
this.rttMethod = rttMethod;
196+
}
197+
}
198+
174199
static final class Tracker {
175200
private final MetricRecorder metricRecorder;
176201
private final String target;
177202
private final Metrics metrics;
178203
private final String epollSocketChannelClassName;
179204
private final String epollTcpInfoClassName;
205+
private volatile ChannelReflectionAccessor channelReflectionAccessor;
206+
private long lastTotalRetrans;
180207

181208
Tracker(MetricRecorder metricRecorder, String target) {
182209
this(metricRecorder, target, DEFAULT_METRICS);
@@ -197,6 +224,9 @@ static final class Tracker {
197224
this.epollTcpInfoClassName = epollTcpInfoClassName;
198225
}
199226

227+
private static final Map<String, ChannelReflectionAccessor> accessorCache =
228+
new HashMap<>();
229+
200230
private static final long RECORD_INTERVAL_MILLIS;
201231

202232
static {
@@ -261,43 +291,65 @@ void channelInactive(Channel channel) {
261291
metricRecorder.addLongUpDownCounter(metrics.connectionCount, -1,
262292
Collections.singletonList(target), labelValues);
263293
// Final collection on close
264-
recordTcpInfo(channel);
294+
recordTcpInfo(channel, true);
265295
}
266296
}
267297

268-
private void recordTcpInfo(Channel channel) {
298+
void recordTcpInfo(Channel channel) {
299+
recordTcpInfo(channel, false);
300+
}
301+
302+
void recordTcpInfo(Channel channel, boolean isClosed) {
269303
if (metricRecorder == null || target == null) {
270304
return;
271305
}
272306
java.util.List<String> labelValues = getLabelValues(channel);
273307
try {
274-
if (channel.getClass().getName().equals(epollSocketChannelClassName)) {
275-
Class<?> tcpInfoClass = Class.forName(epollTcpInfoClassName);
276-
Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", tcpInfoClass);
277-
Object info = tcpInfoClass.getDeclaredConstructor().newInstance();
278-
tcpInfoMethod.invoke(channel, info);
279-
280-
Method totalRetransMethod = tcpInfoClass.getMethod("totalRetrans");
281-
Method retransmitsMethod = tcpInfoClass.getMethod("retransmits");
282-
Method rttMethod = tcpInfoClass.getMethod("rtt");
283-
284-
long totalRetrans = (Long) totalRetransMethod.invoke(info);
285-
int retransmits = (Integer) retransmitsMethod.invoke(info);
286-
long rtt = (Long) rttMethod.invoke(info);
287-
288-
if (metrics.packetsRetransmitted != null) {
289-
metricRecorder.addLongCounter(metrics.packetsRetransmitted, totalRetrans,
290-
Collections.singletonList(target), labelValues);
308+
if (channelReflectionAccessor == null) {
309+
if (!channel.getClass().getName().equals(epollSocketChannelClassName)) {
310+
return;
291311
}
292-
if (metrics.recurringRetransmits != null) {
293-
metricRecorder.addLongCounter(metrics.recurringRetransmits, retransmits,
294-
Collections.singletonList(target), labelValues);
312+
synchronized (accessorCache) {
313+
channelReflectionAccessor = accessorCache.get(epollTcpInfoClassName);
314+
if (channelReflectionAccessor == null) {
315+
Class<?> tcpInfoClass = Class.forName(epollTcpInfoClassName);
316+
Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", tcpInfoClass);
317+
Constructor<?> tcpInfoConstructor = tcpInfoClass.getDeclaredConstructor();
318+
Method totalRetransMethod = tcpInfoClass.getMethod("totalRetrans");
319+
Method retransmitsMethod = tcpInfoClass.getMethod("retransmits");
320+
Method rttMethod = tcpInfoClass.getMethod("rtt");
321+
322+
channelReflectionAccessor = new ChannelReflectionAccessor(
323+
tcpInfoMethod, tcpInfoConstructor, totalRetransMethod, retransmitsMethod,
324+
rttMethod);
325+
accessorCache.put(epollTcpInfoClassName, channelReflectionAccessor);
326+
}
295327
}
296-
if (metrics.minRtt != null) {
297-
metricRecorder.recordDoubleHistogram(metrics.minRtt,
298-
rtt / 1000000.0, // Convert microseconds to seconds
328+
}
329+
330+
Object info = channelReflectionAccessor.tcpInfoConstructor.newInstance();
331+
channelReflectionAccessor.tcpInfoMethod.invoke(channel, info);
332+
333+
long totalRetrans = (Long) channelReflectionAccessor.totalRetransMethod.invoke(info);
334+
int retransmits = (Integer) channelReflectionAccessor.retransmitsMethod.invoke(info);
335+
long rtt = (Long) channelReflectionAccessor.rttMethod.invoke(info);
336+
337+
if (metrics.packetsRetransmitted != null) {
338+
long delta = totalRetrans - lastTotalRetrans;
339+
if (delta > 0) {
340+
metricRecorder.addLongCounter(metrics.packetsRetransmitted, delta,
299341
Collections.singletonList(target), labelValues);
300342
}
343+
lastTotalRetrans = totalRetrans;
344+
}
345+
if (isClosed && metrics.recurringRetransmits != null) {
346+
metricRecorder.addLongCounter(metrics.recurringRetransmits, retransmits,
347+
Collections.singletonList(target), labelValues);
348+
}
349+
if (metrics.minRtt != null) {
350+
metricRecorder.recordDoubleHistogram(metrics.minRtt,
351+
rtt / 1000000.0, // Convert microseconds to seconds
352+
Collections.singletonList(target), labelValues);
301353
}
302354
} catch (Throwable t) {
303355
// Epoll not available or error getting tcp_info, just ignore.

netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1198,7 +1198,8 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize,
11981198
MAX_RST_COUNT_DISABLED,
11991199
0,
12001200
Attributes.EMPTY,
1201-
channelz);
1201+
channelz,
1202+
null);
12021203
server.start(serverListener);
12031204
address = TestUtils.testServerAddress((InetSocketAddress) server.getListenSocketAddress());
12041205
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());

netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,4 +189,15 @@ public void useNioTransport_shouldNotThrow() {
189189

190190
builder.assertEventLoopsAndChannelType();
191191
}
192+
193+
@Test
194+
public void setMetricRecorder_propagatedToServer() throws Exception {
195+
io.grpc.MetricRecorder recorder = mock(io.grpc.MetricRecorder.class);
196+
builder.setMetricRecorder(recorder);
197+
198+
NettyServer server = builder.buildTransportServers(
199+
ImmutableList.<ServerStreamTracer.Factory>of());
200+
201+
assertThat(server.getMetricRecorder()).isSameInstanceAs(recorder);
202+
}
192203
}

netty/src/test/java/io/grpc/netty/NettyServerTest.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ class NoHandlerProtocolNegotiator implements ProtocolNegotiator {
161161
0,
162162
0, // ignore
163163
Attributes.EMPTY,
164-
channelz);
164+
channelz,
165+
null);
165166
final SettableFuture<Void> serverShutdownCalled = SettableFuture.create();
166167
ns.start(new ServerListener() {
167168
@Override
@@ -218,7 +219,8 @@ public void multiPortStartStopGet() throws Exception {
218219
0,
219220
0, // ignore
220221
Attributes.EMPTY,
221-
channelz);
222+
channelz,
223+
null);
222224
final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
223225
ns.start(new ServerListener() {
224226
@Override
@@ -298,7 +300,8 @@ public void multiPortConnections() throws Exception {
298300
0,
299301
0, // ignore
300302
Attributes.EMPTY,
301-
channelz);
303+
channelz,
304+
null);
302305
final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
303306
ns.start(new ServerListener() {
304307
@Override
@@ -366,7 +369,8 @@ public void getPort_notStarted() {
366369
0,
367370
0, // ignore
368371
Attributes.EMPTY,
369-
channelz);
372+
channelz,
373+
null);
370374

371375
assertThat(ns.getListenSocketAddress()).isEqualTo(addr);
372376
assertThat(ns.getListenSocketAddresses()).isEqualTo(addresses);
@@ -447,7 +451,8 @@ class TestProtocolNegotiator implements ProtocolNegotiator {
447451
0,
448452
0, // ignore
449453
eagAttributes,
450-
channelz);
454+
channelz,
455+
null);
451456
ns.start(new ServerListener() {
452457
@Override
453458
public ServerTransportListener transportCreated(ServerTransport transport) {
@@ -501,7 +506,8 @@ public void channelzListenSocket() throws Exception {
501506
0,
502507
0, // ignore
503508
Attributes.EMPTY,
504-
channelz);
509+
channelz,
510+
null);
505511
final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
506512
ns.start(new ServerListener() {
507513
@Override
@@ -649,7 +655,8 @@ private NettyServer getServer(List<SocketAddress> addr, EventLoopGroup ev) {
649655
0,
650656
0, // ignore
651657
Attributes.EMPTY,
652-
channelz);
658+
channelz,
659+
null);
653660
}
654661

655662
private static class NoopServerTransportListener implements ServerTransportListener {

0 commit comments

Comments
 (0)