Skip to content

Commit 529a75e

Browse files
Merge branch 'grpc:master' into Issue_fixed_1537
2 parents 11adf6f + 25199e9 commit 529a75e

15 files changed

Lines changed: 451 additions & 74 deletions

api/src/main/java/io/grpc/NameResolver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ public Status onResult2(ResolutionResult resolutionResult) {
275275
@Documented
276276
public @interface ResolutionResultAttr {}
277277

278+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11989")
279+
@ResolutionResultAttr
280+
public static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
281+
Attributes.Key.create("io.grpc.NameResolver.ATTR_BACKEND_SERVICE");
282+
278283
/**
279284
* Information that a {@link Factory} uses to create a {@link NameResolver}.
280285
*

api/src/main/java/io/grpc/StatusOr.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ public Status getStatus() {
6666
return status == null ? Status.OK : status;
6767
}
6868

69+
/**
70+
* Note that StatusOr containing statuses, the equality comparision is delegated to
71+
* {@link Status#equals} which just does a reference equality check because equality on
72+
* Statuses is not well defined.
73+
* Instead, do comparison based on their Code with {@link Status#getCode}. The description and
74+
* cause of the Status are unlikely to be stable, and additional fields may be added to Status
75+
* in the future.
76+
*/
6977
@Override
7078
public boolean equals(Object other) {
7179
if (!(other instanceof StatusOr)) {

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.opentelemetry;
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
2021
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
2122
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
2223
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
@@ -70,7 +71,6 @@
7071
*/
7172
final class OpenTelemetryMetricsModule {
7273
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
73-
private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
7474
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
7575
ImmutableSet.of(
7676
"grpc.client.attempt.started",
@@ -90,14 +90,16 @@ final class OpenTelemetryMetricsModule {
9090
private final OpenTelemetryMetricsResource resource;
9191
private final Supplier<Stopwatch> stopwatchSupplier;
9292
private final boolean localityEnabled;
93+
private final boolean backendServiceEnabled;
9394
private final ImmutableList<OpenTelemetryPlugin> plugins;
9495

9596
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
9697
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
9798
List<OpenTelemetryPlugin> plugins) {
9899
this.resource = checkNotNull(resource, "resource");
99100
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
100-
this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
101+
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
102+
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
101103
this.plugins = ImmutableList.copyOf(plugins);
102104
}
103105

@@ -162,6 +164,7 @@ private static final class ClientTracer extends ClientStreamTracer {
162164
volatile long outboundWireSize;
163165
volatile long inboundWireSize;
164166
volatile String locality;
167+
volatile String backendService;
165168
long attemptNanos;
166169
Code statusCode;
167170

@@ -206,9 +209,12 @@ public void inboundWireSize(long bytes) {
206209

207210
@Override
208211
public void addOptionalLabel(String key, String value) {
209-
if (LOCALITY_LABEL_NAME.equals(key)) {
212+
if ("grpc.lb.locality".equals(key)) {
210213
locality = value;
211214
}
215+
if ("grpc.lb.backend_service".equals(key)) {
216+
backendService = value;
217+
}
212218
}
213219

214220
@Override
@@ -248,6 +254,13 @@ void recordFinishedAttempt() {
248254
}
249255
builder.put(LOCALITY_KEY, savedLocality);
250256
}
257+
if (module.backendServiceEnabled) {
258+
String savedBackendService = backendService;
259+
if (savedBackendService == null) {
260+
savedBackendService = "";
261+
}
262+
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
263+
}
251264
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
252265
plugin.addLabels(builder);
253266
}

opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public final class OpenTelemetryConstants {
3333
public static final AttributeKey<String> LOCALITY_KEY =
3434
AttributeKey.stringKey("grpc.lb.locality");
3535

36+
public static final AttributeKey<String> BACKEND_SERVICE_KEY =
37+
AttributeKey.stringKey("grpc.lb.backend_service");
38+
3639
public static final List<Double> LATENCY_BUCKETS =
3740
ImmutableList.of(
3841
0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,

opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory;
5252
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
5353
import io.grpc.testing.GrpcServerRule;
54+
import io.opentelemetry.api.common.AttributeKey;
5455
import io.opentelemetry.api.metrics.Meter;
5556
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
5657
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@@ -1070,6 +1071,140 @@ public void clientLocalityMetrics_missing() {
10701071
point -> point.hasAttributes(clientAttributes))));
10711072
}
10721073

1074+
@Test
1075+
public void clientBackendServiceMetrics_present() {
1076+
String target = "target:///";
1077+
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
1078+
enabledMetricsMap, disableDefaultMetrics);
1079+
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
1080+
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
1081+
emptyList());
1082+
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
1083+
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
1084+
1085+
ClientStreamTracer tracer =
1086+
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
1087+
tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
1088+
tracer.addOptionalLabel("grpc.lb.backend_service", "should-be-overwritten");
1089+
tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon");
1090+
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
1091+
tracer.streamClosed(Status.OK);
1092+
callAttemptsTracerFactory.callEnded(Status.OK);
1093+
1094+
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
1095+
TARGET_KEY, target,
1096+
METHOD_KEY, method.getFullMethodName());
1097+
1098+
io.opentelemetry.api.common.Attributes clientAttributes
1099+
= io.opentelemetry.api.common.Attributes.of(
1100+
TARGET_KEY, target,
1101+
METHOD_KEY, method.getFullMethodName(),
1102+
STATUS_KEY, Status.Code.OK.toString());
1103+
1104+
io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
1105+
= clientAttributes.toBuilder()
1106+
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "the-moon")
1107+
.build();
1108+
1109+
assertThat(openTelemetryTesting.getMetrics())
1110+
.satisfiesExactlyInAnyOrder(
1111+
metric ->
1112+
assertThat(metric)
1113+
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
1114+
.hasLongSumSatisfying(
1115+
longSum -> longSum.hasPointsSatisfying(
1116+
point -> point.hasAttributes(attributes))),
1117+
metric ->
1118+
assertThat(metric)
1119+
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
1120+
.hasHistogramSatisfying(
1121+
histogram -> histogram.hasPointsSatisfying(
1122+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1123+
metric ->
1124+
assertThat(metric)
1125+
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
1126+
.hasHistogramSatisfying(
1127+
histogram -> histogram.hasPointsSatisfying(
1128+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1129+
metric ->
1130+
assertThat(metric)
1131+
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
1132+
.hasHistogramSatisfying(
1133+
histogram -> histogram.hasPointsSatisfying(
1134+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1135+
metric ->
1136+
assertThat(metric)
1137+
.hasName(CLIENT_CALL_DURATION)
1138+
.hasHistogramSatisfying(
1139+
histogram -> histogram.hasPointsSatisfying(
1140+
point -> point.hasAttributes(clientAttributes))));
1141+
}
1142+
1143+
@Test
1144+
public void clientBackendServiceMetrics_missing() {
1145+
String target = "target:///";
1146+
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
1147+
enabledMetricsMap, disableDefaultMetrics);
1148+
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
1149+
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
1150+
emptyList());
1151+
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
1152+
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
1153+
1154+
ClientStreamTracer tracer =
1155+
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
1156+
tracer.streamClosed(Status.OK);
1157+
callAttemptsTracerFactory.callEnded(Status.OK);
1158+
1159+
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
1160+
TARGET_KEY, target,
1161+
METHOD_KEY, method.getFullMethodName());
1162+
1163+
io.opentelemetry.api.common.Attributes clientAttributes
1164+
= io.opentelemetry.api.common.Attributes.of(
1165+
TARGET_KEY, target,
1166+
METHOD_KEY, method.getFullMethodName(),
1167+
STATUS_KEY, Status.Code.OK.toString());
1168+
1169+
io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
1170+
= clientAttributes.toBuilder()
1171+
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "")
1172+
.build();
1173+
1174+
assertThat(openTelemetryTesting.getMetrics())
1175+
.satisfiesExactlyInAnyOrder(
1176+
metric ->
1177+
assertThat(metric)
1178+
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
1179+
.hasLongSumSatisfying(
1180+
longSum -> longSum.hasPointsSatisfying(
1181+
point -> point.hasAttributes(attributes))),
1182+
metric ->
1183+
assertThat(metric)
1184+
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
1185+
.hasHistogramSatisfying(
1186+
histogram -> histogram.hasPointsSatisfying(
1187+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1188+
metric ->
1189+
assertThat(metric)
1190+
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
1191+
.hasHistogramSatisfying(
1192+
histogram -> histogram.hasPointsSatisfying(
1193+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1194+
metric ->
1195+
assertThat(metric)
1196+
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
1197+
.hasHistogramSatisfying(
1198+
histogram -> histogram.hasPointsSatisfying(
1199+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1200+
metric ->
1201+
assertThat(metric)
1202+
.hasName(CLIENT_CALL_DURATION)
1203+
.hasHistogramSatisfying(
1204+
histogram -> histogram.hasPointsSatisfying(
1205+
point -> point.hasAttributes(clientAttributes))));
1206+
}
1207+
10731208
@Test
10741209
public void serverBasicMetrics() {
10751210
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,

xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.grpc.InternalLogId;
3333
import io.grpc.LoadBalancer;
3434
import io.grpc.Metadata;
35+
import io.grpc.NameResolver;
3536
import io.grpc.Status;
3637
import io.grpc.internal.ForwardingClientStreamTracer;
3738
import io.grpc.internal.GrpcUtil;
@@ -150,7 +151,9 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
150151

151152
childSwitchLb.handleResolvedAddresses(
152153
resolvedAddresses.toBuilder()
153-
.setAttributes(attributes)
154+
.setAttributes(attributes.toBuilder()
155+
.set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
156+
.build())
154157
.setLoadBalancingPolicyConfig(config.childConfig)
155158
.build());
156159
return Status.OK;

xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,32 +102,44 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
102102
private final long infTime;
103103
private final Ticker ticker;
104104
private String locality = "";
105+
private String backendService = "";
105106
private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
106107

107108
// The metric instruments are only registered once and shared by all instances of this LB.
108109
static {
109110
MetricInstrumentRegistry metricInstrumentRegistry
110111
= MetricInstrumentRegistry.getDefaultRegistry();
111-
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
112+
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter(
113+
"grpc.lb.wrr.rr_fallback",
112114
"EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
113115
+ "with valid weight, which caused the WRR policy to fall back to RR behavior",
114-
"{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
116+
"{update}",
117+
Lists.newArrayList("grpc.target"),
118+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
115119
false);
116120
ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
117-
"grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
118-
+ "from each scheduler update that don't yet have usable weight information",
119-
"{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
121+
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
122+
"EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable "
123+
+ "weight information",
124+
"{endpoint}",
125+
Lists.newArrayList("grpc.target"),
126+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
120127
false);
121128
ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
122129
"grpc.lb.wrr.endpoint_weight_stale",
123130
"EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
124-
+ "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
125-
Lists.newArrayList("grpc.lb.locality"), false);
131+
+ "older than the expiration period",
132+
"{endpoint}",
133+
Lists.newArrayList("grpc.target"),
134+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
135+
false);
126136
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
127137
"grpc.lb.wrr.endpoint_weights",
128138
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
129-
"{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
130-
Lists.newArrayList("grpc.lb.locality"),
139+
"{weight}",
140+
Lists.newArrayList(),
141+
Lists.newArrayList("grpc.target"),
142+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
131143
false);
132144
}
133145

@@ -168,6 +180,13 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
168180
} else {
169181
this.locality = "";
170182
}
183+
String backendService
184+
= resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE);
185+
if (backendService != null) {
186+
this.backendService = backendService;
187+
} else {
188+
this.backendService = "";
189+
}
171190
config =
172191
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
173192

@@ -232,26 +251,27 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
232251
helper.getMetricRecorder()
233252
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
234253
ImmutableList.of(helper.getChannelTarget()),
235-
ImmutableList.of(locality));
254+
ImmutableList.of(locality, backendService));
236255
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
237256
}
238257

239258
if (staleEndpoints.get() > 0) {
240259
helper.getMetricRecorder()
241260
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
242261
ImmutableList.of(helper.getChannelTarget()),
243-
ImmutableList.of(locality));
262+
ImmutableList.of(locality, backendService));
244263
}
245264
if (notYetUsableEndpoints.get() > 0) {
246265
helper.getMetricRecorder()
247266
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
248-
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
267+
ImmutableList.of(helper.getChannelTarget()),
268+
ImmutableList.of(locality, backendService));
249269
}
250270
boolean weightsEffective = picker.updateWeight(newWeights);
251271
if (!weightsEffective) {
252272
helper.getMetricRecorder()
253273
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
254-
ImmutableList.of(locality));
274+
ImmutableList.of(locality, backendService));
255275
}
256276
}
257277

0 commit comments

Comments
 (0)