Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-899dcc8.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Added `AsyncRequestBody.fromInputStream(InputStream, Long)` overload that uses an SDK-managed thread pool, removing the need for users to provide their own ExecutorService."
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,17 @@ static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers
}

/**
* Creates an {@link AsyncRequestBody} from an {@link InputStream}.
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with a customer-provided {@link ExecutorService}.
*
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
* non-blocking event loop threads owned by the SDK.
* non-blocking event loop threads owned by the SDK. Consider using {@link #fromInputStream(InputStream, Long)} instead,
* which lets the SDK manage threading internally.
*
* <p><b>Executor Guidance:</b> The provided executor is used to run a blocking task that reads from the input stream and
* pushes data to the HTTP channel. If the executor has fewer threads than the number of concurrent requests using it,
* pushes data to the HTTP connection. If the executor has fewer threads than the number of concurrent requests using it,
* tasks are serialized — one slow or failing request may block other requests from writing data in a timely manner,
* leading to idle HTTP connections that the server may close before data can be written. This may result in an
* unrecoverable write timeout loop where every retry also fails.
* leading to idle HTTP connections that the server may close before data can be written. This may result in
* degraded performance or request timeouts.
*
* <p>To avoid this:
* <ul>
Expand All @@ -399,11 +400,30 @@ static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers
*
* @return An AsyncRequestBody instance for the input stream
*
* @see #fromInputStream(InputStream, Long)
*/
static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) {
return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength).executor(executor));
}

/**
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with SDK-managed executor.
*
* <p>The SDK manages the threading required to perform blocking reads from the input stream
* without blocking the non-blocking event loop threads.
*
* @param inputStream The input stream containing the data to be sent
* @param contentLength The content length. If a content length smaller than the actual size of the object is set, the client
* will truncate the stream to the specified content length and only send exactly the number of bytes
* equal to the content length.
*
* @see #fromInputStream(InputStream, Long, ExecutorService)
* @return An AsyncRequestBody instance for the input stream
*/
static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength) {
return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength));
}

/**
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with the provided
* {@link AsyncRequestBodyFromInputStreamConfiguration}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private AsyncRequestBodyFromInputStreamConfiguration(DefaultBuilder builder) {
this.inputStream = Validate.paramNotNull(builder.inputStream, "inputStream");
this.contentLength = Validate.isNotNegativeOrNull(builder.contentLength, "contentLength");
this.maxReadLimit = Validate.isPositiveOrNull(builder.maxReadLimit, "maxReadLimit");
this.executor = Validate.paramNotNull(builder.executor, "executor");
this.executor = builder.executor;
}

/**
Expand All @@ -58,7 +58,7 @@ public Long contentLength() {
}

/**
* @return the provided {@link ExecutorService}.
* @return the provided {@link ExecutorService}, or {@code null} if the SDK-managed executor should be used.
*/
public ExecutorService executor() {
return executor;
Expand Down Expand Up @@ -137,8 +137,12 @@ public interface Builder extends CopyableBuilder<AsyncRequestBodyFromInputStream
/**
* Configures the {@link ExecutorService} to perform the blocking data reads.
*
* <p>It is recommended to have a dedicated executor for SDK input stream requests and have as many threads as the
* number of concurrent requests sharing it. Using an undersized or shared executor may lead to unrecoverable failures.
* <p>If not provided, the SDK uses a shared internal cached thread pool that allocates one thread per
* concurrent request.
*
* <p>If providing a custom executor, it is recommended to have a dedicated executor for SDK input stream
* requests and have as many threads as the number of concurrent requests sharing it. Using an undersized or
* shared executor may lead to degraded performance or request timeouts.
* See {@link AsyncRequestBody#fromInputStream(InputStream, Long, ExecutorService)} for details.
*
* @param executor the executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -35,6 +36,7 @@
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.ThreadFactoryBuilder;

/**
* A {@link AsyncRequestBody} that allows reading data off of an {@link InputStream} using a background
Expand All @@ -53,10 +55,18 @@ public class InputStreamWithExecutorAsyncRequestBody implements AsyncRequestBody

private Future<?> writeFuture;

private static final class SharedExecutor {
private static final ExecutorService INSTANCE = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().threadNamePrefix("sdk-async-input-stream").daemonThreads(true).build()
);
}

public InputStreamWithExecutorAsyncRequestBody(AsyncRequestBodyFromInputStreamConfiguration configuration) {
this.inputStream = configuration.inputStream();
this.contentLength = configuration.contentLength();
this.executor = configuration.executor();
this.executor = configuration.executor() != null
? configuration.executor()
: SharedExecutor.INSTANCE;
IoUtils.markStreamWithMaxReadLimit(inputStream, configuration.maxReadLimit());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ void inputStreamIsNull_shouldThrowException() {


@Test
void executorIsNull_shouldThrowException() {
assertThatThrownBy(() ->
AsyncRequestBodyFromInputStreamConfiguration.builder()
.inputStream(mock(InputStream.class))
.build())
.isInstanceOf(NullPointerException.class).hasMessageContaining("executor");
void executorIsNull_shouldUseDefault() {
AsyncRequestBodyFromInputStreamConfiguration config =
AsyncRequestBodyFromInputStreamConfiguration.builder()
.inputStream(mock(InputStream.class))
.build();
assertThat(config.executor()).isNull();
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,74 +19,100 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;

class InputStreamWithExecutorAsyncRequestBodyTest {
@Test
@Timeout(10)
public void dataFromInputStreamIsCopied() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
PipedOutputStream os = new PipedOutputStream();
PipedInputStream is = new PipedInputStream(os);

InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
(InputStreamWithExecutorAsyncRequestBody) AsyncRequestBody.fromInputStream(b -> b.inputStream(is).executor(executor).contentLength(4L));

ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
asyncRequestBody.subscribe(subscriber);

os.write(0);
os.write(1);
os.write(2);
os.write(3);
os.close();

asyncRequestBody.activeWriteFuture().get();

ByteBuffer output = ByteBuffer.allocate(8);
assertThat(subscriber.transferTo(output)).isEqualTo(TransferResult.END_OF_STREAM);
output.flip();

assertThat(output.remaining()).isEqualTo(4);
assertThat(output.get()).isEqualTo((byte) 0);
assertThat(output.get()).isEqualTo((byte) 1);
assertThat(output.get()).isEqualTo((byte) 2);
assertThat(output.get()).isEqualTo((byte) 3);
} finally {
executor.shutdownNow();
}

private static ExecutorService customerExecutor;

@BeforeAll
static void setUp() {
customerExecutor = Executors.newSingleThreadExecutor();
}

@AfterAll
static void tearDown() {
customerExecutor.shutdownNow();
}

static Stream<Arguments> requestBodyFactories() {
return Stream.of(
Arguments.of("customerProvidedExecutor",
(BiFunction<InputStream, Long, AsyncRequestBody>) (is, len) ->
AsyncRequestBody.fromInputStream(is, len, customerExecutor)),
Arguments.of("sdkManagedExecutor_twoArgOverload",
(BiFunction<InputStream, Long, AsyncRequestBody>) AsyncRequestBody::fromInputStream),
Arguments.of("sdkManagedExecutor_builderWithoutExecutor",
(BiFunction<InputStream, Long, AsyncRequestBody>) (is, len) ->
AsyncRequestBody.fromInputStream(b -> b.inputStream(is).contentLength(len)))
);
}

@Test
@ParameterizedTest(name = "{0}")
@MethodSource("requestBodyFactories")
@Timeout(10)
public void errorsReadingInputStreamAreForwardedToSubscriber() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
PipedOutputStream os = new PipedOutputStream();
PipedInputStream is = new PipedInputStream(os);
public void dataFromInputStreamIsCopied(String name,

Check warning on line 70 in core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ2XOqd0ebChCgSk5ywH&open=AZ2XOqd0ebChCgSk5ywH&pullRequest=6868
BiFunction<InputStream, Long, AsyncRequestBody> factory) throws Exception {
PipedOutputStream os = new PipedOutputStream();
PipedInputStream is = new PipedInputStream(os);

InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
(InputStreamWithExecutorAsyncRequestBody) factory.apply(is, 4L);

is.close();
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
asyncRequestBody.subscribe(subscriber);

os.write(0);
os.write(1);
os.write(2);
os.write(3);
os.close();

asyncRequestBody.activeWriteFuture().get();

ByteBuffer output = ByteBuffer.allocate(8);
assertThat(subscriber.transferTo(output)).isEqualTo(TransferResult.END_OF_STREAM);
output.flip();

assertThat(output.remaining()).isEqualTo(4);
assertThat(output.get()).isEqualTo((byte) 0);
assertThat(output.get()).isEqualTo((byte) 1);
assertThat(output.get()).isEqualTo((byte) 2);
assertThat(output.get()).isEqualTo((byte) 3);
}

@ParameterizedTest(name = "{0}")
@MethodSource("requestBodyFactories")
@Timeout(10)
public void errorsReadingInputStreamAreForwardedToSubscriber(String name,

Check warning on line 103 in core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ2XOqd0ebChCgSk5ywI&open=AZ2XOqd0ebChCgSk5ywI&pullRequest=6868
BiFunction<InputStream, Long, AsyncRequestBody> factory) throws Exception {
PipedOutputStream os = new PipedOutputStream();
PipedInputStream is = new PipedInputStream(os);

InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
(InputStreamWithExecutorAsyncRequestBody) AsyncRequestBody.fromInputStream(b -> b.inputStream(is).executor(executor).contentLength(4L));
is.close();

InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
(InputStreamWithExecutorAsyncRequestBody) factory.apply(is, 4L);

ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
asyncRequestBody.subscribe(subscriber);
assertThatThrownBy(() -> asyncRequestBody.activeWriteFuture().get()).hasRootCauseInstanceOf(IOException.class);
assertThatThrownBy(() -> subscriber.transferTo(ByteBuffer.allocate(8))).hasRootCauseInstanceOf(IOException.class);
} finally {
executor.shutdownNow();
}
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
asyncRequestBody.subscribe(subscriber);
assertThatThrownBy(() -> asyncRequestBody.activeWriteFuture().get()).hasRootCauseInstanceOf(IOException.class);
assertThatThrownBy(() -> subscriber.transferTo(ByteBuffer.allocate(8))).hasRootCauseInstanceOf(IOException.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,23 @@ private void assertWriteThroughputReported(MetricCollection metrics) {
assertThat(writeThroughput.get(0)).isGreaterThan(0);
}

@ParameterizedTest
@MethodSource("s3Clients")
public void putObject_fromInputStreamWithSdkManagedExecutor_shouldSucceed(S3AsyncClient client) {
String key = "sdk-managed-executor-" + ASYNC_KEY;
byte[] content = RandomStringUtils.randomAlphanumeric(1024).getBytes(StandardCharsets.UTF_8);

AsyncRequestBody body = AsyncRequestBody.fromInputStream(new ByteArrayInputStream(content), (long) content.length);
client.putObject(b -> b.bucket(BUCKET).key(key), body).join();

ResponseInputStream<GetObjectResponse> response =
client.getObject(b -> b.bucket(BUCKET).key(key), AsyncResponseTransformer.toBlockingInputStream()).join();

assertThat(response.response().contentLength()).isEqualTo(content.length);
byte[] downloaded = invokeSafely(() -> IoUtils.toByteArray(response));
assertThat(downloaded).isEqualTo(content);
}

private static class TestContentProvider implements ContentStreamProvider {
private final byte[] content;
private final List<CloseTrackingInputStream> createdStreams = new ArrayList<>();
Expand Down
Loading