7272import java .util .concurrent .ScheduledThreadPoolExecutor ;
7373import java .util .concurrent .ThreadLocalRandom ;
7474import java .util .concurrent .TimeUnit ;
75- import java .util .concurrent .atomic .AtomicBoolean ;
7675import java .util .concurrent .atomic .AtomicInteger ;
7776import java .util .concurrent .atomic .AtomicLong ;
78- import java .util .concurrent .atomic .AtomicReference ;
7977import java .util .function .Supplier ;
8078import java .util .logging .Level ;
8179import java .util .logging .Logger ;
@@ -115,12 +113,31 @@ public class GcpManagedChannel extends ManagedChannel {
115113
116114 /** Opaque sticky channel reference for callers that should not depend on {@link ChannelRef}. */
117115 public static final class ChannelAffinityRef {
118- private final AtomicReference <ChannelRef > channelRef = new AtomicReference <>();
119- private final AtomicBoolean useDifferentChannelOnNextCall = new AtomicBoolean ();
116+ private static final int USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK = 1 << 31 ;
117+ private static final int CHANNEL_ID_MASK = ~USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK ;
118+ private static final int NO_CHANNEL_ID = -1 ;
119+
120+ // Single allocation hot-path state:
121+ // * lower 31 bits: channel id + 1, or 0 when unset.
122+ // * high bit: use a different active channel on the next call.
123+ private final AtomicInteger state = new AtomicInteger ();
120124
121125 /** Forces the next RPC to prefer a different active channel if one is available. */
122126 public void useDifferentChannelOnNextCall () {
123- useDifferentChannelOnNextCall .set (true );
127+ state .getAndUpdate (value -> value | USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK );
128+ }
129+
130+ private static int channelIdFromState (int state ) {
131+ int encodedChannelId = state & CHANNEL_ID_MASK ;
132+ return encodedChannelId == 0 ? NO_CHANNEL_ID : encodedChannelId - 1 ;
133+ }
134+
135+ private static boolean useDifferentChannelOnNextCallFromState (int state ) {
136+ return (state & USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK ) != 0 ;
137+ }
138+
139+ private static int stateFromChannelId (int channelId ) {
140+ return (channelId + 1 ) & CHANNEL_ID_MASK ;
124141 }
125142 }
126143
@@ -157,6 +174,7 @@ public void useDifferentChannelOnNextCall() {
157174
158175 // The channel pool.
159176 @ VisibleForTesting final List <ChannelRef > channelRefs = new CopyOnWriteArrayList <>();
177+ private final Map <Integer , ChannelRef > channelIdToChannelRef = new ConcurrentHashMap <>();
160178 // A set of channels that we removed from the pool and wait for their RPCs to be completed before
161179 // we can shut them down.
162180 final Set <ChannelRef > removedChannelRefs = new HashSet <>();
@@ -369,6 +387,7 @@ private synchronized void checkScaleDown() {
369387 channelRef .getChannel ().shutdown ();
370388 // Remove channel from broken channels map.
371389 fallbackMap .remove (channelRef .getId ());
390+ channelIdToChannelRef .remove (channelRef .getId ());
372391 }
373392 }
374393
@@ -1702,8 +1721,14 @@ protected ChannelRef getChannelRef(@Nullable String key) {
17021721 protected ChannelRef getChannelRefByAffinityRef (ChannelAffinityRef affinityRef ) {
17031722 maybeDynamicUpscale ();
17041723 while (true ) {
1705- ChannelRef channelRef = affinityRef .channelRef .get ();
1706- boolean useDifferentChannel = affinityRef .useDifferentChannelOnNextCall .getAndSet (false );
1724+ int state = affinityRef .state .get ();
1725+ int channelId = ChannelAffinityRef .channelIdFromState (state );
1726+ boolean useDifferentChannel =
1727+ ChannelAffinityRef .useDifferentChannelOnNextCallFromState (state );
1728+ ChannelRef channelRef =
1729+ channelId == ChannelAffinityRef .NO_CHANNEL_ID
1730+ ? null
1731+ : channelIdToChannelRef .get (channelId );
17071732 if (!useDifferentChannel && channelRef != null && channelRef .isActive ()) {
17081733 return channelRef ;
17091734 }
@@ -1712,7 +1737,8 @@ protected ChannelRef getChannelRefByAffinityRef(ChannelAffinityRef affinityRef)
17121737 useDifferentChannel
17131738 ? pickLeastBusyChannelDifferentFrom (channelRef )
17141739 : pickLeastBusyChannel (/* forFallback= */ false );
1715- if (affinityRef .channelRef .compareAndSet (channelRef , selectedChannelRef )) {
1740+ if (affinityRef .state .compareAndSet (
1741+ state , ChannelAffinityRef .stateFromChannelId (selectedChannelRef .getId ()))) {
17161742 return selectedChannelRef ;
17171743 }
17181744 }
@@ -1751,6 +1777,7 @@ ChannelRef createNewChannel() {
17511777 ChannelRef chRef = reusedChannelRef .get ();
17521778 channelRefs .add (chRef );
17531779 removedChannelRefs .remove (chRef );
1780+ channelIdToChannelRef .put (chRef .getId (), chRef );
17541781 chRef .activate ();
17551782 logger .finer (log ("Channel %d reused." , chRef .getId ()));
17561783 incReadyChannels (false );
@@ -1760,6 +1787,7 @@ ChannelRef createNewChannel() {
17601787
17611788 ChannelRef channelRef = new ChannelRef (delegateChannelBuilder .build ());
17621789 channelRefs .add (channelRef );
1790+ channelIdToChannelRef .put (channelRef .getId (), channelRef );
17631791 logger .finer (log ("Channel %d created." , channelRef .getId ()));
17641792 maxChannels .accumulateAndGet (getNumberOfChannels (), Math ::max );
17651793 return channelRef ;
0 commit comments