Skip to content

Commit 9902ec8

Browse files
committed
PR comments
- renamed EmittingSubscription, mark it ThreadSafe - Added comments - some other renaming
1 parent af94ef1 commit 9902ec8

4 files changed

Lines changed: 39 additions & 29 deletions

File tree

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ThreadSafeEmittingSubscription.java renamed to core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/EmittingSubscription.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,18 @@
2121
import org.reactivestreams.Subscriber;
2222
import org.reactivestreams.Subscription;
2323
import software.amazon.awssdk.annotations.SdkInternalApi;
24-
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
24+
import software.amazon.awssdk.annotations.ThreadSafe;
2525
import software.amazon.awssdk.utils.Logger;
2626

27+
/**
28+
* Subscription which can emit {@link Subscriber#onNext(T)} signals to a subscriber, based on the demand received with the
29+
* {@link Subscription#request(long)}. It tracks the outstandingDemand that has not yet been fulfilled and used a Supplier
30+
* passed to it to create the object it needs to emit.
31+
* @param <T> the type of obejct to emit to the subscriber.
32+
*/
2733
@SdkInternalApi
28-
public class ThreadSafeEmittingSubscription<T> implements Subscription {
34+
@ThreadSafe
35+
public final class EmittingSubscription<T> implements Subscription {
2936

3037
private Subscriber<? super T> downstreamSubscriber;
3138
private final AtomicBoolean emitting = new AtomicBoolean(false);
@@ -36,7 +43,7 @@ public class ThreadSafeEmittingSubscription<T> implements Subscription {
3643
private final Logger log;
3744

3845

39-
private ThreadSafeEmittingSubscription(Builder<T> builder) {
46+
private EmittingSubscription(Builder<T> builder) {
4047
this.downstreamSubscriber = builder.downstreamSubscriber;
4148
this.outstandingDemand = builder.outstandingDemand;
4249
this.onCancel = builder.onCancel;
@@ -113,7 +120,7 @@ public static class Builder<T> {
113120
private Subscriber<? super T> downstreamSubscriber;
114121
private AtomicLong outstandingDemand = new AtomicLong(0);
115122
private AtomicBoolean isCancelled = new AtomicBoolean(false);
116-
private Logger log = Logger.loggerFor(ThreadSafeEmittingSubscription.class);
123+
private Logger log = Logger.loggerFor(EmittingSubscription.class);
117124
private Runnable onCancel;
118125
private Supplier<T> supplier;
119126

@@ -147,8 +154,8 @@ public Builder<T> supplier(Supplier<T> supplier) {
147154
return this;
148155
}
149156

150-
public ThreadSafeEmittingSubscription<T> build() {
151-
return new ThreadSafeEmittingSubscription<>(this);
157+
public EmittingSubscription<T> build() {
158+
return new EmittingSubscription<>(this);
152159
}
153160
}
154161

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,14 @@ public FileAsyncResponseTransformerPublisher(FileAsyncResponseTransformer<?> res
6060

6161
@Override
6262
public void subscribe(Subscriber<? super AsyncResponseTransformer<T, T>> s) {
63-
if (s == null) {
64-
throw new NullPointerException("Subscription must not be null");
65-
}
63+
Validate.notNull(s, "Subscriber must not be null");
6664
this.subscriber = s;
67-
s.onSubscribe(ThreadSafeEmittingSubscription.<AsyncResponseTransformer<T, T>>builder()
68-
.downstreamSubscriber(s)
69-
.onCancel(this::onCancel)
70-
.log(log)
71-
.supplier(this::createTransformer)
72-
.build());
65+
s.onSubscribe(EmittingSubscription.<AsyncResponseTransformer<T, T>>builder()
66+
.downstreamSubscriber(s)
67+
.onCancel(this::onCancel)
68+
.log(log)
69+
.supplier(this::createTransformer)
70+
.build());
7371
}
7472

7573
private AsyncResponseTransformer<T, T> createTransformer() {
@@ -110,8 +108,7 @@ public void onResponse(T response) {
110108
if (!contentRangeList.isPresent()) {
111109
if (subscriber != null) {
112110
IllegalStateException e = new IllegalStateException("Content range header is missing");
113-
subscriber.onError(e);
114-
future.completeExceptionally(e);
111+
handleError(e);
115112
}
116113
return;
117114
}
@@ -121,8 +118,7 @@ public void onResponse(T response) {
121118
if (!contentRangePair.isPresent()) {
122119
if (subscriber != null) {
123120
IllegalStateException e = new IllegalStateException("Could not parse content range header " + contentRange);
124-
subscriber.onError(e);
125-
future.completeExceptionally(e);
121+
handleError(e);
126122
}
127123
return;
128124
}
@@ -135,6 +131,11 @@ public void onResponse(T response) {
135131
delegate.onResponse(response);
136132
}
137133

134+
private void handleError(Throwable e) {
135+
subscriber.onError(e);
136+
future.completeExceptionally(e);
137+
}
138+
138139
private AsyncResponseTransformer<T, T> getDelegateTransformer(Long startAt) {
139140
if (transformerCount.get() == 0) {
140141
return AsyncResponseTransformer.toFile(path, initialConfig);
@@ -154,7 +155,7 @@ private AsyncResponseTransformer<T, T> getDelegateTransformer(Long startAt) {
154155
.position(initialOffset + startAt));
155156
return AsyncResponseTransformer.toFile(path, newConfig);
156157
}
157-
// APPEND mode is not supported for non-serial operations,
158+
// As per design specification, APPEND mode is not supported for non-serial operations
158159
case CREATE_OR_APPEND_TO_EXISTING:
159160
default:
160161
throw new UnsupportedOperationException("Unsupported fileWriteOption: " + initialConfig.fileWriteOption());

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartConfigurationResolver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
public final class MultipartConfigurationResolver {
2828

2929
private static final long DEFAULT_MIN_PART_SIZE = 8L * 1024 * 1024;
30+
31+
// Using 50 as the default since maxConcurrency for http client is also 50
3032
private static final int DEFAULT_MAX_IN_FLIGHT_PARTS = 50;
3133

3234
private final long minimalPartSizeInBytes;

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public class NonLinearMultipartDownloaderSubscriber
122122
/**
123123
* Tracks if the first request was sent
124124
*/
125-
private final AtomicBoolean firstRequestSent = new AtomicBoolean(false);
125+
private final AtomicBoolean isFirstRequestSent = new AtomicBoolean(false);
126126

127127
/**
128128
* Tracks the total number of transformers requested from the subscription
@@ -139,7 +139,7 @@ public class NonLinearMultipartDownloaderSubscriber
139139
* attempted for that part, but it still failed. This is a failure state, the error should be reported back to the user
140140
* and any more request should be ignored.
141141
*/
142-
private final AtomicBoolean completedExceptionally = new AtomicBoolean(false);
142+
private final AtomicBoolean isCompletedExceptionally = new AtomicBoolean(false);
143143

144144
public NonLinearMultipartDownloaderSubscriber(S3AsyncClient s3,
145145
GetObjectRequest getObjectRequest,
@@ -185,7 +185,7 @@ public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse
185185

186186
private void executeRequestOrAddToPending(
187187
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
188-
if (handleFirstRequest(asyncResponseTransformer)) {
188+
if (handleFirstRequestOrEnqueueTransformer(asyncResponseTransformer)) {
189189
return;
190190
}
191191

@@ -205,16 +205,16 @@ private void executeRequestOrAddToPending(
205205
* We need to wait for the first request to finish so we know if it is aa multipart object or not.
206206
* While we don't know yet, additional onNext signal receives are stored in pendingTransformers.
207207
*/
208-
private boolean handleFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
208+
private boolean handleFirstRequestOrEnqueueTransformer(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
209209
if (completedParts.get() != 0) {
210210
return false;
211211
}
212212

213213
firstPartLock.lock();
214214
try {
215-
if (!firstRequestSent.get()) {
215+
if (!isFirstRequestSent.get()) {
216216
sendFirstRequest(asyncResponseTransformer);
217-
firstRequestSent.set(true);
217+
isFirstRequestSent.set(true);
218218
return true;
219219
}
220220

@@ -251,7 +251,7 @@ private void sendNextRequest(AsyncResponseTransformer<GetObjectResponse, GetObje
251251
inFlightRequests.remove(partToGet);
252252

253253
completedParts.incrementAndGet();
254-
if (e != null || completedExceptionally.get()) {
254+
if (e != null || isCompletedExceptionally.get()) {
255255
// Note on retries: When this future completes exceptionally, it means we did all retries and still failed for
256256
// that part. We need to report back the failure to the user.
257257
handlePartError(e, partToGet);
@@ -286,7 +286,7 @@ private void sendFirstRequest(AsyncResponseTransformer<GetObjectResponse, GetObj
286286
inFlightRequests.remove(1);
287287

288288
completedParts.incrementAndGet();
289-
if (e != null || completedExceptionally.get()) {
289+
if (e != null || isCompletedExceptionally.get()) {
290290
// Note on retries: When this future completes exceptionally, it means we did all retries and still failed for
291291
// that part. We need to report back the failure to the user.
292292
handlePartError(e, 1);
@@ -337,7 +337,7 @@ private void setInitialPartCountAndEtag(GetObjectResponse response) {
337337
}
338338

339339
private void handlePartError(Throwable e, int part) {
340-
completedExceptionally.set(true);
340+
isCompletedExceptionally.set(true);
341341
log.error(() -> "Error on part " + part + ": " + e);
342342
resultFuture.completeExceptionally(e);
343343
inFlightRequests.values().forEach(future -> future.cancel(true));

0 commit comments

Comments
 (0)