Skip to content

Commit 6466fcb

Browse files
Add Range-based Multipart download Subscriber for Pre-signed URLs (#6331)
* Range based multipart download subscriber * Refactor code and add tests * Fixed sonarcube issue * Address PR comments: use Long type, inline variables, allow empty files * Refactored subscriber and tests * delete testutil * Added validation tests * Fixed failing tests for IfMatch header * Refactor: split validatePartAndRequestMore into separate methods * Fixed validation tests * Remove onComplete after subscription cancel * Make validatePart boolean function
1 parent 1a4e832 commit 6466fcb

File tree

10 files changed

+983
-23
lines changed

10 files changed

+983
-23
lines changed

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@
2020
import java.util.Collections;
2121
import java.util.List;
2222
import java.util.Optional;
23+
import java.util.regex.Matcher;
24+
import java.util.regex.Pattern;
2325
import software.amazon.awssdk.annotations.SdkInternalApi;
2426
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
27+
import software.amazon.awssdk.services.s3.model.S3Request;
2528

2629
@SdkInternalApi
2730
public final class MultipartDownloadUtils {
2831

32+
private static final Pattern CONTENT_RANGE_PATTERN = Pattern.compile("bytes\\s+(\\d+)-(\\d+)/(\\d+)");
33+
2934
private MultipartDownloadUtils() {
3035
}
3136

@@ -58,4 +63,52 @@ public static Optional<MultipartDownloadResumeContext> multipartDownloadResumeCo
5863
.flatMap(conf -> Optional.ofNullable(conf.executionAttributes().getAttribute(MULTIPART_DOWNLOAD_RESUME_CONTEXT)));
5964
}
6065

66+
/**
67+
* This method checks the
68+
* {@link software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute#MULTIPART_DOWNLOAD_RESUME_CONTEXT}
69+
* execution attributes for a context object and returns it if it finds one. Otherwise, returns an empty Optional.
70+
*
71+
* @param request the request to look for execution attributes
72+
* @return the MultipartDownloadResumeContext if one is found, otherwise an empty Optional.
73+
*/
74+
public static Optional<MultipartDownloadResumeContext> multipartDownloadResumeContext(S3Request request) {
75+
return request
76+
.overrideConfiguration()
77+
.flatMap(conf -> Optional.ofNullable(conf.executionAttributes().getAttribute(MULTIPART_DOWNLOAD_RESUME_CONTEXT)));
78+
}
79+
80+
/**
81+
* Parses the start byte from a Content-Range header.
82+
*
83+
* @param contentRange the Content-Range header value (e.g., "bytes 0-1023/2048")
84+
* @return the start byte position, or -1 if parsing fails
85+
*/
86+
public static long parseStartByteFromContentRange(String contentRange) {
87+
if (contentRange == null) {
88+
return -1;
89+
}
90+
Matcher matcher = CONTENT_RANGE_PATTERN.matcher(contentRange);
91+
if (!matcher.matches()) {
92+
return -1;
93+
}
94+
return Long.parseLong(matcher.group(1));
95+
}
96+
97+
/**
98+
* Parses the total size from a Content-Range header.
99+
*
100+
* @param contentRange the Content-Range header value (e.g., "bytes 0-1023/2048")
101+
* @return the total size, or empty if parsing fails
102+
*/
103+
public static Optional<Long> parseContentRangeForTotalSize(String contentRange) {
104+
if (contentRange == null) {
105+
return Optional.empty();
106+
}
107+
Matcher matcher = CONTENT_RANGE_PATTERN.matcher(contentRange);
108+
if (!matcher.matches()) {
109+
return Optional.empty();
110+
}
111+
return Optional.of(Long.parseLong(matcher.group(3)));
112+
}
113+
61114
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlDownloadHelper.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import software.amazon.awssdk.annotations.SdkInternalApi;
2020
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
2121
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
22+
import software.amazon.awssdk.core.exception.SdkClientException;
2223
import software.amazon.awssdk.services.s3.S3AsyncClient;
2324
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2425
import software.amazon.awssdk.services.s3.presignedurl.AsyncPresignedUrlExtension;
@@ -63,15 +64,25 @@ public <T> CompletableFuture<T> downloadObject(
6364
.build();
6465
AsyncResponseTransformer.SplitResult<GetObjectResponse, T> split =
6566
asyncResponseTransformer.split(splittingConfig);
66-
// TODO: PresignedUrlMultipartDownloaderSubscriber needs to be implemented in next PR
67-
// PresignedUrlMultipartDownloaderSubscriber subscriber =
68-
// new PresignedUrlMultipartDownloaderSubscriber(
69-
// s3AsyncClient,
70-
// presignedRequest,
71-
// configuredPartSizeInBytes);
72-
//
73-
// split.publisher().subscribe(subscriber);
74-
// return split.resultFuture();
75-
throw new UnsupportedOperationException("Multipart presigned URL download not yet implemented - TODO in next PR");
67+
PresignedUrlMultipartDownloaderSubscriber subscriber =
68+
new PresignedUrlMultipartDownloaderSubscriber(
69+
s3AsyncClient,
70+
presignedRequest,
71+
configuredPartSizeInBytes);
72+
73+
split.publisher().subscribe(subscriber);
74+
return split.resultFuture();
75+
}
76+
77+
static SdkClientException invalidContentRangeHeader(String contentRange) {
78+
return SdkClientException.create("Invalid Content-Range header: " + contentRange);
79+
}
80+
81+
static SdkClientException missingContentRangeHeader() {
82+
return SdkClientException.create("No Content-Range header in response");
83+
}
84+
85+
static SdkClientException invalidContentLength() {
86+
return SdkClientException.create("Invalid or missing Content-Length in response");
7687
}
7788
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.multipart;
17+
18+
import java.util.Optional;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
import software.amazon.awssdk.annotations.Immutable;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.annotations.ThreadSafe;
26+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
27+
import software.amazon.awssdk.core.exception.SdkClientException;
28+
import software.amazon.awssdk.services.s3.S3AsyncClient;
29+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
30+
import software.amazon.awssdk.services.s3.presignedurl.model.PresignedUrlDownloadRequest;
31+
import software.amazon.awssdk.utils.Logger;
32+
33+
/**
34+
* A subscriber implementation that will download all individual parts for a multipart presigned URL download request.
35+
* It receives individual {@link AsyncResponseTransformer} instances which will be used to perform the individual
36+
* range-based part requests using presigned URLs. This is a 'one-shot' class, it should <em>NOT</em> be reused
37+
* for more than one multipart download.
38+
*
39+
* <p>Unlike the standard {@link MultipartDownloaderSubscriber} which uses S3's native multipart API with part numbers,
40+
* this subscriber uses HTTP range requests against presigned URLs to achieve multipart download functionality.
41+
* <p>This implementation is thread-safe and handles concurrent part downloads while maintaining proper
42+
* ordering and validation of responses.</p>
43+
*/
44+
@ThreadSafe
45+
@Immutable
46+
@SdkInternalApi
47+
public class PresignedUrlMultipartDownloaderSubscriber
48+
implements Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> {
49+
50+
private static final Logger log = Logger.loggerFor(PresignedUrlMultipartDownloaderSubscriber.class);
51+
private static final String BYTES_RANGE_PREFIX = "bytes=";
52+
53+
private final S3AsyncClient s3AsyncClient;
54+
private final PresignedUrlDownloadRequest presignedUrlDownloadRequest;
55+
private final Long configuredPartSizeInBytes;
56+
private final CompletableFuture<Void> future;
57+
private final Object lock = new Object();
58+
private final AtomicInteger completedParts;
59+
private final AtomicInteger requestsSent;
60+
61+
private volatile Long totalContentLength;
62+
private volatile Integer totalParts;
63+
private volatile String eTag;
64+
private Subscription subscription;
65+
66+
public PresignedUrlMultipartDownloaderSubscriber(
67+
S3AsyncClient s3AsyncClient,
68+
PresignedUrlDownloadRequest presignedUrlDownloadRequest,
69+
long configuredPartSizeInBytes) {
70+
this.s3AsyncClient = s3AsyncClient;
71+
this.presignedUrlDownloadRequest = presignedUrlDownloadRequest;
72+
this.configuredPartSizeInBytes = configuredPartSizeInBytes;
73+
this.completedParts = new AtomicInteger(0);
74+
this.requestsSent = new AtomicInteger(0);
75+
this.future = new CompletableFuture<>();
76+
}
77+
78+
@Override
79+
public void onSubscribe(Subscription s) {
80+
if (subscription != null) {
81+
s.cancel();
82+
return;
83+
}
84+
this.subscription = s;
85+
s.request(1);
86+
}
87+
88+
@Override
89+
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
90+
if (asyncResponseTransformer == null) {
91+
throw new NullPointerException("onNext must not be called with null asyncResponseTransformer");
92+
}
93+
94+
int nextPartIndex;
95+
synchronized (lock) {
96+
nextPartIndex = completedParts.get();
97+
if (totalParts != null && nextPartIndex >= totalParts) {
98+
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
99+
subscription.cancel();
100+
return;
101+
}
102+
completedParts.incrementAndGet();
103+
}
104+
makeRangeRequest(nextPartIndex, asyncResponseTransformer);
105+
}
106+
107+
private void makeRangeRequest(int partIndex,
108+
AsyncResponseTransformer<GetObjectResponse,
109+
GetObjectResponse> asyncResponseTransformer) {
110+
PresignedUrlDownloadRequest partRequest = createRangedGetRequest(partIndex);
111+
log.debug(() -> "Sending range request for part " + partIndex + " with range=" + partRequest.range());
112+
113+
requestsSent.incrementAndGet();
114+
s3AsyncClient.presignedUrlExtension()
115+
.getObject(partRequest, asyncResponseTransformer)
116+
.whenComplete((response, error) -> {
117+
if (error != null) {
118+
log.debug(() -> "Error encountered during part request for part " + partIndex);
119+
handleError(error);
120+
return;
121+
}
122+
if (validatePart(response, partIndex, asyncResponseTransformer)) {
123+
requestMoreIfNeeded(completedParts.get());
124+
}
125+
});
126+
}
127+
128+
private boolean validatePart(GetObjectResponse response, int partIndex,
129+
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
130+
int totalComplete = completedParts.get();
131+
log.debug(() -> String.format("Completed part %d", totalComplete));
132+
133+
String responseETag = response.eTag();
134+
String responseContentRange = response.contentRange();
135+
if (eTag == null) {
136+
this.eTag = responseETag;
137+
log.debug(() -> String.format("Multipart object ETag: %s", this.eTag));
138+
}
139+
140+
Optional<SdkClientException> validationError = validateResponse(response, partIndex);
141+
if (validationError.isPresent()) {
142+
log.debug(() -> "Response validation failed", validationError.get());
143+
asyncResponseTransformer.exceptionOccurred(validationError.get());
144+
handleError(validationError.get());
145+
return false;
146+
}
147+
148+
if (totalContentLength == null && responseContentRange != null) {
149+
Optional<Long> parsedContentLength = MultipartDownloadUtils.parseContentRangeForTotalSize(responseContentRange);
150+
if (!parsedContentLength.isPresent()) {
151+
SdkClientException error = PresignedUrlDownloadHelper.invalidContentRangeHeader(responseContentRange);
152+
log.debug(() -> "Failed to parse content range", error);
153+
asyncResponseTransformer.exceptionOccurred(error);
154+
handleError(error);
155+
return false;
156+
}
157+
158+
this.totalContentLength = parsedContentLength.get();
159+
this.totalParts = calculateTotalParts(totalContentLength, configuredPartSizeInBytes);
160+
log.debug(() -> String.format("Total content length: %d, Total parts: %d", totalContentLength, totalParts));
161+
}
162+
return true;
163+
}
164+
165+
private void requestMoreIfNeeded(int totalComplete) {
166+
synchronized (lock) {
167+
if (hasMoreParts(totalComplete)) {
168+
subscription.request(1);
169+
} else {
170+
if (totalParts != null && requestsSent.get() != totalParts) {
171+
handleError(new IllegalStateException(
172+
"Request count mismatch. Expected: " + totalParts + ", sent: " + requestsSent.get()));
173+
return;
174+
}
175+
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
176+
subscription.cancel();
177+
}
178+
}
179+
}
180+
181+
private Optional<SdkClientException> validateResponse(GetObjectResponse response, int partIndex) {
182+
if (response == null) {
183+
return Optional.of(SdkClientException.create("Response cannot be null"));
184+
}
185+
String contentRange = response.contentRange();
186+
if (contentRange == null) {
187+
return Optional.of(PresignedUrlDownloadHelper.missingContentRangeHeader());
188+
}
189+
190+
Long contentLength = response.contentLength();
191+
if (contentLength == null || contentLength < 0) {
192+
return Optional.of(PresignedUrlDownloadHelper.invalidContentLength());
193+
}
194+
195+
long expectedStartByte = partIndex * configuredPartSizeInBytes;
196+
long expectedEndByte;
197+
if (totalContentLength != null) {
198+
expectedEndByte = Math.min(expectedStartByte + configuredPartSizeInBytes - 1, totalContentLength - 1);
199+
} else {
200+
expectedEndByte = expectedStartByte + configuredPartSizeInBytes - 1;
201+
}
202+
String expectedRange = "bytes " + expectedStartByte + "-" + expectedEndByte + "/";
203+
if (!contentRange.startsWith(expectedRange)) {
204+
return Optional.of(SdkClientException.create(
205+
"Content-Range mismatch. Expected range starting with: " + expectedRange +
206+
", but got: " + contentRange));
207+
}
208+
209+
long expectedPartSize;
210+
if (totalContentLength != null && partIndex == totalParts - 1) {
211+
expectedPartSize = totalContentLength - (partIndex * configuredPartSizeInBytes);
212+
} else {
213+
expectedPartSize = configuredPartSizeInBytes;
214+
}
215+
if (!contentLength.equals(expectedPartSize)) {
216+
return Optional.of(SdkClientException.create(
217+
String.format("Part content length validation failed for part %d. Expected: %d, but got: %d",
218+
partIndex, expectedPartSize, contentLength)));
219+
}
220+
221+
long actualStartByte = MultipartDownloadUtils.parseStartByteFromContentRange(contentRange);
222+
if (actualStartByte != expectedStartByte) {
223+
return Optional.of(SdkClientException.create(
224+
"Content range offset mismatch for part " + partIndex +
225+
". Expected start: " + expectedStartByte + ", but got: " + actualStartByte));
226+
}
227+
228+
return Optional.empty();
229+
}
230+
231+
private int calculateTotalParts(long contentLength, long partSize) {
232+
return (int) Math.ceil((double) contentLength / partSize);
233+
}
234+
235+
private boolean hasMoreParts(int completedPartsCount) {
236+
return totalParts != null && completedPartsCount < totalParts;
237+
}
238+
239+
private PresignedUrlDownloadRequest createRangedGetRequest(int partIndex) {
240+
long startByte = partIndex * configuredPartSizeInBytes;
241+
long endByte;
242+
if (totalContentLength != null) {
243+
endByte = Math.min(startByte + configuredPartSizeInBytes - 1, totalContentLength - 1);
244+
} else {
245+
endByte = startByte + configuredPartSizeInBytes - 1;
246+
}
247+
String rangeHeader = BYTES_RANGE_PREFIX + startByte + "-" + endByte;
248+
PresignedUrlDownloadRequest.Builder builder = presignedUrlDownloadRequest.toBuilder()
249+
.range(rangeHeader);
250+
if (partIndex > 0 && eTag != null) {
251+
builder.ifMatch(eTag);
252+
}
253+
return builder.build();
254+
}
255+
256+
private void handleError(Throwable t) {
257+
synchronized (lock) {
258+
if (subscription != null) {
259+
subscription.cancel();
260+
}
261+
}
262+
onError(t);
263+
}
264+
265+
@Override
266+
public void onError(Throwable t) {
267+
log.debug(() -> "Error in multipart download", t);
268+
future.completeExceptionally(t);
269+
}
270+
271+
@Override
272+
public void onComplete() {
273+
future.complete(null);
274+
}
275+
276+
}

0 commit comments

Comments
 (0)