2929import java .util .concurrent .ConcurrentHashMap ;
3030import software .amazon .awssdk .annotations .SdkProtectedApi ;
3131import software .amazon .awssdk .crt .CrtResource ;
32- import software .amazon .awssdk .crt .http .HttpClientConnectionManager ;
32+ import software .amazon .awssdk .crt .http .Http2StreamManagerOptions ;
3333import software .amazon .awssdk .crt .http .HttpClientConnectionManagerOptions ;
3434import software .amazon .awssdk .crt .http .HttpMonitoringOptions ;
3535import software .amazon .awssdk .crt .http .HttpProxyOptions ;
36+ import software .amazon .awssdk .crt .http .HttpStreamManager ;
37+ import software .amazon .awssdk .crt .http .HttpStreamManagerOptions ;
38+ import software .amazon .awssdk .crt .http .HttpVersion ;
3639import software .amazon .awssdk .crt .io .ClientBootstrap ;
3740import software .amazon .awssdk .crt .io .SocketOptions ;
3841import software .amazon .awssdk .crt .io .TlsContext ;
@@ -58,46 +61,48 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
5861 private static final long DEFAULT_STREAM_WINDOW_SIZE = 16L * 1024L * 1024L ; // 16 MB
5962
6063 protected final long readBufferSize ;
61- private final Map <URI , HttpClientConnectionManager > connectionPools = new ConcurrentHashMap <>();
64+ protected final Protocol protocol ;
65+ private final Map <URI , HttpStreamManager > connectionPools = new ConcurrentHashMap <>();
6266 private final LinkedList <CrtResource > ownedSubResources = new LinkedList <>();
6367 private final ClientBootstrap bootstrap ;
6468 private final SocketOptions socketOptions ;
6569 private final TlsContext tlsContext ;
6670 private final HttpProxyOptions proxyOptions ;
6771 private final HttpMonitoringOptions monitoringOptions ;
6872 private final long maxConnectionIdleInMilliseconds ;
69- private final int maxConnectionsPerEndpoint ;
73+ private final int maxStreamsPerEndpoint ;
7074 private final long connectionAcquisitionTimeout ;
75+ private final TlsContextOptions tlsContextOptions ;
7176 private boolean isClosed = false ;
7277
7378 AwsCrtHttpClientBase (AwsCrtClientBuilderBase builder , AttributeMap config ) {
74- if (config .get (PROTOCOL ) == Protocol .HTTP2 ) {
75- throw new UnsupportedOperationException ("HTTP/2 is not supported in AwsCrtHttpClient yet. Use "
76- + "NettyNioAsyncHttpClient instead." );
79+ ClientBootstrap clientBootstrap = new ClientBootstrap (null , null );
80+ SocketOptions clientSocketOptions = buildSocketOptions (builder .getTcpKeepAliveConfiguration (),
81+ config .get (SdkHttpConfigurationOption .CONNECTION_TIMEOUT ));
82+ TlsContextOptions clientTlsContextOptions =
83+ TlsContextOptions .createDefaultClient ()
84+ .withCipherPreference (resolveCipherPreference (builder .getPostQuantumTlsEnabled ()))
85+ .withVerifyPeer (!config .get (SdkHttpConfigurationOption .TRUST_ALL_CERTIFICATES ));
86+ this .protocol = config .get (PROTOCOL );
87+ if (protocol == Protocol .HTTP2 ) {
88+ clientTlsContextOptions = clientTlsContextOptions .withAlpnList ("h2" );
7789 }
7890
79- try (ClientBootstrap clientBootstrap = new ClientBootstrap (null , null );
80- SocketOptions clientSocketOptions = buildSocketOptions (builder .getTcpKeepAliveConfiguration (),
81- config .get (SdkHttpConfigurationOption .CONNECTION_TIMEOUT ));
82- TlsContextOptions clientTlsContextOptions =
83- TlsContextOptions .createDefaultClient ()
84- .withCipherPreference (resolveCipherPreference (builder .getPostQuantumTlsEnabled ()))
85- .withVerifyPeer (!config .get (SdkHttpConfigurationOption .TRUST_ALL_CERTIFICATES ));
86- TlsContext clientTlsContext = new TlsContext (clientTlsContextOptions )) {
87-
88- this .bootstrap = registerOwnedResource (clientBootstrap );
89- this .socketOptions = registerOwnedResource (clientSocketOptions );
90- this .tlsContext = registerOwnedResource (clientTlsContext );
91- this .readBufferSize = builder .getReadBufferSizeInBytes () == null ?
92- DEFAULT_STREAM_WINDOW_SIZE : builder .getReadBufferSizeInBytes ();
93- this .maxConnectionsPerEndpoint = config .get (SdkHttpConfigurationOption .MAX_CONNECTIONS );
94- this .monitoringOptions =
95- resolveHttpMonitoringOptions (builder .getConnectionHealthConfiguration ())
96- .orElseGet (() -> defaultConnectionHealthConfiguration (config ));
97- this .maxConnectionIdleInMilliseconds = config .get (SdkHttpConfigurationOption .CONNECTION_MAX_IDLE_TIMEOUT ).toMillis ();
98- this .connectionAcquisitionTimeout = config .get (SdkHttpConfigurationOption .CONNECTION_ACQUIRE_TIMEOUT ).toMillis ();
99- this .proxyOptions = resolveProxy (builder .getProxyConfiguration (), tlsContext ).orElse (null );
100- }
91+ this .tlsContextOptions = registerOwnedResource (clientTlsContextOptions );
92+ TlsContext clientTlsContext = new TlsContext (clientTlsContextOptions );
93+
94+ this .bootstrap = registerOwnedResource (clientBootstrap );
95+ this .socketOptions = registerOwnedResource (clientSocketOptions );
96+ this .tlsContext = registerOwnedResource (clientTlsContext );
97+ this .readBufferSize = builder .getReadBufferSizeInBytes () == null ?
98+ DEFAULT_STREAM_WINDOW_SIZE : builder .getReadBufferSizeInBytes ();
99+ this .maxStreamsPerEndpoint = config .get (SdkHttpConfigurationOption .MAX_CONNECTIONS );
100+ this .monitoringOptions =
101+ resolveHttpMonitoringOptions (builder .getConnectionHealthConfiguration ())
102+ .orElseGet (() -> defaultConnectionHealthConfiguration (config ));
103+ this .maxConnectionIdleInMilliseconds = config .get (SdkHttpConfigurationOption .CONNECTION_MAX_IDLE_TIMEOUT ).toMillis ();
104+ this .connectionAcquisitionTimeout = config .get (SdkHttpConfigurationOption .CONNECTION_ACQUIRE_TIMEOUT ).toMillis ();
105+ this .proxyOptions = resolveProxy (builder .getProxyConfiguration (), tlsContext ).orElse (null );
101106 }
102107
103108 /**
@@ -109,7 +114,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
109114 */
110115 private <T extends CrtResource > T registerOwnedResource (T subresource ) {
111116 if (subresource != null ) {
112- subresource .addRef ();
113117 ownedSubResources .push (subresource );
114118 }
115119 return subresource ;
@@ -119,23 +123,46 @@ String clientName() {
119123 return AWS_COMMON_RUNTIME ;
120124 }
121125
122- private HttpClientConnectionManager createConnectionPool (URI uri ) {
123- log .debug (() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint );
126+ private HttpStreamManager createConnectionPool (URI uri ) {
127+ log .debug (() ->
128+ String .format ("Creating ConnectionPool for: URI:%s, MaxConns: %d, MaxStreams: %d" ,
129+ uri , maxStreamsPerEndpoint , maxStreamsPerEndpoint ));
130+
131+ boolean isHttps = "https" .equalsIgnoreCase (uri .getScheme ());
132+ TlsContext poolTlsContext = isHttps ? tlsContext : null ;
124133
125- HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions ()
134+ HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions ()
126135 .withClientBootstrap (bootstrap )
127136 .withSocketOptions (socketOptions )
128- .withTlsContext (tlsContext )
137+ .withTlsContext (poolTlsContext )
129138 .withUri (uri )
130139 .withWindowSize (readBufferSize )
131- .withMaxConnections (maxConnectionsPerEndpoint )
140+ .withMaxConnections (maxStreamsPerEndpoint )
132141 .withManualWindowManagement (true )
133142 .withProxyOptions (proxyOptions )
134143 .withMonitoringOptions (monitoringOptions )
135144 .withMaxConnectionIdleInMilliseconds (maxConnectionIdleInMilliseconds )
136145 .withConnectionAcquisitionTimeoutInMilliseconds (connectionAcquisitionTimeout );
137146
138- return HttpClientConnectionManager .create (options );
147+ HttpStreamManagerOptions options = new HttpStreamManagerOptions ()
148+ .withHTTP1ConnectionManagerOptions (h1Options );
149+
150+ if (protocol == Protocol .HTTP2 ) {
151+ Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions ()
152+ .withMaxConcurrentStreams (maxStreamsPerEndpoint )
153+ .withConnectionManagerOptions (h1Options );
154+
155+ if (!isHttps ) {
156+ h2Options .withPriorKnowledge (true );
157+ }
158+
159+ options .withHTTP2StreamManagerOptions (h2Options );
160+ options .withExpectedProtocol (HttpVersion .HTTP_2 );
161+ } else {
162+ options .withExpectedProtocol (HttpVersion .HTTP_1_1 );
163+ }
164+
165+ return HttpStreamManager .create (options );
139166 }
140167
141168 /*
@@ -153,14 +180,13 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
153180 * existing pool. If we add all of execute() to the scope, we include, at minimum a JNI call to the native
154181 * pool implementation.
155182 */
156- HttpClientConnectionManager getOrCreateConnectionPool (URI uri ) {
183+ HttpStreamManager getOrCreateConnectionPool (URI uri ) {
157184 synchronized (this ) {
158185 if (isClosed ) {
159186 throw new IllegalStateException ("Client is closed. No more requests can be made with this client." );
160187 }
161188
162- HttpClientConnectionManager connPool = connectionPools .computeIfAbsent (uri , this ::createConnectionPool );
163- connPool .addRef ();
189+ HttpStreamManager connPool = connectionPools .computeIfAbsent (uri , this ::createConnectionPool );
164190 return connPool ;
165191 }
166192 }
0 commit comments