3333import software .amazon .awssdk .utils .CompletableFutureUtils ;
3434import software .amazon .awssdk .utils .Logger ;
3535
36+ /**
37+ * A subscriber implementation that will download all individual parts for a multipart get-object request in parallel,
38+ * concurrently. The amount of concurrent get-object is limited by the {@code maxInFlightParts} configuration. It receives the
39+ * individual {@link AsyncResponseTransformer} which will be used to perform the individual part requests. These
40+ * AsyncResponseTransformer should be able to handle receiving data in parts potentially out of order, For example, the
41+ * AsyncResponseTransformer for part 4 might may have any of its callback called before part 1, 2 or 3 if it finishes before. This
42+ * is a 'one-shot' class, it should <em>NOT</em> be reused for more than one multipart download.
43+ */
3644@ SdkInternalApi
3745public class NonLinearMultipartDownloaderSubscriber
3846 implements Subscriber <AsyncResponseTransformer <GetObjectResponse , GetObjectResponse >> {
@@ -41,7 +49,7 @@ public class NonLinearMultipartDownloaderSubscriber
4149 /**
4250 * Maximum number of concurrent GetObject requests
4351 */
44- private final int maxInFlight ;
52+ private final int maxInFlightParts ;
4553
4654 /**
4755 * The s3 client used to make the individual part requests
@@ -61,7 +69,7 @@ public class NonLinearMultipartDownloaderSubscriber
6169 private final AtomicInteger completedParts = new AtomicInteger ();
6270
6371 /**
64- * Queue of all remaining parts in ascending order. i.e. 4, 5, 6, 8, 11
72+ * Queue of all remaining parts in ascending order. i.e. 4, 5, 6, 8, 11, etc
6573 */
6674 private final Queue <Integer > allRemainingParts = new ArrayDeque <>();
6775
@@ -107,8 +115,7 @@ public class NonLinearMultipartDownloaderSubscriber
107115 private final ReentrantLock firstPartLock = new ReentrantLock ();
108116
109117 /**
110- * Pending transformers waiting for the first request to be executed. We cant execute multiple part get until the first
111- * requests completes, and we know the object in s3 is a multipart object.
118+ * Pending transformers received through onNext that are waiting to be executed.
112119 */
113120 private final Queue <AsyncResponseTransformer <GetObjectResponse , GetObjectResponse >> pendingTransformers = new ArrayDeque <>();
114121
@@ -134,7 +141,7 @@ public NonLinearMultipartDownloaderSubscriber(S3AsyncClient s3,
134141 this .s3 = s3 ;
135142 this .getObjectRequest = getObjectRequest ;
136143 this .resultFuture = resultFuture ;
137- this .maxInFlight = maxInFLight ;
144+ this .maxInFlightParts = maxInFLight ;
138145 }
139146
140147 @ Override
@@ -155,14 +162,13 @@ private void request(int amount) {
155162
156163 @ Override
157164 public void onNext (AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > asyncResponseTransformer ) {
158- // todo change to trace log
159165 outstandingDemand .decrementAndGet ();
160- log .info (() -> "=== On Next ===\n Total in flight parts: " + inFlightRequests .size ()
161- + "\n Outstanding Demand : " + outstandingDemand .get ()
162- + "\n Total completed parts: " + completedParts
163- + "\n Total transformers requested: " + transformersRequested .get ()
164- + "\n Total pending transformers: " + pendingTransformers .size ()
165- + "\n Current in flight requests: " + inFlightRequests .keySet ());
166+ log .trace (() -> "=== On Next ===\n Total in flight parts: " + inFlightRequests .size ()
167+ + "\n Outstanding Demand : " + outstandingDemand .get ()
168+ + "\n Total completed parts: " + completedParts
169+ + "\n Total transformers requested: " + transformersRequested .get ()
170+ + "\n Total pending transformers: " + pendingTransformers .size ()
171+ + "\n Current in flight requests: " + inFlightRequests .keySet ());
166172 if (asyncResponseTransformer == null ) {
167173 subscription .cancel ();
168174 throw new NullPointerException ("onNext must not be called with null asyncResponseTransformer" );
@@ -175,7 +181,7 @@ private void executeRequestOrAddToPending(AsyncResponseTransformer<GetObjectResp
175181 return ;
176182 }
177183
178- if (inFlightRequests .size () >= maxInFlight ) {
184+ if (inFlightRequests .size () >= maxInFlightParts ) {
179185 pendingTransformers .offer (asyncResponseTransformer );
180186 return ;
181187 }
@@ -218,16 +224,15 @@ private void sendNextRequest(AsyncResponseTransformer<GetObjectResponse, GetObje
218224 Integer partToGet = nextPart ();
219225
220226 GetObjectRequest request = nextRequest (partToGet );
221- // todo change to trace log
222- log .info (() -> "Sending next request for part: " + partToGet );
227+ log .debug (() -> "Sending next request for part: " + partToGet );
223228
224229 CompletableFuture <GetObjectResponse > response = s3 .getObject (request , asyncResponseTransformer );
225230
226231 inFlightRequests .put (partToGet , response );
227232 CompletableFutureUtils .forwardExceptionTo (resultFuture , response );
228233
229234 response .whenComplete ((res , e ) -> {
230- log .info (() -> "Completed part: " + partToGet );
235+ log .debug (() -> "Completed part: " + partToGet );
231236 inFlightRequests .remove (partToGet );
232237
233238 completedParts .incrementAndGet ();
@@ -240,8 +245,6 @@ private void sendNextRequest(AsyncResponseTransformer<GetObjectResponse, GetObje
240245 return ;
241246 }
242247 if (completedParts .get () >= totalParts ) {
243- // todo remove log
244- log .info (() -> "================= ALL PARTS COMPLETED ==================" );
245248 subscription .cancel ();
246249 resultFuture .complete (getObjectResponse );
247250 } else {
@@ -253,7 +256,7 @@ private void sendNextRequest(AsyncResponseTransformer<GetObjectResponse, GetObje
253256
254257 // returns true if the first request was sent and is still in flight
255258 private void sendFirstRequest (AsyncResponseTransformer <GetObjectResponse , GetObjectResponse > asyncResponseTransformer ) {
256- log .info (() -> "Sending first request" );
259+ log .debug (() -> "Sending first request" );
257260 GetObjectRequest request = nextRequest (1 );
258261 CompletableFuture <GetObjectResponse > responseFuture = s3 .getObject (request , asyncResponseTransformer );
259262 inFlightRequests .put (1 , responseFuture );
@@ -262,7 +265,8 @@ private void sendFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObj
262265 CompletableFutureUtils .forwardExceptionTo (resultFuture , responseFuture );
263266
264267 responseFuture .whenComplete ((res , e ) -> {
265- inFlightRequests .remove (1 );
268+ log .debug (() -> "Completed part: 1" );
269+ inFlightRequests .remove (1 );
266270
267271 completedParts .incrementAndGet ();
268272 if (e != null ) {
@@ -275,8 +279,7 @@ private void sendFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObj
275279 Integer partCount = res .partsCount ();
276280 eTag = res .eTag ();
277281 if (partCount != null && totalParts == null ) {
278- // todo change debug log
279- log .info (() -> String .format ("Total amount of parts of the object to download: %d" , partCount ));
282+ log .debug (() -> String .format ("Total amount of parts of the object to download: %d" , partCount ));
280283 // MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest)
281284 // .ifPresent(ctx -> ctx.totalParts(partCount));
282285 totalParts = partCount ;
@@ -286,14 +289,13 @@ private void sendFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObj
286289 if (totalParts == null || totalParts == 1 ) {
287290 // Single part object detected, skip multipart and complete everything now
288291 // todo change debug log
289- log .info (() -> "Single Part object detected, skipping multipart download" );
292+ log .debug (() -> "Single Part object detected, skipping multipart download" );
290293 subscription .cancel ();
291294 resultFuture .complete (res );
292295 return ;
293296 }
294297
295- // todo change debug log
296- log .info (() -> "Multipart object detected, performing multipart download" );
298+ log .debug (() -> "Multipart object detected, performing multipart download" );
297299 // part 1 already completed, so skip it
298300 for (int i = 2 ; i <= totalParts ; i ++) {
299301 allRemainingParts .add (i );
@@ -330,9 +332,6 @@ private void requestMoreIfNeeded() {
330332 // Don't request more if we already have enough work in progress
331333 int totalWorkInProgress = inFlightRequests .size () + completedParts .get ();
332334 if (totalWorkInProgress >= totalParts ) {
333- // todo change trace log
334- // log.info(() -> "Not requesting more. Total work in progress (" + totalWorkInProgress
335- // + ") >= totalParts (" + totalParts + ")");
336335 return ;
337336 }
338337
@@ -343,22 +342,20 @@ private void requestMoreIfNeeded() {
343342 }
344343
345344 // don't request if we already have enough work in progress
346- if (inFlightRequests .size () >= maxInFlight ) {
345+ if (inFlightRequests .size () >= maxInFlightParts ) {
347346 return ;
348347 }
349348
350349 // don't request if we have already requested more work than we can handle
351- if (outstandingDemand .get () >= maxInFlight ) {
350+ if (outstandingDemand .get () >= maxInFlightParts ) {
352351 return ;
353352 }
354353
355354 int partsNeeded = Math .min (Math .min (totalParts - currentRequested , remainingParts ),
356- maxInFlight - inFlightRequests .size ());
355+ maxInFlightParts - inFlightRequests .size ());
357356 if (partsNeeded > 0 ) {
358- // todo change trace log
359- // log.info(() -> "Requesting " + partsNeeded + " more transformers. Total requested will be: "
360- // + (currentRequested + partsNeeded) + ". Remaining parts: " + remainingParts
361- // + ". InFlight: " + inFlightRequests.size());
357+ log .trace (() -> "Requesting " + partsNeeded + " more transformers. Total requested will be: "
358+ + (currentRequested + partsNeeded ));
362359 request (partsNeeded );
363360 }
364361
@@ -372,9 +369,9 @@ private Integer nextPart() {
372369
373370 @ Override
374371 public void onError (Throwable t ) {
375- // Signal received from the publisher this is subscribed to, in the case of file download, that's
376- // FileAsyncResponseTransformerPublisher.
377- // failed state, something really wrong has happened, cancel everything
372+ // Signal received from the publisher this is subscribed to
373+ // (in the case of file download, that's FileAsyncResponseTransformerPublisher)
374+ // Failed state, something really wrong has happened, cancel everything
378375 inFlightRequests .values ().forEach (future -> future .cancel (true ));
379376 inFlightRequests .clear ();
380377 resultFuture .completeExceptionally (t );
0 commit comments