diff --git a/.changes/next-release/feature-AWSSDKforJavav2-899dcc8.json b/.changes/next-release/feature-AWSSDKforJavav2-899dcc8.json new file mode 100644 index 000000000000..31711db7727f --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-899dcc8.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Added `AsyncRequestBody.fromInputStream(InputStream, Long)` overload that uses an SDK-managed thread pool, removing the need for users to provide their own ExecutorService." +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 0877557e35bd..36a2d7eb647c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -370,16 +370,17 @@ static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers } /** - * Creates an {@link AsyncRequestBody} from an {@link InputStream}. + * Creates an {@link AsyncRequestBody} from an {@link InputStream} with a customer-provided {@link ExecutorService}. * *
An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the - * non-blocking event loop threads owned by the SDK. + * non-blocking event loop threads owned by the SDK. Consider using {@link #fromInputStream(InputStream, Long)} instead, + * which lets the SDK manage threading internally. * *
Executor Guidance: The provided executor is used to run a blocking task that reads from the input stream and - * pushes data to the HTTP channel. If the executor has fewer threads than the number of concurrent requests using it, + * pushes data to the HTTP connection. If the executor has fewer threads than the number of concurrent requests using it, * tasks are serialized — one slow or failing request may block other requests from writing data in a timely manner, - * leading to idle HTTP connections that the server may close before data can be written. This may result in an - * unrecoverable write timeout loop where every retry also fails. + * leading to idle HTTP connections that the server may close before data can be written. This may result in + * degraded performance or request timeouts. * *
To avoid this: *
The SDK manages the threading required to perform blocking reads from the input stream
+ * without blocking the non-blocking event loop threads.
+ *
+ * @param inputStream The input stream containing the data to be sent
+ * @param contentLength The content length. If a content length smaller than the actual size of the object is set, the client
+ * will truncate the stream to the specified content length and only send exactly the number of bytes
+ * equal to the content length.
+ *
+ * @see #fromInputStream(InputStream, Long, ExecutorService)
+ * @return An AsyncRequestBody instance for the input stream
+ */
+ static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength) {
+ return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength));
+ }
+
/**
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with the provided
* {@link AsyncRequestBodyFromInputStreamConfiguration}.
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfiguration.java
index ec8272458fb7..462476574d97 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfiguration.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfiguration.java
@@ -40,7 +40,7 @@ private AsyncRequestBodyFromInputStreamConfiguration(DefaultBuilder builder) {
this.inputStream = Validate.paramNotNull(builder.inputStream, "inputStream");
this.contentLength = Validate.isNotNegativeOrNull(builder.contentLength, "contentLength");
this.maxReadLimit = Validate.isPositiveOrNull(builder.maxReadLimit, "maxReadLimit");
- this.executor = Validate.paramNotNull(builder.executor, "executor");
+ this.executor = builder.executor;
}
/**
@@ -58,7 +58,7 @@ public Long contentLength() {
}
/**
- * @return the provided {@link ExecutorService}.
+ * @return the provided {@link ExecutorService}, or {@code null} if the SDK-managed executor should be used.
*/
public ExecutorService executor() {
return executor;
@@ -137,8 +137,12 @@ public interface Builder extends CopyableBuilder If not provided, the SDK uses a shared internal cached thread pool that allocates one thread per
+ * concurrent request.
+ *
+ * If providing a custom executor, it is recommended to have a dedicated executor for SDK input stream
+ * requests and have as many threads as the number of concurrent requests sharing it. Using an undersized or
+ * shared executor may lead to degraded performance or request timeouts.
* See {@link AsyncRequestBody#fromInputStream(InputStream, Long, ExecutorService)} for details.
*
* @param executor the executor
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.java
index 7ee81817ed11..a29e69f7028b 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.java
@@ -22,6 +22,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -35,6 +36,7 @@
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.Logger;
+import software.amazon.awssdk.utils.ThreadFactoryBuilder;
/**
* A {@link AsyncRequestBody} that allows reading data off of an {@link InputStream} using a background
@@ -53,10 +55,18 @@ public class InputStreamWithExecutorAsyncRequestBody implements AsyncRequestBody
private Future> writeFuture;
+ private static final class SharedExecutor {
+ private static final ExecutorService INSTANCE = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().threadNamePrefix("sdk-async-input-stream").daemonThreads(true).build()
+ );
+ }
+
public InputStreamWithExecutorAsyncRequestBody(AsyncRequestBodyFromInputStreamConfiguration configuration) {
this.inputStream = configuration.inputStream();
this.contentLength = configuration.contentLength();
- this.executor = configuration.executor();
+ this.executor = configuration.executor() != null
+ ? configuration.executor()
+ : SharedExecutor.INSTANCE;
IoUtils.markStreamWithMaxReadLimit(inputStream, configuration.maxReadLimit());
}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfigurationTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfigurationTest.java
index 7994c9595520..45cd9c4b4a0e 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfigurationTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfigurationTest.java
@@ -59,12 +59,12 @@ void inputStreamIsNull_shouldThrowException() {
@Test
- void executorIsNull_shouldThrowException() {
- assertThatThrownBy(() ->
- AsyncRequestBodyFromInputStreamConfiguration.builder()
- .inputStream(mock(InputStream.class))
- .build())
- .isInstanceOf(NullPointerException.class).hasMessageContaining("executor");
+ void executorIsNull_shouldUseDefault() {
+ AsyncRequestBodyFromInputStreamConfiguration config =
+ AsyncRequestBodyFromInputStreamConfiguration.builder()
+ .inputStream(mock(InputStream.class))
+ .build();
+ assertThat(config.executor()).isNull();
}
@ParameterizedTest
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java
index c7b48d79cd1b..97183f280e1d 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java
@@ -19,74 +19,100 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.junit.jupiter.api.Test;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;
class InputStreamWithExecutorAsyncRequestBodyTest {
- @Test
- @Timeout(10)
- public void dataFromInputStreamIsCopied() throws Exception {
- ExecutorService executor = Executors.newSingleThreadExecutor();
- try {
- PipedOutputStream os = new PipedOutputStream();
- PipedInputStream is = new PipedInputStream(os);
-
- InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
- (InputStreamWithExecutorAsyncRequestBody) AsyncRequestBody.fromInputStream(b -> b.inputStream(is).executor(executor).contentLength(4L));
-
- ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
- asyncRequestBody.subscribe(subscriber);
-
- os.write(0);
- os.write(1);
- os.write(2);
- os.write(3);
- os.close();
-
- asyncRequestBody.activeWriteFuture().get();
-
- ByteBuffer output = ByteBuffer.allocate(8);
- assertThat(subscriber.transferTo(output)).isEqualTo(TransferResult.END_OF_STREAM);
- output.flip();
-
- assertThat(output.remaining()).isEqualTo(4);
- assertThat(output.get()).isEqualTo((byte) 0);
- assertThat(output.get()).isEqualTo((byte) 1);
- assertThat(output.get()).isEqualTo((byte) 2);
- assertThat(output.get()).isEqualTo((byte) 3);
- } finally {
- executor.shutdownNow();
- }
+
+ private static ExecutorService customerExecutor;
+
+ @BeforeAll
+ static void setUp() {
+ customerExecutor = Executors.newSingleThreadExecutor();
+ }
+
+ @AfterAll
+ static void tearDown() {
+ customerExecutor.shutdownNow();
+ }
+
+ static Stream