5858import java .util .ArrayList ;
5959import java .util .Collection ;
6060import java .util .Collections ;
61+ import java .util .HashMap ;
62+ import java .util .Iterator ;
63+ import java .util .LinkedHashMap ;
6164import java .util .List ;
65+ import java .util .Map ;
6266import java .util .concurrent .ScheduledExecutorService ;
6367import java .util .concurrent .TimeUnit ;
6468import javax .annotation .Nullable ;
69+ import javax .annotation .concurrent .GuardedBy ;
6570import javax .annotation .concurrent .ThreadSafe ;
6671
6772/**
@@ -126,7 +131,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
126131 private ManagedClientTransport shutdownDueToUpdateTransport ;
127132
128133 /**
129- * All transports that are not terminated. At the very least the value of {@link #activeTransport }
134+ * All transports that are not terminated. At the very least the value of {@link #activeTransports }
130135 * will be present, but previously used transports that still have streams or are stopping may
131136 * also be present.
132137 */
@@ -153,10 +158,11 @@ protected void handleNotInUse() {
153158 private ConnectionClientTransport pendingTransport ;
154159
155160 /**
156- * The transport for new outgoing requests. Non-null only in READY state.
161+ * The transports for new outgoing requests and their open stream counts. Only in READY state transports will be
162+ * present in this map.
157163 */
158164 @ Nullable
159- private volatile ManagedClientTransport activeTransport ;
165+ private final LinkedHashMap < ClientTransport , Integer > activeTransports = new LinkedHashMap <>() ;
160166
161167 private volatile ConnectivityStateInfo state = ConnectivityStateInfo .forNonError (IDLE );
162168
@@ -209,9 +215,12 @@ ChannelLogger getChannelLogger() {
209215
210216 @ Override
211217 public ClientTransport obtainActiveTransport () {
212- ClientTransport savedTransport = activeTransport ;
213- if (savedTransport != null ) {
214- return savedTransport ;
218+ synchronized (this ) {
219+ ClientTransport activeTransport = getTransport ();
220+ if (activeTransport != null ) {
221+ activeTransports .put (activeTransport , activeTransports .get (activeTransport ) + 1 );
222+ return activeTransport ;
223+ }
215224 }
216225 syncContext .execute (new Runnable () {
217226 @ Override
@@ -230,8 +239,16 @@ public void run() {
230239 * Returns a READY transport if there is any, without trying to connect.
231240 */
232241 @ Nullable
233- ClientTransport getTransport () {
234- return activeTransport ;
242+ synchronized ClientTransport getTransport () {
243+ Iterator <Map .Entry <ClientTransport , Integer >> iterator = activeTransports .entrySet ().iterator ();
244+ while (iterator .hasNext ()) {
245+ Map .Entry <ClientTransport , Integer > transportAndStreamCount = iterator .next ();
246+ // TODO: get the stream limit via settings
247+ if (transportAndStreamCount .getValue () < 0 ) {
248+ return transportAndStreamCount .getKey ();
249+ }
250+ }
251+ return null ;
235252 }
236253
237254 /**
@@ -384,8 +401,8 @@ public void run() {
384401 if (!addressIndex .seekTo (previousAddress )) {
385402 // Forced to drop the connection
386403 if (state .getState () == READY ) {
387- savedTransport = activeTransport ;
388- activeTransport = null ;
404+ savedTransport = activeTransports ;
405+ activeTransports = null ;
389406 addressIndex .reset ();
390407 gotoNonErrorState (IDLE );
391408 } else {
@@ -441,9 +458,9 @@ public void run() {
441458 return ;
442459 }
443460 shutdownReason = reason ;
444- savedActiveTransport = activeTransport ;
461+ savedActiveTransport = activeTransports ;
445462 savedPendingTransport = pendingTransport ;
446- activeTransport = null ;
463+ activeTransports = null ;
447464 pendingTransport = null ;
448465 gotoNonErrorState (SHUTDOWN );
449466 addressIndex .reset ();
@@ -594,11 +611,11 @@ public void run() {
594611 reconnectPolicy = null ;
595612 if (shutdownReason != null ) {
596613 // activeTransport should have already been set to null by shutdown(). We keep it null.
597- Preconditions .checkState (activeTransport == null ,
614+ Preconditions .checkState (activeTransports == null ,
598615 "Unexpected non-null activeTransport" );
599616 transport .shutdown (shutdownReason );
600617 } else if (pendingTransport == transport ) {
601- activeTransport = transport ;
618+ activeTransports = transport ;
602619 pendingTransport = null ;
603620 connectedAddressAttributes = addressIndex .getCurrentEagAttributes ();
604621 gotoNonErrorState (READY );
@@ -630,8 +647,8 @@ public void run() {
630647 if (state .getState () == SHUTDOWN ) {
631648 return ;
632649 }
633- if (activeTransport == transport ) {
634- activeTransport = null ;
650+ if (activeTransports == transport ) {
651+ activeTransports = null ;
635652 addressIndex .reset ();
636653 gotoNonErrorState (IDLE );
637654 subchannelMetrics .recordDisconnection (/* target= */ target ,
0 commit comments