6868import java .util .concurrent .ExecutorService ;
6969import java .util .concurrent .Executors ;
7070import java .util .concurrent .RejectedExecutionException ;
71- import java .util .concurrent .ScheduledExecutorService ;
71+ import java .util .concurrent .ScheduledFuture ;
72+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
73+ import java .util .concurrent .ThreadLocalRandom ;
7274import java .util .concurrent .TimeUnit ;
7375import java .util .concurrent .atomic .AtomicInteger ;
7476import java .util .concurrent .atomic .AtomicLong ;
@@ -122,6 +124,8 @@ public class GcpManagedChannel extends ManagedChannel {
122124 private Duration scaleDownInterval = Duration .ZERO ;
123125 private boolean isDynamicScalingEnabled = false ;
124126 private int maxConcurrentStreamsLowWatermark = DEFAULT_MAX_STREAM ;
127+ private GcpManagedChannelOptions .ChannelPickStrategy channelPickStrategy =
128+ GcpManagedChannelOptions .ChannelPickStrategy .POWER_OF_TWO ;
125129 private Duration affinityKeyLifetime = Duration .ZERO ;
126130
127131 @ VisibleForTesting final Map <String , AffinityConfig > methodToAffinity = new HashMap <>();
@@ -179,8 +183,12 @@ public class GcpManagedChannel extends ManagedChannel {
179183 private final String metricPoolIndex =
180184 String .format ("pool-%d" , channelPoolIndex .incrementAndGet ());
181185 private final Map <String , Long > cumulativeMetricValues = new ConcurrentHashMap <>();
182- private final ScheduledExecutorService backgroundService =
183- Executors .newSingleThreadScheduledExecutor (GcpThreadFactory .newThreadFactory ("gcp-mc-bg-%d" ));
186+ private static final ScheduledThreadPoolExecutor SHARED_BACKGROUND_SERVICE =
187+ createSharedBackgroundService ();
188+
189+ private ScheduledFuture <?> cleanupTask ;
190+ private ScheduledFuture <?> scaleDownTask ;
191+ private ScheduledFuture <?> logMetricsTask ;
184192
185193 // Metrics counters.
186194 private final AtomicInteger readyChannels = new AtomicInteger ();
@@ -223,6 +231,17 @@ public class GcpManagedChannel extends ManagedChannel {
223231 private AtomicLong scaleUpCount = new AtomicLong ();
224232 private AtomicLong scaleDownCount = new AtomicLong ();
225233
234+ private static ScheduledThreadPoolExecutor createSharedBackgroundService () {
235+ ScheduledThreadPoolExecutor executor =
236+ new ScheduledThreadPoolExecutor (
237+ Math .max (2 , Math .min (4 , Runtime .getRuntime ().availableProcessors () / 2 )),
238+ GcpThreadFactory .newThreadFactory ("gcp-mc-bg-%d" ));
239+ executor .setRemoveOnCancelPolicy (true );
240+ executor .setExecuteExistingDelayedTasksAfterShutdownPolicy (false );
241+ executor .setContinueExistingPeriodicTasksAfterShutdownPolicy (false );
242+ return executor ;
243+ }
244+
226245 /**
227246 * Constructor for GcpManagedChannel.
228247 *
@@ -396,6 +415,7 @@ private void initOptions() {
396415 scaleDownInterval = poolOptions .getScaleDownInterval ();
397416 isDynamicScalingEnabled =
398417 minRpcPerChannel > 0 && maxRpcPerChannel > 0 && !scaleDownInterval .isZero ();
418+ channelPickStrategy = poolOptions .getChannelPickStrategy ();
399419 }
400420 initMetrics ();
401421 }
@@ -404,27 +424,30 @@ private synchronized void initCleanupTask(Duration cleanupInterval) {
404424 if (cleanupInterval .isZero ()) {
405425 return ;
406426 }
407- backgroundService .scheduleAtFixedRate (
408- this ::cleanupAffinityKeys ,
409- cleanupInterval .toMillis (),
410- cleanupInterval .toMillis (),
411- MILLISECONDS );
427+ cleanupTask =
428+ SHARED_BACKGROUND_SERVICE .scheduleAtFixedRate (
429+ this ::cleanupAffinityKeys ,
430+ cleanupInterval .toMillis (),
431+ cleanupInterval .toMillis (),
432+ MILLISECONDS );
412433 }
413434
414435 private synchronized void initScaleDownChecker (Duration scaleDownInterval ) {
415436 if (!isDynamicScalingEnabled || scaleDownInterval .isZero ()) {
416437 return ;
417438 }
418439
419- backgroundService .scheduleAtFixedRate (
420- this ::checkScaleDown ,
421- scaleDownInterval .toMillis (),
422- scaleDownInterval .toMillis (),
423- MILLISECONDS );
440+ scaleDownTask =
441+ SHARED_BACKGROUND_SERVICE .scheduleAtFixedRate (
442+ this ::checkScaleDown ,
443+ scaleDownInterval .toMillis (),
444+ scaleDownInterval .toMillis (),
445+ MILLISECONDS );
424446 }
425447
426448 private synchronized void initLogMetrics () {
427- backgroundService .scheduleAtFixedRate (this ::logMetrics , 60 , 60 , SECONDS );
449+ logMetricsTask =
450+ SHARED_BACKGROUND_SERVICE .scheduleAtFixedRate (this ::logMetrics , 60 , 60 , SECONDS );
428451 }
429452
430453 private void logMetricsOptions () {
@@ -1757,8 +1780,52 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) {
17571780 return first ;
17581781 }
17591782
1760- // Pick the least busy channel and the least busy ready and not overloaded channel (this could
1761- // be the same channel or different or no channel).
1783+ if (!fallbackEnabled ) {
1784+ return pickLeastBusyNoFallback ();
1785+ }
1786+
1787+ return pickLeastBusyWithFallback (forFallback );
1788+ }
1789+
1790+ /**
1791+ * Non-fallback channel selection. Uses the configured {@link
1792+ * GcpManagedChannelOptions.ChannelPickStrategy}.
1793+ */
1794+ private ChannelRef pickLeastBusyNoFallback () {
1795+ ChannelRef channelCandidate ;
1796+ int minStreams ;
1797+
1798+ if (channelPickStrategy == GcpManagedChannelOptions .ChannelPickStrategy .POWER_OF_TWO ) {
1799+ channelCandidate = pickPowerOfTwo ();
1800+ minStreams = channelCandidate .getActiveStreamsCount ();
1801+ } else {
1802+ channelCandidate = channelRefs .get (0 );
1803+ minStreams = channelCandidate .getActiveStreamsCount ();
1804+ for (ChannelRef channelRef : channelRefs ) {
1805+ int cnt = channelRef .getActiveStreamsCount ();
1806+ if (cnt < minStreams ) {
1807+ minStreams = cnt ;
1808+ channelCandidate = channelRef ;
1809+ }
1810+ }
1811+ }
1812+
1813+ if (shouldScaleUp (minStreams )) {
1814+ ChannelRef newChannel = tryCreateNewChannel ();
1815+ if (newChannel != null ) {
1816+ scaleUpCount .incrementAndGet ();
1817+ return newChannel ;
1818+ }
1819+ }
1820+ return channelCandidate ;
1821+ }
1822+
1823+ /**
1824+ * Fallback-enabled channel selection. Always uses a full linear scan because the fallback logic
1825+ * needs to filter channels by readiness state and max stream limits.
1826+ */
1827+ private ChannelRef pickLeastBusyWithFallback (boolean forFallback ) {
1828+ // Full scan required: readyCandidate must be filtered by fallbackMap and DEFAULT_MAX_STREAM.
17621829 ChannelRef channelCandidate = channelRefs .get (0 );
17631830 int minStreams = channelCandidate .getActiveStreamsCount ();
17641831 ChannelRef readyCandidate = null ;
@@ -1778,17 +1845,6 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) {
17781845 }
17791846 }
17801847
1781- if (!fallbackEnabled ) {
1782- if (shouldScaleUp (minStreams )) {
1783- ChannelRef newChannel = tryCreateNewChannel ();
1784- if (newChannel != null ) {
1785- scaleUpCount .incrementAndGet ();
1786- return newChannel ;
1787- }
1788- }
1789- return channelCandidate ;
1790- }
1791-
17921848 if (shouldScaleUp (readyMinStreams )) {
17931849 ChannelRef newChannel = tryCreateNewChannel ();
17941850 if (newChannel != null ) {
@@ -1825,6 +1881,41 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) {
18251881 return channelCandidate ;
18261882 }
18271883
1884+ /**
1885+ * Power-of-two random choices: pick two channels at random and return the less busy one. On tie,
1886+ * prefer the channel with more recent activity (warmer) to preserve connection warmth under low
1887+ * traffic.
1888+ */
1889+ private ChannelRef pickPowerOfTwo () {
1890+ int size = channelRefs .size ();
1891+ if (size == 1 ) {
1892+ return channelRefs .get (0 );
1893+ }
1894+
1895+ ThreadLocalRandom random = ThreadLocalRandom .current ();
1896+ int i = random .nextInt (size );
1897+ int j = random .nextInt (size - 1 );
1898+ if (j >= i ) {
1899+ j ++;
1900+ }
1901+
1902+ ChannelRef a = channelRefs .get (i );
1903+ ChannelRef b = channelRefs .get (j );
1904+
1905+ int aStreams = a .getActiveStreamsCount ();
1906+ int bStreams = b .getActiveStreamsCount ();
1907+
1908+ if (aStreams < bStreams ) {
1909+ return a ;
1910+ }
1911+ if (bStreams < aStreams ) {
1912+ return b ;
1913+ }
1914+
1915+ // Tie: prefer the warmer channel (more recent activity) to preserve connection warmth.
1916+ return a .lastResponseNanos >= b .lastResponseNanos ? a : b ;
1917+ }
1918+
18281919 @ Override
18291920 public String authority () {
18301921 if (!channelRefs .isEmpty ()) {
@@ -1882,6 +1973,21 @@ private String keyFromOptsCtx(CallOptions callOptions) {
18821973 return key ;
18831974 }
18841975
1976+ private void cancelBackgroundTasks () {
1977+ if (cleanupTask != null ) {
1978+ cleanupTask .cancel (false );
1979+ cleanupTask = null ;
1980+ }
1981+ if (scaleDownTask != null ) {
1982+ scaleDownTask .cancel (false );
1983+ scaleDownTask = null ;
1984+ }
1985+ if (logMetricsTask != null ) {
1986+ logMetricsTask .cancel (false );
1987+ logMetricsTask = null ;
1988+ }
1989+ }
1990+
18851991 @ Override
18861992 public ManagedChannel shutdownNow () {
18871993 logger .finer (log ("Shutdown now started." ));
@@ -1895,9 +2001,7 @@ public ManagedChannel shutdownNow() {
18952001 channelRef .getChannel ().shutdownNow ();
18962002 }
18972003 }
1898- if (backgroundService != null && !backgroundService .isTerminated ()) {
1899- backgroundService .shutdownNow ();
1900- }
2004+ cancelBackgroundTasks ();
19012005 if (!stateNotificationExecutor .isTerminated ()) {
19022006 stateNotificationExecutor .shutdownNow ();
19032007 }
@@ -1913,9 +2017,7 @@ public ManagedChannel shutdown() {
19132017 for (ChannelRef channelRef : removedChannelRefs ) {
19142018 channelRef .getChannel ().shutdown ();
19152019 }
1916- if (backgroundService != null ) {
1917- backgroundService .shutdown ();
1918- }
2020+ cancelBackgroundTasks ();
19192021 stateNotificationExecutor .shutdown ();
19202022 return this ;
19212023 }
@@ -1936,10 +2038,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
19362038 channelRef .getChannel ().awaitTermination (awaitTimeNanos , NANOSECONDS );
19372039 }
19382040 long awaitTimeNanos = endTimeNanos - System .nanoTime ();
1939- if (backgroundService != null && awaitTimeNanos > 0 ) {
1940- //noinspection ResultOfMethodCallIgnored
1941- backgroundService .awaitTermination (awaitTimeNanos , NANOSECONDS );
1942- }
19432041 awaitTimeNanos = endTimeNanos - System .nanoTime ();
19442042 if (awaitTimeNanos > 0 ) {
19452043 // noinspection ResultOfMethodCallIgnored
@@ -1957,10 +2055,10 @@ public boolean isShutdown() {
19572055 return false ;
19582056 }
19592057 }
1960- if ( backgroundService != null && ! backgroundService . isShutdown ()) {
1961- return false ;
1962- }
1963- return stateNotificationExecutor .isShutdown ();
2058+ return cleanupTask == null
2059+ && scaleDownTask == null
2060+ && logMetricsTask == null
2061+ && stateNotificationExecutor .isShutdown ();
19642062 }
19652063
19662064 @ Override
@@ -1972,10 +2070,10 @@ public boolean isTerminated() {
19722070 return false ;
19732071 }
19742072 }
1975- if ( backgroundService != null && ! backgroundService . isTerminated ()) {
1976- return false ;
1977- }
1978- return stateNotificationExecutor .isTerminated ();
2073+ return cleanupTask == null
2074+ && scaleDownTask == null
2075+ && logMetricsTask == null
2076+ && stateNotificationExecutor .isTerminated ();
19792077 }
19802078
19812079 /** Get the current connectivity state of the channel pool. */
0 commit comments