Skip to content

Commit c8079ee

Browse files
authored
xds: Move backend service label plumbing to CDS (#12882)
Addresses #12431. This moves backend service metric label plumbing from `xds_cluster_impl` to `cds`, matching the A75/A89 aggregate cluster behavior. Previously, `xds_cluster_impl` added the `grpc.lb.backend_service` pick-details label and propagated `NameResolver.ATTR_BACKEND_SERVICE` to its child policy. That works for simple clusters, but it is the wrong layer after A75 because aggregate clusters need the leaf CDS cluster name, not the aggregate root cluster name. This change makes CDS responsible for backend service plumbing: - non-aggregate CDS adds the `grpc.lb.backend_service` pick-details label using the CDS cluster name - non-aggregate CDS propagates `NameResolver.ATTR_BACKEND_SERVICE` to its child policy, so WRR can consume the backend service context - aggregate root CDS does not add the backend service attribute or pick-details label - leaf CDS instances under an aggregate cluster add their own leaf cluster name - `xds_cluster_impl` no longer owns backend service pick-details label or child attribute propagation The existing `InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE` path is left unchanged for endpoint/subchannel metrics. Tests cover: - non-aggregate CDS propagating `NameResolver.ATTR_BACKEND_SERVICE` - non-aggregate CDS adding `grpc.lb.backend_service` - aggregate root CDS not adding the root cluster name - aggregate leaf CDS instances propagating leaf cluster names - `xds_cluster_impl` no longer adding backend service labels or attributes - picker tests using a non-null `PickDetailsConsumer`, matching the real `PickSubchannelArgs` contract
1 parent 53ebe2d commit c8079ee

5 files changed

Lines changed: 177 additions & 17 deletions

File tree

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@
2525
import com.google.common.primitives.UnsignedInts;
2626
import com.google.errorprone.annotations.CheckReturnValue;
2727
import io.grpc.Attributes;
28+
import io.grpc.ConnectivityState;
2829
import io.grpc.EquivalentAddressGroup;
2930
import io.grpc.HttpConnectProxiedSocketAddress;
3031
import io.grpc.InternalEquivalentAddressGroup;
3132
import io.grpc.InternalLogId;
3233
import io.grpc.LoadBalancer;
3334
import io.grpc.LoadBalancerProvider;
3435
import io.grpc.LoadBalancerRegistry;
36+
import io.grpc.NameResolver;
3537
import io.grpc.Status;
3638
import io.grpc.StatusOr;
3739
import io.grpc.internal.GrpcUtil;
40+
import io.grpc.util.ForwardingLoadBalancerHelper;
3841
import io.grpc.util.GracefulSwitchLoadBalancer;
3942
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
4043
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
@@ -83,15 +86,17 @@ final class CdsLoadBalancer2 extends LoadBalancer {
8386
private final Helper helper;
8487
private final LoadBalancerRegistry lbRegistry;
8588
private final ClusterState clusterState = new ClusterState();
89+
private final CdsLbHelper cdsLbHelper = new CdsLbHelper();
8690
private GracefulSwitchLoadBalancer delegate;
91+
private boolean addBackendServicePickDetailsLabel;
8792
// Following fields are effectively final.
8893
private String clusterName;
8994
private Subscription clusterSubscription;
9095

9196
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
9297
this.helper = checkNotNull(helper, "helper");
9398
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
94-
this.delegate = new GracefulSwitchLoadBalancer(helper);
99+
this.delegate = new GracefulSwitchLoadBalancer(cdsLbHelper);
95100
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
96101
logger.log(XdsLogLevel.INFO, "Created");
97102
}
@@ -126,6 +131,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
126131
XdsClusterConfig clusterConfig = clusterConfigOr.getValue();
127132

128133
if (clusterConfig.getChildren() instanceof EndpointConfig) {
134+
addBackendServicePickDetailsLabel = true;
129135
StatusOr<EdsUpdate> edsUpdate = getEdsUpdate(xdsConfig, clusterName);
130136
StatusOr<ClusterResolutionResult> statusOrResult = clusterState.edsUpdateToResult(
131137
clusterName,
@@ -156,8 +162,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
156162
resolvedAddresses.toBuilder()
157163
.setLoadBalancingPolicyConfig(gracefulConfig)
158164
.setAddresses(Collections.unmodifiableList(addresses))
165+
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
166+
.set(NameResolver.ATTR_BACKEND_SERVICE, clusterName)
167+
.build())
159168
.build());
160169
} else if (clusterConfig.getChildren() instanceof AggregateConfig) {
170+
addBackendServicePickDetailsLabel = false;
161171
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
162172
List<String> leafClusters = ((AggregateConfig) clusterConfig.getChildren()).getLeafNames();
163173
for (String childCluster: leafClusters) {
@@ -196,7 +206,8 @@ public void handleNameResolutionError(Status error) {
196206
public void shutdown() {
197207
logger.log(XdsLogLevel.INFO, "Shutdown");
198208
delegate.shutdown();
199-
delegate = new GracefulSwitchLoadBalancer(helper);
209+
delegate = new GracefulSwitchLoadBalancer(cdsLbHelper);
210+
addBackendServicePickDetailsLabel = false;
200211
if (clusterSubscription != null) {
201212
clusterSubscription.close();
202213
clusterSubscription = null;
@@ -206,6 +217,7 @@ public void shutdown() {
206217
@CheckReturnValue // don't forget to return up the stack after the fail call
207218
private Status fail(Status error) {
208219
delegate.shutdown();
220+
addBackendServicePickDetailsLabel = false;
209221
helper.updateBalancingState(
210222
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
211223
return Status.OK; // XdsNameResolver isn't a polling NR, so this value doesn't matter
@@ -215,6 +227,38 @@ private String errorPrefix() {
215227
return "CdsLb for " + clusterName + ": ";
216228
}
217229

230+
private final class CdsLbHelper extends ForwardingLoadBalancerHelper {
231+
@Override
232+
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
233+
if (addBackendServicePickDetailsLabel) {
234+
newPicker = new BackendServiceMetricLabelSubchannelPicker(newPicker, clusterName);
235+
}
236+
delegate().updateBalancingState(newState, newPicker);
237+
}
238+
239+
@Override
240+
protected Helper delegate() {
241+
return helper;
242+
}
243+
}
244+
245+
private static final class BackendServiceMetricLabelSubchannelPicker extends SubchannelPicker {
246+
private final SubchannelPicker delegate;
247+
private final String backendService;
248+
249+
private BackendServiceMetricLabelSubchannelPicker(
250+
SubchannelPicker delegate, String backendService) {
251+
this.delegate = checkNotNull(delegate, "delegate");
252+
this.backendService = checkNotNull(backendService, "backendService");
253+
}
254+
255+
@Override
256+
public PickResult pickSubchannel(PickSubchannelArgs args) {
257+
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", backendService);
258+
return delegate.pickSubchannel(args);
259+
}
260+
}
261+
218262
/**
219263
* The number of bits assigned to the fractional part of fixed-point values. We normalize weights
220264
* to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 ==

xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import io.grpc.InternalLogId;
3434
import io.grpc.LoadBalancer;
3535
import io.grpc.Metadata;
36-
import io.grpc.NameResolver;
3736
import io.grpc.Status;
3837
import io.grpc.internal.ForwardingClientStreamTracer;
3938
import io.grpc.internal.GrpcUtil;
@@ -154,9 +153,6 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
154153

155154
return childSwitchLb.acceptResolvedAddresses(
156155
resolvedAddresses.toBuilder()
157-
.setAttributes(attributes.toBuilder()
158-
.set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
159-
.build())
160156
.setLoadBalancingPolicyConfig(config.childConfig)
161157
.build());
162158
}
@@ -409,7 +405,6 @@ private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
409405
public PickResult pickSubchannel(PickSubchannelArgs args) {
410406
args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
411407
.accept(filterMetadata);
412-
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
413408
for (DropOverload dropOverload : dropPolicies) {
414409
int rand = random.nextInt(1_000_000);
415410
if (rand < dropOverload.dropsPerMillion()) {

xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import io.grpc.ConnectivityState;
6262
import io.grpc.LoadBalancer;
6363
import io.grpc.LoadBalancer.Helper;
64+
import io.grpc.LoadBalancer.PickDetailsConsumer;
6465
import io.grpc.LoadBalancer.PickResult;
6566
import io.grpc.LoadBalancer.PickSubchannelArgs;
6667
import io.grpc.LoadBalancer.ResolvedAddresses;
@@ -296,6 +297,24 @@ public void nonAggregateCluster_resourceUpdate() {
296297
assertThat(childLbConfig.maxConcurrentRequests).isEqualTo(200L);
297298
}
298299

300+
@Test
301+
public void nonAggregateCluster_addsBackendServiceAttributeAndPickDetailsLabel() {
302+
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(CLUSTER, EDS_CLUSTER));
303+
startXdsDepManager();
304+
305+
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
306+
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
307+
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);
308+
309+
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
310+
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
311+
PickResult result =
312+
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));
313+
314+
assertThat(result.getStatus().isOk()).isTrue();
315+
verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", CLUSTER);
316+
}
317+
299318
@Test
300319
public void nonAggregateCluster_resourceRevoked() {
301320
lbRegistry.register(new PriorityLoadBalancerProvider());
@@ -429,6 +448,73 @@ public void discoverAggregateCluster_createsPriorityLbPolicy() {
429448
.isEqualTo("cds_experimental");
430449
}
431450

451+
@Test
452+
public void aggregateCluster_doesNotAddBackendServiceAttributeAndPickDetailsLabelFromRoot() {
453+
String cluster1 = "cluster-01.googleapis.com";
454+
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(
455+
// CLUSTER (aggr.) -> [cluster1 (EDS)]
456+
CLUSTER, Cluster.newBuilder()
457+
.setName(CLUSTER)
458+
.setClusterType(Cluster.CustomClusterType.newBuilder()
459+
.setName("envoy.clusters.aggregate")
460+
.setTypedConfig(Any.pack(ClusterConfig.newBuilder()
461+
.addClusters(cluster1)
462+
.build())))
463+
.build(),
464+
cluster1, EDS_CLUSTER.toBuilder().setName(cluster1).build()));
465+
startXdsDepManager();
466+
467+
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
468+
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isNull();
469+
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);
470+
471+
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
472+
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
473+
PickResult result =
474+
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));
475+
476+
assertThat(result.getStatus().isOk()).isTrue();
477+
verify(detailsConsumer, never()).addOptionalLabel(eq("grpc.lb.backend_service"), any());
478+
}
479+
480+
@Test
481+
public void aggregateCluster_leafAddsBackendServicePickDetailsLabel() {
482+
lbRegistry.register(new PriorityLoadBalancerProvider());
483+
CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry);
484+
lbRegistry.register(cdsLoadBalancerProvider);
485+
loadBalancer = (CdsLoadBalancer2) cdsLoadBalancerProvider.newLoadBalancer(helper);
486+
487+
String cluster1 = "cluster-01.googleapis.com";
488+
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of(
489+
// CLUSTER (aggr.) -> [cluster1 (EDS)]
490+
CLUSTER, Cluster.newBuilder()
491+
.setName(CLUSTER)
492+
.setClusterType(Cluster.CustomClusterType.newBuilder()
493+
.setName("envoy.clusters.aggregate")
494+
.setTypedConfig(Any.pack(ClusterConfig.newBuilder()
495+
.addClusters(cluster1)
496+
.build())))
497+
.build(),
498+
cluster1, EDS_CLUSTER.toBuilder().setName(cluster1).build()));
499+
startXdsDepManager();
500+
501+
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
502+
ClusterImplConfig clusterImplConfig = (ClusterImplConfig) childBalancer.config;
503+
assertThat(clusterImplConfig.cluster).isEqualTo(cluster1);
504+
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
505+
.isEqualTo(cluster1);
506+
childBalancer.deliverSubchannelState(PickResult.withNoResult(), ConnectivityState.READY);
507+
508+
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
509+
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
510+
PickResult result =
511+
pickerCaptor.getValue().pickSubchannel(newPickSubchannelArgs(detailsConsumer));
512+
513+
assertThat(result.getStatus().isOk()).isTrue();
514+
verify(detailsConsumer).addOptionalLabel("grpc.lb.backend_service", cluster1);
515+
verify(detailsConsumer, never()).addOptionalLabel("grpc.lb.backend_service", CLUSTER);
516+
}
517+
432518
@Test
433519
// Both priorities will get tried using real priority LB policy.
434520
public void discoverAggregateCluster_testChildCdsLbPolicyParsing() {
@@ -462,12 +548,16 @@ public void discoverAggregateCluster_testChildCdsLbPolicyParsing() {
462548
.isEqualTo("cluster-01.googleapis.com");
463549
assertThat(cluster1ImplConfig.edsServiceName)
464550
.isEqualTo("backend-service-1.googleapis.com");
551+
assertThat(childBalancers.get(0).attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
552+
.isEqualTo(cluster1);
465553
ClusterImplConfig cluster2ImplConfig =
466554
(ClusterImplConfig) childBalancers.get(1).config;
467555
assertThat(cluster2ImplConfig.cluster)
468556
.isEqualTo("cluster-02.googleapis.com");
469557
assertThat(cluster2ImplConfig.edsServiceName)
470558
.isEqualTo("backend-service-1.googleapis.com");
559+
assertThat(childBalancers.get(1).attributes.get(NameResolver.ATTR_BACKEND_SERVICE))
560+
.isEqualTo(cluster2);
471561
}
472562

473563
@Test
@@ -577,7 +667,9 @@ public void unknownLbProvider() {
577667
startXdsDepManager();
578668
verify(helper).updateBalancingState(
579669
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
580-
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
670+
PickResult result =
671+
pickerCaptor.getValue().pickSubchannel(
672+
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
581673
Status actualStatus = result.getStatus();
582674
assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
583675
assertThat(actualStatus.getDescription()).contains("Invalid LoadBalancingPolicy");
@@ -605,7 +697,9 @@ public void invalidLbConfig() {
605697
startXdsDepManager();
606698
verify(helper).updateBalancingState(
607699
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
608-
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
700+
PickResult result =
701+
pickerCaptor.getValue().pickSubchannel(
702+
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
609703
Status actualStatus = result.getStatus();
610704
assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
611705
assertThat(actualStatus.getDescription()).contains("Invalid 'minRingSize'");
@@ -639,12 +733,19 @@ private void startXdsDepManager(final CdsConfig cdsConfig) {
639733
}
640734

641735
private static void assertPickerStatus(SubchannelPicker picker, Status expectedStatus) {
642-
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
736+
PickResult result = picker.pickSubchannel(
737+
newPickSubchannelArgs(mock(PickDetailsConsumer.class)));
643738
Status actualStatus = result.getStatus();
644739
assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode());
645740
assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());
646741
}
647742

743+
private static PickSubchannelArgs newPickSubchannelArgs(PickDetailsConsumer pickDetailsConsumer) {
744+
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
745+
when(args.getPickDetailsConsumer()).thenReturn(pickDetailsConsumer);
746+
return args;
747+
}
748+
648749
private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
649750
private final String policyName;
650751
private final LoadBalancerProvider configParsingDelegate;
@@ -660,7 +761,7 @@ private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
660761

661762
@Override
662763
public LoadBalancer newLoadBalancer(Helper helper) {
663-
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName);
764+
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper);
664765
childBalancers.add(balancer);
665766
return balancer;
666767
}
@@ -692,17 +793,21 @@ public NameResolver.ConfigOrError parseLoadBalancingPolicyConfig(
692793

693794
private final class FakeLoadBalancer extends LoadBalancer {
694795
private final String name;
796+
private final Helper helper;
695797
private Object config;
798+
private Attributes attributes;
696799
private Status upstreamError;
697800
private boolean shutdown;
698801

699-
FakeLoadBalancer(String name) {
802+
FakeLoadBalancer(String name, Helper helper) {
700803
this.name = name;
804+
this.helper = helper;
701805
}
702806

703807
@Override
704808
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
705809
config = resolvedAddresses.getLoadBalancingPolicyConfig();
810+
attributes = resolvedAddresses.getAttributes();
706811
return Status.OK;
707812
}
708813

@@ -716,5 +821,15 @@ public void shutdown() {
716821
shutdown = true;
717822
childBalancers.remove(this);
718823
}
824+
825+
void deliverSubchannelState(final PickResult result, ConnectivityState state) {
826+
SubchannelPicker picker = new SubchannelPicker() {
827+
@Override
828+
public PickResult pickSubchannel(PickSubchannelArgs args) {
829+
return result;
830+
}
831+
};
832+
helper.updateBalancingState(state, picker);
833+
}
719834
}
720835
}

0 commit comments

Comments
 (0)