Skip to content

Commit a1a7363

Browse files
committed
Prefer FixedResultPicker over custom Picker implementations
These other implementations pre-date FixedResultPicker, and are no longer needed. gRPC-LB's tests failed when using FixedResultPicker because it didn't transition to READY. This was because GrpclbState.maybeUpdatePicker() didn't consider the ConnectivityState when checking if anything had changed. The old PickFirstLoadBalancer.Picker didn't implement equals() so every update was considered different, ignoring the connectivity state. PickFirstLeafLoadBalancer wasn't impacted because it didn't pick a subchannel when in CONNECTING, and neither did PickFirstLoadBalancer after the first update. This is fixed by gRPC-LB checking the connectivity state. A follow-up will have PickFirstLoadBalancer no longer return the useless subchannel when picking during CONNECTING.
1 parent a9f73f4 commit a1a7363

11 files changed

Lines changed: 58 additions & 202 deletions

File tree

core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,15 @@
1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22-
import com.google.common.base.MoreObjects;
2322
import io.grpc.ChannelLogger.ChannelLogLevel;
2423
import io.grpc.ConnectivityState;
2524
import io.grpc.ConnectivityStateInfo;
2625
import io.grpc.LoadBalancer;
26+
import io.grpc.LoadBalancer.FixedResultPicker;
2727
import io.grpc.LoadBalancer.Helper;
2828
import io.grpc.LoadBalancer.PickResult;
29-
import io.grpc.LoadBalancer.PickSubchannelArgs;
3029
import io.grpc.LoadBalancer.ResolvedAddresses;
3130
import io.grpc.LoadBalancer.Subchannel;
32-
import io.grpc.LoadBalancer.SubchannelPicker;
3331
import io.grpc.LoadBalancerProvider;
3432
import io.grpc.LoadBalancerRegistry;
3533
import io.grpc.NameResolver.ConfigOrError;
@@ -110,7 +108,8 @@ Status tryAcceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
110108
defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy");
111109
} catch (PolicyException e) {
112110
Status s = Status.INTERNAL.withDescription(e.getMessage());
113-
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s));
111+
helper.updateBalancingState(
112+
ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(s)));
114113
delegate.shutdown();
115114
delegateProvider = null;
116115
delegate = new NoopLoadBalancer();
@@ -122,7 +121,8 @@ Status tryAcceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
122121

123122
if (delegateProvider == null
124123
|| !policySelection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) {
125-
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker());
124+
helper.updateBalancingState(
125+
ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
126126
delegate.shutdown();
127127
delegateProvider = policySelection.provider;
128128
LoadBalancer old = delegate;
@@ -236,30 +236,4 @@ private PolicyException(String msg) {
236236
super(msg);
237237
}
238238
}
239-
240-
private static final class EmptyPicker extends SubchannelPicker {
241-
242-
@Override
243-
public PickResult pickSubchannel(PickSubchannelArgs args) {
244-
return PickResult.withNoResult();
245-
}
246-
247-
@Override
248-
public String toString() {
249-
return MoreObjects.toStringHelper(EmptyPicker.class).toString();
250-
}
251-
}
252-
253-
private static final class FailingPicker extends SubchannelPicker {
254-
private final Status failure;
255-
256-
FailingPicker(Status failure) {
257-
this.failure = failure;
258-
}
259-
260-
@Override
261-
public PickResult pickSubchannel(PickSubchannelArgs args) {
262-
return PickResult.withError(failure);
263-
}
264-
}
265239
}

core/src/main/java/io/grpc/internal/OobChannel.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
import io.grpc.InternalLogId;
3939
import io.grpc.InternalWithLogId;
4040
import io.grpc.LoadBalancer;
41+
import io.grpc.LoadBalancer.FixedResultPicker;
4142
import io.grpc.LoadBalancer.PickResult;
42-
import io.grpc.LoadBalancer.PickSubchannelArgs;
4343
import io.grpc.LoadBalancer.Subchannel;
4444
import io.grpc.LoadBalancer.SubchannelPicker;
4545
import io.grpc.ManagedChannel;
@@ -182,23 +182,7 @@ public Object getInternalSubchannel() {
182182
}
183183
};
184184

185-
final class OobSubchannelPicker extends SubchannelPicker {
186-
final PickResult result = PickResult.withSubchannel(subchannelImpl);
187-
188-
@Override
189-
public PickResult pickSubchannel(PickSubchannelArgs args) {
190-
return result;
191-
}
192-
193-
@Override
194-
public String toString() {
195-
return MoreObjects.toStringHelper(OobSubchannelPicker.class)
196-
.add("result", result)
197-
.toString();
198-
}
199-
}
200-
201-
subchannelPicker = new OobSubchannelPicker();
185+
subchannelPicker = new FixedResultPicker(PickResult.withSubchannel(subchannelImpl));
202186
delayedTransport.reprocess(subchannelPicker);
203187
}
204188

@@ -270,23 +254,8 @@ void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
270254
delayedTransport.reprocess(subchannelPicker);
271255
break;
272256
case TRANSIENT_FAILURE:
273-
final class OobErrorPicker extends SubchannelPicker {
274-
final PickResult errorResult = PickResult.withError(newState.getStatus());
275-
276-
@Override
277-
public PickResult pickSubchannel(PickSubchannelArgs args) {
278-
return errorResult;
279-
}
280-
281-
@Override
282-
public String toString() {
283-
return MoreObjects.toStringHelper(OobErrorPicker.class)
284-
.add("errorResult", errorResult)
285-
.toString();
286-
}
287-
}
288-
289-
delayedTransport.reprocess(new OobErrorPicker());
257+
delayedTransport.reprocess(
258+
new FixedResultPicker(PickResult.withError(newState.getStatus())));
290259
break;
291260
default:
292261
// Do nothing

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
2525

2626
import com.google.common.annotations.VisibleForTesting;
27-
import com.google.common.base.MoreObjects;
2827
import com.google.common.collect.ImmutableList;
2928
import com.google.common.collect.Lists;
3029
import io.grpc.Attributes;
@@ -164,7 +163,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
164163
if (noOldAddrs) {
165164
// Make tests happy; they don't properly assume starting in CONNECTING
166165
rawConnectivityState = CONNECTING;
167-
updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
166+
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
168167
}
169168

170169
if (rawConnectivityState == READY) {
@@ -237,7 +236,7 @@ public void handleNameResolutionError(Status error) {
237236
subchannels.clear();
238237
addressIndex.updateGroups(ImmutableList.of());
239238
rawConnectivityState = TRANSIENT_FAILURE;
240-
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
239+
updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
241240
}
242241

243242
void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
@@ -290,7 +289,7 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
290289

291290
case CONNECTING:
292291
rawConnectivityState = CONNECTING;
293-
updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
292+
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
294293
break;
295294

296295
case READY:
@@ -322,7 +321,7 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
322321
if (isPassComplete()) {
323322
rawConnectivityState = TRANSIENT_FAILURE;
324323
updateBalancingState(TRANSIENT_FAILURE,
325-
new Picker(PickResult.withError(stateInfo.getStatus())));
324+
new FixedResultPicker(PickResult.withError(stateInfo.getStatus())));
326325

327326
// Refresh Name Resolution, but only when all 3 conditions are met
328327
// * We are at the end of addressIndex
@@ -385,11 +384,11 @@ private void updateHealthCheckedState(SubchannelData subchannelData) {
385384
updateBalancingState(READY,
386385
new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
387386
} else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
388-
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
387+
updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(
389388
subchannelData.healthStateInfo.getStatus())));
390389
} else if (concludedState != TRANSIENT_FAILURE) {
391390
updateBalancingState(subchannelData.getHealthState(),
392-
new Picker(PickResult.withNoResult()));
391+
new FixedResultPicker(PickResult.withNoResult()));
393392
}
394393
}
395394

@@ -593,28 +592,6 @@ ConnectivityState getConcludedConnectivityState() {
593592
return this.concludedState;
594593
}
595594

596-
/**
597-
* No-op picker which doesn't add any custom picking logic. It just passes already known result
598-
* received in constructor.
599-
*/
600-
private static final class Picker extends SubchannelPicker {
601-
private final PickResult result;
602-
603-
Picker(PickResult result) {
604-
this.result = checkNotNull(result, "result");
605-
}
606-
607-
@Override
608-
public PickResult pickSubchannel(PickSubchannelArgs args) {
609-
return result;
610-
}
611-
612-
@Override
613-
public String toString() {
614-
return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
615-
}
616-
}
617-
618595
/**
619596
* Picker that requests connection during the first pick, and returns noResult.
620597
*/

core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static io.grpc.ConnectivityState.SHUTDOWN;
2323
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
2424

25-
import com.google.common.base.MoreObjects;
2625
import io.grpc.ConnectivityState;
2726
import io.grpc.ConnectivityStateInfo;
2827
import io.grpc.EquivalentAddressGroup;
@@ -87,7 +86,8 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
8786

8887
// The channel state does not get updated when doing name resolving today, so for the moment
8988
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
90-
updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
89+
updateBalancingState(
90+
CONNECTING, new FixedResultPicker(PickResult.withSubchannel(subchannel)));
9191
subchannel.requestConnection();
9292
} else {
9393
subchannel.updateAddresses(servers);
@@ -105,7 +105,7 @@ public void handleNameResolutionError(Status error) {
105105

106106
// NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine
107107
// for time being.
108-
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
108+
updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
109109
}
110110

111111
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
@@ -139,13 +139,13 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
139139
case CONNECTING:
140140
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
141141
// the current picker in-place. But ignoring the potential optimization is simpler.
142-
picker = new Picker(PickResult.withNoResult());
142+
picker = new FixedResultPicker(PickResult.withNoResult());
143143
break;
144144
case READY:
145-
picker = new Picker(PickResult.withSubchannel(subchannel));
145+
picker = new FixedResultPicker(PickResult.withSubchannel(subchannel));
146146
break;
147147
case TRANSIENT_FAILURE:
148-
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
148+
picker = new FixedResultPicker(PickResult.withError(stateInfo.getStatus()));
149149
break;
150150
default:
151151
throw new IllegalArgumentException("Unsupported state:" + newState);
@@ -173,28 +173,6 @@ public void requestConnection() {
173173
}
174174
}
175175

176-
/**
177-
* No-op picker which doesn't add any custom picking logic. It just passes already known result
178-
* received in constructor.
179-
*/
180-
private static final class Picker extends SubchannelPicker {
181-
private final PickResult result;
182-
183-
Picker(PickResult result) {
184-
this.result = checkNotNull(result, "result");
185-
}
186-
187-
@Override
188-
public PickResult pickSubchannel(PickSubchannelArgs args) {
189-
return result;
190-
}
191-
192-
@Override
193-
public String toString() {
194-
return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
195-
}
196-
}
197-
198176
/** Picker that requests connection during the first pick, and returns noResult. */
199177
private final class RequestConnectionPicker extends SubchannelPicker {
200178
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);

examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
9292
});
9393
this.subchannel = subchannel;
9494

95-
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
95+
helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
9696
subchannel.requestConnection();
9797
} else {
9898
subchannel.updateAddresses(servers);
@@ -107,7 +107,8 @@ public void handleNameResolutionError(Status error) {
107107
subchannel.shutdown();
108108
subchannel = null;
109109
}
110-
helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
110+
helper.updateBalancingState(
111+
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
111112
}
112113

113114
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
@@ -125,13 +126,13 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
125126
picker = new RequestConnectionPicker();
126127
break;
127128
case CONNECTING:
128-
picker = new Picker(PickResult.withNoResult());
129+
picker = new FixedResultPicker(PickResult.withNoResult());
129130
break;
130131
case READY:
131-
picker = new Picker(PickResult.withSubchannel(subchannel));
132+
picker = new FixedResultPicker(PickResult.withSubchannel(subchannel));
132133
break;
133134
case TRANSIENT_FAILURE:
134-
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
135+
picker = new FixedResultPicker(PickResult.withError(stateInfo.getStatus()));
135136
break;
136137
default:
137138
throw new IllegalArgumentException("Unsupported state:" + currentState);
@@ -154,29 +155,6 @@ public void requestConnection() {
154155
}
155156
}
156157

157-
/**
158-
* No-op picker which doesn't add any custom picking logic. It just passes already known result
159-
* received in constructor.
160-
*/
161-
private static final class Picker extends SubchannelPicker {
162-
163-
private final PickResult result;
164-
165-
Picker(PickResult result) {
166-
this.result = checkNotNull(result, "result");
167-
}
168-
169-
@Override
170-
public PickResult pickSubchannel(PickSubchannelArgs args) {
171-
return result;
172-
}
173-
174-
@Override
175-
public String toString() {
176-
return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
177-
}
178-
}
179-
180158
/**
181159
* Picker that requests connection during the first pick, and returns noResult.
182160
*/

grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ enum Mode {
187187
private List<DropEntry> dropList = Collections.emptyList();
188188
// Contains only non-drop, i.e., backends from the round-robin list from the balancer.
189189
private List<BackendEntry> backendList = Collections.emptyList();
190+
private ConnectivityState currentState = ConnectivityState.CONNECTING;
190191
private RoundRobinPicker currentPicker =
191192
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
192193
private boolean requestConnectionPending;
@@ -937,10 +938,12 @@ private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker)
937938
// Discard the new picker if we are sure it won't make any difference, in order to save
938939
// re-processing pending streams, and avoid unnecessary resetting of the pointer in
939940
// RoundRobinPicker.
940-
if (picker.dropList.equals(currentPicker.dropList)
941+
if (state.equals(currentState)
942+
&& picker.dropList.equals(currentPicker.dropList)
941943
&& picker.pickList.equals(currentPicker.pickList)) {
942944
return;
943945
}
946+
currentState = state;
944947
currentPicker = picker;
945948
helper.updateBalancingState(state, picker);
946949
}

0 commit comments

Comments
 (0)