Skip to content

Commit 488d51d

Browse files
committed
Add support for maxInFlightPutObjectParts in MultipartS3AsyncClient
1 parent a95286b commit 488d51d

10 files changed

Lines changed: 193 additions & 17 deletions

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
7070
private final AtomicReferenceArray<CompletedPart> completedParts;
7171
private final Map<Integer, CompletedPart> existingParts;
7272
private final PublisherListener<Long> progressListener;
73+
private final int maxInFlightPutObjectParts;
7374
private Subscription subscription;
7475
private volatile boolean isDone;
7576
private volatile boolean isPaused;
@@ -81,7 +82,8 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
8182

8283
KnownContentLengthAsyncRequestBodySubscriber(MpuRequestContext mpuRequestContext,
8384
CompletableFuture<PutObjectResponse> returnFuture,
84-
MultipartUploadHelper multipartUploadHelper) {
85+
MultipartUploadHelper multipartUploadHelper,
86+
int maxInFlightPutObjectParts) {
8587
this.totalSize = mpuRequestContext.contentLength();
8688
this.partSize = mpuRequestContext.partSize();
8789
this.expectedNumParts = mpuRequestContext.expectedNumParts();
@@ -92,6 +94,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
9294
this.existingNumParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted());
9395
this.completedParts = new AtomicReferenceArray<>(expectedNumParts);
9496
this.multipartUploadHelper = multipartUploadHelper;
97+
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
9598
this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes()
9699
.getAttribute(JAVA_PROGRESS_LISTENER))
97100
.orElseGet(PublisherListener::noOp);
@@ -159,6 +162,7 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
159162
asyncRequestBody.subscribe(new CancelledSubscriber<>());
160163
asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext);
161164
asyncRequestBody.close();
165+
162166
subscription.request(1);
163167
return;
164168
}
@@ -192,10 +196,16 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
192196
subscription.cancel();
193197
}
194198
} else {
195-
completeMultipartUploadIfFinished(asyncRequestBodyInFlight.decrementAndGet());
199+
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
200+
if (!isDone && inFlight < maxInFlightPutObjectParts) {
201+
subscription.request(1);
202+
}
203+
completeMultipartUploadIfFinished(inFlight);
196204
}
197205
});
198-
subscription.request(1);
206+
if (asyncRequestBodyInFlight.get() < maxInFlightPutObjectParts) {
207+
subscription.request(1);
208+
}
199209
}
200210

201211
private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartConfigurationResolver.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public final class MultipartConfigurationResolver {
3535
private final long apiCallBufferSize;
3636
private final long thresholdInBytes;
3737
private final int maxInFlightParts;
38+
private final int maxInFlightPutObjectParts;
3839

3940
public MultipartConfigurationResolver(MultipartConfiguration multipartConfiguration) {
4041
Validate.notNull(multipartConfiguration, "multipartConfiguration");
@@ -46,9 +47,13 @@ public MultipartConfigurationResolver(MultipartConfiguration multipartConfigurat
4647
ParallelConfiguration parallelConfiguration = multipartConfiguration.parallelConfiguration();
4748
if (parallelConfiguration == null) {
4849
this.maxInFlightParts = DEFAULT_MAX_IN_FLIGHT_PARTS;
50+
this.maxInFlightPutObjectParts = DEFAULT_MAX_IN_FLIGHT_PARTS;
4951
} else {
5052
this.maxInFlightParts = Validate.getOrDefault(multipartConfiguration.parallelConfiguration().maxInFlightParts(),
5153
() -> DEFAULT_MAX_IN_FLIGHT_PARTS);
54+
this.maxInFlightPutObjectParts = Validate.getOrDefault(
55+
multipartConfiguration.parallelConfiguration().maxInFlightPutObjectParts(),
56+
() -> DEFAULT_MAX_IN_FLIGHT_PARTS);
5257
}
5358
}
5459

@@ -67,4 +72,8 @@ public long apiCallBufferSize() {
6772
public int maxInFlightParts() {
6873
return maxInFlightParts;
6974
}
75+
76+
public int maxInFlightPutObjectParts() {
77+
return maxInFlightPutObjectParts;
78+
}
7079
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private MultipartS3AsyncClient(S3AsyncClient delegate, MultipartConfiguration mu
6363
long threshold = resolver.thresholdInBytes();
6464
long apiCallBufferSize = resolver.apiCallBufferSize();
6565
int maxInFlightParts = resolver.maxInFlightParts();
66+
int maxInFlightPutObjectParts = resolver.maxInFlightPutObjectParts();
6667
mpuHelper = new UploadObjectHelper(delegate, resolver);
6768
copyObjectHelper = new CopyObjectHelper(delegate, minPartSizeInBytes, threshold);
6869
downloadObjectHelper = new DownloadObjectHelper(delegate, apiCallBufferSize, maxInFlightParts);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,17 @@ public UploadObjectHelper(S3AsyncClient s3AsyncClient,
4848
SdkPojoConversionUtils::toPutObjectResponse);
4949
this.apiCallBufferSize = resolver.apiCallBufferSize();
5050
this.multipartUploadThresholdInBytes = resolver.thresholdInBytes();
51+
int maxInFlightPutObjectParts = resolver.maxInFlightPutObjectParts();
5152
this.uploadWithKnownContentLength = new UploadWithKnownContentLengthHelper(s3AsyncClient,
5253
partSizeInBytes,
5354
multipartUploadThresholdInBytes,
54-
apiCallBufferSize);
55+
apiCallBufferSize,
56+
maxInFlightPutObjectParts);
5557
this.uploadWithUnknownContentLength = new UploadWithUnknownContentLengthHelper(s3AsyncClient,
5658
partSizeInBytes,
5759
multipartUploadThresholdInBytes,
58-
apiCallBufferSize);
60+
apiCallBufferSize,
61+
maxInFlightPutObjectParts);
5962
}
6063

6164
public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObjectRequest,

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,13 @@ public final class UploadWithKnownContentLengthHelper {
5050
private final long maxMemoryUsageInBytes;
5151
private final long multipartUploadThresholdInBytes;
5252
private final MultipartUploadHelper multipartUploadHelper;
53+
private final int maxInFlightPutObjectParts;
5354

5455
public UploadWithKnownContentLengthHelper(S3AsyncClient s3AsyncClient,
5556
long partSizeInBytes,
5657
long multipartUploadThresholdInBytes,
57-
long maxMemoryUsageInBytes) {
58+
long maxMemoryUsageInBytes,
59+
int maxInFlightPutObjectParts) {
5860
this.s3AsyncClient = s3AsyncClient;
5961
this.partSizeInBytes = partSizeInBytes;
6062
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
@@ -64,6 +66,7 @@ public UploadWithKnownContentLengthHelper(S3AsyncClient s3AsyncClient,
6466
this.multipartUploadThresholdInBytes = multipartUploadThresholdInBytes;
6567
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, multipartUploadThresholdInBytes,
6668
maxMemoryUsageInBytes);
69+
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
6770
}
6871

6972
public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObjectRequest,
@@ -181,7 +184,8 @@ private void resumePausedUpload(ResumeRequestContext resumeContext) {
181184

182185
private void splitAndSubscribe(MpuRequestContext mpuRequestContext, CompletableFuture<PutObjectResponse> returnFuture) {
183186
KnownContentLengthAsyncRequestBodySubscriber subscriber =
184-
new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper);
187+
new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper,
188+
maxInFlightPutObjectParts);
185189

186190
attachSubscriberToObservable(subscriber, mpuRequestContext.request().left());
187191

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@ public final class UploadWithUnknownContentLengthHelper {
6262
private final long multipartUploadThresholdInBytes;
6363

6464
private final MultipartUploadHelper multipartUploadHelper;
65+
private final int maxInFlightPutObjectParts;
6566

6667
public UploadWithUnknownContentLengthHelper(S3AsyncClient s3AsyncClient,
6768
long partSizeInBytes,
6869
long multipartUploadThresholdInBytes,
69-
long maxMemoryUsageInBytes) {
70+
long maxMemoryUsageInBytes,
71+
int maxInFlightPutObjectParts) {
7072
this.s3AsyncClient = s3AsyncClient;
7173
this.partSizeInBytes = partSizeInBytes;
7274
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
@@ -76,6 +78,7 @@ public UploadWithUnknownContentLengthHelper(S3AsyncClient s3AsyncClient,
7678
this.multipartUploadThresholdInBytes = multipartUploadThresholdInBytes;
7779
this.multipartUploadHelper = new MultipartUploadHelper(s3AsyncClient, multipartUploadThresholdInBytes,
7880
maxMemoryUsageInBytes);
81+
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
7982
}
8083

8184
public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObjectRequest,
@@ -255,12 +258,20 @@ private void sendUploadPartRequest(String uploadId,
255258
multipartUploadHelper.failRequestsElegantly(futures, t, uploadId, returnFuture, putObjectRequest);
256259
}
257260
} else {
258-
completeMultipartUploadIfFinish(asyncRequestBodyInFlight.decrementAndGet());
261+
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
262+
if (!isDone && inFlight < maxInFlightPutObjectParts) {
263+
synchronized (UnknownContentLengthAsyncRequestBodySubscriber.this) {
264+
subscription.request(1);
265+
}
266+
}
267+
completeMultipartUploadIfFinish(inFlight);
259268
}
260269
});
261-
synchronized (this) {
262-
subscription.request(1);
263-
};
270+
if (asyncRequestBodyInFlight.get() < maxInFlightPutObjectParts) {
271+
synchronized (this) {
272+
subscription.request(1);
273+
}
274+
}
264275
}
265276

266277
private Pair<UploadPartRequest, AsyncRequestBody> uploadPart(AsyncRequestBody asyncRequestBody, int partNum) {

services/s3/src/main/java/software/amazon/awssdk/services/s3/multipart/ParallelConfiguration.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,45 @@
2929
public class ParallelConfiguration implements ToCopyableBuilder<ParallelConfiguration.Builder, ParallelConfiguration> {
3030

3131
private final Integer maxInFlightParts;
32+
private final Integer maxInFlightPutObjectParts;
3233

3334
public ParallelConfiguration(Builder builder) {
3435
this.maxInFlightParts = builder.maxInFlightParts;
36+
this.maxInFlightPutObjectParts = builder.maxInFlightPutObjectParts;
3537
}
3638

3739
public static Builder builder() {
3840
return new Builder();
3941
}
4042

4143
/**
42-
* The maximum number of concurrent GetObject the that are allowed for multipart download.
43-
* @return The value for the maximum number of concurrent GetObject the that are allowed for multipart download.
44+
* The maximum number of concurrent GetObject requests that are allowed for multipart download.
45+
* @return The value for the maximum number of concurrent GetObject requests that are allowed for multipart download.
4446
*/
4547
public Integer maxInFlightParts() {
4648
return maxInFlightParts;
4749
}
4850

51+
/**
52+
* The maximum number of concurrent PutObject/UploadPart requests that are allowed for multipart upload. This limits the
53+
* number of parts that can be in flight at any given time during a multipart putObject operation, preventing the client from
54+
* overwhelming the HTTP connection pool when uploading large objects.
55+
*
56+
* @return The value for the maximum number of concurrent UploadPart requests that are allowed for multipart upload.
57+
*/
58+
public Integer maxInFlightPutObjectParts() {
59+
return maxInFlightPutObjectParts;
60+
}
61+
4962
@Override
5063
public Builder toBuilder() {
51-
return builder().maxInFlightParts(maxInFlightParts);
64+
return builder().maxInFlightParts(maxInFlightParts)
65+
.maxInFlightPutObjectParts(maxInFlightPutObjectParts);
5266
}
5367

5468
public static class Builder implements CopyableBuilder<Builder, ParallelConfiguration> {
5569
private int maxInFlightParts;
70+
private Integer maxInFlightPutObjectParts;
5671

5772
public Builder maxInFlightParts(int maxInFlightParts) {
5873
this.maxInFlightParts = maxInFlightParts;
@@ -63,6 +78,25 @@ public int maxInFlightParts() {
6378
return maxInFlightParts;
6479
}
6580

81+
/**
82+
* Configures the maximum number of concurrent UploadPart requests that are allowed for multipart upload. This limits
83+
* the number of parts that can be in flight at any given time during a multipart putObject operation, preventing the
84+
* client from overwhelming the HTTP connection pool when uploading large objects.
85+
*
86+
* <p>Default value: 50 (matching the default HTTP client max concurrency)
87+
*
88+
* @param maxInFlightPutObjectParts the maximum number of concurrent UploadPart requests
89+
* @return an instance of this builder.
90+
*/
91+
public Builder maxInFlightPutObjectParts(Integer maxInFlightPutObjectParts) {
92+
this.maxInFlightPutObjectParts = maxInFlightPutObjectParts;
93+
return this;
94+
}
95+
96+
public Integer maxInFlightPutObjectParts() {
97+
return maxInFlightPutObjectParts;
98+
}
99+
66100
@Override
67101
public ParallelConfiguration build() {
68102
return new ParallelConfiguration(this);

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.ArgumentMatchers.any;
2020
import static org.mockito.ArgumentMatchers.eq;
2121
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.times;
2223
import static org.mockito.Mockito.verify;
2324
import static org.mockito.Mockito.when;
2425

@@ -215,6 +216,50 @@ private S3ResumeToken testPauseScenario(int numExistingParts,
215216
return subscriber.pause();
216217
}
217218

219+
@Test
220+
void maxInFlightPutObjectParts_shouldLimitConcurrentUploads() {
221+
int maxInFlight = 2;
222+
long contentSize = 5 * PART_SIZE;
223+
int totalParts = 5;
224+
225+
MpuRequestContext context = MpuRequestContext.builder()
226+
.request(Pair.of(putObjectRequest, asyncRequestBody))
227+
.contentLength(contentSize)
228+
.partSize(PART_SIZE)
229+
.uploadId(UPLOAD_ID)
230+
.numPartsCompleted(0L)
231+
.expectedNumParts(totalParts)
232+
.build();
233+
234+
// Use non-completing futures to simulate slow uploads so parts stay in-flight
235+
CompletableFuture<CompletedPart> pendingFuture1 = new CompletableFuture<>();
236+
CompletableFuture<CompletedPart> pendingFuture2 = new CompletableFuture<>();
237+
CompletableFuture<CompletedPart> pendingFuture3 = new CompletableFuture<>();
238+
239+
when(multipartUploadHelper.sendIndividualUploadPartRequest(eq(UPLOAD_ID), any(), any(), any(), any()))
240+
.thenReturn(pendingFuture1)
241+
.thenReturn(pendingFuture2)
242+
.thenReturn(pendingFuture3);
243+
244+
KnownContentLengthAsyncRequestBodySubscriber sub = createSubscriber(context, maxInFlight);
245+
Subscription mockSubscription = mock(Subscription.class);
246+
sub.onSubscribe(mockSubscription);
247+
248+
// First onNext: in-flight goes to 1, which is < maxInFlight(2), so subscription.request(1) is called
249+
sub.onNext(createMockAsyncRequestBody(PART_SIZE));
250+
// onSubscribe calls request(1), and first onNext calls request(1) since inFlight(1) < max(2)
251+
verify(mockSubscription, times(2)).request(1);
252+
253+
// Second onNext: in-flight goes to 2, which is NOT < maxInFlight(2), so no additional request
254+
sub.onNext(createMockAsyncRequestBody(PART_SIZE));
255+
// No additional request(1) call since we're at the limit
256+
verify(mockSubscription, times(2)).request(1);
257+
258+
// Complete the first part — the completion callback should call request(1) since in-flight drops to 1
259+
pendingFuture1.complete(CompletedPart.builder().partNumber(1).build());
260+
verify(mockSubscription, times(3)).request(1);
261+
}
262+
218263
private MpuRequestContext createDefaultMpuRequestContext() {
219264
return MpuRequestContext.builder()
220265
.request(Pair.of(putObjectRequest, AsyncRequestBody.fromFile(testFile)))
@@ -240,7 +285,13 @@ private MpuRequestContext createMpuRequestContextWithExistingParts(int numExisti
240285
}
241286

242287
private KnownContentLengthAsyncRequestBodySubscriber createSubscriber(MpuRequestContext mpuRequestContext) {
243-
return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper);
288+
return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper, 50);
289+
}
290+
291+
private KnownContentLengthAsyncRequestBodySubscriber createSubscriber(MpuRequestContext mpuRequestContext,
292+
int maxInFlightPutObjectParts) {
293+
return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper,
294+
maxInFlightPutObjectParts);
244295
}
245296

246297
private CloseableAsyncRequestBody createMockAsyncRequestBody(long contentLength) {

0 commit comments

Comments
 (0)