Skip to content

Commit fe25917

Browse files
committed
switch to single, universal maxInFlightParts config
1 parent 488d51d commit fe25917

8 files changed

Lines changed: 23 additions & 84 deletions

File tree

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
7070
private final AtomicReferenceArray<CompletedPart> completedParts;
7171
private final Map<Integer, CompletedPart> existingParts;
7272
private final PublisherListener<Long> progressListener;
73-
private final int maxInFlightPutObjectParts;
73+
private final int maxInFlightParts;
7474
private Subscription subscription;
7575
private volatile boolean isDone;
7676
private volatile boolean isPaused;
@@ -83,7 +83,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
8383
KnownContentLengthAsyncRequestBodySubscriber(MpuRequestContext mpuRequestContext,
8484
CompletableFuture<PutObjectResponse> returnFuture,
8585
MultipartUploadHelper multipartUploadHelper,
86-
int maxInFlightPutObjectParts) {
86+
int maxInFlightParts) {
8787
this.totalSize = mpuRequestContext.contentLength();
8888
this.partSize = mpuRequestContext.partSize();
8989
this.expectedNumParts = mpuRequestContext.expectedNumParts();
@@ -94,7 +94,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
9494
this.existingNumParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted());
9595
this.completedParts = new AtomicReferenceArray<>(expectedNumParts);
9696
this.multipartUploadHelper = multipartUploadHelper;
97-
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
97+
this.maxInFlightParts = maxInFlightParts;
9898
this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes()
9999
.getAttribute(JAVA_PROGRESS_LISTENER))
100100
.orElseGet(PublisherListener::noOp);
@@ -197,13 +197,13 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
197197
}
198198
} else {
199199
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
200-
if (!isDone && inFlight < maxInFlightPutObjectParts) {
200+
if (!isDone && inFlight < maxInFlightParts) {
201201
subscription.request(1);
202202
}
203203
completeMultipartUploadIfFinished(inFlight);
204204
}
205205
});
206-
if (asyncRequestBodyInFlight.get() < maxInFlightPutObjectParts) {
206+
if (asyncRequestBodyInFlight.get() < maxInFlightParts) {
207207
subscription.request(1);
208208
}
209209
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartConfigurationResolver.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public final class MultipartConfigurationResolver {
3535
private final long apiCallBufferSize;
3636
private final long thresholdInBytes;
3737
private final int maxInFlightParts;
38-
private final int maxInFlightPutObjectParts;
3938

4039
public MultipartConfigurationResolver(MultipartConfiguration multipartConfiguration) {
4140
Validate.notNull(multipartConfiguration, "multipartConfiguration");
@@ -47,13 +46,9 @@ public MultipartConfigurationResolver(MultipartConfiguration multipartConfigurat
4746
ParallelConfiguration parallelConfiguration = multipartConfiguration.parallelConfiguration();
4847
if (parallelConfiguration == null) {
4948
this.maxInFlightParts = DEFAULT_MAX_IN_FLIGHT_PARTS;
50-
this.maxInFlightPutObjectParts = DEFAULT_MAX_IN_FLIGHT_PARTS;
5149
} else {
5250
this.maxInFlightParts = Validate.getOrDefault(multipartConfiguration.parallelConfiguration().maxInFlightParts(),
5351
() -> DEFAULT_MAX_IN_FLIGHT_PARTS);
54-
this.maxInFlightPutObjectParts = Validate.getOrDefault(
55-
multipartConfiguration.parallelConfiguration().maxInFlightPutObjectParts(),
56-
() -> DEFAULT_MAX_IN_FLIGHT_PARTS);
5752
}
5853
}
5954

@@ -72,8 +67,4 @@ public long apiCallBufferSize() {
7267
public int maxInFlightParts() {
7368
return maxInFlightParts;
7469
}
75-
76-
public int maxInFlightPutObjectParts() {
77-
return maxInFlightPutObjectParts;
78-
}
7970
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ private MultipartS3AsyncClient(S3AsyncClient delegate, MultipartConfiguration mu
6363
long threshold = resolver.thresholdInBytes();
6464
long apiCallBufferSize = resolver.apiCallBufferSize();
6565
int maxInFlightParts = resolver.maxInFlightParts();
66-
int maxInFlightPutObjectParts = resolver.maxInFlightPutObjectParts();
6766
mpuHelper = new UploadObjectHelper(delegate, resolver);
6867
copyObjectHelper = new CopyObjectHelper(delegate, minPartSizeInBytes, threshold);
6968
downloadObjectHelper = new DownloadObjectHelper(delegate, apiCallBufferSize, maxInFlightParts);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ public UploadObjectHelper(S3AsyncClient s3AsyncClient,
4848
SdkPojoConversionUtils::toPutObjectResponse);
4949
this.apiCallBufferSize = resolver.apiCallBufferSize();
5050
this.multipartUploadThresholdInBytes = resolver.thresholdInBytes();
51-
int maxInFlightPutObjectParts = resolver.maxInFlightPutObjectParts();
51+
int maxInFlightParts = resolver.maxInFlightParts();
5252
this.uploadWithKnownContentLength = new UploadWithKnownContentLengthHelper(s3AsyncClient,
5353
partSizeInBytes,
5454
multipartUploadThresholdInBytes,
5555
apiCallBufferSize,
56-
maxInFlightPutObjectParts);
56+
maxInFlightParts);
5757
this.uploadWithUnknownContentLength = new UploadWithUnknownContentLengthHelper(s3AsyncClient,
5858
partSizeInBytes,
5959
multipartUploadThresholdInBytes,
6060
apiCallBufferSize,
61-
maxInFlightPutObjectParts);
61+
maxInFlightParts);
6262
}
6363

6464
public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObjectRequest,

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ public final class UploadWithKnownContentLengthHelper {
5050
private final long maxMemoryUsageInBytes;
5151
private final long multipartUploadThresholdInBytes;
5252
private final MultipartUploadHelper multipartUploadHelper;
53-
private final int maxInFlightPutObjectParts;
53+
private final int maxInFlightParts;
5454

5555
public UploadWithKnownContentLengthHelper(S3AsyncClient s3AsyncClient,
5656
long partSizeInBytes,
5757
long multipartUploadThresholdInBytes,
5858
long maxMemoryUsageInBytes,
59-
int maxInFlightPutObjectParts) {
59+
int maxInFlightParts) {
6060
this.s3AsyncClient = s3AsyncClient;
6161
this.partSizeInBytes = partSizeInBytes;
6262
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
@@ -66,7 +66,7 @@ public UploadWithKnownContentLengthHelper(S3AsyncClient s3AsyncClient,
6666
this.multipartUploadThresholdInBytes = multipartUploadThresholdInBytes;
6767
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, multipartUploadThresholdInBytes,
6868
maxMemoryUsageInBytes);
69-
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
69+
this.maxInFlightParts = maxInFlightParts;
7070
}
7171

7272
public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObjectRequest,
@@ -185,7 +185,7 @@ private void resumePausedUpload(ResumeRequestContext resumeContext) {
185185
private void splitAndSubscribe(MpuRequestContext mpuRequestContext, CompletableFuture<PutObjectResponse> returnFuture) {
186186
KnownContentLengthAsyncRequestBodySubscriber subscriber =
187187
new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper,
188-
maxInFlightPutObjectParts);
188+
maxInFlightParts);
189189

190190
attachSubscriberToObservable(subscriber, mpuRequestContext.request().left());
191191

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,13 @@ public final class UploadWithUnknownContentLengthHelper {
6262
private final long multipartUploadThresholdInBytes;
6363

6464
private final MultipartUploadHelper multipartUploadHelper;
65-
private final int maxInFlightPutObjectParts;
65+
private final int maxInFlightParts;
6666

6767
public UploadWithUnknownContentLengthHelper(S3AsyncClient s3AsyncClient,
6868
long partSizeInBytes,
6969
long multipartUploadThresholdInBytes,
7070
long maxMemoryUsageInBytes,
71-
int maxInFlightPutObjectParts) {
71+
int maxInFlightParts) {
7272
this.s3AsyncClient = s3AsyncClient;
7373
this.partSizeInBytes = partSizeInBytes;
7474
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
@@ -78,7 +78,7 @@ public UploadWithUnknownContentLengthHelper(S3AsyncClient s3AsyncClient,
7878
this.multipartUploadThresholdInBytes = multipartUploadThresholdInBytes;
7979
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, multipartUploadThresholdInBytes,
8080
maxMemoryUsageInBytes);
81-
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
81+
this.maxInFlightParts = maxInFlightParts;
8282
}
8383

8484
public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObjectRequest,
@@ -259,15 +259,15 @@ private void sendUploadPartRequest(String uploadId,
259259
}
260260
} else {
261261
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
262-
if (!isDone && inFlight < maxInFlightPutObjectParts) {
262+
if (!isDone && inFlight < maxInFlightParts) {
263263
synchronized (UnknownContentLengthAsyncRequestBodySubscriber.this) {
264264
subscription.request(1);
265265
}
266266
}
267267
completeMultipartUploadIfFinish(inFlight);
268268
}
269269
});
270-
if (asyncRequestBodyInFlight.get() < maxInFlightPutObjectParts) {
270+
if (asyncRequestBodyInFlight.get() < maxInFlightParts) {
271271
synchronized (this) {
272272
subscription.request(1);
273273
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/ParallelConfiguration.java

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,45 +29,33 @@
2929
public class ParallelConfiguration implements ToCopyableBuilder<ParallelConfiguration.Builder, ParallelConfiguration> {
3030

3131
private final Integer maxInFlightParts;
32-
private final Integer maxInFlightPutObjectParts;
3332

3433
public ParallelConfiguration(Builder builder) {
3534
this.maxInFlightParts = builder.maxInFlightParts;
36-
this.maxInFlightPutObjectParts = builder.maxInFlightPutObjectParts;
3735
}
3836

3937
public static Builder builder() {
4038
return new Builder();
4139
}
4240

4341
/**
44-
* The maximum number of concurrent GetObject requests that are allowed for multipart download.
45-
* @return The value for the maximum number of concurrent GetObject requests that are allowed for multipart download.
42+
* The maximum number of concurrent part requests that are allowed for multipart operations, including both multipart
43+
* download (GetObject) and multipart upload (PutObject). This limits the number of parts that can be in flight at any
44+
* given time, preventing the client from overwhelming the HTTP connection pool when transferring large objects.
45+
*
46+
* @return The value for the maximum number of concurrent part requests.
4647
*/
4748
public Integer maxInFlightParts() {
4849
return maxInFlightParts;
4950
}
5051

51-
/**
52-
* The maximum number of concurrent PutObject/UploadPart requests that are allowed for multipart upload. This limits the
53-
* number of parts that can be in flight at any given time during a multipart putObject operation, preventing the client from
54-
* overwhelming the HTTP connection pool when uploading large objects.
55-
*
56-
* @return The value for the maximum number of concurrent UploadPart requests that are allowed for multipart upload.
57-
*/
58-
public Integer maxInFlightPutObjectParts() {
59-
return maxInFlightPutObjectParts;
60-
}
61-
6252
@Override
6353
public Builder toBuilder() {
64-
return builder().maxInFlightParts(maxInFlightParts)
65-
.maxInFlightPutObjectParts(maxInFlightPutObjectParts);
54+
return builder().maxInFlightParts(maxInFlightParts);
6655
}
6756

6857
public static class Builder implements CopyableBuilder<Builder, ParallelConfiguration> {
6958
private int maxInFlightParts;
70-
private Integer maxInFlightPutObjectParts;
7159

7260
public Builder maxInFlightParts(int maxInFlightParts) {
7361
this.maxInFlightParts = maxInFlightParts;
@@ -78,25 +66,6 @@ public int maxInFlightParts() {
7866
return maxInFlightParts;
7967
}
8068

81-
/**
82-
* Configures the maximum number of concurrent UploadPart requests that are allowed for multipart upload. This limits
83-
* the number of parts that can be in flight at any given time during a multipart putObject operation, preventing the
84-
* client from overwhelming the HTTP connection pool when uploading large objects.
85-
*
86-
* <p>Default value: 50 (matching the default HTTP client max concurrency)
87-
*
88-
* @param maxInFlightPutObjectParts the maximum number of concurrent UploadPart requests
89-
* @return an instance of this builder.
90-
*/
91-
public Builder maxInFlightPutObjectParts(Integer maxInFlightPutObjectParts) {
92-
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
93-
return this;
94-
}
95-
96-
public Integer maxInFlightPutObjectParts() {
97-
return maxInFlightPutObjectParts;
98-
}
99-
10069
@Override
10170
public ParallelConfiguration build() {
10271
return new ParallelConfiguration(this);

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartConfigurationResolverTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,6 @@ void resolveMaxInFlightParts_valueProvidedWithBuilder_shouldHonor() {
7070
assertThat(resolver.maxInFlightParts()).isEqualTo(1);
7171
}
7272

73-
@Test
74-
void resolveMaxInFlightPutObjectParts_valueProvidedWithBuilder_shouldHonor() {
75-
MultipartConfiguration configuration =
76-
MultipartConfiguration.builder()
77-
.parallelConfiguration(p -> p.maxInFlightPutObjectParts(10))
78-
.build();
79-
MultipartConfigurationResolver resolver = new MultipartConfigurationResolver(configuration);
80-
assertThat(resolver.maxInFlightPutObjectParts()).isEqualTo(10);
81-
}
82-
83-
@Test
84-
void resolveMaxInFlightPutObjectParts_valueNotProvided_shouldUseDefault() {
85-
MultipartConfigurationResolver resolver = new MultipartConfigurationResolver(MultipartConfiguration.builder()
86-
.build());
87-
assertThat(resolver.maxInFlightPutObjectParts()).isEqualTo(50);
88-
}
89-
9073
@Test
9174
void valueProvidedForAllFields_shouldHonor() {
9275
MultipartConfiguration configuration =
@@ -96,15 +79,13 @@ void valueProvidedForAllFields_shouldHonor() {
9679
.apiCallBufferSizeInBytes(3L)
9780
.parallelConfiguration(ParallelConfiguration.builder()
9881
.maxInFlightParts(1)
99-
.maxInFlightPutObjectParts(5)
10082
.build())
10183
.build();
10284
MultipartConfigurationResolver resolver = new MultipartConfigurationResolver(configuration);
10385
assertThat(resolver.minimalPartSizeInBytes()).isEqualTo(10L);
10486
assertThat(resolver.thresholdInBytes()).isEqualTo(8L);
10587
assertThat(resolver.apiCallBufferSize()).isEqualTo(3L);
10688
assertThat(resolver.maxInFlightParts()).isEqualTo(1);
107-
assertThat(resolver.maxInFlightPutObjectParts()).isEqualTo(5);
10889
}
10990

11091
@Test
@@ -115,7 +96,6 @@ void noValueProvided_shouldUseDefault() {
11596
assertThat(resolver.thresholdInBytes()).isEqualTo(8L * 1024 * 1024);
11697
assertThat(resolver.apiCallBufferSize()).isEqualTo(8L * 1024 * 1024 * 4);
11798
assertThat(resolver.maxInFlightParts()).isEqualTo(50);
118-
assertThat(resolver.maxInFlightPutObjectParts()).isEqualTo(50);
11999
}
120100

121101
}

0 commit comments

Comments
 (0)