-
Notifications
You must be signed in to change notification settings - Fork 0
testing grpclb changes #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
bd07668
5dab3a7
8f0a011
edd38c5
b4eb6ae
6e7ebd3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,13 +37,16 @@ | |
| import io.grpc.ConnectivityStateInfo; | ||
| import io.grpc.Context; | ||
| import io.grpc.EquivalentAddressGroup; | ||
| import io.grpc.LoadBalancer.CreateSubchannelArgs; | ||
| import io.grpc.LoadBalancer; | ||
| import io.grpc.LoadBalancer.FixedResultPicker; | ||
| import io.grpc.LoadBalancer.Helper; | ||
| import io.grpc.LoadBalancer.PickResult; | ||
| import io.grpc.LoadBalancer.PickSubchannelArgs; | ||
| import io.grpc.LoadBalancer.ResolvedAddresses; | ||
| import io.grpc.LoadBalancer.Subchannel; | ||
| import io.grpc.LoadBalancer.SubchannelPicker; | ||
| import io.grpc.LoadBalancer.SubchannelStateListener; | ||
| import io.grpc.LoadBalancerProvider; | ||
| import io.grpc.LoadBalancerRegistry; | ||
| import io.grpc.ManagedChannel; | ||
| import io.grpc.Metadata; | ||
| import io.grpc.Status; | ||
|
|
@@ -62,6 +65,7 @@ | |
| import io.grpc.lb.v1.Server; | ||
| import io.grpc.lb.v1.ServerList; | ||
| import io.grpc.stub.StreamObserver; | ||
| import io.grpc.util.ForwardingLoadBalancerHelper; | ||
| import java.net.InetAddress; | ||
| import java.net.InetSocketAddress; | ||
| import java.net.UnknownHostException; | ||
|
|
@@ -119,7 +123,7 @@ final class GrpclbState { | |
| @VisibleForTesting | ||
| static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { | ||
| @Override | ||
| public PickResult picked(Metadata headers) { | ||
| public PickResult picked(PickSubchannelArgs args) { | ||
| return PickResult.withNoResult(); | ||
| } | ||
|
|
||
|
|
@@ -187,6 +191,15 @@ enum Mode { | |
| new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY)); | ||
| private boolean requestConnectionPending; | ||
|
|
||
| // Child LoadBalancer and state for PICK_FIRST mode delegation. | ||
| private final LoadBalancerProvider pickFirstLbProvider; | ||
| @Nullable | ||
| private LoadBalancer pickFirstLb; | ||
| private ConnectivityState pickFirstLbState = CONNECTING; | ||
| private SubchannelPicker pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult()); | ||
| @Nullable | ||
| private GrpclbClientLoadRecorder currentPickFirstLoadRecorder; | ||
|
|
||
| GrpclbState( | ||
| GrpclbConfig config, | ||
| Helper helper, | ||
|
|
@@ -212,6 +225,8 @@ public void onSubchannelState( | |
| } else { | ||
| this.subchannelPool = null; | ||
| } | ||
| this.pickFirstLbProvider = | ||
| LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first"); | ||
| this.time = checkNotNull(time, "time provider"); | ||
| this.stopwatch = checkNotNull(stopwatch, "stopwatch"); | ||
| this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService"); | ||
|
|
@@ -309,6 +324,12 @@ void handleAddresses( | |
|
|
||
| void requestConnection() { | ||
| requestConnectionPending = true; | ||
| // For PICK_FIRST mode with delegation, forward to the child LB. | ||
| if (config.getMode() == Mode.PICK_FIRST && pickFirstLb != null) { | ||
| pickFirstLb.requestConnection(); | ||
| requestConnectionPending = false; | ||
| return; | ||
| } | ||
| for (RoundRobinEntry entry : currentPicker.pickList) { | ||
| if (entry instanceof IdleSubchannelEntry) { | ||
| ((IdleSubchannelEntry) entry).subchannel.requestConnection(); | ||
|
|
@@ -323,15 +344,23 @@ private void maybeUseFallbackBackends() { | |
| } | ||
| // Balancer RPC should have either been broken or timed out. | ||
| checkState(fallbackReason != null, "no reason to fallback"); | ||
| for (Subchannel subchannel : subchannels.values()) { | ||
| ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get(); | ||
| if (stateInfo.getState() == READY) { | ||
| // For PICK_FIRST mode with delegation, check the child LB's state. | ||
| if (config.getMode() == Mode.PICK_FIRST) { | ||
| if (pickFirstLb != null && pickFirstLbState == READY) { | ||
| return; | ||
| } | ||
| // If we do have balancer-provided backends, use one of its error in the error message if | ||
| // fail to fallback. | ||
| if (stateInfo.getState() == TRANSIENT_FAILURE) { | ||
| fallbackReason = stateInfo.getStatus(); | ||
| // For PICK_FIRST, we don't have individual subchannel states to use as fallback reason. | ||
| } else { | ||
| for (Subchannel subchannel : subchannels.values()) { | ||
| ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get(); | ||
| if (stateInfo.getState() == READY) { | ||
| return; | ||
| } | ||
| // If we do have balancer-provided backends, use one of its error in the error message if | ||
| // fail to fallback. | ||
| if (stateInfo.getState() == TRANSIENT_FAILURE) { | ||
| fallbackReason = stateInfo.getStatus(); | ||
| } | ||
| } | ||
| } | ||
| // Fallback conditions met | ||
|
|
@@ -438,9 +467,10 @@ void shutdown() { | |
| subchannelPool.clear(); | ||
| break; | ||
| case PICK_FIRST: | ||
| if (!subchannels.isEmpty()) { | ||
| checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels); | ||
| subchannels.values().iterator().next().shutdown(); | ||
| // Shutdown the child pick_first LB which manages its own subchannels. | ||
| if (pickFirstLb != null) { | ||
| pickFirstLb.shutdown(); | ||
| pickFirstLb = null; | ||
| } | ||
| break; | ||
| default: | ||
|
|
@@ -517,22 +547,17 @@ private void updateServerList( | |
| subchannels = Collections.unmodifiableMap(newSubchannelMap); | ||
| break; | ||
| case PICK_FIRST: | ||
| checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels); | ||
| final Subchannel subchannel; | ||
| // Delegate to child pick_first LB for address management. | ||
| // Shutdown existing child LB if addresses become empty. | ||
| if (newBackendAddrList.isEmpty()) { | ||
| if (subchannels.size() == 1) { | ||
| subchannel = subchannels.values().iterator().next(); | ||
| subchannel.shutdown(); | ||
| subchannels = Collections.emptyMap(); | ||
| if (pickFirstLb != null) { | ||
| pickFirstLb.shutdown(); | ||
| pickFirstLb = null; | ||
| } | ||
| break; | ||
| } | ||
| List<EquivalentAddressGroup> eagList = new ArrayList<>(); | ||
| // Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to | ||
| // attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on | ||
| // headers. | ||
| // | ||
| // The PICK_FIRST code path doesn't cache Subchannels. | ||
| // Attach tokens to EAG attributes for TokenAttachingTracerFactory to retrieve. | ||
| for (BackendAddressGroup bag : newBackendAddrList) { | ||
| EquivalentAddressGroup origEag = bag.getAddresses(); | ||
| Attributes eagAttrs = origEag.getAttributes(); | ||
|
|
@@ -542,30 +567,24 @@ private void updateServerList( | |
| } | ||
| eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs)); | ||
| } | ||
| if (subchannels.isEmpty()) { | ||
| subchannel = | ||
| helper.createSubchannel( | ||
| CreateSubchannelArgs.newBuilder() | ||
| .setAddresses(eagList) | ||
| .setAttributes(createSubchannelAttrs()) | ||
| .build()); | ||
| subchannel.start(new SubchannelStateListener() { | ||
| @Override | ||
| public void onSubchannelState(ConnectivityStateInfo newState) { | ||
| handleSubchannelState(subchannel, newState); | ||
| } | ||
| }); | ||
| if (requestConnectionPending) { | ||
| subchannel.requestConnection(); | ||
| requestConnectionPending = false; | ||
| } | ||
| } else { | ||
| subchannel = subchannels.values().iterator().next(); | ||
| subchannel.updateAddresses(eagList); | ||
| // Always shutdown and recreate the child LB when addresses change to avoid | ||
| // calling Subchannel.updateAddresses(). This ensures we use the new dualstack- | ||
| // compatible path where the child LB creates fresh subchannels. | ||
| if (pickFirstLb != null) { | ||
| pickFirstLb.shutdown(); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment here states that the child load balancer is recreated "when addresses change", but the current implementation recreates it on every server list update. This happens even if the list of backends is identical to the previous one, which can lead to unnecessary connection churn and performance degradation if the remote balancer sends frequent but identical server lists. To align with the stated intent and optimize performance, I suggest caching the list of You could implement this by:
|
||
| pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper()); | ||
| // Pass addresses to child LB. | ||
| pickFirstLb.acceptResolvedAddresses( | ||
| ResolvedAddresses.newBuilder() | ||
| .setAddresses(eagList) | ||
| .build()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There seems to be a potential issue here where stale state from a previous child load balancer could be used. When It would be safer to reset the child LB's state immediately after creating a new instance. pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper());
// Reset the child LB state, since we created a new one.
pickFirstLbState = CONNECTING;
pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult());
// Pass addresses to child LB.
pickFirstLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(eagList)
.build()); |
||
| if (requestConnectionPending) { | ||
| pickFirstLb.requestConnection(); | ||
| requestConnectionPending = false; | ||
| } | ||
| subchannels = Collections.singletonMap(eagList, subchannel); | ||
| newBackendList.add( | ||
| new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder))); | ||
| // Store the load recorder for token attachment. | ||
| currentPickFirstLoadRecorder = loadRecorder; | ||
| break; | ||
| default: | ||
| throw new AssertionError("Missing case for " + config.getMode()); | ||
|
|
@@ -842,7 +861,11 @@ private void cleanUp() { | |
| private void maybeUpdatePicker() { | ||
| List<RoundRobinEntry> pickList; | ||
| ConnectivityState state; | ||
| if (backendList.isEmpty()) { | ||
| // For PICK_FIRST mode with delegation, check if child LB exists instead of backendList. | ||
| boolean hasBackends = config.getMode() == Mode.PICK_FIRST | ||
| ? pickFirstLb != null | ||
| : !backendList.isEmpty(); | ||
| if (!hasBackends) { | ||
| // Note balancer (is working) may enforce using fallback backends, and that fallback may | ||
| // fail. So we should check if currently in fallback first. | ||
| if (usingFallbackBackends) { | ||
|
|
@@ -894,26 +917,12 @@ private void maybeUpdatePicker() { | |
| } | ||
| break; | ||
| case PICK_FIRST: { | ||
| checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); | ||
| BackendEntry onlyEntry = backendList.get(0); | ||
| ConnectivityStateInfo stateInfo = | ||
| onlyEntry.subchannel.getAttributes().get(STATE_INFO).get(); | ||
| state = stateInfo.getState(); | ||
| switch (state) { | ||
| case READY: | ||
| pickList = Collections.<RoundRobinEntry>singletonList(onlyEntry); | ||
| break; | ||
| case TRANSIENT_FAILURE: | ||
| pickList = | ||
| Collections.<RoundRobinEntry>singletonList(new ErrorEntry(stateInfo.getStatus())); | ||
| break; | ||
| case CONNECTING: | ||
| pickList = Collections.singletonList(BUFFER_ENTRY); | ||
| break; | ||
| default: | ||
| pickList = Collections.<RoundRobinEntry>singletonList( | ||
| new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); | ||
| } | ||
| // Use child LB's state and picker. Wrap the picker for token attachment. | ||
| state = pickFirstLbState; | ||
| TokenAttachingTracerFactory tracerFactory = | ||
| new TokenAttachingTracerFactory(currentPickFirstLoadRecorder); | ||
| pickList = Collections.<RoundRobinEntry>singletonList( | ||
| new ChildLbPickerEntry(pickFirstLbPicker, tracerFactory)); | ||
| break; | ||
| } | ||
| default: | ||
|
|
@@ -983,7 +992,7 @@ public boolean equals(Object other) { | |
|
|
||
| @VisibleForTesting | ||
| interface RoundRobinEntry { | ||
| PickResult picked(Metadata headers); | ||
| PickResult picked(PickSubchannelArgs args); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -1024,7 +1033,8 @@ static final class BackendEntry implements RoundRobinEntry { | |
| } | ||
|
|
||
| @Override | ||
| public PickResult picked(Metadata headers) { | ||
| public PickResult picked(PickSubchannelArgs args) { | ||
| Metadata headers = args.getHeaders(); | ||
| headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); | ||
| if (token != null) { | ||
| headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); | ||
|
|
@@ -1065,7 +1075,7 @@ static final class IdleSubchannelEntry implements RoundRobinEntry { | |
| } | ||
|
|
||
| @Override | ||
| public PickResult picked(Metadata headers) { | ||
| public PickResult picked(PickSubchannelArgs args) { | ||
| if (connectionRequested.compareAndSet(false, true)) { | ||
| syncContext.execute(new Runnable() { | ||
| @Override | ||
|
|
@@ -1108,7 +1118,7 @@ static final class ErrorEntry implements RoundRobinEntry { | |
| } | ||
|
|
||
| @Override | ||
| public PickResult picked(Metadata headers) { | ||
| public PickResult picked(PickSubchannelArgs args) { | ||
| return result; | ||
| } | ||
|
|
||
|
|
@@ -1132,6 +1142,53 @@ public String toString() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Entry that wraps a child LB's picker for PICK_FIRST mode delegation. | ||
| * Attaches TokenAttachingTracerFactory to the pick result for token propagation. | ||
| */ | ||
| @VisibleForTesting | ||
| static final class ChildLbPickerEntry implements RoundRobinEntry { | ||
| private final SubchannelPicker childPicker; | ||
| private final TokenAttachingTracerFactory tracerFactory; | ||
|
|
||
| ChildLbPickerEntry(SubchannelPicker childPicker, TokenAttachingTracerFactory tracerFactory) { | ||
| this.childPicker = checkNotNull(childPicker, "childPicker"); | ||
| this.tracerFactory = checkNotNull(tracerFactory, "tracerFactory"); | ||
| } | ||
|
|
||
| @Override | ||
| public PickResult picked(PickSubchannelArgs args) { | ||
| PickResult childResult = childPicker.pickSubchannel(args); | ||
| if (childResult.getSubchannel() == null) { | ||
| // No subchannel (e.g., buffer, error), return as-is. | ||
| return childResult; | ||
| } | ||
| // Wrap the pick result to attach tokens via the tracer factory. | ||
| return PickResult.withSubchannel( | ||
| childResult.getSubchannel(), tracerFactory, childResult.getAuthorityOverride()); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(childPicker, tracerFactory); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object other) { | ||
| if (!(other instanceof ChildLbPickerEntry)) { | ||
| return false; | ||
| } | ||
| ChildLbPickerEntry that = (ChildLbPickerEntry) other; | ||
| return Objects.equal(childPicker, that.childPicker) | ||
| && Objects.equal(tracerFactory, that.tracerFactory); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "ChildLbPickerEntry(" + childPicker + ")"; | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static final class RoundRobinPicker extends SubchannelPicker { | ||
| @VisibleForTesting | ||
|
|
@@ -1174,7 +1231,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { | |
| if (pickIndex == pickList.size()) { | ||
| pickIndex = 0; | ||
| } | ||
| return pick.picked(args.getHeaders()); | ||
| return pick.picked(args); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1189,4 +1246,28 @@ public String toString() { | |
| return MoreObjects.toStringHelper(RoundRobinPicker.class).toString(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Helper for the child pick_first LB in PICK_FIRST mode. Intercepts updateBalancingState() | ||
| * to store state and trigger the grpclb picker update with drops and token attachment. | ||
| */ | ||
| private final class PickFirstLbHelper extends ForwardingLoadBalancerHelper { | ||
|
|
||
| @Override | ||
| protected Helper delegate() { | ||
| return helper; | ||
| } | ||
|
|
||
| @Override | ||
| public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { | ||
| pickFirstLbState = newState; | ||
| pickFirstLbPicker = newPicker; | ||
| // Trigger name resolution refresh on TRANSIENT_FAILURE or IDLE, similar to ROUND_ROBIN. | ||
| if (newState == TRANSIENT_FAILURE || newState == IDLE) { | ||
| helper.refreshNameResolution(); | ||
| } | ||
| maybeUseFallbackBackends(); | ||
| maybeUpdatePicker(); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For improved robustness, it's a good practice to ensure that the
pickFirstLbProvideris not null.LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first")is annotated with@Nullableand can returnnullif the "pick_first" provider is not found in the registry. ANullPointerExceptionwould occur later inupdateServerListwhenpickFirstLbProvider.newLoadBalancer()is called.Adding a
checkNotNullwill cause a clean and early failure during initialization if the provider is missing, which is preferable to a deferredNullPointerException.