Skip to content

Commit 05fc150

Browse files
committed
feat: Implement TCP metrics collection and pipeline MetricRecorder (Address PR comments)
1 parent fc08423 commit 05fc150

36 files changed

+1170
-58
lines changed

api/src/main/java/io/grpc/ForwardingServerBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,12 @@ public Server build() {
201201
return delegate().build();
202202
}
203203

204+
@Override
205+
public T addMetricSink(MetricSink metricSink) {
206+
delegate().addMetricSink(metricSink);
207+
return thisT();
208+
}
209+
204210
@Override
205211
public String toString() {
206212
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2026 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.List;
23+
24+
/**
25+
* TCP Metrics defined to be shared across transport implementations.
26+
*/
27+
@Internal
28+
public final class InternalTcpMetrics {
29+
30+
private InternalTcpMetrics() {}
31+
32+
private static final List<String> OPTIONAL_LABELS = Arrays.asList(
33+
"network.local.address",
34+
"network.local.port",
35+
"network.peer.address",
36+
"network.peer.port");
37+
38+
public static final DoubleHistogramMetricInstrument MIN_RTT_INSTRUMENT =
39+
MetricInstrumentRegistry.getDefaultRegistry()
40+
.registerDoubleHistogram(
41+
"grpc.tcp.min_rtt",
42+
"Minimum round-trip time of a TCP connection",
43+
"s",
44+
getMinRttBuckets(),
45+
Collections.emptyList(),
46+
OPTIONAL_LABELS,
47+
false);
48+
49+
public static final LongCounterMetricInstrument CONNECTIONS_CREATED_INSTRUMENT =
50+
MetricInstrumentRegistry
51+
.getDefaultRegistry()
52+
.registerLongCounter(
53+
"grpc.tcp.connections_created",
54+
"The total number of TCP connections established.",
55+
"{connection}",
56+
Collections.emptyList(),
57+
OPTIONAL_LABELS,
58+
false);
59+
60+
public static final LongUpDownCounterMetricInstrument CONNECTION_COUNT_INSTRUMENT =
61+
MetricInstrumentRegistry
62+
.getDefaultRegistry()
63+
.registerLongUpDownCounter(
64+
"grpc.tcp.connection_count",
65+
"The current number of active TCP connections.",
66+
"{connection}",
67+
Collections.emptyList(),
68+
OPTIONAL_LABELS,
69+
false
70+
);
71+
72+
public static final LongCounterMetricInstrument PACKETS_RETRANSMITTED_INSTRUMENT =
73+
MetricInstrumentRegistry
74+
.getDefaultRegistry()
75+
.registerLongCounter(
76+
"grpc.tcp.packets_retransmitted",
77+
"The total number of packets retransmitted for all TCP connections.",
78+
"{packet}",
79+
Collections.emptyList(),
80+
OPTIONAL_LABELS,
81+
false
82+
);
83+
84+
public static final LongCounterMetricInstrument RECURRING_RETRANSMITS_INSTRUMENT =
85+
MetricInstrumentRegistry
86+
.getDefaultRegistry()
87+
.registerLongCounter(
88+
"grpc.tcp.recurring_retransmits",
89+
"The total number of times the retransmit timer popped for all TCP"
90+
+ " connections.",
91+
"{timeout}",
92+
Collections.emptyList(),
93+
OPTIONAL_LABELS,
94+
false
95+
);
96+
97+
private static List<Double> getMinRttBuckets() {
98+
List<Double> buckets = new ArrayList<>(100);
99+
for (int i = 1; i <= 100; i++) {
100+
buckets.add(1e-6 * Math.pow(2.0, i * 0.24));
101+
}
102+
return Collections.unmodifiableList(buckets);
103+
}
104+
}

api/src/main/java/io/grpc/NameResolver.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ public static final class Args {
355355
@Nullable private final ChannelLogger channelLogger;
356356
@Nullable private final Executor executor;
357357
@Nullable private final String overrideAuthority;
358-
@Nullable private final MetricRecorder metricRecorder;
358+
private final MetricRecorder metricRecorder;
359359
@Nullable private final NameResolverRegistry nameResolverRegistry;
360360
@Nullable private final IdentityHashMap<Key<?>, Object> customArgs;
361361

@@ -497,7 +497,6 @@ public String getOverrideAuthority() {
497497
/**
498498
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
499499
*/
500-
@Nullable
501500
public MetricRecorder getMetricRecorder() {
502501
return metricRecorder;
503502
}

api/src/main/java/io/grpc/ServerBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,16 @@ public T setBinaryLog(BinaryLog binaryLog) {
435435
*/
436436
public abstract Server build();
437437

438+
/**
439+
* Adds a metric sink to the server.
440+
*
441+
* @param metricSink the metric sink to add.
442+
* @return this
443+
*/
444+
public T addMetricSink(MetricSink metricSink) {
445+
throw new UnsupportedOperationException();
446+
}
447+
438448
/**
439449
* Returns the correctly typed version of the builder.
440450
*/

core/src/main/java/io/grpc/internal/ClientTransportFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.grpc.ChannelCredentials;
2525
import io.grpc.ChannelLogger;
2626
import io.grpc.HttpConnectProxiedSocketAddress;
27+
import io.grpc.MetricRecorder;
2728
import java.io.Closeable;
2829
import java.net.SocketAddress;
2930
import java.util.Collection;
@@ -91,6 +92,8 @@ final class ClientTransportOptions {
9192
private Attributes eagAttributes = Attributes.EMPTY;
9293
@Nullable private String userAgent;
9394
@Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr;
95+
private MetricRecorder metricRecorder = new MetricRecorder() {
96+
};
9497

9598
public ChannelLogger getChannelLogger() {
9699
return channelLogger;
@@ -101,6 +104,15 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) {
101104
return this;
102105
}
103106

107+
public MetricRecorder getMetricRecorder() {
108+
return metricRecorder;
109+
}
110+
111+
public ClientTransportOptions setMetricRecorder(MetricRecorder metricRecorder) {
112+
this.metricRecorder = Preconditions.checkNotNull(metricRecorder, "metricRecorder");
113+
return this;
114+
}
115+
104116
public String getAuthority() {
105117
return authority;
106118
}

core/src/main/java/io/grpc/internal/InternalServer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public interface InternalServer {
5858
/**
5959
* Returns the first listen socket stats of this server. May return {@code null}.
6060
*/
61-
@Nullable InternalInstrumented<SocketStats> getListenSocketStats();
61+
@Nullable
62+
InternalInstrumented<SocketStats> getListenSocketStats();
6263

6364
/**
6465
* Returns a list of listening socket addresses. May change after {@link #start(ServerListener)}
@@ -69,6 +70,7 @@ public interface InternalServer {
6970
/**
7071
* Returns a list of listen socket stats of this server. May return {@code null}.
7172
*/
72-
@Nullable List<InternalInstrumented<SocketStats>> getListenSocketStatsList();
73+
@Nullable
74+
List<InternalInstrumented<SocketStats>> getListenSocketStatsList();
7375

7476
}

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
8080
private final InternalChannelz channelz;
8181
private final CallTracer callsTracer;
8282
private final ChannelTracer channelTracer;
83+
private final MetricRecorder metricRecorder;
8384
private final ChannelLogger channelLogger;
8485
private final boolean reconnectDisabled;
8586

@@ -191,6 +192,7 @@ protected void handleNotInUse() {
191192
this.scheduledExecutor = scheduledExecutor;
192193
this.connectingTimer = stopwatchSupplier.get();
193194
this.syncContext = syncContext;
195+
this.metricRecorder = metricRecorder;
194196
this.callback = callback;
195197
this.channelz = channelz;
196198
this.callsTracer = callsTracer;
@@ -265,6 +267,7 @@ private void startNewTransport() {
265267
.setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
266268
.setEagAttributes(currentEagAttributes)
267269
.setUserAgent(userAgent)
270+
.setMetricRecorder(metricRecorder)
268271
.setHttpConnectProxiedSocketAddress(proxiedAddr);
269272
TransportLogger transportLogger = new TransportLogger();
270273
// In case the transport logs in the constructor, use the subchannel logId

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
143143
InternalServer transportServer,
144144
Context rootContext) {
145145
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
146+
146147
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
147148
this.fallbackRegistry =
148149
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");

core/src/main/java/io/grpc/internal/ServerImplBuilder.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import io.grpc.HandlerRegistry;
3232
import io.grpc.InternalChannelz;
3333
import io.grpc.InternalConfiguratorRegistry;
34+
import io.grpc.MetricInstrumentRegistry;
35+
import io.grpc.MetricRecorder;
36+
import io.grpc.MetricSink;
3437
import io.grpc.Server;
3538
import io.grpc.ServerBuilder;
3639
import io.grpc.ServerCallExecutorSupplier;
@@ -80,6 +83,7 @@ public static ServerBuilder<?> forPort(int port) {
8083
final List<ServerTransportFilter> transportFilters = new ArrayList<>();
8184
final List<ServerInterceptor> interceptors = new ArrayList<>();
8285
private final List<ServerStreamTracer.Factory> streamTracerFactories = new ArrayList<>();
86+
final List<MetricSink> metricSinks = new ArrayList<>();
8387
private final ClientTransportServersBuilder clientTransportServersBuilder;
8488
HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
8589
ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
@@ -104,7 +108,8 @@ public static ServerBuilder<?> forPort(int port) {
104108
*/
105109
public interface ClientTransportServersBuilder {
106110
InternalServer buildClientTransportServers(
107-
List<? extends ServerStreamTracer.Factory> streamTracerFactories);
111+
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
112+
MetricRecorder metricRecorder);
108113
}
109114

110115
/**
@@ -157,6 +162,15 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) {
157162
return this;
158163
}
159164

165+
/**
166+
* Adds a MetricSink to the server.
167+
*/
168+
@Override
169+
public ServerImplBuilder addMetricSink(MetricSink metricSink) {
170+
metricSinks.add(checkNotNull(metricSink, "metricSink"));
171+
return this;
172+
}
173+
160174
@Override
161175
public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) {
162176
streamTracerFactories.add(checkNotNull(factory, "factory"));
@@ -241,8 +255,11 @@ public void setDeadlineTicker(Deadline.Ticker ticker) {
241255

242256
@Override
243257
public Server build() {
258+
MetricRecorder metricRecorder = new MetricRecorderImpl(metricSinks,
259+
MetricInstrumentRegistry.getDefaultRegistry());
244260
return new ServerImpl(this,
245-
clientTransportServersBuilder.buildClientTransportServers(getTracerFactories()),
261+
clientTransportServersBuilder.buildClientTransportServers(
262+
getTracerFactories(), metricRecorder),
246263
Context.ROOT);
247264
}
248265

core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020
import static org.junit.Assert.assertEquals;
21+
import static org.mockito.Mockito.mock;
2122

2223
import io.grpc.InternalConfigurator;
2324
import io.grpc.InternalConfiguratorRegistry;
2425
import io.grpc.Metadata;
26+
import io.grpc.MetricRecorder;
27+
import io.grpc.MetricSink;
2528
import io.grpc.ServerBuilder;
2629
import io.grpc.ServerCall;
2730
import io.grpc.ServerCallHandler;
@@ -73,7 +76,8 @@ public void setUp() throws Exception {
7376
new ClientTransportServersBuilder() {
7477
@Override
7578
public InternalServer buildClientTransportServers(
76-
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
79+
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
80+
MetricRecorder metricRecorder) {
7781
throw new UnsupportedOperationException();
7882
}
7983
});
@@ -128,6 +132,13 @@ public void getTracerFactories_disableBoth() {
128132
assertThat(factories).containsExactly(DUMMY_USER_TRACER);
129133
}
130134

135+
@Test
136+
public void addMetricSink_addsToSinks() {
137+
MetricSink mockSink = mock(MetricSink.class);
138+
builder.addMetricSink(mockSink);
139+
assertThat(builder.metricSinks).containsExactly(mockSink);
140+
}
141+
131142
@Test
132143
public void getTracerFactories_callsGet() throws Exception {
133144
Class<?> runnable = classLoader.loadClass(StaticTestingClassLoaderCallsGet.class.getName());
@@ -139,7 +150,7 @@ public static final class StaticTestingClassLoaderCallsGet implements Runnable {
139150
public void run() {
140151
ServerImplBuilder builder =
141152
new ServerImplBuilder(
142-
streamTracerFactories -> {
153+
(streamTracerFactories, metricRecorder) -> {
143154
throw new UnsupportedOperationException();
144155
});
145156
assertThat(builder.getTracerFactories()).hasSize(2);
@@ -169,7 +180,7 @@ public void configureServerBuilder(ServerBuilder<?> builder) {
169180
}));
170181
ServerImplBuilder builder =
171182
new ServerImplBuilder(
172-
streamTracerFactories -> {
183+
(streamTracerFactories, metricRecorder) -> {
173184
throw new UnsupportedOperationException();
174185
});
175186
assertThat(builder.getTracerFactories()).containsExactly(DUMMY_USER_TRACER);
@@ -192,7 +203,7 @@ public void run() {
192203
InternalConfiguratorRegistry.setConfigurators(Collections.emptyList());
193204
ServerImplBuilder builder =
194205
new ServerImplBuilder(
195-
streamTracerFactories -> {
206+
(streamTracerFactories, metricRecorder) -> {
196207
throw new UnsupportedOperationException();
197208
});
198209
assertThat(builder.getTracerFactories()).isEmpty();

0 commit comments

Comments
 (0)