diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java b/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java index 5559656ab..4ba083308 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java +++ b/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java @@ -187,12 +187,49 @@ public void updateConnectionWindow(long incrementSize) { @Override public Http2Stream makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler) throws CrtRuntimeException { + return makeRequest(request, streamHandler, false); + } + + /** + * Schedules an HttpRequest on the Native EventLoop for this + * HttpClientConnection. The HTTP/1.1 request will be transformed to HTTP/2 + * request under the hood. + * + * @param request The Request to make to the Server. + * @param streamHandler The Stream Handler to be called from the Native + * EventLoop + * @param useManualDataWrites When {@code true}, request body data is provided via + * {@link HttpStreamBase#writeData} instead of from the request's + * {@link HttpRequestBodyStream}. + * + *

By design, CRT supports setting both a body stream and enabling manual + * writes for HTTP/2, but this is not recommended. Body streams are intended + * for requests whose payload is available in full at the time of sending. If + * the stream does not signal end-of-stream promptly, the event loop will + * busy-wait (hot-loop) for more data, wasting CPU time. Manual writes avoid + * this by letting the caller control when data is sent; the event loop only + * processes the request when {@link HttpStreamBase#writeData} is called and is + * free to service other requests in the meantime. + * + *

When both a body stream and manual writes are enabled, the body stream is + * sent as the first DATA frame and the connection then waits asynchronously for + * subsequent {@code writeData()} calls. However, if the body stream has not + * signalled end-of-stream, the event loop will keep getting scheduled for + * requesting more data until it completes. + * @throws CrtRuntimeException if stream creation fails + * @return The Http2Stream that represents this Request/Response Pair. It can be + * closed at any time during the request/response, but must be closed by + * the user thread making this request when it's done. + */ + public Http2Stream makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler, + boolean useManualDataWrites) throws CrtRuntimeException { if (isNull()) { throw new IllegalStateException("Http2ClientConnection has been closed, can't make requests on it."); } Http2Stream stream = http2ClientConnectionMakeRequest(getNativeHandle(), request.marshalForJni(), - request.getBodyStream(), new HttpStreamResponseHandlerNativeAdapter(streamHandler)); + request.getBodyStream(), new HttpStreamResponseHandlerNativeAdapter(streamHandler), + useManualDataWrites); return stream; } @@ -204,7 +241,8 @@ public Http2Stream makeRequest(HttpRequestBase request, HttpStreamBaseResponseHa ******************************************************************************/ private static native Http2Stream http2ClientConnectionMakeRequest(long connectionBinding, byte[] marshalledRequest, - HttpRequestBodyStream bodyStream, HttpStreamResponseHandlerNativeAdapter responseHandler) + HttpRequestBodyStream bodyStream, HttpStreamResponseHandlerNativeAdapter responseHandler, + boolean useManualDataWrites) throws CrtRuntimeException; private static native void http2ClientConnectionUpdateSettings(long connectionBinding, diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java index bf1fa4e8b..d52b07aeb 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java @@ -41,22 +41,58 @@ protected HttpClientConnection(long connectionBinding) { */ public HttpStream makeRequest(HttpRequest request, HttpStreamResponseHandler streamHandler) throws CrtRuntimeException { + return makeRequest(request, streamHandler, false); + } + + /** + * Schedules an HttpRequest on the Native EventLoop for this HttpClientConnection specific to HTTP/1.1 connection. + * + * @param request The Request to make to the Server. + * @param streamHandler The Stream Handler to be called from the Native EventLoop + * @param useManualDataWrites When {@code true}, request body data is provided via + * {@link HttpStreamBase#writeData} instead of from the request's + * {@link HttpRequestBodyStream}. + * + *

By design, CRT does not support setting both a body stream and enabling + * manual writes for HTTP/1.1. Body streams are intended for requests whose + * payload is available in full at the time of sending. If the stream does not + * signal end-of-stream promptly, the event loop will busy-wait (hot-loop) for + * more data, wasting CPU time. Manual writes avoid this by letting the caller + * control when data is sent; the event loop only processes the request when + * {@link HttpStreamBase#writeData} is called and is free to service other + * requests in the meantime. + * + *

If the request was created with an {@link HttpRequestBodyStream} and this + * parameter is {@code true}, an {@link IllegalStateException} is thrown + * immediately. + * @throws CrtRuntimeException if stream creation fails + * @return The HttpStream that represents this Request/Response Pair. It can be closed at any time during the + * request/response, but must be closed by the user thread making this request when it's done. + */ + public HttpStream makeRequest(HttpRequest request, HttpStreamResponseHandler streamHandler, + boolean useManualDataWrites) throws CrtRuntimeException, IllegalStateException { if (isNull()) { throw new IllegalStateException("HttpClientConnection has been closed, can't make requests on it."); } if (getVersion() == HttpVersion.HTTP_2) { throw new IllegalArgumentException("HTTP/1 only method called on an HTTP/2 connection."); } + if (useManualDataWrites && request.getBodyStream() != null) { + throw new IllegalStateException( + "Cannot use manual data writes with a body stream on an HTTP/1.1 request. " + + "Either remove the body stream or set useManualDataWrites to false."); + } HttpStreamBase stream = httpClientConnectionMakeRequest(getNativeHandle(), request.marshalForJni(), request.getBodyStream(), - new HttpStreamResponseHandlerNativeAdapter(streamHandler)); + new HttpStreamResponseHandlerNativeAdapter(streamHandler), + useManualDataWrites); return (HttpStream)stream; } /** - * Schedules an HttpRequestBase on the Native EventLoop for this HttpClientConnection applies to both HTTP/2 and HTTP/1.1 connection. + * Schedules an HttpRequestBase on the Native EventLoop for this HttpClientConnection. Applies to both HTTP/2 and HTTP/1.1 connections. * * @param request The Request to make to the Server. * @param streamHandler The Stream Handler to be called from the Native EventLoop @@ -65,13 +101,30 @@ public HttpStream makeRequest(HttpRequest request, HttpStreamResponseHandler str * request/response, but must be closed by the user thread making this request when it's done. */ public HttpStreamBase makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler) throws CrtRuntimeException { + return makeRequest(request, streamHandler, false); + } + + /** + * Schedules an HttpRequestBase on the Native EventLoop for this HttpClientConnection. Applies to both HTTP/2 and HTTP/1.1 connections. + * + * @param request The Request to make to the Server. + * @param streamHandler The Stream Handler to be called from the Native EventLoop + * @param useManualDataWrites When true, request body data will be provided via + * {@link HttpStreamBase#writeData} instead of from the request's body stream. + * @throws CrtRuntimeException if stream creation fails + * @return The HttpStream that represents this Request/Response Pair. It can be closed at any time during the + * request/response, but must be closed by the user thread making this request when it's done. + */ + public HttpStreamBase makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler, + boolean useManualDataWrites) throws CrtRuntimeException { if (isNull()) { throw new IllegalStateException("HttpClientConnection has been closed, can't make requests on it."); } HttpStreamBase stream = httpClientConnectionMakeRequest(getNativeHandle(), request.marshalForJni(), request.getBodyStream(), - new HttpStreamResponseHandlerNativeAdapter(streamHandler)); + new HttpStreamResponseHandlerNativeAdapter(streamHandler), + useManualDataWrites); return stream; } @@ -172,7 +225,8 @@ public static boolean isErrorRetryable(HttpException exception) { private static native HttpStreamBase httpClientConnectionMakeRequest(long connectionBinding, byte[] marshalledRequest, HttpRequestBodyStream bodyStream, - HttpStreamResponseHandlerNativeAdapter responseHandler) throws CrtRuntimeException; + HttpStreamResponseHandlerNativeAdapter responseHandler, + boolean useManualDataWrites) throws CrtRuntimeException; private static native void httpClientConnectionShutdown(long connectionBinding) throws CrtRuntimeException; private static native boolean httpClientConnectionIsOpen(long connectionBinding) throws CrtRuntimeException; diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java index 728664b34..4db16dd1c 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java @@ -40,7 +40,10 @@ public interface HttpStreamWriteChunkCompletionCallback { * request stream. * @param chunkCompletionCallback Invoked upon the data being flushed to the * wire or an error occurring. + * @deprecated Use {@link HttpStreamBase#writeData(byte[], boolean, HttpStreamWriteDataCompletionCallback)} instead. + * writeData() works for both HTTP/1.1 and HTTP/2, whereas writeChunk() is HTTP/1.1 only. */ + @Deprecated public void writeChunk(final byte[] chunkData, boolean isFinalChunk, final HttpStreamWriteChunkCompletionCallback chunkCompletionCallback) { if (isNull()) { @@ -71,7 +74,10 @@ public void writeChunk(final byte[] chunkData, boolean isFinalChunk, * @param isFinalChunk if set to true, this will terminate the request stream. * @return completable future which will complete upon the data being flushed to * the wire or an error occurring. + * @deprecated Use {@link HttpStreamBase#writeData(byte[], boolean)} instead. + * writeData() works for both HTTP/1.1 and HTTP/2, whereas writeChunk() is HTTP/1.1 only. */ + @Deprecated public CompletableFuture writeChunk(final byte[] chunkData, boolean isFinalChunk) { CompletableFuture completionFuture = new CompletableFuture<>(); diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java index d7a2b0078..15b96633c 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java @@ -5,7 +5,11 @@ package software.amazon.awssdk.crt.http; +import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; + +import java.util.concurrent.CompletableFuture; /** * An base class represents a single Http Request/Response for both HTTP/1.1 and @@ -112,6 +116,63 @@ public void cancel() { } } + /** + * Completion interface for writing data to an http stream. + */ + public interface HttpStreamWriteDataCompletionCallback { + void onWriteDataCompleted(int errorCode); + } + + /** + * Write data to an HTTP stream. Works for both HTTP/1.1 and HTTP/2. + * The stream must have been created with {@code useManualDataWrites = true}. + * You must call activate() before using this function. + * + * @param data data to send, or null to write zero bytes. Pass null with + * endStream=true to signal end-of-body without sending additional data. + * @param endStream if true, this is the last data to be sent on this stream. + * @param completionCallback invoked when the data has been flushed or an error occurs. + */ + public void writeData(final byte[] data, boolean endStream, + final HttpStreamWriteDataCompletionCallback completionCallback) { + if (isNull()) { + throw new IllegalStateException("HttpStream has been closed."); + } + if (completionCallback == null) { + throw new IllegalArgumentException("You must supply a completionCallback"); + } + + int error = httpStreamBaseWriteData(getNativeHandle(), data, endStream, completionCallback); + if (error != 0) { + int lastError = CRT.awsLastError(); + throw new CrtRuntimeException(lastError); + } + } + + /** + * Write data to an HTTP stream. Works for both HTTP/1.1 and HTTP/2. + * The stream must have been created with {@code useManualDataWrites = true}. + * You must call activate() before using this function. + * + * @param data data to send, or null to write zero bytes. Pass null with + * endStream=true to signal end-of-body without sending additional data. + * @param endStream if true, this is the last data to be sent on this stream. + * @return completable future which completes when data is flushed or an error occurs. + */ + public CompletableFuture writeData(final byte[] data, boolean endStream) { + CompletableFuture completionFuture = new CompletableFuture<>(); + + writeData(data, endStream, (errorCode) -> { + if (errorCode == 0) { + completionFuture.complete(null); + } else { + completionFuture.completeExceptionally(new CrtRuntimeException(errorCode)); + } + }); + + return completionFuture; + } + /******************************************************************************* * Native methods ******************************************************************************/ @@ -125,4 +186,7 @@ public void cancel() { private static native int httpStreamBaseGetResponseStatusCode(long http_stream); private static native void httpStreamBaseCancelDefaultError(long http_stream); + + private static native int httpStreamBaseWriteData(long http_stream, byte[] data, boolean endStream, + HttpStreamWriteDataCompletionCallback completionCallback); } diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json index 0f818e1e2..8edc259bc 100644 --- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json +++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json @@ -761,6 +761,17 @@ } ] }, + { + "name": "software.amazon.awssdk.crt.http.HttpStreamBase$HttpStreamWriteDataCompletionCallback", + "methods": [ + { + "name": "onWriteDataCompleted", + "parameterTypes": [ + "int" + ] + } + ] + }, { "name": "software.amazon.awssdk.crt.http.HttpStreamMetrics", "methods": [ diff --git a/src/native/http_request_response.c b/src/native/http_request_response.c index 2311a0159..b2bb63690 100644 --- a/src/native/http_request_response.c +++ b/src/native/http_request_response.c @@ -400,7 +400,8 @@ static jobject s_make_request_general( jbyteArray marshalled_request, jobject jni_http_request_body_stream, jobject jni_http_response_callback_handler, - enum aws_http_version version) { + enum aws_http_version version, + bool use_manual_data_writes) { struct aws_http_connection_binding *connection_binding = (struct aws_http_connection_binding *)jni_connection; struct aws_http_connection *native_conn = connection_binding->connection; @@ -441,6 +442,7 @@ static jobject s_make_request_general( .on_destroy = aws_java_http_stream_on_stream_destroy_fn, .on_metrics = aws_java_http_stream_on_stream_metrics_fn, .user_data = stream_binding, + .use_manual_data_writes = use_manual_data_writes, }; stream_binding->native_stream = aws_http_connection_make_request(native_conn, &request_options); @@ -478,7 +480,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_HttpClientConnect jlong jni_connection, jbyteArray marshalled_request, jobject jni_http_request_body_stream, - jobject jni_http_response_callback_handler) { + jobject jni_http_response_callback_handler, + jboolean jni_use_manual_data_writes) { (void)jni_class; aws_cache_jni_ids(env); @@ -488,7 +491,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_HttpClientConnect marshalled_request, jni_http_request_body_stream, jni_http_response_callback_handler, - AWS_HTTP_VERSION_1_1); + AWS_HTTP_VERSION_1_1, + jni_use_manual_data_writes); } JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_Http2ClientConnection_http2ClientConnectionMakeRequest( @@ -497,7 +501,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_Http2ClientConnec jlong jni_connection, jbyteArray marshalled_request, jobject jni_http_request_body_stream, - jobject jni_http_response_callback_handler) { + jobject jni_http_response_callback_handler, + jboolean jni_use_manual_data_writes) { (void)jni_class; aws_cache_jni_ids(env); @@ -507,7 +512,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_Http2ClientConnec marshalled_request, jni_http_request_body_stream, jni_http_response_callback_handler, - AWS_HTTP_VERSION_2); + AWS_HTTP_VERSION_2, + jni_use_manual_data_writes); } struct http_stream_chunked_callback_data { @@ -603,6 +609,90 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStream_httpStrea return AWS_OP_SUCCESS; } +struct http_stream_write_data_callback_data { + struct http_stream_binding *stream_cb_data; + struct aws_byte_buf data_buf; + struct aws_input_stream *data_stream; + jobject completion_callback; +}; + +static void s_cleanup_write_data_callback_data( + JNIEnv *env, + struct http_stream_write_data_callback_data *callback_data) { + if (callback_data->data_stream) { + aws_input_stream_destroy(callback_data->data_stream); + } + aws_byte_buf_clean_up(&callback_data->data_buf); + (*env)->DeleteGlobalRef(env, callback_data->completion_callback); + aws_mem_release(aws_jni_get_allocator(), callback_data); +} + +static void s_write_data_complete(struct aws_http_stream *stream, int error_code, void *user_data) { + (void)stream; + + struct http_stream_write_data_callback_data *callback_data = user_data; + + /********** JNI ENV ACQUIRE **********/ + struct aws_jvm_env_context jvm_env_context = aws_jni_acquire_thread_env(callback_data->stream_cb_data->jvm); + JNIEnv *env = jvm_env_context.env; + if (env == NULL) { + return; + } + + (*env)->CallVoidMethod( + env, callback_data->completion_callback, http_stream_write_data_completion_properties.callback, error_code); + aws_jni_check_and_clear_exception(env); + + JavaVM *jvm = callback_data->stream_cb_data->jvm; + s_cleanup_write_data_callback_data(env, callback_data); + aws_jni_release_thread_env(jvm, &jvm_env_context); + /********** JNI ENV RELEASE **********/ +} + +JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpStreamBaseWriteData( + JNIEnv *env, + jclass jni_class, + jlong jni_cb_data, + jbyteArray data, + jboolean end_stream, + jobject completion_callback) { + (void)jni_class; + aws_cache_jni_ids(env); + + struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data; + struct aws_http_stream *stream = cb_data->native_stream; + + struct http_stream_write_data_callback_data *callback_data = + aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct http_stream_write_data_callback_data)); + + callback_data->stream_cb_data = cb_data; + callback_data->completion_callback = (*env)->NewGlobalRef(env, completion_callback); + + struct aws_http_stream_write_data_options options = { + .data = NULL, + .end_stream = end_stream, + .on_complete = s_write_data_complete, + .user_data = callback_data, + }; + + if (data != NULL) { + struct aws_byte_cursor data_cur = aws_jni_byte_cursor_from_jbyteArray_acquire(env, data); + aws_byte_buf_init_copy_from_cursor(&callback_data->data_buf, aws_jni_get_allocator(), data_cur); + aws_jni_byte_cursor_from_jbyteArray_release(env, data, data_cur); + + data_cur = aws_byte_cursor_from_buf(&callback_data->data_buf); + callback_data->data_stream = aws_input_stream_new_from_cursor(aws_jni_get_allocator(), &data_cur); + options.data = callback_data->data_stream; + } + + if (aws_http_stream_write_data(stream, &options)) { + s_cleanup_write_data_callback_data(env, callback_data); + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} + JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpStreamBaseActivate( JNIEnv *env, jclass jni_class, diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 71be0085b..765b6a983 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -542,6 +542,18 @@ static void s_cache_http_stream_write_chunk_completion_properties(JNIEnv *env) { AWS_FATAL_ASSERT(http_stream_write_chunk_completion_properties.callback); } +struct java_http_stream_write_data_completion_properties http_stream_write_data_completion_properties; + +static void s_cache_http_stream_write_data_completion_properties(JNIEnv *env) { + jclass cls = + (*env)->FindClass(env, "software/amazon/awssdk/crt/http/HttpStreamBase$HttpStreamWriteDataCompletionCallback"); + AWS_FATAL_ASSERT(cls); + + http_stream_write_data_completion_properties.callback = + (*env)->GetMethodID(env, cls, "onWriteDataCompleted", "(I)V"); + AWS_FATAL_ASSERT(http_stream_write_data_completion_properties.callback); +} + struct java_http_stream_metrics_properties http_stream_metrics_properties; static void s_cache_http_stream_metrics_properties(JNIEnv *env) { @@ -2700,6 +2712,7 @@ static void s_cache_java_class_ids(void *user_data) { s_cache_http2_stream(env); s_cache_http_stream_response_handler_native_adapter(env); s_cache_http_stream_write_chunk_completion_properties(env); + s_cache_http_stream_write_data_completion_properties(env); s_cache_http_stream_metrics_properties(env); s_cache_event_stream_server_listener_properties(env); s_cache_event_stream_server_listener_handler_properties(env); diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index f613e1c6b..d1da8565e 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -246,6 +246,12 @@ struct java_http_stream_write_chunk_completion_properties { }; extern struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties; +/* HttpStreamWriteDataCompletionCallback */ +struct java_http_stream_write_data_completion_properties { + jmethodID callback; +}; +extern struct java_http_stream_write_data_completion_properties http_stream_write_data_completion_properties; + /* HtppStreamMetrics */ struct java_http_stream_metrics_properties { jclass http_stream_metrics_class; diff --git a/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java b/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java new file mode 100644 index 000000000..f9af855c0 --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java @@ -0,0 +1,329 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.test; + +import org.junit.Assert; +import org.junit.Test; + +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.http.*; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class WriteDataTest extends HttpRequestResponseFixture { + private final static String HOST = "https://localhost"; + private final static int H1_TLS_PORT = 8082; + private final static int H2_TLS_PORT = 3443; + + @Test + public void testHttp2WriteData() throws Exception { + skipIfAndroid(); + skipIfLocalhostUnavailable(); + + URI uri = new URI(HOST + ":" + H2_TLS_PORT); + byte[] payload = "hello from writeData".getBytes(StandardCharsets.UTF_8); + + HttpHeader[] headers = new HttpHeader[]{ + new HttpHeader(":method", "PUT"), + new HttpHeader(":path", "/echo"), + new HttpHeader(":scheme", "https"), + new HttpHeader(":authority", uri.getHost()), + new HttpHeader("content-length", Integer.toString(payload.length)), + }; + Http2Request request = new Http2Request(headers, null); + + CompletableFuture reqCompleted = new CompletableFuture<>(); + TestHttpResponse response = new TestHttpResponse(); + + CompletableFuture shutdownComplete; + try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_2)) { + shutdownComplete = connPool.getShutdownCompleteFuture(); + try (Http2ClientConnection conn = (Http2ClientConnection) connPool.acquireConnection() + .get(60, TimeUnit.SECONDS)) { + + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + response.bodyBuffer.put(bodyBytesIn); + return bodyBytesIn.length; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + } + }; + + try (Http2Stream stream = conn.makeRequest(request, streamHandler, true)) { + stream.activate(); + stream.writeData(payload, true).get(5, TimeUnit.SECONDS); + reqCompleted.get(60, TimeUnit.SECONDS); + } + } + } + + Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode); + Assert.assertEquals(200, response.statusCode); + // /echo returns JSON: {"body": "", "bytes": } + String body = response.getBody(); + Assert.assertTrue("Response should contain sent body: " + body, + body.contains("\"body\": \"hello from writeData\"")); + Assert.assertTrue("Response should contain byte count: " + body, + body.contains("\"bytes\": " + payload.length)); + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.waitForNoResources(); + } + + @Test + public void testHttp2WriteDataEndStreamOnly() throws Exception { + skipIfAndroid(); + skipIfLocalhostUnavailable(); + + URI uri = new URI(HOST + ":" + H2_TLS_PORT); + + HttpHeader[] headers = new HttpHeader[]{ + new HttpHeader(":method", "GET"), + new HttpHeader(":path", "/echo"), + new HttpHeader(":scheme", "https"), + new HttpHeader(":authority", uri.getHost()), + }; + Http2Request request = new Http2Request(headers, null); + + CompletableFuture reqCompleted = new CompletableFuture<>(); + TestHttpResponse response = new TestHttpResponse(); + + CompletableFuture shutdownComplete; + try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_2)) { + shutdownComplete = connPool.getShutdownCompleteFuture(); + try (Http2ClientConnection conn = (Http2ClientConnection) connPool.acquireConnection() + .get(60, TimeUnit.SECONDS)) { + + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + response.bodyBuffer.put(bodyBytesIn); + return bodyBytesIn.length; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + } + }; + + // Use manual writes but send null data with endStream=true (zero-byte body) + try (Http2Stream stream = conn.makeRequest(request, streamHandler, true)) { + stream.activate(); + stream.writeData(null, true).get(5, TimeUnit.SECONDS); + reqCompleted.get(60, TimeUnit.SECONDS); + } + } + } + + Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode); + Assert.assertEquals(200, response.statusCode); + // /echo returns JSON: {"body": "", "bytes": 0} + String body = response.getBody(); + Assert.assertTrue("Response should contain zero bytes: " + body, + body.contains("\"bytes\": 0")); + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.waitForNoResources(); + } + + @Test + public void testHttp1WriteData() throws Exception { + skipIfAndroid(); + skipIfLocalhostUnavailable(); + + URI uri = new URI(HOST + ":" + H1_TLS_PORT); + byte[] payload = "hello from writeData h1".getBytes(StandardCharsets.UTF_8); + + HttpHeader[] headers = new HttpHeader[]{ + new HttpHeader("Host", uri.getHost()), + new HttpHeader("Content-Length", Integer.toString(payload.length)), + }; + HttpRequest request = new HttpRequest("PUT", "/echo", headers, null); + + CompletableFuture reqCompleted = new CompletableFuture<>(); + TestHttpResponse response = new TestHttpResponse(); + + CompletableFuture shutdownComplete; + try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_1_1)) { + shutdownComplete = connPool.getShutdownCompleteFuture(); + try (HttpClientConnection conn = connPool.acquireConnection().get(60, TimeUnit.SECONDS)) { + + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + response.bodyBuffer.put(bodyBytesIn); + return bodyBytesIn.length; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + } + }; + + // Use the unified makeRequest with useManualDataWrites=true + try (HttpStreamBase stream = conn.makeRequest(request, streamHandler, true)) { + stream.activate(); + stream.writeData(payload, true).get(5, TimeUnit.SECONDS); + reqCompleted.get(60, TimeUnit.SECONDS); + } + } + } + + Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode); + Assert.assertEquals(200, response.statusCode); + // H1 /echo returns JSON: {"data": ""} + String body = response.getBody(); + Assert.assertTrue("Response should contain sent data: " + body, + body.contains("\"data\": \"hello from writeData h1\"")); + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.waitForNoResources(); + } + + @Test + public void testHttp1WriteDataEndStreamOnly() throws Exception { + skipIfAndroid(); + skipIfLocalhostUnavailable(); + + URI uri = new URI(HOST + ":" + H1_TLS_PORT); + + HttpHeader[] headers = new HttpHeader[]{ + new HttpHeader("Host", uri.getHost()), + new HttpHeader("Content-Length", "0"), + }; + HttpRequest request = new HttpRequest("GET", "/echo", headers, null); + + CompletableFuture reqCompleted = new CompletableFuture<>(); + TestHttpResponse response = new TestHttpResponse(); + + CompletableFuture shutdownComplete; + try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_1_1)) { + shutdownComplete = connPool.getShutdownCompleteFuture(); + try (HttpClientConnection conn = connPool.acquireConnection().get(60, TimeUnit.SECONDS)) { + + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + response.bodyBuffer.put(bodyBytesIn); + return bodyBytesIn.length; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + } + }; + + try (HttpStreamBase stream = conn.makeRequest(request, streamHandler, true)) { + stream.activate(); + stream.writeData(null, true).get(5, TimeUnit.SECONDS); + reqCompleted.get(60, TimeUnit.SECONDS); + } + } + } + + Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode); + Assert.assertEquals(200, response.statusCode); + // H1 /echo returns JSON: {"data": ""} + String body = response.getBody(); + Assert.assertTrue("Response should contain empty data: " + body, + body.contains("\"data\": \"\"")); + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.waitForNoResources(); + } + + /** + * Tests that makeRequest throws IllegalStateException when called with both + * a body stream and useManualDataWrites=true on an HTTP/1.1 connection. + */ + @Test + public void testHttp1MakeRequestWithBodyStreamAndManualWrites() throws Exception { + skipIfAndroid(); + skipIfLocalhostUnavailable(); + + URI uri = new URI(HOST + ":" + H1_TLS_PORT); + + HttpRequestBodyStream bodyStream = new HttpRequestBodyStream() { + @Override + public boolean sendRequestBody(ByteBuffer bodyBytesOut) { + return true; + } + }; + + HttpHeader[] headers = new HttpHeader[]{ + new HttpHeader("Host", uri.getHost()), + }; + // Create request WITH body stream AND useManualDataWrites=true + HttpRequest request = new HttpRequest("PUT", "/echo", headers, bodyStream); + + try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_1_1)) { + try (HttpClientConnection conn = connPool.acquireConnection().get(60, TimeUnit.SECONDS)) { + + HttpStreamResponseHandler streamHandler = new HttpStreamResponseHandler() { + @Override + public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) {} + @Override + public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { return bodyBytesIn.length; } + @Override + public void onResponseComplete(HttpStream stream, int errorCode) {} + }; + + try { + conn.makeRequest(request, streamHandler, true); + Assert.fail("Expected IllegalStateException from makeRequest"); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("manual data writes")); + } + } + } + } +}