1+ /*
2+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License").
5+ * You may not use this file except in compliance with the License.
6+ * A copy of the License is located at
7+ *
8+ * http://aws.amazon.com/apache2.0
9+ *
10+ * or in the "license" file accompanying this file. This file is distributed
11+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+ * express or implied. See the License for the specific language governing
13+ * permissions and limitations under the License.
14+ */
15+
16+ package software .amazon .awssdk .services .s3 .internal .multipart ;
17+
18+ import java .util .concurrent .CompletableFuture ;
19+ import java .util .concurrent .atomic .AtomicInteger ;
20+ import java .util .regex .Matcher ;
21+ import java .util .regex .Pattern ;
22+ import org .reactivestreams .Subscriber ;
23+ import org .reactivestreams .Subscription ;
24+ import software .amazon .awssdk .annotations .Immutable ;
25+ import software .amazon .awssdk .annotations .SdkInternalApi ;
26+ import software .amazon .awssdk .annotations .ThreadSafe ;
27+ import software .amazon .awssdk .core .async .AsyncResponseTransformer ;
28+ import software .amazon .awssdk .services .s3 .S3AsyncClient ;
29+ import software .amazon .awssdk .services .s3 .model .GetObjectResponse ;
30+ import software .amazon .awssdk .services .s3 .presignedurl .model .PresignedUrlDownloadRequest ;
31+ import software .amazon .awssdk .utils .Logger ;
32+
33+ /**
34+ * A subscriber implementation that will download all individual parts for a multipart presigned URL download request.
35+ * It receives individual {@link AsyncResponseTransformer} instances which will be used to perform the individual
36+ * range-based part requests using presigned URLs. This is a 'one-shot' class, it should <em>NOT</em> be reused
37+ * for more than one multipart download.
38+ *
39+ * <p>Unlike the standard {@link MultipartDownloaderSubscriber} which uses S3's native multipart API with part numbers,
40+ * this subscriber uses HTTP range requests against presigned URLs to achieve multipart download functionality.
41+ * <p>This implementation is thread-safe and handles concurrent part downloads while maintaining proper
42+ * ordering and validation of responses.</p>
43+ */
44+ @ ThreadSafe
45+ @ Immutable
46+ @ SdkInternalApi
47+ public class PresignedUrlMultipartDownloaderSubscriber
48+ implements Subscriber <AsyncResponseTransformer <GetObjectResponse , GetObjectResponse >> {
49+
50+ private static final Logger log = Logger .loggerFor (PresignedUrlMultipartDownloaderSubscriber .class );
51+ private static final String BYTES_RANGE_PREFIX = "bytes=" ;
52+ private static final Pattern CONTENT_RANGE_PATTERN = Pattern .compile ("bytes\\ s+(\\ d+)-(\\ d+)/(\\ d+)" );
53+
54+ private final S3AsyncClient s3AsyncClient ;
55+ private final PresignedUrlDownloadRequest presignedUrlDownloadRequest ;
56+ private final long configuredPartSizeInBytes ;
57+ private final int completedParts ;
58+ private final CompletableFuture <Void > future ;
59+ private final Object lock = new Object ();
60+ private volatile MultipartDownloadState state ;
61+ private Subscription subscription ;
62+
63+ private static class MultipartDownloadState {
64+ final long totalContentLength ;
65+ final long actualPartSizeInBytes ;
66+ final int totalParts ;
67+ final AtomicInteger completedParts ;
68+ final String etag ;
69+
70+ MultipartDownloadState (long totalLength , long partSize , int totalParts , String etag , int completedParts ) {
71+ this .totalContentLength = totalLength ;
72+ this .actualPartSizeInBytes = partSize ;
73+ this .totalParts = totalParts ;
74+ this .completedParts = new AtomicInteger (completedParts );
75+ this .etag = etag ;
76+ }
77+ }
78+
79+ public PresignedUrlMultipartDownloaderSubscriber (
80+ S3AsyncClient s3AsyncClient ,
81+ PresignedUrlDownloadRequest presignedUrlDownloadRequest ,
82+ long configuredPartSizeInBytes ) {
83+ this .s3AsyncClient = s3AsyncClient ;
84+ this .presignedUrlDownloadRequest = presignedUrlDownloadRequest ;
85+ this .configuredPartSizeInBytes = configuredPartSizeInBytes ;
86+ this .completedParts = 0 ;
87+ this .future = new CompletableFuture <>();
88+ }
89+
90+ @ Override
91+ public void onSubscribe (Subscription s ) {
92+ synchronized (lock ) {
93+ if (subscription != null ) {
94+ s .cancel ();
95+ return ;
96+ }
97+ this .subscription = s ;
98+ s .request (1 );
99+ }
100+ }
101+
102+ @ Override
103+ public void onNext (AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > asyncResponseTransformer ) {
104+ if (asyncResponseTransformer == null ) {
105+ subscription .cancel ();
106+ throw new NullPointerException ("onNext must not be called with null asyncResponseTransformer" );
107+ }
108+ synchronized (lock ) {
109+ if (state == null ) {
110+ performSizeDiscoveryAndFirstPart (asyncResponseTransformer );
111+ } else {
112+ downloadNextPart (asyncResponseTransformer );
113+ }
114+ }
115+ }
116+
117+ private void performSizeDiscoveryAndFirstPart (AsyncResponseTransformer <GetObjectResponse ,
118+ GetObjectResponse > asyncResponseTransformer ) {
119+ if (completedParts > 0 ) {
120+ performSizeDiscoveryOnly (asyncResponseTransformer );
121+ return ;
122+ }
123+ long endByte = configuredPartSizeInBytes - 1 ;
124+ String firstPartRange = String .format ("%s0-%d" , BYTES_RANGE_PREFIX , endByte );
125+ PresignedUrlDownloadRequest firstPartRequest = presignedUrlDownloadRequest .toBuilder ()
126+ .range (firstPartRange )
127+ .build ();
128+ s3AsyncClient .presignedUrlExtension ().getObject (firstPartRequest , asyncResponseTransformer )
129+ .whenComplete ((response , error ) -> {
130+ if (error != null ) {
131+ log .debug (() -> "Error encountered during first part request" );
132+ onError (error );
133+ return ;
134+ }
135+ try {
136+ String contentRange = response .contentRange ();
137+ if (contentRange == null ) {
138+ onError (new IllegalStateException ("No Content-Range header in response" ));
139+ return ;
140+ }
141+ long totalSize = parseContentRangeForTotalSize (contentRange );
142+ if (totalSize <= configuredPartSizeInBytes ) {
143+ subscription .cancel ();
144+ return ;
145+ }
146+ String etag = response .eTag ();
147+ initializeStateAfterFirstPart (totalSize , etag );
148+ if (state .totalParts > 1 ) {
149+ subscription .request (1 );
150+ } else {
151+ subscription .cancel ();
152+ }
153+ } catch (Exception e ) {
154+ log .debug (() -> "Error during first part processing" , e );
155+ onError (e );
156+ }
157+ });
158+ }
159+
160+ private void performSizeDiscoveryOnly (
161+ AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > asyncResponseTransformer ) {
162+ String sizeDiscoveryRange = String .format ("%s0-0" , BYTES_RANGE_PREFIX );
163+ PresignedUrlDownloadRequest sizeDiscoveryRequest = presignedUrlDownloadRequest .toBuilder ()
164+ .range (sizeDiscoveryRange )
165+ .build ();
166+
167+ s3AsyncClient .presignedUrlExtension ().getObject (sizeDiscoveryRequest , asyncResponseTransformer )
168+ .whenComplete ((response , error ) -> {
169+ if (error != null ) {
170+ log .debug (() -> "Error encountered during size discovery request" );
171+ onError (error );
172+ return ;
173+ }
174+ try {
175+ String contentRange = response .contentRange ();
176+ if (contentRange == null ) {
177+ onError (new IllegalStateException ("No Content-Range header in response" ));
178+ return ;
179+ }
180+ long totalSize = parseContentRangeForTotalSize (contentRange );
181+ String etag = response .eTag ();
182+ if (etag == null ) {
183+ onError (new IllegalStateException ("No ETag in response, cannot ensure consistency" ));
184+ return ;
185+ }
186+ int totalParts = calculateTotalParts (totalSize , configuredPartSizeInBytes );
187+ this .state = new MultipartDownloadState (totalSize , configuredPartSizeInBytes ,
188+ totalParts , etag , completedParts );
189+ if (completedParts < state .totalParts ) {
190+ subscription .request (1 );
191+ } else {
192+ subscription .cancel ();
193+ }
194+ } catch (Exception e ) {
195+ log .debug (() -> "Error during size discovery processing" , e );
196+ onError (e );
197+ }
198+ });
199+ }
200+
201+ private void downloadNextPart (AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > transformer ) {
202+ int nextPartIndex = state .completedParts .getAndIncrement ();
203+ if (nextPartIndex >= state .totalParts ) {
204+ subscription .cancel ();
205+ return ;
206+ }
207+ PresignedUrlDownloadRequest partRequest = createPartRequest (nextPartIndex );
208+ String expectedRange = partRequest .range ();
209+ s3AsyncClient .presignedUrlExtension ().getObject (partRequest , transformer )
210+ .whenComplete ((response , error ) -> {
211+ if (error != null ) {
212+ log .debug (() -> "Error encountered during part request with range=" + expectedRange );
213+ onError (error );
214+ } else {
215+ try {
216+ validatePartResponse (response , nextPartIndex , expectedRange );
217+ int completedCount = nextPartIndex + 1 ;
218+ if (completedCount < state .totalParts ) {
219+ subscription .request (1 );
220+ } else {
221+ subscription .cancel ();
222+ }
223+ } catch (Exception validationError ) {
224+ log .debug (() -> "Validation failed for part " + (nextPartIndex + 1 ));
225+ onError (validationError );
226+ }
227+ }
228+ });
229+ }
230+
231+ private void initializeStateAfterFirstPart (long totalSize , String etag ) {
232+ int totalParts = calculateTotalParts (totalSize , configuredPartSizeInBytes );
233+ this .state = new MultipartDownloadState (totalSize , configuredPartSizeInBytes , totalParts , etag , completedParts + 1 );
234+ }
235+
236+ private long parseContentRangeForTotalSize (String contentRange ) {
237+ Matcher matcher = CONTENT_RANGE_PATTERN .matcher (contentRange );
238+ if (!matcher .matches ()) {
239+ throw new IllegalArgumentException ("Invalid Content-Range header: " + contentRange );
240+ }
241+ return Long .parseLong (matcher .group (3 ));
242+ }
243+
244+ private int calculateTotalParts (long contentLength , long partSize ) {
245+ return (int ) Math .ceil ((double ) contentLength / partSize );
246+ }
247+
248+ private PresignedUrlDownloadRequest createPartRequest (int partIndex ) {
249+ long startByte = partIndex * state .actualPartSizeInBytes ;
250+ long endByte = Math .min (startByte + state .actualPartSizeInBytes - 1 , state .totalContentLength - 1 );
251+ String rangeHeader = BYTES_RANGE_PREFIX + startByte + "-" + endByte ;
252+
253+ return presignedUrlDownloadRequest .toBuilder ()
254+ .range (rangeHeader )
255+ .build ();
256+ }
257+
258+ @ Override
259+ public void onError (Throwable t ) {
260+ log .debug (() -> "Error in multipart download" , t );
261+ future .completeExceptionally (t );
262+ }
263+
264+ @ Override
265+ public void onComplete () {
266+ future .complete (null );
267+ }
268+
269+ public CompletableFuture <Void > future () {
270+ return this .future ;
271+ }
272+
273+ private void validatePartResponse (GetObjectResponse response , int partIndex , String expectedRange ) {
274+ if (response == null ) {
275+ throw new IllegalArgumentException ("Response cannot be null" );
276+ }
277+ String responseETag = response .eTag ();
278+ if (responseETag != null && state .etag != null && !state .etag .equals (responseETag )) {
279+ throw new IllegalStateException ("ETag mismatch - object may have changed during download" );
280+ }
281+ }
282+ }
0 commit comments