-
Notifications
You must be signed in to change notification settings - Fork 996
Retry support for Java-based S3 multipart client for multipart GET to bytes #6328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
davidh44
merged 22 commits into
master
from
feature/master/multipart-download-byteArrayAsyncResponseTransformer-retry
Aug 22, 2025
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
7c279ef
Override split method in byte array based response transformer
zoewangg 00e97c1
Rename duplicate test
davidh44 24aedc6
Merge branch 'master' into zoewang/splitAsyncResponseTransformer
davidh44 49c7ef2
Update tests and refactor
davidh44 78058b1
Clean up comments
davidh44 76d3277
Remove unused import
davidh44 f386bfe
Refactor tests
davidh44 7bae1ed
Revert logging enabled
davidh44 292df0c
Revert logging enabled
davidh44 1d2dc6c
Revert logging enabled
davidh44 523b2bd
Add changelog
davidh44 76281d6
Update javadocs
davidh44 1d136e5
Use maximumBufferSize in ByteArraySplittingTransformer
davidh44 c381f8a
Revert "Use maximumBufferSize in ByteArraySplittingTransformer"
davidh44 4a927cb
Address comments and refactor tests
davidh44 4a8d6e2
Address comments
davidh44 4eb9324
Remove unused import
davidh44 0ea7599
Merge branch 'master' into feature/master/multipart-download-byteArra…
davidh44 8c2b5a6
Address comments and avoid copying buffer
davidh44 7e1cf14
Address comments
davidh44 0fa8725
Merge branch 'master' into feature/master/multipart-download-byteArra…
davidh44 33c2234
Add check for total parts
davidh44 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "type": "feature", | ||
| "category": "Amazon S3", | ||
| "contributor": "", | ||
| "description": "Add retry support for Java based S3 multipart client download to Byte array" | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
251 changes: 251 additions & 0 deletions
251
...c/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,251 @@ | ||
| /* | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). | ||
| * You may not use this file except in compliance with the License. | ||
| * A copy of the License is located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed | ||
| * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package software.amazon.awssdk.core.internal.async; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import org.reactivestreams.Subscriber; | ||
| import org.reactivestreams.Subscription; | ||
| import software.amazon.awssdk.annotations.SdkInternalApi; | ||
| import software.amazon.awssdk.core.ResponseBytes; | ||
| import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
| import software.amazon.awssdk.core.async.SdkPublisher; | ||
| import software.amazon.awssdk.core.exception.SdkClientException; | ||
| import software.amazon.awssdk.utils.CompletableFutureUtils; | ||
| import software.amazon.awssdk.utils.Logger; | ||
| import software.amazon.awssdk.utils.async.SimplePublisher; | ||
|
|
||
| /** | ||
| * A splitting transformer that creates individual {@link ByteArrayAsyncResponseTransformer} instances for each part of a | ||
| * multipart download. This is necessary to support retries of individual part downloads. | ||
| * | ||
| * <p> | ||
| * This class is created by {@link ByteArrayAsyncResponseTransformer#split} and used internally by the multipart | ||
| * download logic. | ||
| */ | ||
|
|
||
| @SdkInternalApi | ||
| public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> { | ||
davidh44 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private static final Logger log = Logger.loggerFor(ByteArraySplittingTransformer.class); | ||
| private final AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer; | ||
| private final CompletableFuture<ResponseBytes<ResponseT>> resultFuture; | ||
| private Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> downstreamSubscriber; | ||
| private final AtomicInteger nextPartNumber = new AtomicInteger(1); | ||
| private final AtomicReference<ResponseT> responseT = new AtomicReference<>(); | ||
|
|
||
| private final SimplePublisher<ByteBuffer> publisherToUpstream = new SimplePublisher<>(); | ||
| /** | ||
| * The amount requested by the downstream subscriber that is still left to fulfill. Updated when the | ||
| * {@link Subscription#request(long) request} method is called on the downstream subscriber's subscription. Corresponds to the | ||
| * number of {@code AsyncResponseTransformer} that will be published to the downstream subscriber. | ||
| */ | ||
| private final AtomicLong outstandingDemand = new AtomicLong(0); | ||
|
|
||
| /** | ||
| * This flag stops the current thread from publishing transformers while another thread is already publishing. | ||
| */ | ||
| private final AtomicBoolean emitting = new AtomicBoolean(false); | ||
|
|
||
| /** | ||
| * Synchronization lock that protects the {@code onStreamCalled} flag and cancellation | ||
| * workflow from concurrent access. Ensures thread-safety of subscription cancellation. | ||
| */ | ||
| private final Object lock = new Object(); | ||
davidh44 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Set to true once {@code .onStream()} is called on the upstreamResponseTransformer | ||
| */ | ||
| private boolean onStreamCalled; | ||
|
|
||
| /** | ||
| * Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the | ||
| * {@code resultFuture} is cancelled. | ||
| */ | ||
| private final AtomicBoolean isCancelled = new AtomicBoolean(false); | ||
|
|
||
| private final Map<Integer, ByteBuffer> buffers; | ||
|
|
||
| public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> | ||
| upstreamResponseTransformer, | ||
| CompletableFuture<ResponseBytes<ResponseT>> resultFuture) { | ||
| this.upstreamResponseTransformer = upstreamResponseTransformer; | ||
| this.resultFuture = resultFuture; | ||
| this.buffers = new ConcurrentHashMap<>(); | ||
| } | ||
|
|
||
| @Override | ||
| public void subscribe(Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> subscriber) { | ||
| this.downstreamSubscriber = subscriber; | ||
| subscriber.onSubscribe(new DownstreamSubscription()); | ||
| } | ||
|
|
||
| private final class DownstreamSubscription implements Subscription { | ||
|
|
||
| @Override | ||
| public void request(long n) { | ||
| if (n <= 0) { | ||
| downstreamSubscriber.onError(new IllegalArgumentException("Amount requested must be positive")); | ||
| return; | ||
| } | ||
| long newDemand = outstandingDemand.updateAndGet(current -> { | ||
| if (Long.MAX_VALUE - current < n) { | ||
| return Long.MAX_VALUE; | ||
| } | ||
| return current + n; | ||
| }); | ||
| log.trace(() -> String.format("new outstanding demand: %s", newDemand)); | ||
| emit(); | ||
| } | ||
|
|
||
| @Override | ||
| public void cancel() { | ||
| log.trace(() -> String.format("received cancel signal. Current cancel state is 'isCancelled=%s'", isCancelled.get())); | ||
| if (isCancelled.compareAndSet(false, true)) { | ||
| handleSubscriptionCancel(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void emit() { | ||
| do { | ||
| if (!emitting.compareAndSet(false, true)) { | ||
| return; | ||
| } | ||
| try { | ||
| if (doEmit()) { | ||
| return; | ||
| } | ||
| } finally { | ||
| emitting.compareAndSet(true, false); | ||
| } | ||
| } while (outstandingDemand.get() > 0); | ||
| } | ||
|
|
||
| private boolean doEmit() { | ||
| long demand = outstandingDemand.get(); | ||
|
|
||
| while (demand > 0) { | ||
| if (isCancelled.get()) { | ||
| return true; | ||
| } | ||
|
|
||
| demand = outstandingDemand.decrementAndGet(); | ||
| downstreamSubscriber.onNext(new IndividualTransformer(nextPartNumber.getAndIncrement())); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream | ||
| * transformer need to finish processing before we complete. One typical use case for this is completing the multipart | ||
| * download, the subscriber having reached the final part will signal that it doesn't need more | ||
| * {@link AsyncResponseTransformer}s by calling {@code .cancel()} on the subscription. | ||
| */ | ||
| private void handleSubscriptionCancel() { | ||
| synchronized (lock) { | ||
| if (downstreamSubscriber == null) { | ||
| log.trace(() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()"); | ||
| return; | ||
| } | ||
| if (!onStreamCalled) { | ||
| // we never subscribe publisherToUpstream to the upstream, it would not complete | ||
| downstreamSubscriber = null; | ||
| return; | ||
| } | ||
|
|
||
| // if result future is already complete (likely by exception propagation), skip. | ||
| if (resultFuture.isDone()) { | ||
| return; | ||
| } | ||
dagnir marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| try { | ||
| CompletableFuture<ResponseBytes<ResponseT>> upstreamPrepareFuture = upstreamResponseTransformer.prepare(); | ||
| CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture); | ||
|
|
||
| upstreamResponseTransformer.onResponse(responseT.get()); | ||
|
|
||
| int totalPartCount = nextPartNumber.get() - 1; | ||
| if (buffers.size() != totalPartCount) { | ||
| resultFuture.completeExceptionally( | ||
| SdkClientException.create(String.format("Number of parts in buffer [%d] does not match total part count" | ||
| + " [%d], some parts did not complete successfully.", | ||
| buffers.size(), totalPartCount))); | ||
| return; | ||
| } | ||
| for (int i = 1; i <= totalPartCount; ++i) { | ||
| publisherToUpstream.send(buffers.get(i)).exceptionally(ex -> { | ||
| resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex)); | ||
| return null; | ||
| }); | ||
| } | ||
|
|
||
| publisherToUpstream.complete().exceptionally(ex -> { | ||
| resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", ex)); | ||
| return null; | ||
| }); | ||
| upstreamResponseTransformer.onStream(SdkPublisher.adapt(publisherToUpstream)); | ||
|
|
||
| } catch (Throwable throwable) { | ||
| resultFuture.completeExceptionally(SdkClientException.create("unexpected error occurred", throwable)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private final class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> { | ||
| private final int partNumber; | ||
| private final ByteArrayAsyncResponseTransformer<ResponseT> delegate = new ByteArrayAsyncResponseTransformer<>(); | ||
|
|
||
| private IndividualTransformer(int partNumber) { | ||
| this.partNumber = partNumber; | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<ResponseT> prepare() { | ||
| CompletableFuture<ResponseBytes<ResponseT>> prepare = delegate.prepare(); | ||
| return prepare.thenApply(responseBytes -> { | ||
| buffers.put(partNumber, responseBytes.asByteBuffer()); | ||
| return responseBytes.response(); | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void onResponse(ResponseT response) { | ||
| responseT.compareAndSet(null, response); | ||
| delegate.onResponse(response); | ||
| } | ||
|
|
||
| @Override | ||
| public void onStream(SdkPublisher<ByteBuffer> publisher) { | ||
| delegate.onStream(publisher); | ||
| synchronized (lock) { | ||
| if (!onStreamCalled) { | ||
| onStreamCalled = true; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void exceptionOccurred(Throwable error) { | ||
| delegate.exceptionOccurred(error); | ||
| } | ||
| } | ||
| } | ||
49 changes: 49 additions & 0 deletions
49
...java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformerTckTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /* | ||
| * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"). | ||
| * You may not use this file except in compliance with the License. | ||
| * A copy of the License is located at | ||
| * | ||
| * http://aws.amazon.com/apache2.0 | ||
| * | ||
| * or in the "license" file accompanying this file. This file is distributed | ||
| * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing | ||
| * permissions and limitations under the License. | ||
| */ | ||
|
|
||
| package software.amazon.awssdk.core.internal.async; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
| import org.reactivestreams.Publisher; | ||
| import org.reactivestreams.tck.PublisherVerification; | ||
| import org.reactivestreams.tck.TestEnvironment; | ||
| import software.amazon.awssdk.core.ResponseBytes; | ||
| import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
| import software.amazon.awssdk.core.async.SdkPublisher; | ||
|
|
||
| public class ByteArraySplittingTransformerTckTest extends PublisherVerification<AsyncResponseTransformer<Object, Object>> { | ||
|
|
||
| public ByteArraySplittingTransformerTckTest() { | ||
| super(new TestEnvironment()); | ||
| } | ||
|
|
||
| @Override | ||
| public Publisher<AsyncResponseTransformer<Object, Object>> createPublisher(long l) { | ||
| CompletableFuture<ResponseBytes<Object>> future = new CompletableFuture<>(); | ||
| AsyncResponseTransformer<Object, ResponseBytes<Object>> upstreamTransformer = AsyncResponseTransformer.toBytes(); | ||
| ByteArraySplittingTransformer<Object> transformer = new ByteArraySplittingTransformer<>(upstreamTransformer, future); | ||
| return SdkPublisher.adapt(transformer).limit(Math.toIntExact(l)); | ||
| } | ||
|
|
||
| @Override | ||
| public Publisher<AsyncResponseTransformer<Object, Object>> createFailedPublisher() { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public long maxElementsFromPublisher() { | ||
| return Long.MAX_VALUE; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.