Skip to content

Commit 4082b2d

Browse files
committed
feat: Implement TCP metrics collection and pipeline MetricRecorder (Address PR comments)
1 parent fc08423 commit 4082b2d

33 files changed

+1106
-56
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.List;
23+
24+
/**
25+
* TCP Metrics defined to be shared across transport implementations.
26+
*/
27+
@Internal
28+
public final class InternalTcpMetrics {
29+
30+
private InternalTcpMetrics() {
31+
}
32+
33+
private static final List<String> OPTIONAL_LABELS = Arrays.asList(
34+
"network.local.address",
35+
"network.local.port",
36+
"network.peer.address",
37+
"network.peer.port");
38+
39+
public static final DoubleHistogramMetricInstrument MIN_RTT_INSTRUMENT =
40+
MetricInstrumentRegistry.getDefaultRegistry()
41+
.registerDoubleHistogram(
42+
"grpc.tcp.min_rtt",
43+
"Minimum round-trip time of a TCP connection",
44+
"s",
45+
getMinRttBuckets(),
46+
Collections.emptyList(),
47+
OPTIONAL_LABELS,
48+
false);
49+
50+
public static final LongCounterMetricInstrument CONNECTIONS_CREATED_INSTRUMENT =
51+
MetricInstrumentRegistry
52+
.getDefaultRegistry()
53+
.registerLongCounter(
54+
"grpc.tcp.connections_created",
55+
"The total number of TCP connections established.",
56+
"{connection}",
57+
Collections.emptyList(),
58+
OPTIONAL_LABELS,
59+
false);
60+
61+
public static final LongUpDownCounterMetricInstrument CONNECTION_COUNT_INSTRUMENT =
62+
MetricInstrumentRegistry
63+
.getDefaultRegistry()
64+
.registerLongUpDownCounter(
65+
"grpc.tcp.connection_count",
66+
"The current number of active TCP connections.",
67+
"{connection}",
68+
Collections.emptyList(),
69+
OPTIONAL_LABELS,
70+
false);
71+
72+
public static final LongCounterMetricInstrument PACKETS_RETRANSMITTED_INSTRUMENT =
73+
MetricInstrumentRegistry
74+
.getDefaultRegistry()
75+
.registerLongCounter(
76+
"grpc.tcp.packets_retransmitted",
77+
"The total number of packets retransmitted for all TCP connections.",
78+
"{packet}",
79+
Collections.emptyList(),
80+
OPTIONAL_LABELS,
81+
false);
82+
83+
public static final LongCounterMetricInstrument RECURRING_RETRANSMITS_INSTRUMENT =
84+
MetricInstrumentRegistry
85+
.getDefaultRegistry()
86+
.registerLongCounter(
87+
"grpc.tcp.recurring_retransmits",
88+
"The total number of times the retransmit timer popped for all TCP"
89+
+ " connections.",
90+
"{timeout}",
91+
Collections.emptyList(),
92+
OPTIONAL_LABELS,
93+
false);
94+
95+
private static List<Double> getMinRttBuckets() {
96+
List<Double> buckets = new ArrayList<>(100);
97+
for (int i = 1; i <= 100; i++) {
98+
buckets.add(1e-6 * Math.pow(2.0, i * 0.24));
99+
}
100+
return Collections.unmodifiableList(buckets);
101+
}
102+
}

api/src/main/java/io/grpc/NameResolver.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ public static final class Args {
355355
@Nullable private final ChannelLogger channelLogger;
356356
@Nullable private final Executor executor;
357357
@Nullable private final String overrideAuthority;
358-
@Nullable private final MetricRecorder metricRecorder;
358+
private final MetricRecorder metricRecorder;
359359
@Nullable private final NameResolverRegistry nameResolverRegistry;
360360
@Nullable private final IdentityHashMap<Key<?>, Object> customArgs;
361361

@@ -497,7 +497,6 @@ public String getOverrideAuthority() {
497497
/**
498498
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
499499
*/
500-
@Nullable
501500
public MetricRecorder getMetricRecorder() {
502501
return metricRecorder;
503502
}

core/src/main/java/io/grpc/internal/ClientTransportFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.grpc.ChannelCredentials;
2525
import io.grpc.ChannelLogger;
2626
import io.grpc.HttpConnectProxiedSocketAddress;
27+
import io.grpc.MetricRecorder;
2728
import java.io.Closeable;
2829
import java.net.SocketAddress;
2930
import java.util.Collection;
@@ -91,6 +92,7 @@ final class ClientTransportOptions {
9192
private Attributes eagAttributes = Attributes.EMPTY;
9293
@Nullable private String userAgent;
9394
@Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr;
95+
@Nullable private MetricRecorder metricRecorder;
9496

9597
public ChannelLogger getChannelLogger() {
9698
return channelLogger;
@@ -101,6 +103,16 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) {
101103
return this;
102104
}
103105

106+
@Nullable
107+
public MetricRecorder getMetricRecorder() {
108+
return metricRecorder;
109+
}
110+
111+
public ClientTransportOptions setMetricRecorder(@Nullable MetricRecorder metricRecorder) {
112+
this.metricRecorder = metricRecorder;
113+
return this;
114+
}
115+
104116
public String getAuthority() {
105117
return authority;
106118
}

core/src/main/java/io/grpc/internal/InternalServer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public interface InternalServer {
5858
/**
5959
* Returns the first listen socket stats of this server. May return {@code null}.
6060
*/
61-
@Nullable InternalInstrumented<SocketStats> getListenSocketStats();
61+
@Nullable
62+
InternalInstrumented<SocketStats> getListenSocketStats();
6263

6364
/**
6465
* Returns a list of listening socket addresses. May change after {@link #start(ServerListener)}
@@ -69,6 +70,7 @@ public interface InternalServer {
6970
/**
7071
* Returns a list of listen socket stats of this server. May return {@code null}.
7172
*/
72-
@Nullable List<InternalInstrumented<SocketStats>> getListenSocketStatsList();
73+
@Nullable
74+
List<InternalInstrumented<SocketStats>> getListenSocketStatsList();
7375

7476
}

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
8080
private final InternalChannelz channelz;
8181
private final CallTracer callsTracer;
8282
private final ChannelTracer channelTracer;
83+
private final MetricRecorder metricRecorder;
8384
private final ChannelLogger channelLogger;
8485
private final boolean reconnectDisabled;
8586

@@ -191,6 +192,7 @@ protected void handleNotInUse() {
191192
this.scheduledExecutor = scheduledExecutor;
192193
this.connectingTimer = stopwatchSupplier.get();
193194
this.syncContext = syncContext;
195+
this.metricRecorder = metricRecorder;
194196
this.callback = callback;
195197
this.channelz = channelz;
196198
this.callsTracer = callsTracer;
@@ -265,6 +267,7 @@ private void startNewTransport() {
265267
.setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
266268
.setEagAttributes(currentEagAttributes)
267269
.setUserAgent(userAgent)
270+
.setMetricRecorder(metricRecorder)
268271
.setHttpConnectProxiedSocketAddress(proxiedAddr);
269272
TransportLogger transportLogger = new TransportLogger();
270273
// In case the transport logs in the constructor, use the subchannel logId

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
143143
InternalServer transportServer,
144144
Context rootContext) {
145145
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
146+
146147
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
147148
this.fallbackRegistry =
148149
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");

core/src/main/java/io/grpc/internal/ServerImplBuilder.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import io.grpc.HandlerRegistry;
3232
import io.grpc.InternalChannelz;
3333
import io.grpc.InternalConfiguratorRegistry;
34+
import io.grpc.MetricInstrumentRegistry;
35+
import io.grpc.MetricRecorder;
36+
import io.grpc.MetricSink;
3437
import io.grpc.Server;
3538
import io.grpc.ServerBuilder;
3639
import io.grpc.ServerCallExecutorSupplier;
@@ -80,6 +83,7 @@ public static ServerBuilder<?> forPort(int port) {
8083
final List<ServerTransportFilter> transportFilters = new ArrayList<>();
8184
final List<ServerInterceptor> interceptors = new ArrayList<>();
8285
private final List<ServerStreamTracer.Factory> streamTracerFactories = new ArrayList<>();
86+
final List<MetricSink> metricSinks = new ArrayList<>();
8387
private final ClientTransportServersBuilder clientTransportServersBuilder;
8488
HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
8589
ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
@@ -104,7 +108,8 @@ public static ServerBuilder<?> forPort(int port) {
104108
*/
105109
public interface ClientTransportServersBuilder {
106110
InternalServer buildClientTransportServers(
107-
List<? extends ServerStreamTracer.Factory> streamTracerFactories);
111+
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
112+
MetricRecorder metricRecorder);
108113
}
109114

110115
/**
@@ -157,6 +162,14 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) {
157162
return this;
158163
}
159164

165+
/**
166+
* Adds a MetricSink to the server.
167+
*/
168+
public ServerImplBuilder addMetricSink(MetricSink metricSink) {
169+
metricSinks.add(checkNotNull(metricSink, "metricSink"));
170+
return this;
171+
}
172+
160173
@Override
161174
public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) {
162175
streamTracerFactories.add(checkNotNull(factory, "factory"));
@@ -241,8 +254,11 @@ public void setDeadlineTicker(Deadline.Ticker ticker) {
241254

242255
@Override
243256
public Server build() {
257+
MetricRecorder metricRecorder = new MetricRecorderImpl(metricSinks,
258+
MetricInstrumentRegistry.getDefaultRegistry());
244259
return new ServerImpl(this,
245-
clientTransportServersBuilder.buildClientTransportServers(getTracerFactories()),
260+
clientTransportServersBuilder.buildClientTransportServers(
261+
getTracerFactories(), metricRecorder),
246262
Context.ROOT);
247263
}
248264

core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public void setUp() throws Exception {
7373
new ClientTransportServersBuilder() {
7474
@Override
7575
public InternalServer buildClientTransportServers(
76-
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
76+
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
77+
io.grpc.MetricRecorder metricRecorder) {
7778
throw new UnsupportedOperationException();
7879
}
7980
});
@@ -139,7 +140,7 @@ public static final class StaticTestingClassLoaderCallsGet implements Runnable {
139140
public void run() {
140141
ServerImplBuilder builder =
141142
new ServerImplBuilder(
142-
streamTracerFactories -> {
143+
(streamTracerFactories, metricRecorder) -> {
143144
throw new UnsupportedOperationException();
144145
});
145146
assertThat(builder.getTracerFactories()).hasSize(2);
@@ -169,7 +170,7 @@ public void configureServerBuilder(ServerBuilder<?> builder) {
169170
}));
170171
ServerImplBuilder builder =
171172
new ServerImplBuilder(
172-
streamTracerFactories -> {
173+
(streamTracerFactories, metricRecorder) -> {
173174
throw new UnsupportedOperationException();
174175
});
175176
assertThat(builder.getTracerFactories()).containsExactly(DUMMY_USER_TRACER);
@@ -192,7 +193,7 @@ public void run() {
192193
InternalConfiguratorRegistry.setConfigurators(Collections.emptyList());
193194
ServerImplBuilder builder =
194195
new ServerImplBuilder(
195-
streamTracerFactories -> {
196+
(streamTracerFactories, metricRecorder) -> {
196197
throw new UnsupportedOperationException();
197198
});
198199
assertThat(builder.getTracerFactories()).isEmpty();

core/src/test/java/io/grpc/internal/ServerImplTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ public void startUp() throws IOException {
206206
new ClientTransportServersBuilder() {
207207
@Override
208208
public InternalServer buildClientTransportServers(
209-
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
209+
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
210+
io.grpc.MetricRecorder metricRecorder) {
210211
throw new UnsupportedOperationException();
211212
}
212213
});

inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ private InProcessServerBuilder(SocketAddress listenAddress) {
120120
final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
121121
@Override
122122
public InternalServer buildClientTransportServers(
123-
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
123+
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
124+
io.grpc.MetricRecorder metricRecorder) {
124125
return buildTransportServers(streamTracerFactories);
125126
}
126127
}

0 commit comments

Comments
 (0)