Skip to content

Commit 6b777a8

Browse files
authored
Fix TransferListener callbacks for unknown content length uploads (#6898)
* Fix TransferListener callbacks for unknown content length uploads * Improve comments * Fix error propogation * Restore comments
1 parent 57dd539 commit 6b777a8

3 files changed

Lines changed: 158 additions & 12 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fix TransferListener callbacks (bytesTransferred, transferComplete) not firing for unknown-content-length uploads via S3TransferManager when the data fits in a single chunk."
6+
}

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3JavaMultipartTransferProgressListenerTest.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,19 @@
2323
import static com.github.tomakehurst.wiremock.client.WireMock.put;
2424
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
2525
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
26+
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
2627
import static org.assertj.core.api.Assertions.assertThat;
2728
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2829
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
30+
import static org.mockito.Mockito.atLeastOnce;
2931
import static org.mockito.Mockito.mock;
3032
import static org.mockito.Mockito.timeout;
3133
import static org.mockito.Mockito.times;
3234

3335
import com.github.tomakehurst.wiremock.client.WireMock;
3436
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
3537
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
38+
import java.io.ByteArrayInputStream;
3639
import java.io.IOException;
3740
import java.net.URI;
3841
import java.time.Duration;
@@ -47,6 +50,8 @@
4750
import org.mockito.Mockito;
4851
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
4952
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
53+
import software.amazon.awssdk.core.async.AsyncRequestBody;
54+
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
5055
import software.amazon.awssdk.regions.Region;
5156
import software.amazon.awssdk.services.s3.S3AsyncClient;
5257
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
@@ -57,6 +62,7 @@
5762
import software.amazon.awssdk.transfer.s3.S3TransferManager;
5863
import software.amazon.awssdk.transfer.s3.model.Copy;
5964
import software.amazon.awssdk.transfer.s3.model.FileUpload;
65+
import software.amazon.awssdk.transfer.s3.model.Upload;
6066
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
6167
import software.amazon.awssdk.transfer.s3.progress.TransferListener;
6268

@@ -358,6 +364,133 @@ void copyWithJavaBasedClient_listeners_reports_ProgressWhenSuccess_copy() {
358364
Mockito.verify(transferListenerMock, times(numTimesBytesTransferred)).bytesTransferred(ArgumentMatchers.any());
359365
}
360366

367+
/**
368+
* Verifies that TransferListener callbacks fire for unknown-content-length uploads that fit in a single chunk.
369+
* This is the scenario where UploadWithUnknownContentLengthHelper routes to uploadInOneChunk.
370+
*/
371+
@Test
372+
void unknownContentLength_singleChunk_transferCompleteFires() {
373+
S3AsyncClient s3Async = s3AsyncClient(true);
374+
375+
stubFor(put(urlPathEqualTo("/" + EXAMPLE_BUCKET + "/" + TEST_KEY))
376+
.willReturn(aResponse().withStatus(200).withBody("<body/>")));
377+
378+
S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
379+
mock(TransferManagerConfiguration.class),
380+
mock(DownloadDirectoryHelper.class));
381+
CaptureTransferListener transferListener = new CaptureTransferListener();
382+
TransferListener transferListenerMock = mock(TransferListener.class);
383+
384+
BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);
385+
386+
Upload upload = tm.upload(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY))
387+
.requestBody(body)
388+
.addTransferListener(transferListener)
389+
.addTransferListener(transferListenerMock)
390+
.build());
391+
392+
// Write small data (fits in one chunk) and close the stream
393+
byte[] data = new byte[1024];
394+
body.writeInputStream(new ByteArrayInputStream(data));
395+
396+
upload.completionFuture().join();
397+
398+
assertTransferListenerCompletion(transferListener);
399+
assertThat(transferListener.isTransferInitiated()).isTrue();
400+
assertThat(transferListener.isTransferComplete()).isTrue();
401+
assertThat(transferListener.getExceptionCaught()).isNull();
402+
403+
Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
404+
Mockito.verify(transferListenerMock, timeout(1000).times(1)).transferComplete(ArgumentMatchers.any());
405+
Mockito.verify(transferListenerMock, times(0)).transferFailed(ArgumentMatchers.any());
406+
}
407+
408+
/**
409+
* Verifies that TransferListener callbacks fire for unknown-content-length uploads that exceed the part size
410+
* and go through the multipart upload path.
411+
*/
412+
@Test
413+
void unknownContentLength_multiChunk_allCallbacksFire() {
414+
S3AsyncClient s3Async = s3AsyncClient(true);
415+
416+
String createMpuUrl = "/" + EXAMPLE_BUCKET + "/" + TEST_KEY + "?uploads";
417+
String createMpuResponse = "<CreateMultipartUploadResult><UploadId>1234</UploadId></CreateMultipartUploadResult>";
418+
stubFor(post(urlEqualTo(createMpuUrl)).willReturn(aResponse().withStatus(200).withBody(createMpuResponse)));
419+
stubFor(any(anyUrl()).atPriority(6).willReturn(aResponse().withStatus(200).withBody("<body/>")));
420+
421+
S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
422+
mock(TransferManagerConfiguration.class),
423+
mock(DownloadDirectoryHelper.class));
424+
CaptureTransferListener transferListener = new CaptureTransferListener();
425+
TransferListener transferListenerMock = mock(TransferListener.class);
426+
427+
BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);
428+
429+
Upload upload = tm.upload(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY))
430+
.requestBody(body)
431+
.addTransferListener(transferListener)
432+
.addTransferListener(transferListenerMock)
433+
.build());
434+
435+
// Write data larger than the default 8 MiB part size to force multipart
436+
byte[] data = new byte[OBJ_SIZE];
437+
body.writeInputStream(new ByteArrayInputStream(data));
438+
439+
upload.completionFuture().join();
440+
441+
assertTransferListenerCompletion(transferListener);
442+
assertThat(transferListener.isTransferInitiated()).isTrue();
443+
assertThat(transferListener.isTransferComplete()).isTrue();
444+
assertThat(transferListener.getExceptionCaught()).isNull();
445+
446+
Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
447+
Mockito.verify(transferListenerMock, timeout(1000).times(1)).transferComplete(ArgumentMatchers.any());
448+
Mockito.verify(transferListenerMock, times(0)).transferFailed(ArgumentMatchers.any());
449+
Mockito.verify(transferListenerMock, atLeastOnce()).bytesTransferred(ArgumentMatchers.any());
450+
}
451+
452+
/**
453+
* Verifies that when an unknown-content-length upload fails on the single-chunk path,
454+
* the completionFuture completes exceptionally and transferFailed fires.
455+
* This guards against regressions where the failure path in uploadInOneChunk does not
456+
* propagate the exception to returnFuture, causing the upload to hang indefinitely.
457+
*/
458+
@Test
459+
void unknownContentLength_singleChunk_failurePropagates() {
460+
S3AsyncClient s3Async = s3AsyncClient(true);
461+
462+
stubFor(put(urlPathEqualTo("/" + EXAMPLE_BUCKET + "/" + TEST_KEY))
463+
.willReturn(aResponse().withStatus(500).withBody(ERROR_BODY)));
464+
465+
S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
466+
mock(TransferManagerConfiguration.class),
467+
mock(DownloadDirectoryHelper.class));
468+
CaptureTransferListener transferListener = new CaptureTransferListener();
469+
TransferListener transferListenerMock = mock(TransferListener.class);
470+
471+
BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);
472+
473+
Upload upload = tm.upload(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY))
474+
.requestBody(body)
475+
.addTransferListener(transferListener)
476+
.addTransferListener(transferListenerMock)
477+
.build());
478+
479+
byte[] data = new byte[1024];
480+
body.writeInputStream(new ByteArrayInputStream(data));
481+
482+
assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> upload.completionFuture().join());
483+
484+
assertTransferListenerCompletion(transferListener);
485+
assertThat(transferListener.isTransferInitiated()).isTrue();
486+
assertThat(transferListener.isTransferComplete()).isFalse();
487+
assertThat(transferListener.getExceptionCaught()).isNotNull();
488+
489+
Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
490+
Mockito.verify(transferListenerMock, times(0)).transferComplete(ArgumentMatchers.any());
491+
Mockito.verify(transferListenerMock, timeout(1000).times(1)).transferFailed(ArgumentMatchers.any());
492+
}
493+
361494
private static void assertTransferListenerCompletion(CaptureTransferListener transferListener) {
362495
Duration waitDuration = Duration.ofSeconds(5);
363496
assertTimeoutPreemptively(

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -159,23 +159,30 @@ void uploadInOneChunk(PutObjectRequest putObjectRequest,
159159
.getAttribute(REPORT_PROGRESS_IN_SINGLE_CHUNK))
160160
.orElse(Boolean.FALSE);
161161

162+
PublisherListener<Long> progressListener = putObjectRequest.overrideConfiguration()
163+
.map(c -> c.executionAttributes()
164+
.getAttribute(JAVA_PROGRESS_LISTENER))
165+
.orElseGet(PublisherListener::noOp);
166+
162167
CompletableFuture<PutObjectResponse> putObjectResponseCompletableFuture = s3AsyncClient.putObject(putObjectRequest,
163168
asyncRequestBody);
164169
CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectResponseCompletableFuture);
165170

166-
if (reportProgress) {
167-
PublisherListener<Long> progressListener = putObjectRequest.overrideConfiguration()
168-
.map(c -> c.executionAttributes()
169-
.getAttribute(JAVA_PROGRESS_LISTENER))
170-
.orElseGet(PublisherListener::noOp);
171-
putObjectResponseCompletableFuture.thenAccept(response -> {
171+
putObjectResponseCompletableFuture.whenComplete((response, throwable) -> {
172+
if (throwable != null) {
173+
returnFuture.completeExceptionally(throwable);
174+
return;
175+
}
176+
if (reportProgress) {
172177
asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext);
173-
progressListener.subscriberOnComplete();
174-
returnFuture.complete(response);
175-
});
176-
} else {
177-
CompletableFutureUtils.forwardResultTo(putObjectResponseCompletableFuture, returnFuture);
178-
}
178+
}
179+
// Always signal completion so that TransferProgressUpdater's endOfStreamFuture completes
180+
// and the TransferListener's transferComplete callback fires.
181+
// For unknown content length we don't know if it wil lbe one chunk or not ahead of time
182+
// and so don't set REPORT_PROGRESS_IN_SINGLE_CHUNK attribute.
183+
progressListener.subscriberOnComplete();
184+
returnFuture.complete(response);
185+
});
179186
}
180187

181188
static SdkClientException contentLengthMissingForPart(int currentPartNum) {

0 commit comments

Comments
 (0)