Skip to content

Commit 474111b

Browse files
committed
cleanup and changelog
1 parent d37056d commit 474111b

11 files changed

Lines changed: 33 additions & 35 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "S3",
4+
"contributor": "",
5+
"description": "Add support for parallel download for individual part-get for multipart GetObject in s3 async client and Transfer Manager"
6+
}

.idea/codeStyles/Project.xml

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ public void exceptionOccurred(Throwable throwable) {
179179
}
180180
}
181181
if (cf != null) {
182-
log.info(() -> "completing file future exceptionally");
183182
cf.completeExceptionally(throwable);
184183
} else {
185184
log.warn(() -> "An exception occurred before the call to prepare() was able to instantiate the CompletableFuture."
@@ -192,6 +191,9 @@ public String name() {
192191
return TransformerType.FILE.getName();
193192
}
194193

194+
/**
195+
* {@link Subscriber} implementation that writes chunks to a file.
196+
*/
195197
static class FileSubscriber implements Subscriber<ByteBuffer> {
196198
private final AtomicLong position;
197199
private final AsynchronousFileChannel fileChannel;
@@ -205,7 +207,7 @@ static class FileSubscriber implements Subscriber<ByteBuffer> {
205207
private Subscription subscription;
206208

207209
FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future,
208-
Consumer<Throwable> onErrorMethod, long startingPosition) {
210+
Consumer<Throwable> onErrorMethod, long startingPosition) {
209211
this.fileChannel = fileChannel;
210212
this.path = path;
211213
this.future = future;

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSplittingTransformer.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisherTckTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,4 @@ public long maxElementsFromPublisher() {
5757
return Long.MAX_VALUE;
5858
}
5959

60-
}
60+
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileSubscriberTckTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import org.reactivestreams.Subscription;
3232
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
3333
import org.reactivestreams.tck.TestEnvironment;
34+
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer.FileSubscriber;
35+
3436

3537
/**
36-
* TCK verification test for {@link FileAsyncResponseTransformer.FileSubscriber}.
38+
* TCK verification test for {@link FileSubscriber}.
3739
*/
3840
public class FileSubscriberTckTest extends SubscriberWhiteboxVerification<ByteBuffer> {
3941
private static final byte[] CONTENT = new byte[16];
@@ -46,7 +48,7 @@ public FileSubscriberTckTest() {
4648
@Override
4749
public Subscriber<ByteBuffer> createSubscriber(WhiteboxSubscriberProbe<ByteBuffer> whiteboxSubscriberProbe) {
4850
Path tempFile = getNewTempFile();
49-
return new FileAsyncResponseTransformer.FileSubscriber(openChannel(tempFile), tempFile, new CompletableFuture<>(), (t) -> {}, 0) {
51+
return new FileSubscriber(openChannel(tempFile), tempFile, new CompletableFuture<>(), (t) -> {}, 0) {
5052
@Override
5153
public void onSubscribe(Subscription s) {
5254
super.onSubscribe(s);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ private <T> CompletableFuture<T> downloadPartsNonSerially(
6262
GetObjectRequest getObjectRequest,
6363
AsyncResponseTransformer.SplitResult<GetObjectResponse, T> split,
6464
int maxInFlight) {
65-
// TODO pause & resume
6665
NonLinearMultipartDownloaderSubscriber subscriber = new NonLinearMultipartDownloaderSubscriber(
6766
s3AsyncClient, getObjectRequest, (CompletableFuture<GetObjectResponse>) split.resultFuture(), maxInFlight);
6867
split.publisher().subscribe(subscriber);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ private MultipartS3AsyncClient(S3AsyncClient delegate, MultipartConfiguration mu
6262
long minPartSizeInBytes = resolver.minimalPartSizeInBytes();
6363
long threshold = resolver.thresholdInBytes();
6464
long apiCallBufferSize = resolver.apiCallBufferSize();
65-
int maxInFLight = resolver.maxInFlightParts();
65+
int maxInFlightParts = resolver.maxInFlightParts();
6666
mpuHelper = new UploadObjectHelper(delegate, resolver);
6767
copyObjectHelper = new CopyObjectHelper(delegate, minPartSizeInBytes, threshold);
68-
downloadObjectHelper = new DownloadObjectHelper(delegate, apiCallBufferSize, maxInFLight);
68+
downloadObjectHelper = new DownloadObjectHelper(delegate, apiCallBufferSize, maxInFlightParts);
6969
this.checksumEnabled = checksumEnabled;
7070
}
7171

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,12 @@ private void request(int amount) {
163163
@Override
164164
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
165165
outstandingDemand.decrementAndGet();
166-
log.trace(() -> "=== On Next ===\nTotal in flight parts: " + inFlightRequests.size()
167-
+ "\nOutstanding Demand : " + outstandingDemand.get()
168-
+ "\nTotal completed parts: " + completedParts
169-
+ "\nTotal transformers requested: " + transformersRequested.get()
170-
+ "\nTotal pending transformers: " + pendingTransformers.size()
171-
+ "\nCurrent in flight requests: " + inFlightRequests.keySet());
166+
log.trace(() -> "On Next - Total in flight parts: " + inFlightRequests.size()
167+
+ " - Demand : " + outstandingDemand.get()
168+
+ " - Total completed parts: " + completedParts
169+
+ " - Total transformers requested: " + transformersRequested.get()
170+
+ " - Total pending transformers: " + pendingTransformers.size()
171+
+ " - Current in flight requests: " + inFlightRequests.keySet());
172172
if (asyncResponseTransformer == null) {
173173
subscription.cancel();
174174
throw new NullPointerException("onNext must not be called with null asyncResponseTransformer");

services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/ParallelConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@
1616
package software.amazon.awssdk.services.s3.multipart;
1717

1818
import software.amazon.awssdk.annotations.SdkPublicApi;
19+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
20+
import software.amazon.awssdk.services.s3.S3AsyncClient;
1921
import software.amazon.awssdk.utils.builder.CopyableBuilder;
2022
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
2123

24+
/**
25+
* Class that holds configuration properties related to multipart operations for a {@link S3AsyncClient}, related specifically
26+
* to non-linear, parallel operations, that is, when the {@link AsyncResponseTransformer} supports non-serial split.
27+
*/
2228
@SdkPublicApi
2329
public class ParallelConfiguration implements ToCopyableBuilder<ParallelConfiguration.Builder, ParallelConfiguration> {
2430

@@ -32,6 +38,10 @@ public static Builder builder() {
3238
return new Builder();
3339
}
3440

41+
/**
42+
* The maximum number of concurrent GetObject the that are allowed for multipart download.
43+
* @return The value for the maximum number of concurrent GetObject the that are allowed for multipart download.
44+
*/
3545
public Integer maxInFlightParts() {
3646
return maxInFlightParts;
3747
}

0 commit comments

Comments
 (0)