4848import software .amazon .awssdk .utils .Pair ;
4949
5050@ SdkInternalApi
51- public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber <CloseableAsyncRequestBody > {
51+ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber <CloseableAsyncRequestBody > {
5252
5353 private static final Logger log = Logger .loggerFor (KnownContentLengthAsyncRequestBodySubscriber .class );
5454
@@ -70,6 +70,8 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
7070 private final AtomicReferenceArray <CompletedPart > completedParts ;
7171 private final Map <Integer , CompletedPart > existingParts ;
7272 private final PublisherListener <Long > progressListener ;
73+ private final int maxInFlightParts ;
74+ private final Object subscriptionLock = new Object ();
7375 private Subscription subscription ;
7476 private volatile boolean isDone ;
7577 private volatile boolean isPaused ;
@@ -80,8 +82,9 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
8082 private volatile CompletableFuture <CompleteMultipartUploadResponse > completeMpuFuture ;
8183
8284 KnownContentLengthAsyncRequestBodySubscriber (MpuRequestContext mpuRequestContext ,
83- CompletableFuture <PutObjectResponse > returnFuture ,
84- MultipartUploadHelper multipartUploadHelper ) {
85+ CompletableFuture <PutObjectResponse > returnFuture ,
86+ MultipartUploadHelper multipartUploadHelper ,
87+ int maxInFlightParts ) {
8588 this .totalSize = mpuRequestContext .contentLength ();
8689 this .partSize = mpuRequestContext .partSize ();
8790 this .expectedNumParts = mpuRequestContext .expectedNumParts ();
@@ -92,8 +95,10 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
9295 this .existingNumParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
9396 this .completedParts = new AtomicReferenceArray <>(expectedNumParts );
9497 this .multipartUploadHelper = multipartUploadHelper ;
95- this .progressListener = putObjectRequest .overrideConfiguration ().map (c -> c .executionAttributes ()
96- .getAttribute (JAVA_PROGRESS_LISTENER ))
98+ this .maxInFlightParts = maxInFlightParts ;
99+ this .progressListener = putObjectRequest .overrideConfiguration ()
100+ .map (c -> c .executionAttributes ()
101+ .getAttribute (JAVA_PROGRESS_LISTENER ))
97102 .orElseGet (PublisherListener ::noOp );
98103 }
99104
@@ -133,7 +138,7 @@ public void onSubscribe(Subscription s) {
133138 return ;
134139 }
135140 this .subscription = s ;
136- s .request (1 );
141+ s .request (maxInFlightParts );
137142 returnFuture .whenComplete ((r , t ) -> {
138143 if (t != null ) {
139144 s .cancel ();
@@ -153,23 +158,26 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
153158 int currentPartNum = partNumber .getAndIncrement ();
154159
155160 log .debug (() -> String .format ("Received asyncRequestBody for part number %d with length %s" , currentPartNum ,
156- asyncRequestBody .contentLength ()));
161+ asyncRequestBody .contentLength ()));
157162
158163 if (existingParts .containsKey (currentPartNum )) {
159164 asyncRequestBody .subscribe (new CancelledSubscriber <>());
160165 asyncRequestBody .contentLength ().ifPresent (progressListener ::subscriberOnNext );
161166 asyncRequestBody .close ();
162- subscription .request (1 );
167+
168+ synchronized (subscriptionLock ) {
169+ subscription .request (1 );
170+ }
163171 return ;
164172 }
165173
166174 Optional <SdkClientException > sdkClientException = validatePart (asyncRequestBody , currentPartNum );
167175 if (sdkClientException .isPresent ()) {
168176 multipartUploadHelper .failRequestsElegantly (futures ,
169- sdkClientException .get (),
170- uploadId ,
171- returnFuture ,
172- putObjectRequest );
177+ sdkClientException .get (),
178+ uploadId ,
179+ returnFuture ,
180+ putObjectRequest );
173181 subscription .cancel ();
174182 return ;
175183 }
@@ -179,8 +187,9 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
179187 currentPartNum ,
180188 uploadId );
181189
182- Consumer <CompletedPart > completedPartConsumer = completedPart -> completedParts .set (completedPart .partNumber () - 1 ,
183- completedPart );
190+ Consumer <CompletedPart > completedPartConsumer = completedPart -> completedParts .set (
191+ completedPart .partNumber () - 1 ,
192+ completedPart );
184193 multipartUploadHelper .sendIndividualUploadPartRequest (uploadId , completedPartConsumer , futures ,
185194 Pair .of (uploadRequest , asyncRequestBody ), progressListener )
186195 .whenComplete ((r , t ) -> {
@@ -192,10 +201,15 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
192201 subscription .cancel ();
193202 }
194203 } else {
195- completeMultipartUploadIfFinished (asyncRequestBodyInFlight .decrementAndGet ());
204+ int inFlight = asyncRequestBodyInFlight .decrementAndGet ();
205+ if (!isDone && inFlight < maxInFlightParts ) {
206+ synchronized (subscriptionLock ) {
207+ subscription .request (1 );
208+ }
209+ }
210+ completeMultipartUploadIfFinished (inFlight );
196211 }
197212 });
198- subscription .request (1 );
199213 }
200214
201215 private Optional <SdkClientException > validatePart (AsyncRequestBody asyncRequestBody , int currentPartNum ) {
@@ -258,10 +272,9 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
258272 CompletedPart [] parts ;
259273
260274 if (existingParts .isEmpty ()) {
261- parts =
262- IntStream .range (0 , completedParts .length ())
263- .mapToObj (completedParts ::get )
264- .toArray (CompletedPart []::new );
275+ parts = IntStream .range (0 , completedParts .length ())
276+ .mapToObj (completedParts ::get )
277+ .toArray (CompletedPart []::new );
265278 } else {
266279 // List of CompletedParts needs to be in ascending order
267280 parts = mergeCompletedParts ();
@@ -274,7 +287,8 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
274287 return ;
275288 }
276289
277- completeMpuFuture = multipartUploadHelper .completeMultipartUpload (returnFuture , uploadId , parts , putObjectRequest ,
290+ completeMpuFuture = multipartUploadHelper .completeMultipartUpload (returnFuture , uploadId , parts ,
291+ putObjectRequest ,
278292 totalSize );
279293 }
280294 }
@@ -283,8 +297,8 @@ private CompletedPart[] mergeCompletedParts() {
283297 CompletedPart [] merged = new CompletedPart [expectedNumParts ];
284298 int currPart = 1 ;
285299 while (currPart < expectedNumParts + 1 ) {
286- CompletedPart completedPart = existingParts .containsKey (currPart ) ? existingParts .get (currPart ) :
287- completedParts .get (currPart - 1 );
300+ CompletedPart completedPart = existingParts .containsKey (currPart ) ? existingParts .get (currPart )
301+ : completedParts .get (currPart - 1 );
288302 merged [currPart - 1 ] = completedPart ;
289303 currPart ++;
290304 }
0 commit comments