Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public class GcpManagedChannel extends ManagedChannel {
private final AtomicLong totalReadinessTime = new AtomicLong();
private final AtomicLong readinessTimeOccurrences = new AtomicLong();
private final AtomicInteger totalActiveStreams = new AtomicInteger();
private final AtomicInteger leastBusyPickOffset = new AtomicInteger();
private AtomicInteger minActiveStreams = new AtomicInteger();
private AtomicInteger maxActiveStreams = new AtomicInteger();
private AtomicInteger minTotalActiveStreams = new AtomicInteger();
Expand Down Expand Up @@ -1759,12 +1760,19 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) {

// Pick the least busy channel and the least busy ready and not overloaded channel (this could
// be the same channel or different or no channel).
ChannelRef channelCandidate = channelRefs.get(0);
// Iteration starts at a rotating offset so that ties don't always break to channelRefs[0]:
// activeStreamsCount is incremented later in GcpClientCall.start(), so a burst of concurrent
// first-time getChannelRef(key) calls all observe equal counts and would otherwise all bind to
// index 0, funnelling traffic onto one channel.
final int size = channelRefs.size();
final int offset = Math.floorMod(leastBusyPickOffset.getAndIncrement(), size);
ChannelRef channelCandidate = channelRefs.get(offset);
int minStreams = channelCandidate.getActiveStreamsCount();
ChannelRef readyCandidate = null;
int readyMinStreams = Integer.MAX_VALUE;

for (ChannelRef channelRef : channelRefs) {
for (int i = 0; i < size; i++) {
ChannelRef channelRef = channelRefs.get((offset + i) % size);
int cnt = channelRef.getActiveStreamsCount();
if (cnt < minStreams) {
minStreams = cnt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,23 @@ public void testGetChannelRefPickUpSmallest() {
assertEquals(6, gcpChannel.getChannelRef(null).getAffinityCount());
}

@Test
public void testNewKeyBindingsSpreadAcrossIdleChannels() {
// Regression: when all activeStreamsCounts are equal (e.g., a burst of new
// affinity keys arriving before any GcpClientCall.start() runs), bindings
// should spread across channels rather than collapsing onto index 0.
resetGcpChannel();
final int numChannels = 8;
for (int i = 0; i < numChannels; i++) {
gcpChannel.channelRefs.add(gcpChannel.new ChannelRef(builder.build(), i, 0));
}
Set<ChannelRef> picked = new HashSet<>();
for (int i = 0; i < 64; i++) {
picked.add(gcpChannel.getChannelRef("k" + i));
}
assertThat(picked).hasSize(numChannels);
}

private void assertFallbacksMetric(
FakeMetricRegistry fakeRegistry, long successes, long failures) {
MetricsRecord record = fakeRegistry.pollRecord();
Expand Down