Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-336f8cc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fix TransferListener callbacks (bytesTransferred, transferComplete) not firing for unknown-content-length uploads via S3TransferManager when the data fits in a single chunk."
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@
import static com.github.tomakehurst.wiremock.client.WireMock.put;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;

import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
Expand All @@ -47,6 +50,8 @@
import org.mockito.Mockito;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
Expand All @@ -57,6 +62,7 @@
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
import software.amazon.awssdk.transfer.s3.progress.TransferListener;

Expand Down Expand Up @@ -358,6 +364,133 @@ void copyWithJavaBasedClient_listeners_reports_ProgressWhenSuccess_copy() {
Mockito.verify(transferListenerMock, times(numTimesBytesTransferred)).bytesTransferred(ArgumentMatchers.any());
}

/**
* Verifies that TransferListener callbacks fire for unknown-content-length uploads that fit in a single chunk.
* This is the scenario where UploadWithUnknownContentLengthHelper routes to uploadInOneChunk.
*/
@Test
void unknownContentLength_singleChunk_transferCompleteFires() {
S3AsyncClient s3Async = s3AsyncClient(true);

stubFor(put(urlPathEqualTo("/" + EXAMPLE_BUCKET + "/" + TEST_KEY))
.willReturn(aResponse().withStatus(200).withBody("<body/>")));

S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
mock(TransferManagerConfiguration.class),
mock(DownloadDirectoryHelper.class));
CaptureTransferListener transferListener = new CaptureTransferListener();
TransferListener transferListenerMock = mock(TransferListener.class);

BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);

Upload upload = tm.upload(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY))
.requestBody(body)
.addTransferListener(transferListener)
.addTransferListener(transferListenerMock)
.build());

// Write small data (fits in one chunk) and close the stream
byte[] data = new byte[1024];
body.writeInputStream(new ByteArrayInputStream(data));

upload.completionFuture().join();

assertTransferListenerCompletion(transferListener);
assertThat(transferListener.isTransferInitiated()).isTrue();
assertThat(transferListener.isTransferComplete()).isTrue();
assertThat(transferListener.getExceptionCaught()).isNull();

Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
Mockito.verify(transferListenerMock, timeout(1000).times(1)).transferComplete(ArgumentMatchers.any());
Mockito.verify(transferListenerMock, times(0)).transferFailed(ArgumentMatchers.any());
}

/**
* Verifies that TransferListener callbacks fire for unknown-content-length uploads that exceed the part size
* and go through the multipart upload path.
*/
@Test
void unknownContentLength_multiChunk_allCallbacksFire() {
S3AsyncClient s3Async = s3AsyncClient(true);

String createMpuUrl = "/" + EXAMPLE_BUCKET + "/" + TEST_KEY + "?uploads";
String createMpuResponse = "<CreateMultipartUploadResult><UploadId>1234</UploadId></CreateMultipartUploadResult>";
stubFor(post(urlEqualTo(createMpuUrl)).willReturn(aResponse().withStatus(200).withBody(createMpuResponse)));
stubFor(any(anyUrl()).atPriority(6).willReturn(aResponse().withStatus(200).withBody("<body/>")));

S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
mock(TransferManagerConfiguration.class),
mock(DownloadDirectoryHelper.class));
CaptureTransferListener transferListener = new CaptureTransferListener();
TransferListener transferListenerMock = mock(TransferListener.class);

BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);

Upload upload = tm.upload(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY))
.requestBody(body)
.addTransferListener(transferListener)
.addTransferListener(transferListenerMock)
.build());

// Write data larger than the default 8 MiB part size to force multipart
byte[] data = new byte[OBJ_SIZE];
body.writeInputStream(new ByteArrayInputStream(data));

upload.completionFuture().join();

assertTransferListenerCompletion(transferListener);
assertThat(transferListener.isTransferInitiated()).isTrue();
assertThat(transferListener.isTransferComplete()).isTrue();
assertThat(transferListener.getExceptionCaught()).isNull();

Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
Mockito.verify(transferListenerMock, timeout(1000).times(1)).transferComplete(ArgumentMatchers.any());
Mockito.verify(transferListenerMock, times(0)).transferFailed(ArgumentMatchers.any());
Mockito.verify(transferListenerMock, atLeastOnce()).bytesTransferred(ArgumentMatchers.any());
}

/**
* Verifies that when an unknown-content-length upload fails on the single-chunk path,
* the completionFuture completes exceptionally and transferFailed fires.
* This guards against regressions where the failure path in uploadInOneChunk does not
* propagate the exception to returnFuture, causing the upload to hang indefinitely.
*/
@Test
void unknownContentLength_singleChunk_failurePropagates() {
S3AsyncClient s3Async = s3AsyncClient(true);

stubFor(put(urlPathEqualTo("/" + EXAMPLE_BUCKET + "/" + TEST_KEY))
.willReturn(aResponse().withStatus(500).withBody(ERROR_BODY)));

S3TransferManager tm = new GenericS3TransferManager(s3Async, mock(UploadDirectoryHelper.class),
mock(TransferManagerConfiguration.class),
mock(DownloadDirectoryHelper.class));
CaptureTransferListener transferListener = new CaptureTransferListener();
TransferListener transferListenerMock = mock(TransferListener.class);

BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(null);

Upload upload = tm.upload(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key(TEST_KEY))
.requestBody(body)
.addTransferListener(transferListener)
.addTransferListener(transferListenerMock)
.build());

byte[] data = new byte[1024];
body.writeInputStream(new ByteArrayInputStream(data));

assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> upload.completionFuture().join());

assertTransferListenerCompletion(transferListener);
assertThat(transferListener.isTransferInitiated()).isTrue();
assertThat(transferListener.isTransferComplete()).isFalse();
assertThat(transferListener.getExceptionCaught()).isNotNull();

Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
Mockito.verify(transferListenerMock, times(0)).transferComplete(ArgumentMatchers.any());
Mockito.verify(transferListenerMock, timeout(1000).times(1)).transferFailed(ArgumentMatchers.any());
}

private static void assertTransferListenerCompletion(CaptureTransferListener transferListener) {
Duration waitDuration = Duration.ofSeconds(5);
assertTimeoutPreemptively(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,30 @@ void uploadInOneChunk(PutObjectRequest putObjectRequest,
.getAttribute(REPORT_PROGRESS_IN_SINGLE_CHUNK))
.orElse(Boolean.FALSE);

PublisherListener<Long> progressListener = putObjectRequest.overrideConfiguration()
.map(c -> c.executionAttributes()
.getAttribute(JAVA_PROGRESS_LISTENER))
.orElseGet(PublisherListener::noOp);

CompletableFuture<PutObjectResponse> putObjectResponseCompletableFuture = s3AsyncClient.putObject(putObjectRequest,
asyncRequestBody);
CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectResponseCompletableFuture);

if (reportProgress) {
PublisherListener<Long> progressListener = putObjectRequest.overrideConfiguration()
.map(c -> c.executionAttributes()
.getAttribute(JAVA_PROGRESS_LISTENER))
.orElseGet(PublisherListener::noOp);
putObjectResponseCompletableFuture.thenAccept(response -> {
putObjectResponseCompletableFuture.whenComplete((response, throwable) -> {
if (throwable != null) {
returnFuture.completeExceptionally(throwable);
return;
}
if (reportProgress) {
asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext);
progressListener.subscriberOnComplete();
returnFuture.complete(response);
});
} else {
CompletableFutureUtils.forwardResultTo(putObjectResponseCompletableFuture, returnFuture);
}
}
// Always signal completion so that TransferProgressUpdater's endOfStreamFuture completes
// and the TransferListener's transferComplete callback fires.
// For unknown content length we don't know if it wil lbe one chunk or not ahead of time
// and so don't set REPORT_PROGRESS_IN_SINGLE_CHUNK attribute.
progressListener.subscriberOnComplete();
returnFuture.complete(response);
});
}

static SdkClientException contentLengthMissingForPart(int currentPartNum) {
Expand Down
Loading