Skip to content

Commit 59e4049

Browse files
authored
Merge branch 'master' into all-contributors/add-ryanyuan
2 parents 0b949f8 + 64181df commit 59e4049

13 files changed

Lines changed: 389 additions & 29 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fix inaccurate progress tracking for in-memory uploads in the Java-based S3TransferManager."
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "Amazon Cloudfront",
4+
"contributor": "",
5+
"description": "Add support for resourceUrlPattern to `CloudFrontUtilities.getCookiesForCustomPolicy`."
6+
}

core/auth/src/test/java/software/amazon/awssdk/auth/credentials/ProcessCredentialsProviderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,8 @@ void commandAsListOfStrings_isNotExecutedInAShell() {
355355
// executed not in a shell
356356
assertThat(e.getCause()).isInstanceOf(IOException.class);
357357
assertThat(e.getCause().getMessage())
358-
.isEqualTo("Cannot run program \"echo \"Hello, World!\" > output.txt; rm output.txt\": error=2, "
359-
+ "No such file or directory");
358+
.startsWith("Cannot run program \"echo \"Hello, World!\" > output.txt; rm output.txt\":")
359+
.contains("No such file or directory");
360360
}
361361
}
362362

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
2020
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.MULTIPART_DOWNLOAD_RESUME_CONTEXT;
2121
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.PAUSE_OBSERVABLE;
22+
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.REPORT_PROGRESS_IN_SINGLE_CHUNK;
2223
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN;
2324
import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer;
2425

@@ -138,17 +139,39 @@ public Upload upload(UploadRequest uploadRequest) {
138139
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest,
139140
requestBody.contentLength().orElse(null));
140141
progressUpdater.transferInitiated();
141-
requestBody = progressUpdater.wrapRequestBody(requestBody);
142+
boolean multipartEnabled = isS3ClientMultipartEnabled();
143+
boolean isByteBody = AsyncRequestBody.BodyType.BYTES.getName().equals(requestBody.body());
144+
// Suppress wrapper progress for byte bodies. All bytes are delivered to the publisher instantly,
145+
// so wrapper-based progress is misleading (jumps to 100% before data is sent over the wire).
146+
requestBody = progressUpdater.wrapRequestBody(requestBody, isByteBody);
147+
142148
progressUpdater.registerCompletion(returnFuture);
143149

144150
PutObjectRequest putObjectRequest = uploadRequest.putObjectRequest();
145-
if (isS3ClientMultipartEnabled()) {
146-
Consumer<AwsRequestOverrideConfiguration.Builder> attachProgressListener =
147-
b -> b.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener());
148-
putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener);
151+
if (multipartEnabled) {
152+
Consumer<AwsRequestOverrideConfiguration.Builder> attachProgressAttributes =
153+
b -> b.putExecutionAttribute(JAVA_PROGRESS_LISTENER, progressUpdater.multipartClientProgressListener())
154+
.putExecutionAttribute(REPORT_PROGRESS_IN_SINGLE_CHUNK, isByteBody);
155+
putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressAttributes);
149156
}
150157

151-
doUpload(putObjectRequest, requestBody, returnFuture);
158+
if (!multipartEnabled && isByteBody) {
159+
// For in-memory bodies on the non-multipart path, use an intermediate future so we can
160+
// report progress after the server responds but before completing returnFuture.
161+
CompletableFuture<CompletedUpload> uploadFuture = new CompletableFuture<>();
162+
doUpload(putObjectRequest, requestBody, uploadFuture);
163+
CompletableFutureUtils.forwardExceptionTo(returnFuture, uploadFuture);
164+
uploadFuture.whenComplete((result, error) -> {
165+
if (error != null) {
166+
returnFuture.completeExceptionally(error);
167+
} else {
168+
uploadRequest.requestBody().contentLength().ifPresent(progressUpdater::incrementBytesTransferred);
169+
returnFuture.complete(result);
170+
}
171+
});
172+
} else {
173+
doUpload(putObjectRequest, requestBody, returnFuture);
174+
}
152175

153176
return new DefaultUpload(returnFuture, progressUpdater.progress());
154177
}
@@ -192,7 +215,8 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
192215
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest,
193216
requestBody.contentLength().orElse(null));
194217
progressUpdater.transferInitiated();
195-
requestBody = progressUpdater.wrapRequestBody(requestBody);
218+
requestBody = progressUpdater.wrapRequestBody(requestBody, false);
219+
196220
progressUpdater.registerCompletion(returnFuture);
197221

198222
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,15 @@ public void transferInitiated() {
7676
listenerInvoker.transferInitiated(context);
7777
}
7878

79-
public AsyncRequestBody wrapRequestBody(AsyncRequestBody requestBody) {
79+
/**
80+
* Wraps the request body to track upload progress.
81+
*
82+
* @param requestBody the original request body
83+
* @param disableIncrementalProgress when {@code true}, the wrapper will not report byte-level progress. This is used
84+
* for in-memory byte bodies because all bytes are delivered to the publisher instantly and progress would jump to 100%
85+
* before any data is sent over the wire.
86+
*/
87+
public AsyncRequestBody wrapRequestBody(AsyncRequestBody requestBody, boolean disableIncrementalProgress) {
8088
return AsyncRequestBodyListener.wrap(
8189
requestBody,
8290
new AsyncRequestBodyListener() {
@@ -89,12 +97,14 @@ public void publisherSubscribe(Subscriber<? super ByteBuffer> subscriber) {
8997

9098
@Override
9199
public void subscriberOnNext(ByteBuffer byteBuffer) {
92-
incrementBytesTransferred(byteBuffer.limit());
93-
progress.snapshot().ratioTransferred().ifPresent(ratioTransferred -> {
94-
if (Double.compare(ratioTransferred, 1.0) == 0) {
95-
endOfStreamFutureCompleted();
96-
}
97-
});
100+
if (!disableIncrementalProgress) {
101+
incrementBytesTransferred(byteBuffer.limit());
102+
progress.snapshot().ratioTransferred().ifPresent(ratioTransferred -> {
103+
if (Double.compare(ratioTransferred, 1.0) == 0) {
104+
endOfStreamFutureCompleted();
105+
}
106+
});
107+
}
98108
}
99109

100110
@Override
@@ -117,6 +127,10 @@ private void endOfStreamFutureCompleted() {
117127

118128
/**
119129
* Progress listener for Java-based S3Client with multipart enabled.
130+
* <p>
131+
* For multipart uploads, this is the primary source of progress since the wrapper body is bypassed
132+
* by {@code splitCloseable}. For single-chunk uploads via {@code uploadInOneChunk}, this listener
133+
* reports progress after the server responds.
120134
*/
121135
public PublisherListener<Long> multipartClientProgressListener() {
122136

@@ -272,7 +286,7 @@ private void resetBytesTransferred() {
272286
progress.updateAndGet(b -> b.transferredBytes(0L));
273287
}
274288

275-
private void incrementBytesTransferred(long numBytes) {
289+
public void incrementBytesTransferred(long numBytes) {
276290
TransferProgressSnapshot snapshot = progress.updateAndGet(b -> {
277291
b.transferredBytes(b.getTransferredBytes() + numBytes);
278292
});

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,14 @@
7272
* <li>Be mindful that {@link #bytesTransferred(Context.BytesTransferred)} may be called extremely often (subject to I/O
7373
* buffer sizes). Be careful in implementing expensive operations as a side effect. Consider rate-limiting your side
7474
* effect operations, if needed.</li>
75-
* <li>In the case of uploads, there may be some delay between the bytes being fully transferred and the transfer
75+
* <li>In the case of multipart uploads, there may be some delay between the bytes being fully transferred and the transfer
7676
* successfully completing. Internally, {@link S3TransferManager} uses the Amazon S3
7777
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html">multipart upload API</a>
7878
* and must finalize uploads with a {@link CompleteMultipartUploadRequest}.</li>
79+
* <li>For single part in-memory uploads, 100% of the bytes are read immediately into memory and progress is only
80+
* reported once, after all bytes are sent and the HTTP response is received.</li>
81+
* <li>For single part file uploads, progress is reported when bytes are read from the file rather than when bytes are sent
82+
* so there will be some delay between when progress reaching 100% and the transfer successfully completing.</li>
7983
* <li>{@link TransferListener}s may be invoked by different threads. If your {@link TransferListener} is stateful,
8084
* ensure that it is also thread-safe.</li>
8185
* <li>{@link TransferListener}s are not intended to be used for control flow, and therefore your implementation

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferProgressUpdaterTest.java

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.mockito.ArgumentMatchers;
4444
import org.mockito.Mockito;
4545
import org.reactivestreams.Subscriber;
46+
import org.reactivestreams.Subscription;
4647
import software.amazon.awssdk.core.SdkResponse;
4748
import software.amazon.awssdk.core.async.AsyncRequestBody;
4849
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -111,7 +112,7 @@ void registerCompletion_differentTransferredByteRatios_alwaysCompletesOnce(Long
111112
sourceFile = new RandomTempFile(OBJ_SIZE);
112113
AsyncRequestBody requestBody = AsyncRequestBody.fromFile(sourceFile);
113114
TransferProgressUpdater transferProgressUpdater = new TransferProgressUpdater(transferRequest, givenContentLength);
114-
AsyncRequestBody asyncRequestBody = transferProgressUpdater.wrapRequestBody(requestBody);
115+
AsyncRequestBody asyncRequestBody = transferProgressUpdater.wrapRequestBody(requestBody, false);
115116

116117
CompletableFuture<CompletedObjectTransfer> completionFuture = completedObjectResponse(10);
117118
transferProgressUpdater.registerCompletion(completionFuture);
@@ -144,7 +145,7 @@ void transferFailedWhenSubscriptionErrors() throws Exception {
144145
Executors.newSingleThreadExecutor());
145146

146147
TransferProgressUpdater transferProgressUpdater = new TransferProgressUpdater(transferRequest, contentLength);
147-
AsyncRequestBody asyncRequestBody = transferProgressUpdater.wrapRequestBody(requestFileBody);
148+
AsyncRequestBody asyncRequestBody = transferProgressUpdater.wrapRequestBody(requestFileBody, false);
148149

149150
CompletableFuture<CompletedObjectTransfer> future = completedObjectResponse(10);
150151
transferProgressUpdater.registerCompletion(future);
@@ -254,6 +255,103 @@ public void exceptionOccurred(Throwable error) {
254255
}
255256

256257

258+
@Test
259+
void wrapRequestBody_disableIncrementalProgress_doesNotReportProgress() {
260+
byte[] data = new byte[1024];
261+
Arrays.fill(data, (byte) 'a');
262+
long contentLength = data.length;
263+
264+
TransferObjectRequest transferRequest = Mockito.mock(TransferObjectRequest.class);
265+
when(transferRequest.transferListeners()).thenReturn(Arrays.asList(captureTransferListener));
266+
267+
TransferProgressUpdater updater = new TransferProgressUpdater(transferRequest, contentLength);
268+
AsyncRequestBody inMemoryBody = AsyncRequestBody.fromBytes(data);
269+
AsyncRequestBody wrappedBody = updater.wrapRequestBody(inMemoryBody, true);
270+
271+
// Use a custom subscriber that captures progress snapshots at each stage
272+
AtomicReference<Long> bytesAfterOnNext = new AtomicReference<>();
273+
AtomicReference<Long> bytesAfterOnComplete = new AtomicReference<>();
274+
275+
wrappedBody.subscribe(new Subscriber<ByteBuffer>() {
276+
@Override
277+
public void onSubscribe(Subscription s) {
278+
s.request(Long.MAX_VALUE);
279+
}
280+
281+
@Override
282+
public void onNext(ByteBuffer byteBuffer) {
283+
bytesAfterOnNext.set(updater.progress().snapshot().transferredBytes());
284+
}
285+
286+
@Override
287+
public void onError(Throwable t) {
288+
}
289+
290+
@Override
291+
public void onComplete() {
292+
bytesAfterOnComplete.set(updater.progress().snapshot().transferredBytes());
293+
}
294+
});
295+
296+
// When disableIncrementalProgress is true, the wrapper should NOT report any progress.
297+
// Progress is instead reported by the JAVA_PROGRESS_LISTENER after the server responds.
298+
assertThat(bytesAfterOnNext.get()).isNotNull();
299+
assertThat(bytesAfterOnNext.get()).isEqualTo(0L);
300+
assertThat(bytesAfterOnComplete.get()).isEqualTo(0L);
301+
assertThat(updater.progress().snapshot().transferredBytes()).isEqualTo(0L);
302+
}
303+
304+
@Test
305+
void wrapRequestBody_noSuppression_reportsProgressPerByte() throws Exception {
306+
long fileSize = 1024;
307+
sourceFile = new RandomTempFile(fileSize);
308+
309+
TransferObjectRequest transferRequest = Mockito.mock(TransferObjectRequest.class);
310+
when(transferRequest.transferListeners()).thenReturn(Arrays.asList(captureTransferListener));
311+
312+
TransferProgressUpdater updater = new TransferProgressUpdater(transferRequest, fileSize);
313+
AsyncRequestBody fileBody = AsyncRequestBody.fromFile(sourceFile);
314+
AsyncRequestBody wrappedBody = updater.wrapRequestBody(fileBody, false);
315+
316+
CompletableFuture<CompletedObjectTransfer> completionFuture = completedObjectResponse(10);
317+
updater.registerCompletion(completionFuture);
318+
319+
// For file-based bodies, progress should be reported incrementally during onNext
320+
AtomicBoolean progressReportedDuringOnNext = new AtomicBoolean(false);
321+
CompletableFuture<Void> subscriberDone = new CompletableFuture<>();
322+
323+
wrappedBody.subscribe(new Subscriber<ByteBuffer>() {
324+
@Override
325+
public void onSubscribe(Subscription s) {
326+
s.request(Long.MAX_VALUE);
327+
}
328+
329+
@Override
330+
public void onNext(ByteBuffer byteBuffer) {
331+
long transferred = updater.progress().snapshot().transferredBytes();
332+
if (transferred > 0) {
333+
progressReportedDuringOnNext.set(true);
334+
}
335+
}
336+
337+
@Override
338+
public void onError(Throwable t) {
339+
subscriberDone.completeExceptionally(t);
340+
}
341+
342+
@Override
343+
public void onComplete() {
344+
subscriberDone.complete(null);
345+
}
346+
});
347+
348+
subscriberDone.get(5, TimeUnit.SECONDS);
349+
350+
// File body should have reported progress during onNext, not deferred
351+
assertThat(progressReportedDuringOnNext.get()).isTrue();
352+
assertThat(updater.progress().snapshot().transferredBytes()).isEqualTo(fileSize);
353+
}
354+
257355
private static class ExceptionThrowingByteArrayInputStream extends ByteArrayInputStream {
258356
private final int exceptionPosition;
259357

0 commit comments

Comments
 (0)