Skip to content

Commit 86e5c9e

Browse files
authored
fix transfer progress overshoot (#6903)
* fix transfer progress overshoot * add changelog
1 parent 9568628 commit 86e5c9e

3 files changed

Lines changed: 138 additions & 2 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fix S3 Transfer Manager progress tracking overshoot when a multipart download part-get is retried after partial data delivery"
6+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Optional;
2121
import java.util.concurrent.CompletableFuture;
2222
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicLong;
2324
import org.reactivestreams.Subscriber;
2425
import software.amazon.awssdk.annotations.SdkInternalApi;
2526
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
@@ -253,19 +254,31 @@ public String name() {
253254
private <ResultT> SdkPublisher<AsyncResponseTransformer<GetObjectResponse, ResultT>> wrapIndividualTransformer(
254255
SdkPublisher<AsyncResponseTransformer<GetObjectResponse, ResultT>> publisher, GetObjectRequest request) {
255256
// each of the individual transformer for multipart file download
256-
return publisher.map(art -> AsyncResponseTransformerListener.wrap(
257+
return publisher.map(art -> {
258+
AtomicLong partBytesTransferred = new AtomicLong(0);
259+
return AsyncResponseTransformerListener.wrap(
257260
art,
258261
new AsyncResponseTransformerListener<GetObjectResponse>() {
259262
@Override
260263
public void transformerOnResponse(GetObjectResponse response) {
261264
multipartDownloadOnResponse(request, response);
262265
}
263266

267+
@Override
268+
public void publisherSubscribe(Subscriber<? super ByteBuffer> subscriber) {
269+
long previousPartBytes = partBytesTransferred.getAndSet(0);
270+
if (previousPartBytes > 0) {
271+
progress.updateAndGet(b -> b.transferredBytes(b.getTransferredBytes() - previousPartBytes));
272+
}
273+
}
274+
264275
@Override
265276
public void subscriberOnNext(ByteBuffer byteBuffer) {
277+
partBytesTransferred.addAndGet(byteBuffer.limit());
266278
incrementBytesTransferred(byteBuffer.limit());
267279
}
268-
}));
280+
});
281+
});
269282
}
270283

271284
public <ResultT> AsyncResponseTransformer<GetObjectResponse, ResultT> wrapResponseTransformer(

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/TransferProgressUpdaterTest.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.Executors;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
3435
import java.util.concurrent.atomic.AtomicReference;
3536
import java.util.stream.Stream;
3637
import org.apache.commons.lang3.RandomStringUtils;
@@ -45,6 +46,7 @@
4546
import org.reactivestreams.Subscriber;
4647
import org.reactivestreams.Subscription;
4748
import software.amazon.awssdk.core.SdkResponse;
49+
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
4850
import software.amazon.awssdk.core.async.AsyncRequestBody;
4951
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
5052
import software.amazon.awssdk.core.async.SdkPublisher;
@@ -352,6 +354,121 @@ public void onComplete() {
352354
assertThat(updater.progress().snapshot().transferredBytes()).isEqualTo(fileSize);
353355
}
354356

357+
@Test
358+
void wrapForNonSerialFileDownload_whenPartRetriedAfterPartialDelivery_progressShouldNotOvershoot() {
359+
TransferObjectRequest transferRequest = Mockito.mock(TransferObjectRequest.class);
360+
TransferProgressUpdater updater = new TransferProgressUpdater(transferRequest, null);
361+
362+
long objectSize = 100L;
363+
long partSize = 100L;
364+
365+
AtomicInteger upstreamBytesReceived = new AtomicInteger(0);
366+
AsyncResponseTransformer<GetObjectResponse, Void> delegate =
367+
new AsyncResponseTransformer<GetObjectResponse, Void>() {
368+
@Override
369+
public CompletableFuture<Void> prepare() {
370+
return new CompletableFuture<>();
371+
}
372+
373+
@Override
374+
public void onResponse(GetObjectResponse response) {
375+
}
376+
377+
@Override
378+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
379+
publisher.subscribe(new Subscriber<ByteBuffer>() {
380+
@Override
381+
public void onSubscribe(Subscription s) {
382+
s.request(Long.MAX_VALUE);
383+
}
384+
385+
@Override
386+
public void onNext(ByteBuffer byteBuffer) {
387+
upstreamBytesReceived.addAndGet(byteBuffer.remaining());
388+
}
389+
390+
@Override
391+
public void onError(Throwable t) {
392+
}
393+
394+
@Override
395+
public void onComplete() {
396+
}
397+
});
398+
}
399+
400+
@Override
401+
public void exceptionOccurred(Throwable error) {
402+
}
403+
};
404+
405+
GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket("b").key("k").build();
406+
AsyncResponseTransformer<GetObjectResponse, Void> wrapped =
407+
updater.wrapForNonSerialFileDownload(delegate, getObjectRequest);
408+
409+
AsyncResponseTransformer.SplitResult<GetObjectResponse, Void> splitResult =
410+
wrapped.split(SplittingTransformerConfiguration.builder()
411+
.bufferSizeInBytes(8 * 1024 * 1024L)
412+
.build());
413+
414+
AtomicReference<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> capturedTransformer =
415+
new AtomicReference<>();
416+
splitResult.publisher().subscribe(new Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>>() {
417+
@Override
418+
public void onSubscribe(Subscription s) {
419+
s.request(1);
420+
}
421+
422+
@Override
423+
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> t) {
424+
capturedTransformer.set(t);
425+
}
426+
427+
@Override
428+
public void onError(Throwable t) {
429+
}
430+
431+
@Override
432+
public void onComplete() {
433+
}
434+
});
435+
436+
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> partTransformer = capturedTransformer.get();
437+
assertThat(partTransformer).isNotNull();
438+
439+
// First attempt
440+
partTransformer.prepare();
441+
partTransformer.onResponse(GetObjectResponse.builder()
442+
.contentRange("bytes 0-99/100")
443+
.contentLength(partSize)
444+
.build());
445+
assertThat(updater.progress().snapshot().totalBytes()).hasValue(objectSize);
446+
447+
SimplePublisher<ByteBuffer> attempt1Publisher = new SimplePublisher<>();
448+
partTransformer.onStream(SdkPublisher.adapt(attempt1Publisher));
449+
attempt1Publisher.send(ByteBuffer.wrap(new byte[60])).join();
450+
assertThat(updater.progress().snapshot().transferredBytes()).isEqualTo(60L);
451+
452+
//Retry: prepare/onResponse/onStream called again on same transformer
453+
partTransformer.prepare();
454+
partTransformer.onResponse(GetObjectResponse.builder()
455+
.contentRange("bytes 0-99/100")
456+
.contentLength(partSize)
457+
.build());
458+
459+
SimplePublisher<ByteBuffer> attempt2Publisher = new SimplePublisher<>();
460+
partTransformer.onStream(SdkPublisher.adapt(attempt2Publisher));
461+
462+
assertThat(updater.progress().snapshot().transferredBytes())
463+
.as("transferredBytes should be reset on retry")
464+
.isEqualTo(0L);
465+
466+
attempt2Publisher.send(ByteBuffer.wrap(new byte[(int) partSize])).join();
467+
assertThat(updater.progress().snapshot().transferredBytes()).isEqualTo(partSize);
468+
469+
assertThat(upstreamBytesReceived.get()).isEqualTo(60 + (int) partSize);
470+
}
471+
355472
private static class ExceptionThrowingByteArrayInputStream extends ByteArrayInputStream {
356473
private final int exceptionPosition;
357474

0 commit comments

Comments
 (0)