Skip to content

Commit a4ec777

Browse files
committed
Add a new fromInputStream(InputStream, Long) overload that uses an SDK-managed shared cached thread pool
1 parent f608643 commit a4ec777

7 files changed

Lines changed: 151 additions & 68 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Added `AsyncRequestBody.fromInputStream(InputStream, Long)` overload that uses an SDK-managed thread pool, removing the need for users to provide their own ExecutorService."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -370,16 +370,17 @@ static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers
370370
}
371371

372372
/**
373-
* Creates an {@link AsyncRequestBody} from an {@link InputStream}.
373+
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with a customer-provided {@link ExecutorService}.
374374
*
375375
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
376-
* non-blocking event loop threads owned by the SDK.
376+
* non-blocking event loop threads owned by the SDK. Consider using {@link #fromInputStream(InputStream, Long)} instead,
377+
* which lets the SDK manage threading internally.
377378
*
378379
* <p><b>Executor Guidance:</b> The provided executor is used to run a blocking task that reads from the input stream and
379-
* pushes data to the HTTP channel. If the executor has fewer threads than the number of concurrent requests using it,
380+
* pushes data to the HTTP connection. If the executor has fewer threads than the number of concurrent requests using it,
380381
* tasks are serialized — one slow or failing request may block other requests from writing data in a timely manner,
381-
* leading to idle HTTP connections that the server may close before data can be written. This may result in an
382-
* unrecoverable write timeout loop where every retry also fails.
382+
* leading to idle HTTP connections that the server may close before data can be written. This may result in
383+
* degraded performance or request timeouts.
383384
*
384385
* <p>To avoid this:
385386
* <ul>
@@ -399,11 +400,30 @@ static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers
399400
*
400401
* @return An AsyncRequestBody instance for the input stream
401402
*
403+
* @see #fromInputStream(InputStream, Long)
402404
*/
403405
static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) {
404406
return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength).executor(executor));
405407
}
406408

409+
/**
410+
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with SDK-managed executor.
411+
*
412+
* <p>The SDK manages the threading required to perform blocking reads from the input stream
413+
* without blocking the non-blocking event loop threads.
414+
*
415+
* @param inputStream The input stream containing the data to be sent
416+
* @param contentLength The content length. If a content length smaller than the actual size of the object is set, the client
417+
* will truncate the stream to the specified content length and only send exactly the number of bytes
418+
* equal to the content length.
419+
*
420+
* @see #fromInputStream(InputStream, Long, ExecutorService)
421+
* @return An AsyncRequestBody instance for the input stream
422+
*/
423+
static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength) {
424+
return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength));
425+
}
426+
407427
/**
408428
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with the provided
409429
* {@link AsyncRequestBodyFromInputStreamConfiguration}.

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfiguration.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private AsyncRequestBodyFromInputStreamConfiguration(DefaultBuilder builder) {
4040
this.inputStream = Validate.paramNotNull(builder.inputStream, "inputStream");
4141
this.contentLength = Validate.isNotNegativeOrNull(builder.contentLength, "contentLength");
4242
this.maxReadLimit = Validate.isPositiveOrNull(builder.maxReadLimit, "maxReadLimit");
43-
this.executor = Validate.paramNotNull(builder.executor, "executor");
43+
this.executor = builder.executor;
4444
}
4545

4646
/**
@@ -58,7 +58,7 @@ public Long contentLength() {
5858
}
5959

6060
/**
61-
* @return the provided {@link ExecutorService}.
61+
* @return the provided {@link ExecutorService}, or {@code null} if the SDK-managed executor should be used.
6262
*/
6363
public ExecutorService executor() {
6464
return executor;
@@ -137,8 +137,12 @@ public interface Builder extends CopyableBuilder<AsyncRequestBodyFromInputStream
137137
/**
138138
* Configures the {@link ExecutorService} to perform the blocking data reads.
139139
*
140-
* <p>It is recommended to have a dedicated executor for SDK input stream requests and have as many threads as the
141-
* number of concurrent requests sharing it. Using an undersized or shared executor may lead to unrecoverable failures.
140+
* <p>If not provided, the SDK uses a shared internal cached thread pool that allocates one thread per
141+
* concurrent request.
142+
*
143+
* <p>If providing a custom executor, it is recommended to have a dedicated executor for SDK input stream
144+
* requests and have as many threads as the number of concurrent requests sharing it. Using an undersized or
145+
* shared executor may lead to degraded performance or request timeouts.
142146
* See {@link AsyncRequestBody#fromInputStream(InputStream, Long, ExecutorService)} for details.
143147
*
144148
* @param executor the executor

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CancellationException;
2323
import java.util.concurrent.ExecutionException;
2424
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
2526
import java.util.concurrent.Future;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.TimeoutException;
@@ -35,6 +36,7 @@
3536
import software.amazon.awssdk.core.internal.util.NoopSubscription;
3637
import software.amazon.awssdk.utils.IoUtils;
3738
import software.amazon.awssdk.utils.Logger;
39+
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
3840

3941
/**
4042
* A {@link AsyncRequestBody} that allows reading data off of an {@link InputStream} using a background
@@ -53,10 +55,18 @@ public class InputStreamWithExecutorAsyncRequestBody implements AsyncRequestBody
5355

5456
private Future<?> writeFuture;
5557

58+
private static final class SharedExecutor {
59+
private static final ExecutorService INSTANCE = Executors.newCachedThreadPool(
60+
new ThreadFactoryBuilder().threadNamePrefix("sdk-async-input-stream").daemonThreads(true).build()
61+
);
62+
}
63+
5664
public InputStreamWithExecutorAsyncRequestBody(AsyncRequestBodyFromInputStreamConfiguration configuration) {
5765
this.inputStream = configuration.inputStream();
5866
this.contentLength = configuration.contentLength();
59-
this.executor = configuration.executor();
67+
this.executor = configuration.executor() != null
68+
? configuration.executor()
69+
: SharedExecutor.INSTANCE;
6070
IoUtils.markStreamWithMaxReadLimit(inputStream, configuration.maxReadLimit());
6171
}
6272

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyFromInputStreamConfigurationTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ void inputStreamIsNull_shouldThrowException() {
5959

6060

6161
@Test
62-
void executorIsNull_shouldThrowException() {
63-
assertThatThrownBy(() ->
64-
AsyncRequestBodyFromInputStreamConfiguration.builder()
65-
.inputStream(mock(InputStream.class))
66-
.build())
67-
.isInstanceOf(NullPointerException.class).hasMessageContaining("executor");
62+
void executorIsNull_shouldUseDefault() {
63+
AsyncRequestBodyFromInputStreamConfiguration config =
64+
AsyncRequestBodyFromInputStreamConfiguration.builder()
65+
.inputStream(mock(InputStream.class))
66+
.build();
67+
assertThat(config.executor()).isNull();
6868
}
6969

7070
@ParameterizedTest

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java

Lines changed: 78 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,74 +19,100 @@
1919
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2020

2121
import java.io.IOException;
22+
import java.io.InputStream;
2223
import java.io.PipedInputStream;
2324
import java.io.PipedOutputStream;
2425
import java.nio.ByteBuffer;
2526
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.Executors;
27-
import org.junit.jupiter.api.Test;
28+
import java.util.function.BiFunction;
29+
import java.util.stream.Stream;
30+
import org.junit.jupiter.api.AfterAll;
31+
import org.junit.jupiter.api.BeforeAll;
2832
import org.junit.jupiter.api.Timeout;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.Arguments;
35+
import org.junit.jupiter.params.provider.MethodSource;
2936
import software.amazon.awssdk.core.async.AsyncRequestBody;
3037
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
3138
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;
3239

3340
class InputStreamWithExecutorAsyncRequestBodyTest {
34-
@Test
35-
@Timeout(10)
36-
public void dataFromInputStreamIsCopied() throws Exception {
37-
ExecutorService executor = Executors.newSingleThreadExecutor();
38-
try {
39-
PipedOutputStream os = new PipedOutputStream();
40-
PipedInputStream is = new PipedInputStream(os);
41-
42-
InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
43-
(InputStreamWithExecutorAsyncRequestBody) AsyncRequestBody.fromInputStream(b -> b.inputStream(is).executor(executor).contentLength(4L));
44-
45-
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
46-
asyncRequestBody.subscribe(subscriber);
47-
48-
os.write(0);
49-
os.write(1);
50-
os.write(2);
51-
os.write(3);
52-
os.close();
53-
54-
asyncRequestBody.activeWriteFuture().get();
55-
56-
ByteBuffer output = ByteBuffer.allocate(8);
57-
assertThat(subscriber.transferTo(output)).isEqualTo(TransferResult.END_OF_STREAM);
58-
output.flip();
59-
60-
assertThat(output.remaining()).isEqualTo(4);
61-
assertThat(output.get()).isEqualTo((byte) 0);
62-
assertThat(output.get()).isEqualTo((byte) 1);
63-
assertThat(output.get()).isEqualTo((byte) 2);
64-
assertThat(output.get()).isEqualTo((byte) 3);
65-
} finally {
66-
executor.shutdownNow();
67-
}
41+
42+
private static ExecutorService customerExecutor;
43+
44+
@BeforeAll
45+
static void setUp() {
46+
customerExecutor = Executors.newSingleThreadExecutor();
47+
}
48+
49+
@AfterAll
50+
static void tearDown() {
51+
customerExecutor.shutdownNow();
52+
}
53+
54+
static Stream<Arguments> requestBodyFactories() {
55+
return Stream.of(
56+
Arguments.of("customerProvidedExecutor",
57+
(BiFunction<InputStream, Long, AsyncRequestBody>) (is, len) ->
58+
AsyncRequestBody.fromInputStream(is, len, customerExecutor)),
59+
Arguments.of("sdkManagedExecutor_twoArgOverload",
60+
(BiFunction<InputStream, Long, AsyncRequestBody>) AsyncRequestBody::fromInputStream),
61+
Arguments.of("sdkManagedExecutor_builderWithoutExecutor",
62+
(BiFunction<InputStream, Long, AsyncRequestBody>) (is, len) ->
63+
AsyncRequestBody.fromInputStream(b -> b.inputStream(is).contentLength(len)))
64+
);
6865
}
6966

70-
@Test
67+
@ParameterizedTest(name = "{0}")
68+
@MethodSource("requestBodyFactories")
7169
@Timeout(10)
72-
public void errorsReadingInputStreamAreForwardedToSubscriber() throws Exception {
73-
ExecutorService executor = Executors.newSingleThreadExecutor();
74-
try {
75-
PipedOutputStream os = new PipedOutputStream();
76-
PipedInputStream is = new PipedInputStream(os);
70+
public void dataFromInputStreamIsCopied(String name,
71+
BiFunction<InputStream, Long, AsyncRequestBody> factory) throws Exception {
72+
PipedOutputStream os = new PipedOutputStream();
73+
PipedInputStream is = new PipedInputStream(os);
74+
75+
InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
76+
(InputStreamWithExecutorAsyncRequestBody) factory.apply(is, 4L);
7777

78-
is.close();
78+
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
79+
asyncRequestBody.subscribe(subscriber);
80+
81+
os.write(0);
82+
os.write(1);
83+
os.write(2);
84+
os.write(3);
85+
os.close();
86+
87+
asyncRequestBody.activeWriteFuture().get();
88+
89+
ByteBuffer output = ByteBuffer.allocate(8);
90+
assertThat(subscriber.transferTo(output)).isEqualTo(TransferResult.END_OF_STREAM);
91+
output.flip();
92+
93+
assertThat(output.remaining()).isEqualTo(4);
94+
assertThat(output.get()).isEqualTo((byte) 0);
95+
assertThat(output.get()).isEqualTo((byte) 1);
96+
assertThat(output.get()).isEqualTo((byte) 2);
97+
assertThat(output.get()).isEqualTo((byte) 3);
98+
}
99+
100+
@ParameterizedTest(name = "{0}")
101+
@MethodSource("requestBodyFactories")
102+
@Timeout(10)
103+
public void errorsReadingInputStreamAreForwardedToSubscriber(String name,
104+
BiFunction<InputStream, Long, AsyncRequestBody> factory) throws Exception {
105+
PipedOutputStream os = new PipedOutputStream();
106+
PipedInputStream is = new PipedInputStream(os);
79107

80-
InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
81-
(InputStreamWithExecutorAsyncRequestBody) AsyncRequestBody.fromInputStream(b -> b.inputStream(is).executor(executor).contentLength(4L));
108+
is.close();
82109

110+
InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
111+
(InputStreamWithExecutorAsyncRequestBody) factory.apply(is, 4L);
83112

84-
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
85-
asyncRequestBody.subscribe(subscriber);
86-
assertThatThrownBy(() -> asyncRequestBody.activeWriteFuture().get()).hasRootCauseInstanceOf(IOException.class);
87-
assertThatThrownBy(() -> subscriber.transferTo(ByteBuffer.allocate(8))).hasRootCauseInstanceOf(IOException.class);
88-
} finally {
89-
executor.shutdownNow();
90-
}
113+
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
114+
asyncRequestBody.subscribe(subscriber);
115+
assertThatThrownBy(() -> asyncRequestBody.activeWriteFuture().get()).hasRootCauseInstanceOf(IOException.class);
116+
assertThatThrownBy(() -> subscriber.transferTo(ByteBuffer.allocate(8))).hasRootCauseInstanceOf(IOException.class);
91117
}
92-
}
118+
}

services/s3/src/it/java/software/amazon/awssdk/services/s3/PutObjectIntegrationTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,23 @@ private void assertWriteThroughputReported(MetricCollection metrics) {
277277
assertThat(writeThroughput.get(0)).isGreaterThan(0);
278278
}
279279

280+
@ParameterizedTest
281+
@MethodSource("s3Clients")
282+
public void putObject_fromInputStreamWithSdkManagedExecutor_shouldSucceed(S3AsyncClient client) {
283+
String key = "sdk-managed-executor-" + ASYNC_KEY;
284+
byte[] content = RandomStringUtils.randomAlphanumeric(1024).getBytes(StandardCharsets.UTF_8);
285+
286+
AsyncRequestBody body = AsyncRequestBody.fromInputStream(new ByteArrayInputStream(content), (long) content.length);
287+
client.putObject(b -> b.bucket(BUCKET).key(key), body).join();
288+
289+
ResponseInputStream<GetObjectResponse> response =
290+
client.getObject(b -> b.bucket(BUCKET).key(key), AsyncResponseTransformer.toBlockingInputStream()).join();
291+
292+
assertThat(response.response().contentLength()).isEqualTo(content.length);
293+
byte[] downloaded = invokeSafely(() -> IoUtils.toByteArray(response));
294+
assertThat(downloaded).isEqualTo(content);
295+
}
296+
280297
private static class TestContentProvider implements ContentStreamProvider {
281298
private final byte[] content;
282299
private final List<CloseTrackingInputStream> createdStreams = new ArrayList<>();

0 commit comments

Comments
 (0)