diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java b/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java
index 5559656ab..4ba083308 100644
--- a/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java
+++ b/src/main/java/software/amazon/awssdk/crt/http/Http2ClientConnection.java
@@ -187,12 +187,49 @@ public void updateConnectionWindow(long incrementSize) {
@Override
public Http2Stream makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler)
throws CrtRuntimeException {
+ return makeRequest(request, streamHandler, false);
+ }
+
+ /**
+ * Schedules an HttpRequest on the Native EventLoop for this
+ * HttpClientConnection. The HTTP/1.1 request will be transformed to HTTP/2
+ * request under the hood.
+ *
+ * @param request The Request to make to the Server.
+ * @param streamHandler The Stream Handler to be called from the Native
+ * EventLoop
+ * @param useManualDataWrites When {@code true}, request body data is provided via
+ * {@link HttpStreamBase#writeData} instead of from the request's
+ * {@link HttpRequestBodyStream}.
+ *
+ *
By design, CRT supports setting both a body stream and enabling manual
+ * writes for HTTP/2, but this is not recommended. Body streams are intended
+ * for requests whose payload is available in full at the time of sending. If
+ * the stream does not signal end-of-stream promptly, the event loop will
+ * busy-wait (hot-loop) for more data, wasting CPU time. Manual writes avoid
+ * this by letting the caller control when data is sent; the event loop only
+ * processes the request when {@link HttpStreamBase#writeData} is called and is
+ * free to service other requests in the meantime.
+ *
+ *
When both a body stream and manual writes are enabled, the body stream is
+ * sent as the first DATA frame and the connection then waits asynchronously for
+ * subsequent {@code writeData()} calls. However, if the body stream has not
+ * signalled end-of-stream, the event loop will keep getting scheduled for
+ * requesting more data until it completes.
+ * @throws CrtRuntimeException if stream creation fails
+ * @return The Http2Stream that represents this Request/Response Pair. It can be
+ * closed at any time during the request/response, but must be closed by
+ * the user thread making this request when it's done.
+ */
+ public Http2Stream makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler,
+ boolean useManualDataWrites) throws CrtRuntimeException {
if (isNull()) {
throw new IllegalStateException("Http2ClientConnection has been closed, can't make requests on it.");
}
Http2Stream stream = http2ClientConnectionMakeRequest(getNativeHandle(), request.marshalForJni(),
- request.getBodyStream(), new HttpStreamResponseHandlerNativeAdapter(streamHandler));
+ request.getBodyStream(), new HttpStreamResponseHandlerNativeAdapter(streamHandler),
+ useManualDataWrites);
return stream;
}
@@ -204,7 +241,8 @@ public Http2Stream makeRequest(HttpRequestBase request, HttpStreamBaseResponseHa
******************************************************************************/
private static native Http2Stream http2ClientConnectionMakeRequest(long connectionBinding, byte[] marshalledRequest,
- HttpRequestBodyStream bodyStream, HttpStreamResponseHandlerNativeAdapter responseHandler)
+ HttpRequestBodyStream bodyStream, HttpStreamResponseHandlerNativeAdapter responseHandler,
+ boolean useManualDataWrites)
throws CrtRuntimeException;
private static native void http2ClientConnectionUpdateSettings(long connectionBinding,
diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java
index bf1fa4e8b..d52b07aeb 100644
--- a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java
+++ b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnection.java
@@ -41,22 +41,58 @@ protected HttpClientConnection(long connectionBinding) {
*/
public HttpStream makeRequest(HttpRequest request, HttpStreamResponseHandler streamHandler)
throws CrtRuntimeException {
+ return makeRequest(request, streamHandler, false);
+ }
+
+ /**
+ * Schedules an HttpRequest on the Native EventLoop for this HttpClientConnection specific to HTTP/1.1 connection.
+ *
+ * @param request The Request to make to the Server.
+ * @param streamHandler The Stream Handler to be called from the Native EventLoop
+ * @param useManualDataWrites When {@code true}, request body data is provided via
+ * {@link HttpStreamBase#writeData} instead of from the request's
+ * {@link HttpRequestBodyStream}.
+ *
+ *
By design, CRT does not support setting both a body stream and enabling
+ * manual writes for HTTP/1.1. Body streams are intended for requests whose
+ * payload is available in full at the time of sending. If the stream does not
+ * signal end-of-stream promptly, the event loop will busy-wait (hot-loop) for
+ * more data, wasting CPU time. Manual writes avoid this by letting the caller
+ * control when data is sent; the event loop only processes the request when
+ * {@link HttpStreamBase#writeData} is called and is free to service other
+ * requests in the meantime.
+ *
+ *
If the request was created with an {@link HttpRequestBodyStream} and this
+ * parameter is {@code true}, an {@link IllegalStateException} is thrown
+ * immediately.
+ * @throws CrtRuntimeException if stream creation fails
+ * @return The HttpStream that represents this Request/Response Pair. It can be closed at any time during the
+ * request/response, but must be closed by the user thread making this request when it's done.
+ */
+ public HttpStream makeRequest(HttpRequest request, HttpStreamResponseHandler streamHandler,
+ boolean useManualDataWrites) throws CrtRuntimeException, IllegalStateException {
if (isNull()) {
throw new IllegalStateException("HttpClientConnection has been closed, can't make requests on it.");
}
if (getVersion() == HttpVersion.HTTP_2) {
throw new IllegalArgumentException("HTTP/1 only method called on an HTTP/2 connection.");
}
+ if (useManualDataWrites && request.getBodyStream() != null) {
+ throw new IllegalStateException(
+ "Cannot use manual data writes with a body stream on an HTTP/1.1 request. "
+ + "Either remove the body stream or set useManualDataWrites to false.");
+ }
HttpStreamBase stream = httpClientConnectionMakeRequest(getNativeHandle(),
request.marshalForJni(),
request.getBodyStream(),
- new HttpStreamResponseHandlerNativeAdapter(streamHandler));
+ new HttpStreamResponseHandlerNativeAdapter(streamHandler),
+ useManualDataWrites);
return (HttpStream)stream;
}
/**
- * Schedules an HttpRequestBase on the Native EventLoop for this HttpClientConnection applies to both HTTP/2 and HTTP/1.1 connection.
+ * Schedules an HttpRequestBase on the Native EventLoop for this HttpClientConnection. Applies to both HTTP/2 and HTTP/1.1 connections.
*
* @param request The Request to make to the Server.
* @param streamHandler The Stream Handler to be called from the Native EventLoop
@@ -65,13 +101,30 @@ public HttpStream makeRequest(HttpRequest request, HttpStreamResponseHandler str
* request/response, but must be closed by the user thread making this request when it's done.
*/
public HttpStreamBase makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler) throws CrtRuntimeException {
+ return makeRequest(request, streamHandler, false);
+ }
+
+ /**
+ * Schedules an HttpRequestBase on the Native EventLoop for this HttpClientConnection. Applies to both HTTP/2 and HTTP/1.1 connections.
+ *
+ * @param request The Request to make to the Server.
+ * @param streamHandler The Stream Handler to be called from the Native EventLoop
+ * @param useManualDataWrites When true, request body data will be provided via
+ * {@link HttpStreamBase#writeData} instead of from the request's body stream.
+ * @throws CrtRuntimeException if stream creation fails
+ * @return The HttpStream that represents this Request/Response Pair. It can be closed at any time during the
+ * request/response, but must be closed by the user thread making this request when it's done.
+ */
+ public HttpStreamBase makeRequest(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler,
+ boolean useManualDataWrites) throws CrtRuntimeException {
if (isNull()) {
throw new IllegalStateException("HttpClientConnection has been closed, can't make requests on it.");
}
HttpStreamBase stream = httpClientConnectionMakeRequest(getNativeHandle(),
request.marshalForJni(),
request.getBodyStream(),
- new HttpStreamResponseHandlerNativeAdapter(streamHandler));
+ new HttpStreamResponseHandlerNativeAdapter(streamHandler),
+ useManualDataWrites);
return stream;
}
@@ -172,7 +225,8 @@ public static boolean isErrorRetryable(HttpException exception) {
private static native HttpStreamBase httpClientConnectionMakeRequest(long connectionBinding,
byte[] marshalledRequest,
HttpRequestBodyStream bodyStream,
- HttpStreamResponseHandlerNativeAdapter responseHandler) throws CrtRuntimeException;
+ HttpStreamResponseHandlerNativeAdapter responseHandler,
+ boolean useManualDataWrites) throws CrtRuntimeException;
private static native void httpClientConnectionShutdown(long connectionBinding) throws CrtRuntimeException;
private static native boolean httpClientConnectionIsOpen(long connectionBinding) throws CrtRuntimeException;
diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java
index 728664b34..4db16dd1c 100644
--- a/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java
+++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStream.java
@@ -40,7 +40,10 @@ public interface HttpStreamWriteChunkCompletionCallback {
* request stream.
* @param chunkCompletionCallback Invoked upon the data being flushed to the
* wire or an error occurring.
+ * @deprecated Use {@link HttpStreamBase#writeData(byte[], boolean, HttpStreamWriteDataCompletionCallback)} instead.
+ * writeData() works for both HTTP/1.1 and HTTP/2, whereas writeChunk() is HTTP/1.1 only.
*/
+ @Deprecated
public void writeChunk(final byte[] chunkData, boolean isFinalChunk,
final HttpStreamWriteChunkCompletionCallback chunkCompletionCallback) {
if (isNull()) {
@@ -71,7 +74,10 @@ public void writeChunk(final byte[] chunkData, boolean isFinalChunk,
* @param isFinalChunk if set to true, this will terminate the request stream.
* @return completable future which will complete upon the data being flushed to
* the wire or an error occurring.
+ * @deprecated Use {@link HttpStreamBase#writeData(byte[], boolean)} instead.
+ * writeData() works for both HTTP/1.1 and HTTP/2, whereas writeChunk() is HTTP/1.1 only.
*/
+ @Deprecated
public CompletableFuture writeChunk(final byte[] chunkData, boolean isFinalChunk) {
CompletableFuture completionFuture = new CompletableFuture<>();
diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java
index d7a2b0078..15b96633c 100644
--- a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java
+++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java
@@ -5,7 +5,11 @@
package software.amazon.awssdk.crt.http;
+import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.CrtResource;
+import software.amazon.awssdk.crt.CrtRuntimeException;
+
+import java.util.concurrent.CompletableFuture;
/**
* An base class represents a single Http Request/Response for both HTTP/1.1 and
@@ -112,6 +116,63 @@ public void cancel() {
}
}
+ /**
+ * Completion interface for writing data to an http stream.
+ */
+ public interface HttpStreamWriteDataCompletionCallback {
+ void onWriteDataCompleted(int errorCode);
+ }
+
+ /**
+ * Write data to an HTTP stream. Works for both HTTP/1.1 and HTTP/2.
+ * The stream must have been created with {@code useManualDataWrites = true}.
+ * You must call activate() before using this function.
+ *
+ * @param data data to send, or null to write zero bytes. Pass null with
+ * endStream=true to signal end-of-body without sending additional data.
+ * @param endStream if true, this is the last data to be sent on this stream.
+ * @param completionCallback invoked when the data has been flushed or an error occurs.
+ */
+ public void writeData(final byte[] data, boolean endStream,
+ final HttpStreamWriteDataCompletionCallback completionCallback) {
+ if (isNull()) {
+ throw new IllegalStateException("HttpStream has been closed.");
+ }
+ if (completionCallback == null) {
+ throw new IllegalArgumentException("You must supply a completionCallback");
+ }
+
+ int error = httpStreamBaseWriteData(getNativeHandle(), data, endStream, completionCallback);
+ if (error != 0) {
+ int lastError = CRT.awsLastError();
+ throw new CrtRuntimeException(lastError);
+ }
+ }
+
+ /**
+ * Write data to an HTTP stream. Works for both HTTP/1.1 and HTTP/2.
+ * The stream must have been created with {@code useManualDataWrites = true}.
+ * You must call activate() before using this function.
+ *
+ * @param data data to send, or null to write zero bytes. Pass null with
+ * endStream=true to signal end-of-body without sending additional data.
+ * @param endStream if true, this is the last data to be sent on this stream.
+ * @return completable future which completes when data is flushed or an error occurs.
+ */
+ public CompletableFuture writeData(final byte[] data, boolean endStream) {
+ CompletableFuture completionFuture = new CompletableFuture<>();
+
+ writeData(data, endStream, (errorCode) -> {
+ if (errorCode == 0) {
+ completionFuture.complete(null);
+ } else {
+ completionFuture.completeExceptionally(new CrtRuntimeException(errorCode));
+ }
+ });
+
+ return completionFuture;
+ }
+
/*******************************************************************************
* Native methods
******************************************************************************/
@@ -125,4 +186,7 @@ public void cancel() {
private static native int httpStreamBaseGetResponseStatusCode(long http_stream);
private static native void httpStreamBaseCancelDefaultError(long http_stream);
+
+ private static native int httpStreamBaseWriteData(long http_stream, byte[] data, boolean endStream,
+ HttpStreamWriteDataCompletionCallback completionCallback);
}
diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
index 0f818e1e2..8edc259bc 100644
--- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
+++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json
@@ -761,6 +761,17 @@
}
]
},
+ {
+ "name": "software.amazon.awssdk.crt.http.HttpStreamBase$HttpStreamWriteDataCompletionCallback",
+ "methods": [
+ {
+ "name": "onWriteDataCompleted",
+ "parameterTypes": [
+ "int"
+ ]
+ }
+ ]
+ },
{
"name": "software.amazon.awssdk.crt.http.HttpStreamMetrics",
"methods": [
diff --git a/src/native/http_request_response.c b/src/native/http_request_response.c
index 2311a0159..b2bb63690 100644
--- a/src/native/http_request_response.c
+++ b/src/native/http_request_response.c
@@ -400,7 +400,8 @@ static jobject s_make_request_general(
jbyteArray marshalled_request,
jobject jni_http_request_body_stream,
jobject jni_http_response_callback_handler,
- enum aws_http_version version) {
+ enum aws_http_version version,
+ bool use_manual_data_writes) {
struct aws_http_connection_binding *connection_binding = (struct aws_http_connection_binding *)jni_connection;
struct aws_http_connection *native_conn = connection_binding->connection;
@@ -441,6 +442,7 @@ static jobject s_make_request_general(
.on_destroy = aws_java_http_stream_on_stream_destroy_fn,
.on_metrics = aws_java_http_stream_on_stream_metrics_fn,
.user_data = stream_binding,
+ .use_manual_data_writes = use_manual_data_writes,
};
stream_binding->native_stream = aws_http_connection_make_request(native_conn, &request_options);
@@ -478,7 +480,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_HttpClientConnect
jlong jni_connection,
jbyteArray marshalled_request,
jobject jni_http_request_body_stream,
- jobject jni_http_response_callback_handler) {
+ jobject jni_http_response_callback_handler,
+ jboolean jni_use_manual_data_writes) {
(void)jni_class;
aws_cache_jni_ids(env);
@@ -488,7 +491,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_HttpClientConnect
marshalled_request,
jni_http_request_body_stream,
jni_http_response_callback_handler,
- AWS_HTTP_VERSION_1_1);
+ AWS_HTTP_VERSION_1_1,
+ jni_use_manual_data_writes);
}
JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_Http2ClientConnection_http2ClientConnectionMakeRequest(
@@ -497,7 +501,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_Http2ClientConnec
jlong jni_connection,
jbyteArray marshalled_request,
jobject jni_http_request_body_stream,
- jobject jni_http_response_callback_handler) {
+ jobject jni_http_response_callback_handler,
+ jboolean jni_use_manual_data_writes) {
(void)jni_class;
aws_cache_jni_ids(env);
@@ -507,7 +512,8 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_Http2ClientConnec
marshalled_request,
jni_http_request_body_stream,
jni_http_response_callback_handler,
- AWS_HTTP_VERSION_2);
+ AWS_HTTP_VERSION_2,
+ jni_use_manual_data_writes);
}
struct http_stream_chunked_callback_data {
@@ -603,6 +609,90 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStream_httpStrea
return AWS_OP_SUCCESS;
}
+struct http_stream_write_data_callback_data {
+ struct http_stream_binding *stream_cb_data;
+ struct aws_byte_buf data_buf;
+ struct aws_input_stream *data_stream;
+ jobject completion_callback;
+};
+
+static void s_cleanup_write_data_callback_data(
+ JNIEnv *env,
+ struct http_stream_write_data_callback_data *callback_data) {
+ if (callback_data->data_stream) {
+ aws_input_stream_destroy(callback_data->data_stream);
+ }
+ aws_byte_buf_clean_up(&callback_data->data_buf);
+ (*env)->DeleteGlobalRef(env, callback_data->completion_callback);
+ aws_mem_release(aws_jni_get_allocator(), callback_data);
+}
+
+static void s_write_data_complete(struct aws_http_stream *stream, int error_code, void *user_data) {
+ (void)stream;
+
+ struct http_stream_write_data_callback_data *callback_data = user_data;
+
+ /********** JNI ENV ACQUIRE **********/
+ struct aws_jvm_env_context jvm_env_context = aws_jni_acquire_thread_env(callback_data->stream_cb_data->jvm);
+ JNIEnv *env = jvm_env_context.env;
+ if (env == NULL) {
+ return;
+ }
+
+ (*env)->CallVoidMethod(
+ env, callback_data->completion_callback, http_stream_write_data_completion_properties.callback, error_code);
+ aws_jni_check_and_clear_exception(env);
+
+ JavaVM *jvm = callback_data->stream_cb_data->jvm;
+ s_cleanup_write_data_callback_data(env, callback_data);
+ aws_jni_release_thread_env(jvm, &jvm_env_context);
+ /********** JNI ENV RELEASE **********/
+}
+
+JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpStreamBaseWriteData(
+ JNIEnv *env,
+ jclass jni_class,
+ jlong jni_cb_data,
+ jbyteArray data,
+ jboolean end_stream,
+ jobject completion_callback) {
+ (void)jni_class;
+ aws_cache_jni_ids(env);
+
+ struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data;
+ struct aws_http_stream *stream = cb_data->native_stream;
+
+ struct http_stream_write_data_callback_data *callback_data =
+ aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct http_stream_write_data_callback_data));
+
+ callback_data->stream_cb_data = cb_data;
+ callback_data->completion_callback = (*env)->NewGlobalRef(env, completion_callback);
+
+ struct aws_http_stream_write_data_options options = {
+ .data = NULL,
+ .end_stream = end_stream,
+ .on_complete = s_write_data_complete,
+ .user_data = callback_data,
+ };
+
+ if (data != NULL) {
+ struct aws_byte_cursor data_cur = aws_jni_byte_cursor_from_jbyteArray_acquire(env, data);
+ aws_byte_buf_init_copy_from_cursor(&callback_data->data_buf, aws_jni_get_allocator(), data_cur);
+ aws_jni_byte_cursor_from_jbyteArray_release(env, data, data_cur);
+
+ data_cur = aws_byte_cursor_from_buf(&callback_data->data_buf);
+ callback_data->data_stream = aws_input_stream_new_from_cursor(aws_jni_get_allocator(), &data_cur);
+ options.data = callback_data->data_stream;
+ }
+
+ if (aws_http_stream_write_data(stream, &options)) {
+ s_cleanup_write_data_callback_data(env, callback_data);
+ return AWS_OP_ERR;
+ }
+
+ return AWS_OP_SUCCESS;
+}
+
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpStreamBaseActivate(
JNIEnv *env,
jclass jni_class,
diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c
index 71be0085b..765b6a983 100644
--- a/src/native/java_class_ids.c
+++ b/src/native/java_class_ids.c
@@ -542,6 +542,18 @@ static void s_cache_http_stream_write_chunk_completion_properties(JNIEnv *env) {
AWS_FATAL_ASSERT(http_stream_write_chunk_completion_properties.callback);
}
+struct java_http_stream_write_data_completion_properties http_stream_write_data_completion_properties;
+
+static void s_cache_http_stream_write_data_completion_properties(JNIEnv *env) {
+ jclass cls =
+ (*env)->FindClass(env, "software/amazon/awssdk/crt/http/HttpStreamBase$HttpStreamWriteDataCompletionCallback");
+ AWS_FATAL_ASSERT(cls);
+
+ http_stream_write_data_completion_properties.callback =
+ (*env)->GetMethodID(env, cls, "onWriteDataCompleted", "(I)V");
+ AWS_FATAL_ASSERT(http_stream_write_data_completion_properties.callback);
+}
+
struct java_http_stream_metrics_properties http_stream_metrics_properties;
static void s_cache_http_stream_metrics_properties(JNIEnv *env) {
@@ -2700,6 +2712,7 @@ static void s_cache_java_class_ids(void *user_data) {
s_cache_http2_stream(env);
s_cache_http_stream_response_handler_native_adapter(env);
s_cache_http_stream_write_chunk_completion_properties(env);
+ s_cache_http_stream_write_data_completion_properties(env);
s_cache_http_stream_metrics_properties(env);
s_cache_event_stream_server_listener_properties(env);
s_cache_event_stream_server_listener_handler_properties(env);
diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h
index f613e1c6b..d1da8565e 100644
--- a/src/native/java_class_ids.h
+++ b/src/native/java_class_ids.h
@@ -246,6 +246,12 @@ struct java_http_stream_write_chunk_completion_properties {
};
extern struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties;
+/* HttpStreamWriteDataCompletionCallback */
+struct java_http_stream_write_data_completion_properties {
+ jmethodID callback;
+};
+extern struct java_http_stream_write_data_completion_properties http_stream_write_data_completion_properties;
+
/* HtppStreamMetrics */
struct java_http_stream_metrics_properties {
jclass http_stream_metrics_class;
diff --git a/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java b/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java
new file mode 100644
index 000000000..f9af855c0
--- /dev/null
+++ b/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java
@@ -0,0 +1,329 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+package software.amazon.awssdk.crt.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import software.amazon.awssdk.crt.CRT;
+import software.amazon.awssdk.crt.CrtResource;
+import software.amazon.awssdk.crt.http.*;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class WriteDataTest extends HttpRequestResponseFixture {
+ private final static String HOST = "https://localhost";
+ private final static int H1_TLS_PORT = 8082;
+ private final static int H2_TLS_PORT = 3443;
+
+ @Test
+ public void testHttp2WriteData() throws Exception {
+ skipIfAndroid();
+ skipIfLocalhostUnavailable();
+
+ URI uri = new URI(HOST + ":" + H2_TLS_PORT);
+ byte[] payload = "hello from writeData".getBytes(StandardCharsets.UTF_8);
+
+ HttpHeader[] headers = new HttpHeader[]{
+ new HttpHeader(":method", "PUT"),
+ new HttpHeader(":path", "/echo"),
+ new HttpHeader(":scheme", "https"),
+ new HttpHeader(":authority", uri.getHost()),
+ new HttpHeader("content-length", Integer.toString(payload.length)),
+ };
+ Http2Request request = new Http2Request(headers, null);
+
+ CompletableFuture reqCompleted = new CompletableFuture<>();
+ TestHttpResponse response = new TestHttpResponse();
+
+ CompletableFuture shutdownComplete;
+ try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_2)) {
+ shutdownComplete = connPool.getShutdownCompleteFuture();
+ try (Http2ClientConnection conn = (Http2ClientConnection) connPool.acquireConnection()
+ .get(60, TimeUnit.SECONDS)) {
+
+ HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() {
+ @Override
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
+ HttpHeader[] nextHeaders) {
+ response.statusCode = responseStatusCode;
+ response.headers.addAll(Arrays.asList(nextHeaders));
+ }
+
+ @Override
+ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
+ response.bodyBuffer.put(bodyBytesIn);
+ return bodyBytesIn.length;
+ }
+
+ @Override
+ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
+ response.onCompleteErrorCode = errorCode;
+ reqCompleted.complete(null);
+ }
+ };
+
+ try (Http2Stream stream = conn.makeRequest(request, streamHandler, true)) {
+ stream.activate();
+ stream.writeData(payload, true).get(5, TimeUnit.SECONDS);
+ reqCompleted.get(60, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode);
+ Assert.assertEquals(200, response.statusCode);
+ // /echo returns JSON: {"body": "", "bytes": }
+ String body = response.getBody();
+ Assert.assertTrue("Response should contain sent body: " + body,
+ body.contains("\"body\": \"hello from writeData\""));
+ Assert.assertTrue("Response should contain byte count: " + body,
+ body.contains("\"bytes\": " + payload.length));
+
+ shutdownComplete.get(60, TimeUnit.SECONDS);
+ CrtResource.waitForNoResources();
+ }
+
+ @Test
+ public void testHttp2WriteDataEndStreamOnly() throws Exception {
+ skipIfAndroid();
+ skipIfLocalhostUnavailable();
+
+ URI uri = new URI(HOST + ":" + H2_TLS_PORT);
+
+ HttpHeader[] headers = new HttpHeader[]{
+ new HttpHeader(":method", "GET"),
+ new HttpHeader(":path", "/echo"),
+ new HttpHeader(":scheme", "https"),
+ new HttpHeader(":authority", uri.getHost()),
+ };
+ Http2Request request = new Http2Request(headers, null);
+
+ CompletableFuture reqCompleted = new CompletableFuture<>();
+ TestHttpResponse response = new TestHttpResponse();
+
+ CompletableFuture shutdownComplete;
+ try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_2)) {
+ shutdownComplete = connPool.getShutdownCompleteFuture();
+ try (Http2ClientConnection conn = (Http2ClientConnection) connPool.acquireConnection()
+ .get(60, TimeUnit.SECONDS)) {
+
+ HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() {
+ @Override
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
+ HttpHeader[] nextHeaders) {
+ response.statusCode = responseStatusCode;
+ response.headers.addAll(Arrays.asList(nextHeaders));
+ }
+
+ @Override
+ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
+ response.bodyBuffer.put(bodyBytesIn);
+ return bodyBytesIn.length;
+ }
+
+ @Override
+ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
+ response.onCompleteErrorCode = errorCode;
+ reqCompleted.complete(null);
+ }
+ };
+
+ // Use manual writes but send null data with endStream=true (zero-byte body)
+ try (Http2Stream stream = conn.makeRequest(request, streamHandler, true)) {
+ stream.activate();
+ stream.writeData(null, true).get(5, TimeUnit.SECONDS);
+ reqCompleted.get(60, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode);
+ Assert.assertEquals(200, response.statusCode);
+ // /echo returns JSON: {"body": "", "bytes": 0}
+ String body = response.getBody();
+ Assert.assertTrue("Response should contain zero bytes: " + body,
+ body.contains("\"bytes\": 0"));
+
+ shutdownComplete.get(60, TimeUnit.SECONDS);
+ CrtResource.waitForNoResources();
+ }
+
+ @Test
+ public void testHttp1WriteData() throws Exception {
+ skipIfAndroid();
+ skipIfLocalhostUnavailable();
+
+ URI uri = new URI(HOST + ":" + H1_TLS_PORT);
+ byte[] payload = "hello from writeData h1".getBytes(StandardCharsets.UTF_8);
+
+ HttpHeader[] headers = new HttpHeader[]{
+ new HttpHeader("Host", uri.getHost()),
+ new HttpHeader("Content-Length", Integer.toString(payload.length)),
+ };
+ HttpRequest request = new HttpRequest("PUT", "/echo", headers, null);
+
+ CompletableFuture reqCompleted = new CompletableFuture<>();
+ TestHttpResponse response = new TestHttpResponse();
+
+ CompletableFuture shutdownComplete;
+ try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_1_1)) {
+ shutdownComplete = connPool.getShutdownCompleteFuture();
+ try (HttpClientConnection conn = connPool.acquireConnection().get(60, TimeUnit.SECONDS)) {
+
+ HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() {
+ @Override
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
+ HttpHeader[] nextHeaders) {
+ response.statusCode = responseStatusCode;
+ response.headers.addAll(Arrays.asList(nextHeaders));
+ }
+
+ @Override
+ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
+ response.bodyBuffer.put(bodyBytesIn);
+ return bodyBytesIn.length;
+ }
+
+ @Override
+ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
+ response.onCompleteErrorCode = errorCode;
+ reqCompleted.complete(null);
+ }
+ };
+
+ // Use the unified makeRequest with useManualDataWrites=true
+ try (HttpStreamBase stream = conn.makeRequest(request, streamHandler, true)) {
+ stream.activate();
+ stream.writeData(payload, true).get(5, TimeUnit.SECONDS);
+ reqCompleted.get(60, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode);
+ Assert.assertEquals(200, response.statusCode);
+ // H1 /echo returns JSON: {"data": ""}
+ String body = response.getBody();
+ Assert.assertTrue("Response should contain sent data: " + body,
+ body.contains("\"data\": \"hello from writeData h1\""));
+
+ shutdownComplete.get(60, TimeUnit.SECONDS);
+ CrtResource.waitForNoResources();
+ }
+
+ @Test
+ public void testHttp1WriteDataEndStreamOnly() throws Exception {
+ skipIfAndroid();
+ skipIfLocalhostUnavailable();
+
+ URI uri = new URI(HOST + ":" + H1_TLS_PORT);
+
+ HttpHeader[] headers = new HttpHeader[]{
+ new HttpHeader("Host", uri.getHost()),
+ new HttpHeader("Content-Length", "0"),
+ };
+ HttpRequest request = new HttpRequest("GET", "/echo", headers, null);
+
+ CompletableFuture reqCompleted = new CompletableFuture<>();
+ TestHttpResponse response = new TestHttpResponse();
+
+ CompletableFuture shutdownComplete;
+ try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_1_1)) {
+ shutdownComplete = connPool.getShutdownCompleteFuture();
+ try (HttpClientConnection conn = connPool.acquireConnection().get(60, TimeUnit.SECONDS)) {
+
+ HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() {
+ @Override
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
+ HttpHeader[] nextHeaders) {
+ response.statusCode = responseStatusCode;
+ response.headers.addAll(Arrays.asList(nextHeaders));
+ }
+
+ @Override
+ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
+ response.bodyBuffer.put(bodyBytesIn);
+ return bodyBytesIn.length;
+ }
+
+ @Override
+ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
+ response.onCompleteErrorCode = errorCode;
+ reqCompleted.complete(null);
+ }
+ };
+
+ try (HttpStreamBase stream = conn.makeRequest(request, streamHandler, true)) {
+ stream.activate();
+ stream.writeData(null, true).get(5, TimeUnit.SECONDS);
+ reqCompleted.get(60, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode);
+ Assert.assertEquals(200, response.statusCode);
+ // H1 /echo returns JSON: {"data": ""}
+ String body = response.getBody();
+ Assert.assertTrue("Response should contain empty data: " + body,
+ body.contains("\"data\": \"\""));
+
+ shutdownComplete.get(60, TimeUnit.SECONDS);
+ CrtResource.waitForNoResources();
+ }
+
+ /**
+ * Tests that makeRequest throws IllegalStateException when called with both
+ * a body stream and useManualDataWrites=true on an HTTP/1.1 connection.
+ */
+ @Test
+ public void testHttp1MakeRequestWithBodyStreamAndManualWrites() throws Exception {
+ skipIfAndroid();
+ skipIfLocalhostUnavailable();
+
+ URI uri = new URI(HOST + ":" + H1_TLS_PORT);
+
+ HttpRequestBodyStream bodyStream = new HttpRequestBodyStream() {
+ @Override
+ public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
+ return true;
+ }
+ };
+
+ HttpHeader[] headers = new HttpHeader[]{
+ new HttpHeader("Host", uri.getHost()),
+ };
+ // Create request WITH body stream AND useManualDataWrites=true
+ HttpRequest request = new HttpRequest("PUT", "/echo", headers, bodyStream);
+
+ try (HttpClientConnectionManager connPool = createConnectionPoolManager(uri, HttpVersion.HTTP_1_1)) {
+ try (HttpClientConnection conn = connPool.acquireConnection().get(60, TimeUnit.SECONDS)) {
+
+ HttpStreamResponseHandler streamHandler = new HttpStreamResponseHandler() {
+ @Override
+ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
+ HttpHeader[] nextHeaders) {}
+ @Override
+ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { return bodyBytesIn.length; }
+ @Override
+ public void onResponseComplete(HttpStream stream, int errorCode) {}
+ };
+
+ try {
+ conn.makeRequest(request, streamHandler, true);
+ Assert.fail("Expected IllegalStateException from makeRequest");
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("manual data writes"));
+ }
+ }
+ }
+ }
+}