Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions grpclb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation project(':grpc-core'),
project(':grpc-protobuf'),
project(':grpc-stub'),
project(':grpc-util'),
libraries.guava,
libraries.protobuf.java,
libraries.protobuf.java.util
Expand Down
227 changes: 154 additions & 73 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.FixedResultPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -62,6 +65,7 @@
import io.grpc.lb.v1.Server;
import io.grpc.lb.v1.ServerList;
import io.grpc.stub.StreamObserver;
import io.grpc.util.ForwardingLoadBalancerHelper;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -119,7 +123,7 @@ final class GrpclbState {
@VisibleForTesting
static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
return PickResult.withNoResult();
}

Expand Down Expand Up @@ -187,6 +191,15 @@ enum Mode {
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
private boolean requestConnectionPending;

// Child LoadBalancer and state for PICK_FIRST mode delegation.
private final LoadBalancerProvider pickFirstLbProvider;
@Nullable
private LoadBalancer pickFirstLb;
private ConnectivityState pickFirstLbState = CONNECTING;
private SubchannelPicker pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult());
@Nullable
private GrpclbClientLoadRecorder currentPickFirstLoadRecorder;

GrpclbState(
GrpclbConfig config,
Helper helper,
Expand All @@ -212,6 +225,8 @@ public void onSubchannelState(
} else {
this.subchannelPool = null;
}
this.pickFirstLbProvider =
LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

For improved robustness, it's a good practice to ensure that the pickFirstLbProvider is not null. LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first") is annotated with @Nullable and can return null if the "pick_first" provider is not found in the registry. A NullPointerException would occur later in updateServerList when pickFirstLbProvider.newLoadBalancer() is called.

Adding a checkNotNull will cause a clean and early failure during initialization if the provider is missing, which is preferable to a deferred NullPointerException.

Suggested change
this.pickFirstLbProvider =
LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first");
this.pickFirstLbProvider =
checkNotNull(
LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first"),
"pick_first balancer not available");

this.time = checkNotNull(time, "time provider");
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
Expand Down Expand Up @@ -309,6 +324,12 @@ void handleAddresses(

void requestConnection() {
requestConnectionPending = true;
// For PICK_FIRST mode with delegation, forward to the child LB.
if (config.getMode() == Mode.PICK_FIRST && pickFirstLb != null) {
pickFirstLb.requestConnection();
requestConnectionPending = false;
return;
}
for (RoundRobinEntry entry : currentPicker.pickList) {
if (entry instanceof IdleSubchannelEntry) {
((IdleSubchannelEntry) entry).subchannel.requestConnection();
Expand All @@ -323,15 +344,23 @@ private void maybeUseFallbackBackends() {
}
// Balancer RPC should have either been broken or timed out.
checkState(fallbackReason != null, "no reason to fallback");
for (Subchannel subchannel : subchannels.values()) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
// For PICK_FIRST mode with delegation, check the child LB's state.
if (config.getMode() == Mode.PICK_FIRST) {
if (pickFirstLb != null && pickFirstLbState == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
// For PICK_FIRST, we don't have individual subchannel states to use as fallback reason.
} else {
for (Subchannel subchannel : subchannels.values()) {
ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
return;
}
// If we do have balancer-provided backends, use one of its error in the error message if
// fail to fallback.
if (stateInfo.getState() == TRANSIENT_FAILURE) {
fallbackReason = stateInfo.getStatus();
}
}
}
// Fallback conditions met
Expand Down Expand Up @@ -438,9 +467,10 @@ void shutdown() {
subchannelPool.clear();
break;
case PICK_FIRST:
if (!subchannels.isEmpty()) {
checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels);
subchannels.values().iterator().next().shutdown();
// Shutdown the child pick_first LB which manages its own subchannels.
if (pickFirstLb != null) {
pickFirstLb.shutdown();
pickFirstLb = null;
}
break;
default:
Expand Down Expand Up @@ -517,22 +547,17 @@ private void updateServerList(
subchannels = Collections.unmodifiableMap(newSubchannelMap);
break;
case PICK_FIRST:
checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels);
final Subchannel subchannel;
// Delegate to child pick_first LB for address management.
// Shutdown existing child LB if addresses become empty.
if (newBackendAddrList.isEmpty()) {
if (subchannels.size() == 1) {
subchannel = subchannels.values().iterator().next();
subchannel.shutdown();
subchannels = Collections.emptyMap();
if (pickFirstLb != null) {
pickFirstLb.shutdown();
pickFirstLb = null;
}
break;
}
List<EquivalentAddressGroup> eagList = new ArrayList<>();
// Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to
// attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on
// headers.
//
// The PICK_FIRST code path doesn't cache Subchannels.
// Attach tokens to EAG attributes for TokenAttachingTracerFactory to retrieve.
for (BackendAddressGroup bag : newBackendAddrList) {
EquivalentAddressGroup origEag = bag.getAddresses();
Attributes eagAttrs = origEag.getAttributes();
Expand All @@ -542,30 +567,24 @@ private void updateServerList(
}
eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
}
if (subchannels.isEmpty()) {
subchannel =
helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(eagList)
.setAttributes(createSubchannelAttrs())
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
handleSubchannelState(subchannel, newState);
}
});
if (requestConnectionPending) {
subchannel.requestConnection();
requestConnectionPending = false;
}
} else {
subchannel = subchannels.values().iterator().next();
subchannel.updateAddresses(eagList);
// Always shutdown and recreate the child LB when addresses change to avoid
// calling Subchannel.updateAddresses(). This ensures we use the new dualstack-
// compatible path where the child LB creates fresh subchannels.
if (pickFirstLb != null) {
pickFirstLb.shutdown();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The comment here states that the child load balancer is recreated "when addresses change", but the current implementation recreates it on every server list update. This happens even if the list of backends is identical to the previous one, which can lead to unnecessary connection churn and performance degradation if the remote balancer sends frequent but identical server lists.

To align with the stated intent and optimize performance, I suggest caching the list of EquivalentAddressGroups and only recreating the child pick_first load balancer when this list actually changes.

You could implement this by:

  1. Adding a new field private List<EquivalentAddressGroup> pickFirstEagList; to GrpclbState.
  2. In updateServerList, for the PICK_FIRST case, before recreating the LB, check if eagList.equals(pickFirstEagList). If they are equal, you can skip recreating the LB. You would still need to update currentPickFirstLoadRecorder if it has changed.
  3. Update pickFirstEagList = eagList; when you do recreate the LB.
  4. Remember to nullify pickFirstEagList when the child LB is shut down (e.g., when the backend list becomes empty, or in the main shutdown() method).

pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper());
// Pass addresses to child LB.
pickFirstLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(eagList)
.build());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There seems to be a potential issue here where stale state from a previous child load balancer could be used. When pickFirstLb is recreated, pickFirstLbState and pickFirstLbPicker are not reset. If maybeUpdatePicker() is called before the new child LB provides its state, it will use the state from the old, now-shutdown LB. This could lead to using a picker with shutdown subchannels, causing RPCs to fail.

It would be safer to reset the child LB's state immediately after creating a new instance.

        pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper());
        // Reset the child LB state, since we created a new one.
        pickFirstLbState = CONNECTING;
        pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult());
        // Pass addresses to child LB.
        pickFirstLb.acceptResolvedAddresses(
            ResolvedAddresses.newBuilder()
                .setAddresses(eagList)
                .build());

if (requestConnectionPending) {
pickFirstLb.requestConnection();
requestConnectionPending = false;
}
subchannels = Collections.singletonMap(eagList, subchannel);
newBackendList.add(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder)));
// Store the load recorder for token attachment.
currentPickFirstLoadRecorder = loadRecorder;
break;
default:
throw new AssertionError("Missing case for " + config.getMode());
Expand Down Expand Up @@ -842,7 +861,11 @@ private void cleanUp() {
private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList;
ConnectivityState state;
if (backendList.isEmpty()) {
// For PICK_FIRST mode with delegation, check if child LB exists instead of backendList.
boolean hasBackends = config.getMode() == Mode.PICK_FIRST
? pickFirstLb != null
: !backendList.isEmpty();
if (!hasBackends) {
// Note balancer (is working) may enforce using fallback backends, and that fallback may
// fail. So we should check if currently in fallback first.
if (usingFallbackBackends) {
Expand Down Expand Up @@ -894,26 +917,12 @@ private void maybeUpdatePicker() {
}
break;
case PICK_FIRST: {
checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList);
BackendEntry onlyEntry = backendList.get(0);
ConnectivityStateInfo stateInfo =
onlyEntry.subchannel.getAttributes().get(STATE_INFO).get();
state = stateInfo.getState();
switch (state) {
case READY:
pickList = Collections.<RoundRobinEntry>singletonList(onlyEntry);
break;
case TRANSIENT_FAILURE:
pickList =
Collections.<RoundRobinEntry>singletonList(new ErrorEntry(stateInfo.getStatus()));
break;
case CONNECTING:
pickList = Collections.singletonList(BUFFER_ENTRY);
break;
default:
pickList = Collections.<RoundRobinEntry>singletonList(
new IdleSubchannelEntry(onlyEntry.subchannel, syncContext));
}
// Use child LB's state and picker. Wrap the picker for token attachment.
state = pickFirstLbState;
TokenAttachingTracerFactory tracerFactory =
new TokenAttachingTracerFactory(currentPickFirstLoadRecorder);
pickList = Collections.<RoundRobinEntry>singletonList(
new ChildLbPickerEntry(pickFirstLbPicker, tracerFactory));
break;
}
default:
Expand Down Expand Up @@ -983,7 +992,7 @@ public boolean equals(Object other) {

@VisibleForTesting
interface RoundRobinEntry {
PickResult picked(Metadata headers);
PickResult picked(PickSubchannelArgs args);
}

@VisibleForTesting
Expand Down Expand Up @@ -1024,7 +1033,8 @@ static final class BackendEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
Metadata headers = args.getHeaders();
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
if (token != null) {
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
Expand Down Expand Up @@ -1065,7 +1075,7 @@ static final class IdleSubchannelEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
if (connectionRequested.compareAndSet(false, true)) {
syncContext.execute(new Runnable() {
@Override
Expand Down Expand Up @@ -1108,7 +1118,7 @@ static final class ErrorEntry implements RoundRobinEntry {
}

@Override
public PickResult picked(Metadata headers) {
public PickResult picked(PickSubchannelArgs args) {
return result;
}

Expand All @@ -1132,6 +1142,53 @@ public String toString() {
}
}

/**
* Entry that wraps a child LB's picker for PICK_FIRST mode delegation.
* Attaches TokenAttachingTracerFactory to the pick result for token propagation.
*/
@VisibleForTesting
static final class ChildLbPickerEntry implements RoundRobinEntry {
private final SubchannelPicker childPicker;
private final TokenAttachingTracerFactory tracerFactory;

ChildLbPickerEntry(SubchannelPicker childPicker, TokenAttachingTracerFactory tracerFactory) {
this.childPicker = checkNotNull(childPicker, "childPicker");
this.tracerFactory = checkNotNull(tracerFactory, "tracerFactory");
}

@Override
public PickResult picked(PickSubchannelArgs args) {
PickResult childResult = childPicker.pickSubchannel(args);
if (childResult.getSubchannel() == null) {
// No subchannel (e.g., buffer, error), return as-is.
return childResult;
}
// Wrap the pick result to attach tokens via the tracer factory.
return PickResult.withSubchannel(
childResult.getSubchannel(), tracerFactory, childResult.getAuthorityOverride());
}

@Override
public int hashCode() {
return Objects.hashCode(childPicker, tracerFactory);
}

@Override
public boolean equals(Object other) {
if (!(other instanceof ChildLbPickerEntry)) {
return false;
}
ChildLbPickerEntry that = (ChildLbPickerEntry) other;
return Objects.equal(childPicker, that.childPicker)
&& Objects.equal(tracerFactory, that.tracerFactory);
}

@Override
public String toString() {
return "ChildLbPickerEntry(" + childPicker + ")";
}
}

@VisibleForTesting
static final class RoundRobinPicker extends SubchannelPicker {
@VisibleForTesting
Expand Down Expand Up @@ -1174,7 +1231,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (pickIndex == pickList.size()) {
pickIndex = 0;
}
return pick.picked(args.getHeaders());
return pick.picked(args);
}
}

Expand All @@ -1189,4 +1246,28 @@ public String toString() {
return MoreObjects.toStringHelper(RoundRobinPicker.class).toString();
}
}

/**
* Helper for the child pick_first LB in PICK_FIRST mode. Intercepts updateBalancingState()
* to store state and trigger the grpclb picker update with drops and token attachment.
*/
private final class PickFirstLbHelper extends ForwardingLoadBalancerHelper {

@Override
protected Helper delegate() {
return helper;
}

@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
pickFirstLbState = newState;
pickFirstLbPicker = newPicker;
// Trigger name resolution refresh on TRANSIENT_FAILURE or IDLE, similar to ROUND_ROBIN.
if (newState == TRANSIENT_FAILURE || newState == IDLE) {
helper.refreshNameResolution();
}
maybeUseFallbackBackends();
maybeUpdatePicker();
}
}
}
Loading
Loading