Skip to content

Commit 3b7565d

Browse files
committed
Cleanup tests, refactor UnknownContentLengthAsyncRequestBodySubscriber into its own class (following pattern from known content)
1 parent fe25917 commit 3b7565d

5 files changed

Lines changed: 509 additions & 296 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "Amazon S3",
4+
"contributor": "",
5+
"description": "Add support for maxInFlightParts to multipart upload (PutObject) in MultipartS3AsyncClient."
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.multipart;
17+
18+
import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.contentLengthMismatchForPart;
19+
import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.contentLengthMissingForPart;
20+
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
21+
22+
import java.util.Collection;
23+
import java.util.Comparator;
24+
import java.util.Optional;
25+
import java.util.Queue;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ConcurrentLinkedQueue;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
import org.reactivestreams.Subscriber;
32+
import org.reactivestreams.Subscription;
33+
import software.amazon.awssdk.annotations.SdkInternalApi;
34+
import software.amazon.awssdk.core.async.AsyncRequestBody;
35+
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
36+
import software.amazon.awssdk.core.async.listener.PublisherListener;
37+
import software.amazon.awssdk.core.exception.SdkClientException;
38+
import software.amazon.awssdk.services.s3.model.CompletedPart;
39+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
40+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
41+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
42+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
43+
import software.amazon.awssdk.utils.CompletableFutureUtils;
44+
import software.amazon.awssdk.utils.Logger;
45+
import software.amazon.awssdk.utils.Pair;
46+
47+
@SdkInternalApi
48+
public class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {
49+
private static final Logger log = Logger.loggerFor(UnknownContentLengthAsyncRequestBodySubscriber.class);
50+
51+
/**
52+
* Indicates whether this is the first async request body or not.
53+
*/
54+
private final AtomicBoolean firstAsyncRequestBodyReceived = new AtomicBoolean(false);
55+
56+
/**
57+
* Indicates whether CreateMultipartUpload has been initiated or not
58+
*/
59+
private final AtomicBoolean createMultipartUploadInitiated = new AtomicBoolean(false);
60+
61+
/**
62+
* Indicates whether CompleteMultipart has been initiated or not.
63+
*/
64+
private final AtomicBoolean completedMultipartInitiated = new AtomicBoolean(false);
65+
66+
/**
67+
* The number of AsyncRequestBody has been received but yet to be processed
68+
*/
69+
private final AtomicInteger asyncRequestBodyInFlight = new AtomicInteger(0);
70+
71+
private final AtomicBoolean failureActionInitiated = new AtomicBoolean(false);
72+
73+
private final AtomicInteger partNumber = new AtomicInteger(0);
74+
private final AtomicLong contentLength = new AtomicLong(0);
75+
76+
private final Queue<CompletedPart> completedParts = new ConcurrentLinkedQueue<>();
77+
private final Collection<CompletableFuture<CompletedPart>> futures = new ConcurrentLinkedQueue<>();
78+
79+
private final CompletableFuture<String> uploadIdFuture = new CompletableFuture<>();
80+
81+
private final long partSizeInBytes;
82+
private final PutObjectRequest putObjectRequest;
83+
private final CompletableFuture<PutObjectResponse> returnFuture;
84+
private final PublisherListener<Long> progressListener;
85+
private final MultipartUploadHelper multipartUploadHelper;
86+
private final GenericMultipartHelper<PutObjectRequest, PutObjectResponse> genericMultipartHelper;
87+
private final int maxInFlightParts;
88+
89+
private Subscription subscription;
90+
private CloseableAsyncRequestBody firstRequestBody;
91+
private String uploadId;
92+
private volatile boolean isDone;
93+
94+
UnknownContentLengthAsyncRequestBodySubscriber(
95+
long partSizeInBytes,
96+
PutObjectRequest putObjectRequest,
97+
CompletableFuture<PutObjectResponse> returnFuture,
98+
MultipartUploadHelper multipartUploadHelper,
99+
GenericMultipartHelper<PutObjectRequest, PutObjectResponse> genericMultipartHelper,
100+
int maxInFlightParts) {
101+
this.partSizeInBytes = partSizeInBytes;
102+
this.putObjectRequest = putObjectRequest;
103+
this.returnFuture = returnFuture;
104+
this.multipartUploadHelper = multipartUploadHelper;
105+
this.genericMultipartHelper = genericMultipartHelper;
106+
this.maxInFlightParts = maxInFlightParts;
107+
this.progressListener = putObjectRequest.overrideConfiguration()
108+
.map(c -> c.executionAttributes().getAttribute(JAVA_PROGRESS_LISTENER))
109+
.orElseGet(PublisherListener::noOp);
110+
}
111+
112+
@Override
113+
public void onSubscribe(Subscription s) {
114+
if (this.subscription != null) {
115+
log.warn(() -> "The subscriber has already been subscribed. Cancelling the incoming subscription");
116+
subscription.cancel();
117+
return;
118+
}
119+
this.subscription = s;
120+
s.request(1);
121+
returnFuture.whenComplete((r, t) -> {
122+
if (t != null) {
123+
s.cancel();
124+
MultipartUploadHelper.cancelingOtherOngoingRequests(futures, t);
125+
}
126+
});
127+
}
128+
129+
@Override
130+
public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
131+
if (asyncRequestBody == null) {
132+
NullPointerException exception = new NullPointerException("asyncRequestBody passed to onNext MUST NOT be null.");
133+
multipartUploadHelper.failRequestsElegantly(futures,
134+
exception, uploadId, returnFuture, putObjectRequest);
135+
throw exception;
136+
}
137+
138+
if (isDone) {
139+
return;
140+
}
141+
142+
int currentPartNum = partNumber.incrementAndGet();
143+
log.trace(() -> "Received asyncRequestBody " + asyncRequestBody.contentLength());
144+
asyncRequestBodyInFlight.incrementAndGet();
145+
146+
Optional<SdkClientException> sdkClientException = validatePart(asyncRequestBody, currentPartNum);
147+
if (sdkClientException.isPresent()) {
148+
multipartUploadHelper.failRequestsElegantly(futures, sdkClientException.get(), uploadId, returnFuture,
149+
putObjectRequest);
150+
subscription.cancel();
151+
return;
152+
}
153+
154+
if (firstAsyncRequestBodyReceived.compareAndSet(false, true)) {
155+
log.trace(() -> "Received first async request body");
156+
// If this is the first AsyncRequestBody received, request another one because we don't know if there is more
157+
firstRequestBody = asyncRequestBody;
158+
subscription.request(1);
159+
return;
160+
}
161+
162+
// If there are more than 1 AsyncRequestBodies, then we know we need to upload this
163+
// object using MPU
164+
if (createMultipartUploadInitiated.compareAndSet(false, true)) {
165+
log.debug(() -> "Starting the upload as multipart upload request");
166+
CompletableFuture<CreateMultipartUploadResponse> createMultipartUploadFuture =
167+
multipartUploadHelper.createMultipartUpload(putObjectRequest, returnFuture);
168+
169+
createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> {
170+
if (throwable != null) {
171+
genericMultipartHelper.handleException(returnFuture, () -> "Failed to initiate multipart upload",
172+
throwable);
173+
subscription.cancel();
174+
} else {
175+
uploadId = createMultipartUploadResponse.uploadId();
176+
log.debug(() -> "Initiated a new multipart upload, uploadId: " + uploadId);
177+
178+
sendUploadPartRequest(uploadId, firstRequestBody, 1);
179+
sendUploadPartRequest(uploadId, asyncRequestBody, 2);
180+
181+
// We need to complete the uploadIdFuture *after* the first two requests have been sent
182+
uploadIdFuture.complete(uploadId);
183+
}
184+
});
185+
CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture);
186+
} else {
187+
uploadIdFuture.whenComplete((r, t) -> {
188+
sendUploadPartRequest(uploadId, asyncRequestBody, currentPartNum);
189+
});
190+
}
191+
}
192+
193+
private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {
194+
Optional<Long> contentLength = asyncRequestBody.contentLength();
195+
if (!contentLength.isPresent()) {
196+
return Optional.of(contentLengthMissingForPart(currentPartNum));
197+
}
198+
199+
Long contentLengthCurrentPart = contentLength.get();
200+
if (contentLengthCurrentPart > partSizeInBytes) {
201+
return Optional.of(contentLengthMismatchForPart(partSizeInBytes, contentLengthCurrentPart, currentPartNum));
202+
}
203+
return Optional.empty();
204+
}
205+
206+
private void sendUploadPartRequest(String uploadId,
207+
CloseableAsyncRequestBody asyncRequestBody,
208+
int currentPartNum) {
209+
Long contentLengthCurrentPart = asyncRequestBody.contentLength().get();
210+
this.contentLength.getAndAdd(contentLengthCurrentPart);
211+
212+
multipartUploadHelper
213+
.sendIndividualUploadPartRequest(uploadId, completedParts::add, futures,
214+
uploadPart(asyncRequestBody, currentPartNum), progressListener)
215+
.whenComplete((r, t) -> {
216+
asyncRequestBody.close();
217+
if (t != null) {
218+
if (failureActionInitiated.compareAndSet(false, true)) {
219+
multipartUploadHelper.failRequestsElegantly(futures, t, uploadId, returnFuture, putObjectRequest);
220+
}
221+
} else {
222+
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
223+
if (!isDone && inFlight < maxInFlightParts) {
224+
synchronized (this) {
225+
subscription.request(1);
226+
}
227+
}
228+
completeMultipartUploadIfFinish(inFlight);
229+
}
230+
});
231+
if (asyncRequestBodyInFlight.get() < maxInFlightParts) {
232+
synchronized (this) {
233+
subscription.request(1);
234+
}
235+
}
236+
}
237+
238+
private Pair<UploadPartRequest, AsyncRequestBody> uploadPart(AsyncRequestBody asyncRequestBody, int partNum) {
239+
UploadPartRequest uploadRequest =
240+
SdkPojoConversionUtils.toUploadPartRequest(putObjectRequest,
241+
partNum,
242+
uploadId);
243+
244+
return Pair.of(uploadRequest, asyncRequestBody);
245+
}
246+
247+
@Override
248+
public void onError(Throwable t) {
249+
log.debug(() -> "Received onError() ", t);
250+
if (failureActionInitiated.compareAndSet(false, true)) {
251+
isDone = true;
252+
multipartUploadHelper.failRequestsElegantly(futures, t, uploadId, returnFuture, putObjectRequest);
253+
}
254+
}
255+
256+
@Override
257+
public void onComplete() {
258+
log.debug(() -> "Received onComplete()");
259+
// If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload, and if no
260+
// async request body has been received, it's an empty stream
261+
if (createMultipartUploadInitiated.get() == false) {
262+
log.debug(() -> "Starting the upload as a single object upload request");
263+
AsyncRequestBody entireRequestBody = firstAsyncRequestBodyReceived.get() ? firstRequestBody :
264+
AsyncRequestBody.empty();
265+
multipartUploadHelper.uploadInOneChunk(putObjectRequest, entireRequestBody, returnFuture);
266+
} else {
267+
isDone = true;
268+
completeMultipartUploadIfFinish(asyncRequestBodyInFlight.get());
269+
}
270+
}
271+
272+
private void completeMultipartUploadIfFinish(int requestsInFlight) {
273+
if (isDone && requestsInFlight == 0 && completedMultipartInitiated.compareAndSet(false, true)) {
274+
CompletedPart[] parts = completedParts.stream()
275+
.sorted(Comparator.comparingInt(CompletedPart::partNumber))
276+
.toArray(CompletedPart[]::new);
277+
278+
long totalLength = contentLength.get();
279+
int expectedNumParts = genericMultipartHelper.determinePartCount(totalLength, partSizeInBytes);
280+
if (parts.length != expectedNumParts) {
281+
SdkClientException exception = SdkClientException.create(
282+
String.format("The number of UploadParts requests is not equal to the expected number of parts. "
283+
+ "Expected: %d, Actual: %d", expectedNumParts, parts.length));
284+
multipartUploadHelper.failRequestsElegantly(futures, exception, uploadId, returnFuture, putObjectRequest);
285+
return;
286+
}
287+
288+
multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest,
289+
totalLength);
290+
}
291+
}
292+
}

0 commit comments

Comments
 (0)