diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java index 09f1dd0..ec6a988 100644 --- a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java @@ -195,6 +195,7 @@ public class GcpManagedChannel extends ManagedChannel { private final AtomicLong totalReadinessTime = new AtomicLong(); private final AtomicLong readinessTimeOccurrences = new AtomicLong(); private final AtomicInteger totalActiveStreams = new AtomicInteger(); + private final AtomicInteger leastBusyPickOffset = new AtomicInteger(); private AtomicInteger minActiveStreams = new AtomicInteger(); private AtomicInteger maxActiveStreams = new AtomicInteger(); private AtomicInteger minTotalActiveStreams = new AtomicInteger(); @@ -1759,12 +1760,19 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) { // Pick the least busy channel and the least busy ready and not overloaded channel (this could // be the same channel or different or no channel). - ChannelRef channelCandidate = channelRefs.get(0); + // Iteration starts at a rotating offset so that ties don't always break to channelRefs[0]: + // activeStreamsCount is incremented later in GcpClientCall.start(), so a burst of concurrent + // first-time getChannelRef(key) calls all observe equal counts and would otherwise all bind to + // index 0, funnelling traffic onto one channel. + final int size = channelRefs.size(); + final int offset = Math.floorMod(leastBusyPickOffset.getAndIncrement(), size); + ChannelRef channelCandidate = channelRefs.get(offset); int minStreams = channelCandidate.getActiveStreamsCount(); ChannelRef readyCandidate = null; int readyMinStreams = Integer.MAX_VALUE; - for (ChannelRef channelRef : channelRefs) { + for (int i = 0; i < size; i++) { + ChannelRef channelRef = channelRefs.get((offset + i) % size); int cnt = channelRef.getActiveStreamsCount(); if (cnt < minStreams) { minStreams = cnt; diff --git a/grpc-gcp/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java b/grpc-gcp/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java index f3ccbdb..5280901 100644 --- a/grpc-gcp/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java +++ b/grpc-gcp/src/test/java/com/google/cloud/grpc/GcpManagedChannelTest.java @@ -315,6 +315,23 @@ public void testGetChannelRefPickUpSmallest() { assertEquals(6, gcpChannel.getChannelRef(null).getAffinityCount()); } + @Test + public void testNewKeyBindingsSpreadAcrossIdleChannels() { + // Regression: when all activeStreamsCounts are equal (e.g., a burst of new + // affinity keys arriving before any GcpClientCall.start() runs), bindings + // should spread across channels rather than collapsing onto index 0. + resetGcpChannel(); + final int numChannels = 8; + for (int i = 0; i < numChannels; i++) { + gcpChannel.channelRefs.add(gcpChannel.new ChannelRef(builder.build(), i, 0)); + } + Set picked = new HashSet<>(); + for (int i = 0; i < 64; i++) { + picked.add(gcpChannel.getChannelRef("k" + i)); + } + assertThat(picked).hasSize(numChannels); + } + private void assertFallbacksMetric( FakeMetricRegistry fakeRegistry, long successes, long failures) { MetricsRecord record = fakeRegistry.pollRecord();