From ed1e4f1e5e58d9e35e144e9c2e1d47a53cf66f99 Mon Sep 17 00:00:00 2001 From: yibole Date: Mon, 18 Aug 2025 09:58:31 -0700 Subject: [PATCH 01/14] added part count and content range validation for download --- .../MultipartDownloaderSubscriber.java | 63 +++++++++++++++++++ .../multipart/MultipartDownloadTestUtil.java | 27 ++++++++ ...ipartDownloaderSubscriberWiremockTest.java | 26 ++++++++ 3 files changed, 116 insertions(+) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 1ccae234631d..edf50ff92ca2 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -23,6 +23,7 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -76,6 +77,16 @@ public class MultipartDownloaderSubscriber implements Subscriber totalParts) { + validatePartsCount(completedParts.get()); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); subscription.cancel(); return; @@ -162,10 +174,20 @@ private void requestMoreIfNeeded(GetObjectResponse response) { totalParts = partCount; } + String actualContentRange = response.contentRange(); + if (actualContentRange != null && partSize == null) { + getRangeInfo(actualContentRange); + log.debug(() -> String.format("Part size of the object to download: " + partSize)); + log.debug(() -> String.format("Total Content Length of the object to download: " + totalContentLength)); + } + + validateContentRange(totalComplete, actualContentRange); + synchronized (lock) { if (totalParts != null && totalParts > 1 && totalComplete < totalParts) { subscription.request(1); } else { + validatePartsCount(completedParts.get()); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); subscription.cancel(); } @@ -198,4 +220,45 @@ private GetObjectRequest nextRequest(int nextPartToGet) { } }); } + + private void validatePartsCount(int currentGetCount) { + if (totalParts != null && currentGetCount != totalParts) { + String errorMessage = "PartsCount validation failed. Expected " + totalParts + ", downloaded" + + " " + currentGetCount + " parts."; + log.error(() -> errorMessage); + subscription.cancel(); + SdkClientException exception = SdkClientException.create(errorMessage); + onError(exception); + } + } + + private void validateContentRange(int partNumber, String contentRange) { + if (contentRange == null) { + return; + } + + long expectedStart = (partNumber - 1) * partSize; + long expectedEnd = partNumber == totalParts ? totalContentLength - 1 : expectedStart + partSize - 1; + + String expectedContentRange = String.format("bytes %d-%d/%d", expectedStart, expectedEnd, totalContentLength); + + if (!expectedContentRange.equals(contentRange)) { + String errorMessage = String.format( + "Content-Range validation failed for part %d. Expected: %s, Actual: %s", + partNumber, expectedContentRange, contentRange); + log.error(() -> errorMessage); + onError(SdkClientException.create(errorMessage)); + } + } + + private void getRangeInfo(String contentRange) { + String rangeInfo = contentRange.substring(6); + String[] parts = rangeInfo.split("/"); + + this.totalContentLength = Long.parseLong(parts[1]); + String[] rangeParts = parts[0].split("-"); + long startByte = Long.parseLong(rangeParts[0]); + long endByte = Long.parseLong(rangeParts[1]); + this.partSize = endByte - startByte + 1; + } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java index 708972b6b0d7..3df11de206b5 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java @@ -70,6 +70,21 @@ public byte[] stubForPart(String testBucket, String testKey,int part, int totalP aResponse() .withHeader("x-amz-mp-parts-count", totalPart + "") .withHeader("ETag", eTag) + .withHeader("Content-Length", String.valueOf(body.length)) + .withHeader("Content-Range", contentRange(part, totalPart, partSize)) + .withBody(body))); + return body; + } + + public byte[] stubForPartwithWrongContentRange(String testBucket, String testKey,int part, int totalPart, int partSize) { + byte[] body = new byte[partSize]; + random.nextBytes(body); + stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=%d", testBucket, testKey, part))).willReturn( + aResponse() + .withHeader("x-amz-mp-parts-count", totalPart + "") + .withHeader("ETag", eTag) + .withHeader("Content-Length", String.valueOf(body.length)) + .withHeader("Content-Range", contentRange(part, totalPart, partSize + 1)) .withBody(body))); return body; } @@ -95,4 +110,16 @@ public byte[] stubForPartSuccess(int part, int totalPart, int partSize) { .withBody(body))); return body; } + + private String contentRange(int part, int totalPart, int partSize) { + long totalObjectSize = (long) totalPart * partSize; + long startByte = (long) (part - 1) * partSize; + long endByte = startByte + partSize - 1; + + if (part == totalPart) { + endByte = totalObjectSize - 1; + } + + return String.format("bytes %d-%d/%d", startByte, endByte, totalObjectSize); + } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java index 1c6eb666a9c2..6dcf24550756 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java @@ -160,6 +160,32 @@ void errorOnThirdRequest_shouldCompleteExceptionallyOnlyPartsGreaterThanTwo( } } + @ParameterizedTest + @MethodSource("argumentsProvider") + void wrongContentRangeOnSecondRequest_should(AsyncResponseTransformerTestSupplier supplier, + int amountOfPartToTest, + int partSize) { + util.stubForPart(testBucket, testKey, 1, 3, partSize); + util.stubForPartwithWrongContentRange(testBucket, testKey, 2, 3, partSize); + util.stubForPart(testBucket, testKey, 3, 3, partSize); + //byte[] expectedBody = util.stubAllParts(testBucket, testKey, amountOfPartToTest, partSize); + AsyncResponseTransformer transformer = supplier.transformer(); + AsyncResponseTransformer.SplitResult split = transformer.split( + SplittingTransformerConfiguration.builder() + .bufferSizeInBytes(1024 * 32L) + .build()); + Subscriber> subscriber = new MultipartDownloaderSubscriber( + s3AsyncClient, + GetObjectRequest.builder() + .bucket(testBucket) + .key(testKey) + .build()); + + split.publisher().subscribe(subscriber); + T response = split.resultFuture().join(); + + } + private static Stream argumentsProvider() { // amount of part, individual part size List> partSizes = Arrays.asList( From 391a506ca2a3e5226cbfa5795c0065fcd3bd6033 Mon Sep 17 00:00:00 2001 From: yibole Date: Mon, 18 Aug 2025 13:39:15 -0700 Subject: [PATCH 02/14] changelog added --- .changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json | 6 ++++++ .../archtests/CodingConventionWithSuppressionTest.java | 2 ++ 2 files changed, 8 insertions(+) create mode 100644 .changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json b/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json new file mode 100644 index 000000000000..c6ce2c184d71 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Added partCount and ContentRange validation for s3 transfer manager download request " +} diff --git a/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java b/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java index d2edcaac742d..3795b30b5d3a 100644 --- a/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java +++ b/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java @@ -36,6 +36,7 @@ import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.metrics.publishers.emf.EmfMetricLoggingPublisher; import software.amazon.awssdk.metrics.publishers.emf.internal.MetricEmfConverter; +import software.amazon.awssdk.services.s3.internal.multipart.MultipartDownloaderSubscriber; import software.amazon.awssdk.utils.Logger; /** @@ -57,6 +58,7 @@ public class CodingConventionWithSuppressionTest { private static final Set ALLOWED_ERROR_LOG_SUPPRESSION = new HashSet<>( Arrays.asList( ArchUtils.classNameToPattern(EmfMetricLoggingPublisher.class), + ArchUtils.classNameToPattern(MultipartDownloaderSubscriber.class), ArchUtils.classWithInnerClassesToPattern(ResponseTransformer.class))); @Test From fc956acb749f5e74a448bcc24df9f63c7a91ee69 Mon Sep 17 00:00:00 2001 From: yibole Date: Tue, 19 Aug 2025 15:16:36 -0700 Subject: [PATCH 03/14] remove contentRange validation since part size could different --- .../MultipartDownloaderSubscriber.java | 49 ------------------- ...ipartDownloaderSubscriberWiremockTest.java | 26 ---------- 2 files changed, 75 deletions(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index edf50ff92ca2..d6e490226b65 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -77,16 +77,6 @@ public class MultipartDownloaderSubscriber implements Subscriber String.format("Part size of the object to download: " + partSize)); - log.debug(() -> String.format("Total Content Length of the object to download: " + totalContentLength)); - } - - validateContentRange(totalComplete, actualContentRange); - synchronized (lock) { if (totalParts != null && totalParts > 1 && totalComplete < totalParts) { subscription.request(1); @@ -231,34 +212,4 @@ private void validatePartsCount(int currentGetCount) { onError(exception); } } - - private void validateContentRange(int partNumber, String contentRange) { - if (contentRange == null) { - return; - } - - long expectedStart = (partNumber - 1) * partSize; - long expectedEnd = partNumber == totalParts ? totalContentLength - 1 : expectedStart + partSize - 1; - - String expectedContentRange = String.format("bytes %d-%d/%d", expectedStart, expectedEnd, totalContentLength); - - if (!expectedContentRange.equals(contentRange)) { - String errorMessage = String.format( - "Content-Range validation failed for part %d. Expected: %s, Actual: %s", - partNumber, expectedContentRange, contentRange); - log.error(() -> errorMessage); - onError(SdkClientException.create(errorMessage)); - } - } - - private void getRangeInfo(String contentRange) { - String rangeInfo = contentRange.substring(6); - String[] parts = rangeInfo.split("/"); - - this.totalContentLength = Long.parseLong(parts[1]); - String[] rangeParts = parts[0].split("-"); - long startByte = Long.parseLong(rangeParts[0]); - long endByte = Long.parseLong(rangeParts[1]); - this.partSize = endByte - startByte + 1; - } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java index 6dcf24550756..1c6eb666a9c2 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberWiremockTest.java @@ -160,32 +160,6 @@ void errorOnThirdRequest_shouldCompleteExceptionallyOnlyPartsGreaterThanTwo( } } - @ParameterizedTest - @MethodSource("argumentsProvider") - void wrongContentRangeOnSecondRequest_should(AsyncResponseTransformerTestSupplier supplier, - int amountOfPartToTest, - int partSize) { - util.stubForPart(testBucket, testKey, 1, 3, partSize); - util.stubForPartwithWrongContentRange(testBucket, testKey, 2, 3, partSize); - util.stubForPart(testBucket, testKey, 3, 3, partSize); - //byte[] expectedBody = util.stubAllParts(testBucket, testKey, amountOfPartToTest, partSize); - AsyncResponseTransformer transformer = supplier.transformer(); - AsyncResponseTransformer.SplitResult split = transformer.split( - SplittingTransformerConfiguration.builder() - .bufferSizeInBytes(1024 * 32L) - .build()); - Subscriber> subscriber = new MultipartDownloaderSubscriber( - s3AsyncClient, - GetObjectRequest.builder() - .bucket(testBucket) - .key(testKey) - .build()); - - split.publisher().subscribe(subscriber); - T response = split.resultFuture().join(); - - } - private static Stream argumentsProvider() { // amount of part, individual part size List> partSizes = Arrays.asList( From c1acdd867d4ac825801aa5b1491de68183078c03 Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 20 Aug 2025 13:51:34 -0700 Subject: [PATCH 04/14] feedback addressed --- .../next-release/bugfix-AWSSDKforJavav2-17f90b1.json | 6 ------ .changes/next-release/bugfix-AmazonS3-34da391.json | 6 ++++++ .../multipart/MultipartDownloaderSubscriber.java | 12 ++++++------ .../CodingConventionWithSuppressionTest.java | 2 -- 4 files changed, 12 insertions(+), 14 deletions(-) delete mode 100644 .changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json create mode 100644 .changes/next-release/bugfix-AmazonS3-34da391.json diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json b/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json deleted file mode 100644 index c6ce2c184d71..000000000000 --- a/.changes/next-release/bugfix-AWSSDKforJavav2-17f90b1.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "type": "bugfix", - "category": "AWS SDK for Java v2", - "contributor": "", - "description": "Added partCount and ContentRange validation for s3 transfer manager download request " -} diff --git a/.changes/next-release/bugfix-AmazonS3-34da391.json b/.changes/next-release/bugfix-AmazonS3-34da391.json new file mode 100644 index 000000000000..2d177a6c2849 --- /dev/null +++ b/.changes/next-release/bugfix-AmazonS3-34da391.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Amazon S3", + "contributor": "", + "description": "Added additional validations for multipart download operations in the Java multipart S3 client" +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index d6e490226b65..9fa5cff11c2b 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -114,12 +114,13 @@ public void onNext(AsyncResponseTransformer totalParts) { - validatePartsCount(completedParts.get()); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); + validatePartsCount(currentPart); subscription.cancel(); return; } @@ -168,7 +169,7 @@ private void requestMoreIfNeeded(GetObjectResponse response) { if (totalParts != null && totalParts > 1 && totalComplete < totalParts) { subscription.request(1); } else { - validatePartsCount(completedParts.get()); + validatePartsCount(totalComplete); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); subscription.cancel(); } @@ -204,9 +205,8 @@ private GetObjectRequest nextRequest(int nextPartToGet) { private void validatePartsCount(int currentGetCount) { if (totalParts != null && currentGetCount != totalParts) { - String errorMessage = "PartsCount validation failed. Expected " + totalParts + ", downloaded" - + " " + currentGetCount + " parts."; - log.error(() -> errorMessage); + String errorMessage = String.format("PartsCount validation failed. Expected %d, downloaded %d parts.", totalParts, + currentGetCount); subscription.cancel(); SdkClientException exception = SdkClientException.create(errorMessage); onError(exception); diff --git a/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java b/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java index 3795b30b5d3a..d2edcaac742d 100644 --- a/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java +++ b/test/architecture-tests/src/test/java/software/amazon/awssdk/archtests/CodingConventionWithSuppressionTest.java @@ -36,7 +36,6 @@ import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.metrics.publishers.emf.EmfMetricLoggingPublisher; import software.amazon.awssdk.metrics.publishers.emf.internal.MetricEmfConverter; -import software.amazon.awssdk.services.s3.internal.multipart.MultipartDownloaderSubscriber; import software.amazon.awssdk.utils.Logger; /** @@ -58,7 +57,6 @@ public class CodingConventionWithSuppressionTest { private static final Set ALLOWED_ERROR_LOG_SUPPRESSION = new HashSet<>( Arrays.asList( ArchUtils.classNameToPattern(EmfMetricLoggingPublisher.class), - ArchUtils.classNameToPattern(MultipartDownloaderSubscriber.class), ArchUtils.classWithInnerClassesToPattern(ResponseTransformer.class))); @Test From fd674d010637c6711649ef328724b76c1f705db6 Mon Sep 17 00:00:00 2001 From: yibole Date: Fri, 22 Aug 2025 14:49:56 -0700 Subject: [PATCH 05/14] delete unused utils --- .../multipart/MultipartDownloadTestUtil.java | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java index 3df11de206b5..232f138c22f6 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadTestUtil.java @@ -71,20 +71,6 @@ public byte[] stubForPart(String testBucket, String testKey,int part, int totalP .withHeader("x-amz-mp-parts-count", totalPart + "") .withHeader("ETag", eTag) .withHeader("Content-Length", String.valueOf(body.length)) - .withHeader("Content-Range", contentRange(part, totalPart, partSize)) - .withBody(body))); - return body; - } - - public byte[] stubForPartwithWrongContentRange(String testBucket, String testKey,int part, int totalPart, int partSize) { - byte[] body = new byte[partSize]; - random.nextBytes(body); - stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=%d", testBucket, testKey, part))).willReturn( - aResponse() - .withHeader("x-amz-mp-parts-count", totalPart + "") - .withHeader("ETag", eTag) - .withHeader("Content-Length", String.valueOf(body.length)) - .withHeader("Content-Range", contentRange(part, totalPart, partSize + 1)) .withBody(body))); return body; } @@ -110,16 +96,4 @@ public byte[] stubForPartSuccess(int part, int totalPart, int partSize) { .withBody(body))); return body; } - - private String contentRange(int part, int totalPart, int partSize) { - long totalObjectSize = (long) totalPart * partSize; - long startByte = (long) (part - 1) * partSize; - long endByte = startByte + partSize - 1; - - if (part == totalPart) { - endByte = totalObjectSize - 1; - } - - return String.format("bytes %d-%d/%d", startByte, endByte, totalObjectSize); - } } From 450368b5c7dfbfab1f64841e19242c93f08693fa Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 27 Aug 2025 09:26:59 -0700 Subject: [PATCH 06/14] change validation in futurn complete --- .../MultipartDownloaderSubscriber.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 9fa5cff11c2b..49b625c98e96 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -61,6 +61,11 @@ public class MultipartDownloaderSubscriber implements Subscriber totalParts) { log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); - validatePartsCount(currentPart); subscription.cancel(); return; } @@ -129,6 +134,7 @@ public void onNext(AsyncResponseTransformer "Sending GetObjectRequest for next part with partNumber=" + nextPartToGet); CompletableFuture getObjectFuture = s3.getObject(actualRequest, asyncResponseTransformer); + getObjectCallCount.incrementAndGet(); getObjectFutures.add(getObjectFuture); getObjectFuture.whenComplete((response, error) -> { if (error != null) { @@ -169,7 +175,6 @@ private void requestMoreIfNeeded(GetObjectResponse response) { if (totalParts != null && totalParts > 1 && totalComplete < totalParts) { subscription.request(1); } else { - validatePartsCount(totalComplete); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); subscription.cancel(); } @@ -187,6 +192,7 @@ public void onError(Throwable t) { @Override public void onComplete() { + validatePartsCount(); future.complete(null); } @@ -203,10 +209,11 @@ private GetObjectRequest nextRequest(int nextPartToGet) { }); } - private void validatePartsCount(int currentGetCount) { - if (totalParts != null && currentGetCount != totalParts) { + private void validatePartsCount() { + int actualGetCount = getObjectCallCount.get(); + if (totalParts != null && actualGetCount != totalParts) { String errorMessage = String.format("PartsCount validation failed. Expected %d, downloaded %d parts.", totalParts, - currentGetCount); + actualGetCount); subscription.cancel(); SdkClientException exception = SdkClientException.create(errorMessage); onError(exception); From a272a7a9f37cce6358110d274ccaad79aa962292 Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 27 Aug 2025 10:26:23 -0700 Subject: [PATCH 07/14] added unit test --- ...artDownloaderSubscriberMockClientTest.java | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberMockClientTest.java diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberMockClientTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberMockClientTest.java new file mode 100644 index 000000000000..ee21419ac80a --- /dev/null +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberMockClientTest.java @@ -0,0 +1,123 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.internal.multipart; + + +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +public class MultipartDownloaderSubscriberMockClientTest { + @Mock + private S3AsyncClient s3Client; + + @Mock + private Subscription subscription; + + @Mock + private AsyncResponseTransformer responseTransformer; + + private GetObjectRequest getObjectRequest; + private MultipartDownloaderSubscriber subscriber; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + getObjectRequest = GetObjectRequest.builder() + .bucket("test-bucket") + .key("test-key") + .build(); + } + + @Test + void testValidationPassesWhenCallCountMatchesTotalParts() throws InterruptedException { + subscriber = new MultipartDownloaderSubscriber(s3Client, getObjectRequest); + GetObjectResponse response1 = createMockResponse(3, "etag1"); + GetObjectResponse response2 = createMockResponse(3, "etag2"); + GetObjectResponse response3 = createMockResponse(3, "etag3"); + + CompletableFuture future1 = CompletableFuture.completedFuture(response1); + CompletableFuture future2 = CompletableFuture.completedFuture(response2); + CompletableFuture future3 = CompletableFuture.completedFuture(response3); + + when(s3Client.getObject(any(GetObjectRequest.class), eq(responseTransformer))) + .thenReturn(future1, future2, future3); + + subscriber.onSubscribe(subscription); + subscriber.onNext(responseTransformer); + subscriber.onNext(responseTransformer); + subscriber.onNext(responseTransformer); + Thread.sleep(100); + + subscriber.onComplete(); + + assertDoesNotThrow(() -> subscriber.future().get(1, TimeUnit.SECONDS)); + } + + @Test + void testValidationFailsWhenCallCountExceedsTotalParts() throws InterruptedException { + subscriber = new MultipartDownloaderSubscriber(s3Client, getObjectRequest); + GetObjectResponse response1 = createMockResponse(3, "etag1"); + GetObjectResponse response2 = createMockResponse(3, "etag2"); + + CompletableFuture future1 = CompletableFuture.completedFuture(response1); + CompletableFuture future2 = CompletableFuture.completedFuture(response2); + + when(s3Client.getObject(any(GetObjectRequest.class), eq(responseTransformer))) + .thenReturn(future1, future2); + + subscriber.onSubscribe(subscription); + subscriber.onNext(responseTransformer); + subscriber.onNext(responseTransformer); + Thread.sleep(100); + + subscriber.onComplete(); + + ExecutionException exception = assertThrows(ExecutionException.class, + () -> subscriber.future().get(1, TimeUnit.SECONDS)); + assertTrue(exception.getCause() instanceof SdkClientException); + assertTrue(exception.getCause().getMessage().contains("PartsCount validation failed")); + assertTrue(exception.getCause().getMessage().contains("Expected 3, downloaded 2 parts")); + + } + + private GetObjectResponse createMockResponse(int partsCount, String etag) { + GetObjectResponse.Builder builder = GetObjectResponse.builder() + .eTag(etag) + .contentLength(1024L); + + builder.partsCount(partsCount); + return builder.build(); + } + +} From 13de014212768f5f28990a06ec93152fdb45b675 Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 27 Aug 2025 10:38:39 -0700 Subject: [PATCH 08/14] change test name --- ...ltipartDownloaderSubscriberPartCountValidationTest.java} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/{MultipartDownloaderSubscriberMockClientTest.java => MultipartDownloaderSubscriberPartCountValidationTest.java} (95%) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberMockClientTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberPartCountValidationTest.java similarity index 95% rename from services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberMockClientTest.java rename to services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberPartCountValidationTest.java index ee21419ac80a..915013a1dc83 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberMockClientTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberPartCountValidationTest.java @@ -37,7 +37,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; -public class MultipartDownloaderSubscriberMockClientTest { +public class MultipartDownloaderSubscriberPartCountValidationTest { @Mock private S3AsyncClient s3Client; @@ -60,7 +60,7 @@ void setUp() { } @Test - void testValidationPassesWhenCallCountMatchesTotalParts() throws InterruptedException { + void callCountMatchesTotalParts_shouldPass() throws InterruptedException { subscriber = new MultipartDownloaderSubscriber(s3Client, getObjectRequest); GetObjectResponse response1 = createMockResponse(3, "etag1"); GetObjectResponse response2 = createMockResponse(3, "etag2"); @@ -85,7 +85,7 @@ void testValidationPassesWhenCallCountMatchesTotalParts() throws InterruptedExce } @Test - void testValidationFailsWhenCallCountExceedsTotalParts() throws InterruptedException { + void callCountLessThanTotalParts_shouldThrowException() throws InterruptedException { subscriber = new MultipartDownloaderSubscriber(s3Client, getObjectRequest); GetObjectResponse response1 = createMockResponse(3, "etag1"); GetObjectResponse response2 = createMockResponse(3, "etag2"); From a556358db1e14f3ca6d168a5a9aba07445cd8d03 Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 27 Aug 2025 12:30:24 -0700 Subject: [PATCH 09/14] minor change --- .../s3/internal/multipart/MultipartDownloaderSubscriber.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 49b625c98e96..4a389fcfdbd1 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -120,8 +120,7 @@ public void onNext(AsyncResponseTransformer totalParts) { From 72e94faaa8ba55f44034edcf9ee26bb9543f4f7f Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 27 Aug 2025 12:30:58 -0700 Subject: [PATCH 10/14] minor change --- .../s3/internal/multipart/MultipartDownloaderSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 4a389fcfdbd1..27fab95c9649 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -120,7 +120,7 @@ public void onNext(AsyncResponseTransformer totalParts) { From 6eaf6dfb6d56b5f8b3067d15c89b6475c5467d2d Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 10 Sep 2025 17:37:35 -0700 Subject: [PATCH 11/14] Integration test added --- .../MultipartDownloaderSubscriber.java | 14 ++++-- ...3MultipartClientGetObjectWiremockTest.java | 44 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 27fab95c9649..0d3145814079 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -138,7 +138,7 @@ public void onNext(AsyncResponseTransformer { if (error != null) { log.debug(() -> "Error encountered during GetObjectRequest with partNumber=" + nextPartToGet); - onError(error); + handleError(error); return; } requestMoreIfNeeded(response); @@ -174,12 +174,20 @@ private void requestMoreIfNeeded(GetObjectResponse response) { if (totalParts != null && totalParts > 1 && totalComplete < totalParts) { subscription.request(1); } else { + validatePartsCount(); log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts)); subscription.cancel(); } } } + /** + * The method used by the Subscriber itself when error occured. + */ + private void handleError(Throwable t) { + onError(t); + } + @Override public void onError(Throwable t) { CompletableFuture partFuture; @@ -191,7 +199,6 @@ public void onError(Throwable t) { @Override public void onComplete() { - validatePartsCount(); future.complete(null); } @@ -213,9 +220,8 @@ private void validatePartsCount() { if (totalParts != null && actualGetCount != totalParts) { String errorMessage = String.format("PartsCount validation failed. Expected %d, downloaded %d parts.", totalParts, actualGetCount); - subscription.cancel(); SdkClientException exception = SdkClientException.create(errorMessage); - onError(exception); + handleError(exception); } } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java index 9fd58e6a5fa0..449f245203c9 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java @@ -34,6 +34,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartDownloadTestUtils.internalErrorBody; import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartDownloadTestUtils.transformersSuppliers; +import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.MULTIPART_DOWNLOAD_RESUME_CONTEXT; import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; @@ -58,6 +59,7 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.SplittingTransformerConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer; import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer; @@ -67,6 +69,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartDownloadTestUtils; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.utils.AsyncResponseTransformerTestSupplier; @@ -144,6 +147,47 @@ public void errorOnThirdPart_shouldCompleteExceptionallyOnlyPartsGreaterThan } } + @ParameterizedTest + @MethodSource("partSizeAndTransformerParams") + public void partCountValidationFailure_shouldThrowException( + AsyncResponseTransformerTestSupplier supplier, + int partSize) { + + // To trigger the partCount failure, the resumeContext is used to initialize the actualGetCount larger than the + // totalPart number set in the response. This won't happen in real scenario, just to test if the error can be surfaced + // to the user if the validation fails. + MultipartDownloadResumeContext resumeContext = new MultipartDownloadResumeContext(); + resumeContext.addCompletedPart(1); + resumeContext.addCompletedPart(2); + resumeContext.addCompletedPart(3); + resumeContext.addToBytesToLastCompletedParts(3 * partSize); + + GetObjectRequest request = GetObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .overrideConfiguration(config -> config + .putExecutionAttribute( + MULTIPART_DOWNLOAD_RESUME_CONTEXT, + resumeContext)) + .build(); + + util.stubForPart(BUCKET, KEY, 4, 2, partSize); + + // Skip the lazy transformer since the error won't surface unless the content is consumed + AsyncResponseTransformer transformer = supplier.transformer(); + if (transformer instanceof InputStreamResponseTransformer || transformer instanceof PublisherAsyncResponseTransformer) { + return; + } + + assertThatThrownBy(() -> { + T res = multipartClient.getObject(request, transformer).join(); + supplier.body(res); + }).isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(SdkClientException.class) + .hasMessageContaining("PartsCount validation failed. Expected 2, downloaded 4 parts"); + + } + @ParameterizedTest @MethodSource("nonRetryableResponseTransformers") public void errorOnFirstPart_shouldFail(AsyncResponseTransformerTestSupplier supplier) { From 749fc3c1f5e1ada6b4522db06066faebd3cfbc86 Mon Sep 17 00:00:00 2001 From: yibole Date: Wed, 10 Sep 2025 19:28:54 -0700 Subject: [PATCH 12/14] Unit test fixed --- ...DownloaderSubscriberPartCountValidationTest.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberPartCountValidationTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberPartCountValidationTest.java index 915013a1dc83..8ff1eeac0ad5 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberPartCountValidationTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriberPartCountValidationTest.java @@ -85,20 +85,17 @@ void callCountMatchesTotalParts_shouldPass() throws InterruptedException { } @Test - void callCountLessThanTotalParts_shouldThrowException() throws InterruptedException { - subscriber = new MultipartDownloaderSubscriber(s3Client, getObjectRequest); - GetObjectResponse response1 = createMockResponse(3, "etag1"); - GetObjectResponse response2 = createMockResponse(3, "etag2"); + void callCountMoreThanTotalParts_shouldThrowException() throws InterruptedException { + subscriber = new MultipartDownloaderSubscriber(s3Client, getObjectRequest, 3); + GetObjectResponse response1 = createMockResponse(2, "etag1"); CompletableFuture future1 = CompletableFuture.completedFuture(response1); - CompletableFuture future2 = CompletableFuture.completedFuture(response2); when(s3Client.getObject(any(GetObjectRequest.class), eq(responseTransformer))) - .thenReturn(future1, future2); + .thenReturn(future1); subscriber.onSubscribe(subscription); subscriber.onNext(responseTransformer); - subscriber.onNext(responseTransformer); Thread.sleep(100); subscriber.onComplete(); @@ -107,7 +104,7 @@ void callCountLessThanTotalParts_shouldThrowException() throws InterruptedExcept () -> subscriber.future().get(1, TimeUnit.SECONDS)); assertTrue(exception.getCause() instanceof SdkClientException); assertTrue(exception.getCause().getMessage().contains("PartsCount validation failed")); - assertTrue(exception.getCause().getMessage().contains("Expected 3, downloaded 2 parts")); + assertTrue(exception.getCause().getMessage().contains("Expected 2, downloaded 4 parts")); } From a3c55cb00bbf49ebe317d2b79eb3734de460d734 Mon Sep 17 00:00:00 2001 From: yibole Date: Thu, 11 Sep 2025 16:01:41 -0700 Subject: [PATCH 13/14] swap method body of onError and handleError --- .../multipart/MultipartDownloaderSubscriber.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 0d3145814079..13443d2da6b9 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -185,11 +185,6 @@ private void requestMoreIfNeeded(GetObjectResponse response) { * The method used by the Subscriber itself when error occured. */ private void handleError(Throwable t) { - onError(t); - } - - @Override - public void onError(Throwable t) { CompletableFuture partFuture; while ((partFuture = getObjectFutures.poll()) != null) { partFuture.cancel(true); @@ -197,6 +192,11 @@ public void onError(Throwable t) { future.completeExceptionally(t); } + @Override + public void onError(Throwable t) { + handleError(t); + } + @Override public void onComplete() { future.complete(null); From 80ae743f2ef9ef05399264f48af1240da90973ad Mon Sep 17 00:00:00 2001 From: yibole Date: Thu, 11 Sep 2025 16:03:44 -0700 Subject: [PATCH 14/14] minor change --- .../multipart/MultipartDownloaderSubscriber.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java index 13443d2da6b9..7466bda5b2a3 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java @@ -181,6 +181,11 @@ private void requestMoreIfNeeded(GetObjectResponse response) { } } + @Override + public void onError(Throwable t) { + handleError(t); + } + /** * The method used by the Subscriber itself when error occured. */ @@ -192,11 +197,6 @@ private void handleError(Throwable t) { future.completeExceptionally(t); } - @Override - public void onError(Throwable t) { - handleError(t); - } - @Override public void onComplete() { future.complete(null);