subscriptions;
+ private final Object lock = new Object();
+
+ /**
+ * Creates a new BufferingAsyncRequestBody with the specified content length.
+ *
+ * @param length the total content length in bytes, or null if unknown
+ */
+ BufferingAsyncRequestBody(Long length) {
+ this.length = length;
+ this.subscriptions = ConcurrentHashMap.newKeySet();
+ }
+
+ /**
+ * Sends a chunk of data to be buffered for later consumption by subscribers.
+ *
+ * @param data the data to buffer, must not be null
+ * @throws NullPointerException if data is null
+ */
+ public void send(ByteBuffer data) {
+ Validate.paramNotNull(data, "data");
+ synchronized (lock) {
+ if (closed) {
+ throw new IllegalStateException("Cannot send data to closed body");
+ }
+ if (dataReady) {
+ throw new IllegalStateException("Request body has already been completed");
+ }
+ bufferedData.add(data);
+ }
+ }
+
+ /**
+ * Marks the request body as complete and ready for subscription.
+ *
+ * This method must be called before any subscribers can successfully subscribe
+ * to this request body. After calling this method, no more data should be sent
+ * via {@link #send(ByteBuffer)}.
+ *
+ *
Once complete, multiple subscribers can subscribe and will each receive
+ * the complete buffered content independently.
+ */
+ public void complete() {
+ synchronized (lock) {
+ if (dataReady) {
+ return;
+ }
+ if (closed) {
+ throw new IllegalStateException("The AsyncRequestBody has been closed");
+ }
+ dataReady = true;
+ }
+ }
+
+ @Override
+ public Optional contentLength() {
+ return Optional.ofNullable(length);
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> subscriber) {
+ Validate.paramNotNull(subscriber, "subscriber");
+
+ synchronized (lock) {
+ if (!dataReady) {
+ subscriber.onSubscribe(new NoopSubscription(subscriber));
+ subscriber.onError(NonRetryableException.create(
+ "Unexpected error occurred. Data is not ready to be subscribed"));
+ return;
+ }
+
+ if (closed) {
+ subscriber.onSubscribe(new NoopSubscription(subscriber));
+ subscriber.onError(NonRetryableException.create(
+ "AsyncRequestBody has been closed"));
+ return;
+ }
+ }
+
+ ReplayableByteBufferSubscription replayableByteBufferSubscription =
+ new ReplayableByteBufferSubscription(subscriber, bufferedData);
+ subscriber.onSubscribe(replayableByteBufferSubscription);
+ subscriptions.add(replayableByteBufferSubscription);
+ }
+
+ @Override
+ public String body() {
+ return BodyType.BYTES.getName();
+ }
+
+ /**
+ * Closes this request body and releases all resources. This will:
+ *
+ * Clear all buffered data to free memory
+ * Notify all active subscribers with an error
+ * Prevent new subscriptions from succeeding
+ *
+ *
+ * This method is idempotent - calling it multiple times has no additional effect.
+ * It is safe to call this method concurrently from multiple threads.
+ */
+ @Override
+ public void close() {
+ synchronized (lock) {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
+ bufferedData.clear();
+ subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed")));
+ subscriptions.clear();
+ }
+
+ }
+
+ @SdkTestInternalApi
+ List bufferedData() {
+ return Collections.unmodifiableList(bufferedData);
+ }
+
+ private class ReplayableByteBufferSubscription implements Subscription {
+ private final AtomicInteger index = new AtomicInteger(0);
+ private final AtomicBoolean done = new AtomicBoolean(false);
+ private final List buffers;
+ private final AtomicBoolean processingRequest = new AtomicBoolean(false);
+ private Subscriber super ByteBuffer> currentSubscriber;
+ private final AtomicLong outstandingDemand = new AtomicLong();
+
+ private ReplayableByteBufferSubscription(Subscriber super ByteBuffer> subscriber, List buffers) {
+ this.buffers = buffers;
+ this.currentSubscriber = subscriber;
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ currentSubscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
+ currentSubscriber = null;
+ return;
+ }
+
+ if (done.get()) {
+ return;
+ }
+
+ outstandingDemand.updateAndGet(current -> {
+ if (Long.MAX_VALUE - current < n) {
+ return Long.MAX_VALUE;
+ }
+
+ return current + n;
+ });
+ processRequest();
+ }
+
+ private void processRequest() {
+ do {
+ if (!processingRequest.compareAndSet(false, true)) {
+ // Some other thread is processing the queue, so we don't need to.
+ return;
+ }
+
+ try {
+ doProcessRequest();
+ } catch (Throwable e) {
+ notifyError(new IllegalStateException("Encountered fatal error in publisher", e));
+ subscriptions.remove(this);
+ break;
+ } finally {
+ processingRequest.set(false);
+ }
+
+ } while (shouldProcessRequest());
+ }
+
+ private boolean shouldProcessRequest() {
+ return !done.get() && outstandingDemand.get() > 0 && index.get() < buffers.size();
+ }
+
+ private void doProcessRequest() {
+ while (true) {
+ if (!shouldProcessRequest()) {
+ return;
+ }
+
+ int currentIndex = this.index.getAndIncrement();
+
+ if (currentIndex >= buffers.size()) {
+ // This should never happen, but defensive programming
+ notifyError(new IllegalStateException("Index out of bounds"));
+ subscriptions.remove(this);
+ return;
+ }
+
+ ByteBuffer buffer = buffers.get(currentIndex);
+ currentSubscriber.onNext(buffer.asReadOnlyBuffer());
+ outstandingDemand.decrementAndGet();
+
+ if (currentIndex == buffers.size() - 1) {
+ done.set(true);
+ currentSubscriber.onComplete();
+ subscriptions.remove(this);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ done.set(true);
+ subscriptions.remove(this);
+ }
+
+ public void notifyError(Exception exception) {
+ if (currentSubscriber != null) {
+ done.set(true);
+ currentSubscriber.onError(exception);
+ currentSubscriber = null;
+ }
+ }
+ }
+}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
new file mode 100644
index 000000000000..cf5b14c090b5
--- /dev/null
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.core.internal.async;
+
+
+import java.nio.ByteBuffer;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.tck.TestEnvironment;
+import software.amazon.awssdk.core.SdkBytes;
+
+public class BufferingAsyncRequestBodyTckTest extends org.reactivestreams.tck.PublisherVerification {
+ public BufferingAsyncRequestBodyTckTest() {
+ super(new TestEnvironment(true));
+ }
+
+ @Override
+ public Publisher createPublisher(long elements) {
+ BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024 * elements);
+ for (int i = 0; i < elements; i++) {
+ bufferingAsyncRequestBody.send(SdkBytes.fromUtf8String(RandomStringUtils.randomAscii(1024)).asByteBuffer());
+ }
+
+ bufferingAsyncRequestBody.complete();
+ return bufferingAsyncRequestBody;
+ }
+
+ @Override
+ public Publisher createFailedPublisher() {
+ BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024L);
+ bufferingAsyncRequestBody.close();
+ return null;
+ }
+
+ public long maxElementsFromPublisher() {
+ return 100;
+ }
+
+}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
new file mode 100644
index 000000000000..ce3566e2de3f
--- /dev/null
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
@@ -0,0 +1,384 @@
+package software.amazon.awssdk.core.internal.async;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.core.exception.NonRetryableException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.utils.BinaryUtils;
+
+public class BufferingAsyncRequestBodyTest {
+
+ private static final ByteBuffer TEST_DATA_SET_1 =
+ ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8));
+ private static final ByteBuffer TEST_DATA_SET_2 =
+ ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8));
+
+ @Test
+ public void body_whenCalled_shouldReturnConstantBytes() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ assertEquals("Bytes", body.body());
+ }
+
+ @Test
+ public void close_whenCalledMultipleTimes_shouldExecuteOnlyOnce() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ Subscriber mockSubscriber = mock(Subscriber.class);
+
+ body.subscribe(mockSubscriber);
+
+ body.close();
+ assertThat(body.bufferedData()).isEmpty();
+ body.close();
+ body.close();
+
+ verify(mockSubscriber, times(1)).onError(any(NonRetryableException.class));
+ }
+
+ @ParameterizedTest
+ @MethodSource("contentLengthTestCases")
+ public void contentLength_withVariousInputs_shouldReturnExpectedResult(Long inputLength, boolean shouldBePresent,
+ Long expectedValue) {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(inputLength);
+ Optional result = body.contentLength();
+
+ assertThat(result.isPresent()).isEqualTo(shouldBePresent);
+ if (shouldBePresent) {
+ assertThat(result.get()).isEqualTo(expectedValue);
+ }
+ }
+
+ private static Stream contentLengthTestCases() {
+ return Stream.of(
+ Arguments.of(null, false, null),
+ Arguments.of(100L, true, 100L),
+ Arguments.of(0L, true, 0L)
+ );
+ }
+
+ @Test
+ public void send_whenByteBufferIsNull_shouldThrowNullPointerException() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+
+ assertThatThrownBy(() -> body.send(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("data must not be null");
+ }
+
+ @Test
+ public void subscribe_whenBodyIsClosed_shouldNotifySubscriberWithError() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.complete(); // Set dataReady to true
+ body.close(); // Set closed to true
+ Subscriber mockSubscriber = mock(Subscriber.class);
+
+ body.subscribe(mockSubscriber);
+
+ verify(mockSubscriber).onSubscribe(any());
+ verify(mockSubscriber).onError(argThat(e ->
+ e instanceof NonRetryableException &&
+ e.getMessage().equals("AsyncRequestBody has been closed")
+ ));
+ }
+
+ @Test
+ public void subscribe_whenDataNotReady_shouldNotifySubscriberWithError() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ Subscriber mockSubscriber = mock(Subscriber.class);
+
+ body.subscribe(mockSubscriber);
+
+ verify(mockSubscriber).onSubscribe(any());
+ verify(mockSubscriber).onError(argThat(e ->
+ e instanceof NonRetryableException &&
+ e.getMessage().equals("Unexpected error occurred. Data is not ready to be "
+ + "subscribed")
+ ));
+ }
+
+ @Test
+ public void subscribe_whenMultipleSubscribers_shouldSupportConcurrentSubscriptions() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.send(TEST_DATA_SET_1);
+ body.send(TEST_DATA_SET_2);
+ body.complete();
+
+ Subscriber firstSubscriber = mock(Subscriber.class);
+ Subscriber secondSubscriber = mock(Subscriber.class);
+
+ ArgumentCaptor firstSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
+ ArgumentCaptor secondSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
+
+ body.subscribe(firstSubscriber);
+ body.subscribe(secondSubscriber);
+
+ verify(firstSubscriber).onSubscribe(firstSubscriptionCaptor.capture());
+ verify(secondSubscriber).onSubscribe(secondSubscriptionCaptor.capture());
+
+ Subscription firstSubscription = firstSubscriptionCaptor.getValue();
+ Subscription secondSubscription = secondSubscriptionCaptor.getValue();
+
+ firstSubscription.request(2);
+ secondSubscription.request(2);
+ verifyData(firstSubscriber);
+ verifyData(secondSubscriber);
+ verify(firstSubscriber).onComplete();
+ verify(secondSubscriber).onComplete();
+ }
+
+ private static void verifyData(Subscriber subscriber) {
+ verify(subscriber).onNext(
+ argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_1.array())));
+ verify(subscriber).onNext(
+ argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_2.array())));
+ verify(subscriber).onComplete();
+ }
+
+ @Test
+ public void send_afterClose_shouldThrowIllegalStateException() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.close();
+
+ assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes())))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Cannot send data to closed body");
+ }
+
+ @Test
+ public void send_afterComplete_shouldThrowIllegalStateException() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.complete();
+
+ assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes())))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Request body has already been completed");
+ }
+
+ @Test
+ public void complete_afterClose_shouldThrowIllegalStateException() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.close();
+
+ assertThatThrownBy(() -> body.complete())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("The AsyncRequestBody has been closed");
+ }
+
+ @Test
+ public void complete_calledMultipleTimes_shouldNotThrow() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.complete();
+
+ // Should not throw - method returns early if already completed
+ body.complete();
+ body.complete();
+
+ // Verify it's still in completed state
+ Subscriber mockSubscriber = mock(Subscriber.class);
+ body.subscribe(mockSubscriber);
+ verify(mockSubscriber).onSubscribe(any());
+ }
+
+ @Test
+ public void close_withActiveSubscriptions_shouldNotifyAllSubscribers() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.send(ByteBuffer.wrap("test".getBytes()));
+ body.complete();
+
+ Subscriber subscriber1 = mock(Subscriber.class);
+ Subscriber subscriber2 = mock(Subscriber.class);
+ Subscriber subscriber3 = mock(Subscriber.class);
+
+ body.subscribe(subscriber1);
+ body.subscribe(subscriber2);
+ body.subscribe(subscriber3);
+
+ body.close();
+
+ verify(subscriber1).onError(argThat(e ->
+ e instanceof IllegalStateException &&
+ e.getMessage().contains("The publisher has been closed")
+ ));
+ verify(subscriber2).onError(argThat(e ->
+ e instanceof IllegalStateException &&
+ e.getMessage().contains("The publisher has been closed")
+ ));
+ verify(subscriber3).onError(argThat(e ->
+ e instanceof IllegalStateException &&
+ e.getMessage().contains("The publisher has been closed")
+ ));
+ }
+
+ @Test
+ public void bufferedData_afterClose_shouldBeEmpty() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.send(ByteBuffer.wrap("test1".getBytes()));
+ body.send(ByteBuffer.wrap("test2".getBytes()));
+
+ assertThat(body.bufferedData()).hasSize(2);
+
+ body.close();
+
+ assertThat(body.bufferedData()).isEmpty();
+ }
+
+ @Test
+ public void send_withEmptyByteBuffer_shouldStoreEmptyBuffer() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ ByteBuffer emptyBuffer = ByteBuffer.allocate(0);
+
+ body.send(emptyBuffer);
+ body.complete();
+
+ Subscriber mockSubscriber = mock(Subscriber.class);
+ ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
+
+ body.subscribe(mockSubscriber);
+ verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture());
+
+ Subscription subscription = subscriptionCaptor.getValue();
+ subscription.request(1);
+
+ verify(mockSubscriber).onNext(argThat(buffer -> buffer.remaining() == 0));
+ verify(mockSubscriber).onComplete();
+ }
+
+ @Test
+ public void concurrentSendAndComplete_shouldBeThreadSafe() throws InterruptedException {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ CountDownLatch latch = new CountDownLatch(10);
+ AtomicInteger successfulSends = new AtomicInteger(0);
+
+ // Start multiple threads sending data
+ for (int i = 0; i < 10; i++) {
+ final int threadId = i;
+ executor.submit(() -> {
+ try {
+ body.send(ByteBuffer.wrap(("data" + threadId).getBytes()));
+ successfulSends.incrementAndGet();
+ } catch (IllegalStateException e) {
+ // Expected if complete() was called first
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Complete after a short delay
+ Thread.sleep(10);
+ body.complete();
+
+ latch.await(5, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ // Should have some successful sends and the body should be complete
+ assertThat(successfulSends.get()).isGreaterThan(0);
+ assertThat(body.bufferedData().size()).isEqualTo(successfulSends.get());
+ }
+
+ @Test
+ public void concurrentSubscribeAndClose_shouldBeThreadSafe() throws InterruptedException {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ body.send(ByteBuffer.wrap("test".getBytes()));
+ body.complete();
+
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ CountDownLatch latch = new CountDownLatch(10);
+ AtomicInteger successfulSubscriptions = new AtomicInteger(0);
+ AtomicInteger errorNotifications = new AtomicInteger(0);
+
+ // Start multiple threads subscribing
+ for (int i = 0; i < 10; i++) {
+ executor.submit(() -> {
+ try {
+ Subscriber subscriber = new Subscriber() {
+ @Override
+ public void onSubscribe(Subscription s) {
+ successfulSubscriptions.incrementAndGet();
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {}
+
+ @Override
+ public void onError(Throwable t) {
+ errorNotifications.incrementAndGet();
+ }
+
+ @Override
+ public void onComplete() {}
+ };
+ body.subscribe(subscriber);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Close after a short delay
+ Thread.sleep(10);
+ body.close();
+
+ latch.await(5, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ // All subscribers should have been notified
+ assertThat(successfulSubscriptions.get()).isEqualTo(10);
+ }
+
+ @Test
+ public void subscription_readOnlyBuffers_shouldNotAffectOriginalData() {
+ BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
+ ByteBuffer originalBuffer = ByteBuffer.wrap("test".getBytes());
+ int originalPosition = originalBuffer.position();
+
+ body.send(originalBuffer);
+ body.complete();
+
+ Subscriber mockSubscriber = mock(Subscriber.class);
+ ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
+ ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
+
+ body.subscribe(mockSubscriber);
+ verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture());
+
+ Subscription subscription = subscriptionCaptor.getValue();
+ subscription.request(1);
+
+ verify(mockSubscriber).onNext(bufferCaptor.capture());
+ ByteBuffer receivedBuffer = bufferCaptor.getValue();
+
+ // Verify the received buffer is read-only
+ assertThat(receivedBuffer.isReadOnly()).isTrue();
+
+ // Verify original buffer position is unchanged
+ assertThat(originalBuffer.position()).isEqualTo(originalPosition);
+ }
+}
From 0ce2e41390f0e9cc7fcfb3f1b26f1b8f2208d128 Mon Sep 17 00:00:00 2001
From: Zoe Wang
Date: Fri, 1 Aug 2025 10:08:54 -0700
Subject: [PATCH 2/3] Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---
.../awssdk/core/internal/async/BufferingAsyncRequestBody.java | 4 +++-
.../core/internal/async/BufferingAsyncRequestBodyTckTest.java | 2 +-
.../core/internal/async/BufferingAsyncRequestBodyTest.java | 1 -
3 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
index d3f080c05683..779122cbdc2d 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
@@ -284,7 +284,9 @@ private void doProcessRequest() {
int currentIndex = this.index.getAndIncrement();
if (currentIndex >= buffers.size()) {
- // This should never happen, but defensive programming
+ // This should never happen because shouldProcessRequest() ensures that index.get() < buffers.size()
+ // before incrementing. If this condition is true, it likely indicates a concurrency bug or that buffers
+ // was modified unexpectedly. This defensive check is here to catch such rare, unexpected situations.
notifyError(new IllegalStateException("Index out of bounds"));
subscriptions.remove(this);
return;
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
index cf5b14c090b5..06418fd59534 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
@@ -42,7 +42,7 @@ public Publisher createPublisher(long elements) {
public Publisher createFailedPublisher() {
BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024L);
bufferingAsyncRequestBody.close();
- return null;
+ return bufferingAsyncRequestBody;
}
public long maxElementsFromPublisher() {
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
index ce3566e2de3f..ae3f89ad3321 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
@@ -156,7 +156,6 @@ private static void verifyData(Subscriber subscriber) {
argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_1.array())));
verify(subscriber).onNext(
argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_2.array())));
- verify(subscriber).onComplete();
}
@Test
From 62008d62dfca73478037eed98343b6ad721f492d Mon Sep 17 00:00:00 2001
From: Zoe Wang <33073555+zoewangg@users.noreply.github.com>
Date: Tue, 5 Aug 2025 15:12:38 -0700
Subject: [PATCH 3/3] Update existing byteBuffersAsyncRequestBody to support
multicast
---
.../async/BufferingAsyncRequestBody.java | 322 ---------------
.../async/ByteBuffersAsyncRequestBody.java | 255 +++++++++---
.../async/BufferingAsyncRequestBodyTest.java | 383 ------------------
...=> ByteBufferAsyncRequestBodyTckTest.java} | 18 +-
.../ByteBuffersAsyncRequestBodyTest.java | 207 ++++++++--
5 files changed, 378 insertions(+), 807 deletions(-)
delete mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
delete mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
rename core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/{BufferingAsyncRequestBodyTckTest.java => ByteBufferAsyncRequestBodyTckTest.java} (65%)
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
deleted file mode 100644
index 779122cbdc2d..000000000000
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.core.internal.async;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.annotations.SdkTestInternalApi;
-import software.amazon.awssdk.annotations.ThreadSafe;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.core.exception.NonRetryableException;
-import software.amazon.awssdk.core.internal.util.NoopSubscription;
-import software.amazon.awssdk.utils.Logger;
-import software.amazon.awssdk.utils.SdkAutoCloseable;
-import software.amazon.awssdk.utils.Validate;
-
-/**
- * An implementation of {@link AsyncRequestBody} that buffers the entire content and supports multiple concurrent subscribers.
- *
- * This class allows data to be sent incrementally via the {@link #send(ByteBuffer)} method, buffered internally,
- * and then replayed to multiple subscribers independently. Each subscriber receives a complete copy of all buffered data
- * when they subscribe and request it.
- *
- *
Usage Pattern:
- * {@snippet :
- * BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(contentLength);
- *
- * // Send data incrementally
- * body.send(ByteBuffer.wrap("Hello ".getBytes()));
- * body.send(ByteBuffer.wrap("World".getBytes()));
- *
- * // Mark data as complete and ready for subscription
- * body.complete();
- *
- * // Multiple subscribers can now consume the buffered data
- * body.subscribe(subscriber1);
- * body.subscribe(subscriber2);
- * }
- *
- *
Thread Safety:
- * This class is thread-safe and supports concurrent operations:
- *
- * Multiple threads can call {@link #send(ByteBuffer)} concurrently
- * Multiple subscribers can be added concurrently
- * Each subscriber operates independently with its own state
- *
- *
- * Subscription Behavior:
- *
- * Subscribers can only subscribe after {@link #complete()} has been called
- * Each subscriber receives a read-only view of the buffered data
- * Subscribers receive data independently based on their own demand signaling
- * If the body is closed, new subscribers will receive an error immediately
- *
- *
- * Resource Management:
- * The body should be closed when no longer needed to free buffered data and notify active subscribers.
- * Closing the body will:
- *
- * Clear all buffered data
- * Send error notifications to all active subscribers
- * Prevent new subscriptions
- *
- *
- */
-@ThreadSafe
-@SdkInternalApi
-public final class BufferingAsyncRequestBody implements AsyncRequestBody, SdkAutoCloseable {
- private static final Logger log = Logger.loggerFor(BufferingAsyncRequestBody.class);
-
- private final Long length;
- private final List bufferedData = new ArrayList<>();
- private boolean dataReady;
- private boolean closed;
- private final Set subscriptions;
- private final Object lock = new Object();
-
- /**
- * Creates a new BufferingAsyncRequestBody with the specified content length.
- *
- * @param length the total content length in bytes, or null if unknown
- */
- BufferingAsyncRequestBody(Long length) {
- this.length = length;
- this.subscriptions = ConcurrentHashMap.newKeySet();
- }
-
- /**
- * Sends a chunk of data to be buffered for later consumption by subscribers.
- *
- * @param data the data to buffer, must not be null
- * @throws NullPointerException if data is null
- */
- public void send(ByteBuffer data) {
- Validate.paramNotNull(data, "data");
- synchronized (lock) {
- if (closed) {
- throw new IllegalStateException("Cannot send data to closed body");
- }
- if (dataReady) {
- throw new IllegalStateException("Request body has already been completed");
- }
- bufferedData.add(data);
- }
- }
-
- /**
- * Marks the request body as complete and ready for subscription.
- *
- * This method must be called before any subscribers can successfully subscribe
- * to this request body. After calling this method, no more data should be sent
- * via {@link #send(ByteBuffer)}.
- *
- *
Once complete, multiple subscribers can subscribe and will each receive
- * the complete buffered content independently.
- */
- public void complete() {
- synchronized (lock) {
- if (dataReady) {
- return;
- }
- if (closed) {
- throw new IllegalStateException("The AsyncRequestBody has been closed");
- }
- dataReady = true;
- }
- }
-
- @Override
- public Optional contentLength() {
- return Optional.ofNullable(length);
- }
-
- @Override
- public void subscribe(Subscriber super ByteBuffer> subscriber) {
- Validate.paramNotNull(subscriber, "subscriber");
-
- synchronized (lock) {
- if (!dataReady) {
- subscriber.onSubscribe(new NoopSubscription(subscriber));
- subscriber.onError(NonRetryableException.create(
- "Unexpected error occurred. Data is not ready to be subscribed"));
- return;
- }
-
- if (closed) {
- subscriber.onSubscribe(new NoopSubscription(subscriber));
- subscriber.onError(NonRetryableException.create(
- "AsyncRequestBody has been closed"));
- return;
- }
- }
-
- ReplayableByteBufferSubscription replayableByteBufferSubscription =
- new ReplayableByteBufferSubscription(subscriber, bufferedData);
- subscriber.onSubscribe(replayableByteBufferSubscription);
- subscriptions.add(replayableByteBufferSubscription);
- }
-
- @Override
- public String body() {
- return BodyType.BYTES.getName();
- }
-
- /**
- * Closes this request body and releases all resources. This will:
- *
- * Clear all buffered data to free memory
- * Notify all active subscribers with an error
- * Prevent new subscriptions from succeeding
- *
- *
- * This method is idempotent - calling it multiple times has no additional effect.
- * It is safe to call this method concurrently from multiple threads.
- */
- @Override
- public void close() {
- synchronized (lock) {
- if (closed) {
- return;
- }
-
- closed = true;
- bufferedData.clear();
- subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed")));
- subscriptions.clear();
- }
-
- }
-
- @SdkTestInternalApi
- List bufferedData() {
- return Collections.unmodifiableList(bufferedData);
- }
-
- private class ReplayableByteBufferSubscription implements Subscription {
- private final AtomicInteger index = new AtomicInteger(0);
- private final AtomicBoolean done = new AtomicBoolean(false);
- private final List buffers;
- private final AtomicBoolean processingRequest = new AtomicBoolean(false);
- private Subscriber super ByteBuffer> currentSubscriber;
- private final AtomicLong outstandingDemand = new AtomicLong();
-
- private ReplayableByteBufferSubscription(Subscriber super ByteBuffer> subscriber, List buffers) {
- this.buffers = buffers;
- this.currentSubscriber = subscriber;
- }
-
- @Override
- public void request(long n) {
- if (n <= 0) {
- currentSubscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
- currentSubscriber = null;
- return;
- }
-
- if (done.get()) {
- return;
- }
-
- outstandingDemand.updateAndGet(current -> {
- if (Long.MAX_VALUE - current < n) {
- return Long.MAX_VALUE;
- }
-
- return current + n;
- });
- processRequest();
- }
-
- private void processRequest() {
- do {
- if (!processingRequest.compareAndSet(false, true)) {
- // Some other thread is processing the queue, so we don't need to.
- return;
- }
-
- try {
- doProcessRequest();
- } catch (Throwable e) {
- notifyError(new IllegalStateException("Encountered fatal error in publisher", e));
- subscriptions.remove(this);
- break;
- } finally {
- processingRequest.set(false);
- }
-
- } while (shouldProcessRequest());
- }
-
- private boolean shouldProcessRequest() {
- return !done.get() && outstandingDemand.get() > 0 && index.get() < buffers.size();
- }
-
- private void doProcessRequest() {
- while (true) {
- if (!shouldProcessRequest()) {
- return;
- }
-
- int currentIndex = this.index.getAndIncrement();
-
- if (currentIndex >= buffers.size()) {
- // This should never happen because shouldProcessRequest() ensures that index.get() < buffers.size()
- // before incrementing. If this condition is true, it likely indicates a concurrency bug or that buffers
- // was modified unexpectedly. This defensive check is here to catch such rare, unexpected situations.
- notifyError(new IllegalStateException("Index out of bounds"));
- subscriptions.remove(this);
- return;
- }
-
- ByteBuffer buffer = buffers.get(currentIndex);
- currentSubscriber.onNext(buffer.asReadOnlyBuffer());
- outstandingDemand.decrementAndGet();
-
- if (currentIndex == buffers.size() - 1) {
- done.set(true);
- currentSubscriber.onComplete();
- subscriptions.remove(this);
- break;
- }
- }
- }
-
- @Override
- public void cancel() {
- done.set(true);
- subscriptions.remove(this);
- }
-
- public void notifyError(Exception exception) {
- if (currentSubscriber != null) {
- done.set(true);
- currentSubscriber.onError(exception);
- currentSubscriber = null;
- }
- }
- }
-}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java
index c19ab8e245f8..a304d75ccf94 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java
@@ -16,21 +16,47 @@
package software.amazon.awssdk.core.internal.async;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.util.Mimetype;
+import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.Logger;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+import software.amazon.awssdk.utils.Validate;
/**
* An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
* using static methods on {@link AsyncRequestBody}
*
+ * Subscription Behavior:
+ *
+ * Each subscriber receives a read-only view of the buffered data
+ * Subscribers receive data independently based on their own demand signaling
+ * If the body is closed, new subscribers will receive an error immediately
+ *
+ *
+ * Resource Management:
+ * The body should be closed when no longer needed to free buffered data and notify active subscribers.
+ * Closing the body will:
+ *
+ * Clear all buffered data
+ * Send error notifications to all active subscribers
+ * Prevent new subscriptions
+ *
* @see AsyncRequestBody#fromBytes(byte[])
* @see AsyncRequestBody#fromBytesUnsafe(byte[])
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
@@ -40,17 +66,21 @@
* @see AsyncRequestBody#fromString(String)
*/
@SdkInternalApi
-public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
+public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody, SdkAutoCloseable {
private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class);
private final String mimetype;
private final Long length;
- private final ByteBuffer[] buffers;
+ private List buffers;
+ private final Set subscriptions;
+ private final Object lock = new Object();
+ private boolean closed;
- private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
+ private ByteBuffersAsyncRequestBody(String mimetype, Long length, List buffers) {
this.mimetype = mimetype;
- this.length = length;
this.buffers = buffers;
+ this.length = length;
+ this.subscriptions = ConcurrentHashMap.newKeySet();
}
@Override
@@ -64,61 +94,25 @@ public String contentType() {
}
@Override
- public void subscribe(Subscriber super ByteBuffer> s) {
- // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
- if (s == null) {
- throw new NullPointerException("Subscription MUST NOT be null.");
+ public void subscribe(Subscriber super ByteBuffer> subscriber) {
+ Validate.paramNotNull(subscriber, "subscriber");
+ synchronized (lock) {
+ if (closed) {
+ subscriber.onSubscribe(new NoopSubscription(subscriber));
+ subscriber.onError(NonRetryableException.create(
+ "AsyncRequestBody has been closed"));
+ return;
+ }
}
- // As per 2.13, this method must return normally (i.e. not throw).
try {
- s.onSubscribe(
- new Subscription() {
- private final AtomicInteger index = new AtomicInteger(0);
- private final AtomicBoolean completed = new AtomicBoolean(false);
-
- @Override
- public void request(long n) {
- if (completed.get()) {
- return;
- }
-
- if (n > 0) {
- int i = index.getAndIncrement();
-
- if (buffers.length == 0 && completed.compareAndSet(false, true)) {
- s.onComplete();
- }
-
- if (i >= buffers.length) {
- return;
- }
-
- long remaining = n;
-
- do {
- ByteBuffer buffer = buffers[i];
-
- s.onNext(buffer.asReadOnlyBuffer());
- remaining--;
- } while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);
-
- if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
- s.onComplete();
- }
- } else {
- s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
- }
- }
-
- @Override
- public void cancel() {
- completed.set(true);
- }
- }
- );
+ ReplayableByteBufferSubscription replayableByteBufferSubscription =
+ new ReplayableByteBufferSubscription(subscriber);
+ subscriber.onSubscribe(replayableByteBufferSubscription);
+ subscriptions.add(replayableByteBufferSubscription);
} catch (Throwable ex) {
- log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
+ log.error(() -> subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.",
+ ex);
}
}
@@ -127,34 +121,167 @@ public String body() {
return BodyType.BYTES.getName();
}
- public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) {
- long length = Arrays.stream(buffers)
- .mapToLong(ByteBuffer::remaining)
- .sum();
+ public static ByteBuffersAsyncRequestBody of(List buffers) {
+ long length = buffers.stream()
+ .mapToLong(ByteBuffer::remaining)
+ .sum();
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
}
+ public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) {
+ return of(Arrays.asList(buffers));
+ }
+
public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {
- return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
+ return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, Arrays.asList(buffers));
}
public static ByteBuffersAsyncRequestBody of(String mimetype, ByteBuffer... buffers) {
long length = Arrays.stream(buffers)
.mapToLong(ByteBuffer::remaining)
.sum();
- return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
+ return new ByteBuffersAsyncRequestBody(mimetype, length, Arrays.asList(buffers));
}
public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
- return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
+ return new ByteBuffersAsyncRequestBody(mimetype, length, Arrays.asList(buffers));
}
public static ByteBuffersAsyncRequestBody from(byte[] bytes) {
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
- ByteBuffer.wrap(bytes));
+ Collections.singletonList(ByteBuffer.wrap(bytes)));
}
public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) {
- return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
+ return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length,
+ Collections.singletonList(ByteBuffer.wrap(bytes)));
+ }
+
+ @Override
+ public void close() {
+ synchronized (lock) {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
+ buffers = new ArrayList<>();
+ subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed")));
+ subscriptions.clear();
+ }
+ }
+
+ @SdkTestInternalApi
+ public List bufferedData() {
+ return buffers;
+ }
+
+ private class ReplayableByteBufferSubscription implements Subscription {
+ private final AtomicInteger index = new AtomicInteger(0);
+ private volatile boolean done;
+ private final AtomicBoolean processingRequest = new AtomicBoolean(false);
+ private Subscriber super ByteBuffer> currentSubscriber;
+ private final AtomicLong outstandingDemand = new AtomicLong();
+
+ private ReplayableByteBufferSubscription(Subscriber super ByteBuffer> subscriber) {
+ this.currentSubscriber = subscriber;
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ currentSubscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
+ currentSubscriber = null;
+ return;
+ }
+
+ if (done) {
+ return;
+ }
+
+ if (buffers.size() == 0) {
+ currentSubscriber.onComplete();
+ done = true;
+ subscriptions.remove(this);
+ return;
+ }
+
+ outstandingDemand.updateAndGet(current -> {
+ if (Long.MAX_VALUE - current < n) {
+ return Long.MAX_VALUE;
+ }
+
+ return current + n;
+ });
+ processRequest();
+ }
+
+ private void processRequest() {
+ do {
+ if (!processingRequest.compareAndSet(false, true)) {
+ // Some other thread is processing the queue, so we don't need to.
+ return;
+ }
+
+ try {
+ doProcessRequest();
+ } catch (Throwable e) {
+ notifyError(new IllegalStateException("Encountered fatal error in publisher", e));
+ subscriptions.remove(this);
+ break;
+ } finally {
+ processingRequest.set(false);
+ }
+
+ } while (shouldProcessRequest());
+ }
+
+ private boolean shouldProcessRequest() {
+ return !done && outstandingDemand.get() > 0 && index.get() < buffers.size();
+ }
+
+ private void doProcessRequest() {
+ while (true) {
+ if (!shouldProcessRequest()) {
+ return;
+ }
+
+ int currentIndex = this.index.getAndIncrement();
+
+ if (currentIndex >= buffers.size()) {
+ // This should never happen because shouldProcessRequest() ensures that index.get() < buffers.size()
+ // before incrementing. If this condition is true, it likely indicates a concurrency bug or that buffers
+ // was modified unexpectedly. This defensive check is here to catch such rare, unexpected situations.
+ notifyError(new IllegalStateException("Index out of bounds"));
+ subscriptions.remove(this);
+ return;
+ }
+
+ ByteBuffer buffer = buffers.get(currentIndex);
+ currentSubscriber.onNext(buffer.asReadOnlyBuffer());
+ outstandingDemand.decrementAndGet();
+
+ if (currentIndex == buffers.size() - 1) {
+ done = true;
+ currentSubscriber.onComplete();
+ subscriptions.remove(this);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ done = true;
+ subscriptions.remove(this);
+ }
+
+ public void notifyError(Exception exception) {
+ if (currentSubscriber != null) {
+ done = true;
+ currentSubscriber.onError(exception);
+ currentSubscriber = null;
+ }
+ }
}
}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
deleted file mode 100644
index ae3f89ad3321..000000000000
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java
+++ /dev/null
@@ -1,383 +0,0 @@
-package software.amazon.awssdk.core.internal.async;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.junit.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import software.amazon.awssdk.core.exception.NonRetryableException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
-import static org.mockito.Mockito.mock;
-
-import org.mockito.ArgumentCaptor;
-import software.amazon.awssdk.utils.BinaryUtils;
-
-public class BufferingAsyncRequestBodyTest {
-
- private static final ByteBuffer TEST_DATA_SET_1 =
- ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8));
- private static final ByteBuffer TEST_DATA_SET_2 =
- ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8));
-
- @Test
- public void body_whenCalled_shouldReturnConstantBytes() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- assertEquals("Bytes", body.body());
- }
-
- @Test
- public void close_whenCalledMultipleTimes_shouldExecuteOnlyOnce() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- Subscriber mockSubscriber = mock(Subscriber.class);
-
- body.subscribe(mockSubscriber);
-
- body.close();
- assertThat(body.bufferedData()).isEmpty();
- body.close();
- body.close();
-
- verify(mockSubscriber, times(1)).onError(any(NonRetryableException.class));
- }
-
- @ParameterizedTest
- @MethodSource("contentLengthTestCases")
- public void contentLength_withVariousInputs_shouldReturnExpectedResult(Long inputLength, boolean shouldBePresent,
- Long expectedValue) {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(inputLength);
- Optional result = body.contentLength();
-
- assertThat(result.isPresent()).isEqualTo(shouldBePresent);
- if (shouldBePresent) {
- assertThat(result.get()).isEqualTo(expectedValue);
- }
- }
-
- private static Stream contentLengthTestCases() {
- return Stream.of(
- Arguments.of(null, false, null),
- Arguments.of(100L, true, 100L),
- Arguments.of(0L, true, 0L)
- );
- }
-
- @Test
- public void send_whenByteBufferIsNull_shouldThrowNullPointerException() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
-
- assertThatThrownBy(() -> body.send(null))
- .isInstanceOf(NullPointerException.class)
- .hasMessageContaining("data must not be null");
- }
-
- @Test
- public void subscribe_whenBodyIsClosed_shouldNotifySubscriberWithError() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.complete(); // Set dataReady to true
- body.close(); // Set closed to true
- Subscriber mockSubscriber = mock(Subscriber.class);
-
- body.subscribe(mockSubscriber);
-
- verify(mockSubscriber).onSubscribe(any());
- verify(mockSubscriber).onError(argThat(e ->
- e instanceof NonRetryableException &&
- e.getMessage().equals("AsyncRequestBody has been closed")
- ));
- }
-
- @Test
- public void subscribe_whenDataNotReady_shouldNotifySubscriberWithError() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- Subscriber mockSubscriber = mock(Subscriber.class);
-
- body.subscribe(mockSubscriber);
-
- verify(mockSubscriber).onSubscribe(any());
- verify(mockSubscriber).onError(argThat(e ->
- e instanceof NonRetryableException &&
- e.getMessage().equals("Unexpected error occurred. Data is not ready to be "
- + "subscribed")
- ));
- }
-
- @Test
- public void subscribe_whenMultipleSubscribers_shouldSupportConcurrentSubscriptions() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.send(TEST_DATA_SET_1);
- body.send(TEST_DATA_SET_2);
- body.complete();
-
- Subscriber firstSubscriber = mock(Subscriber.class);
- Subscriber secondSubscriber = mock(Subscriber.class);
-
- ArgumentCaptor firstSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
- ArgumentCaptor secondSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
-
- body.subscribe(firstSubscriber);
- body.subscribe(secondSubscriber);
-
- verify(firstSubscriber).onSubscribe(firstSubscriptionCaptor.capture());
- verify(secondSubscriber).onSubscribe(secondSubscriptionCaptor.capture());
-
- Subscription firstSubscription = firstSubscriptionCaptor.getValue();
- Subscription secondSubscription = secondSubscriptionCaptor.getValue();
-
- firstSubscription.request(2);
- secondSubscription.request(2);
- verifyData(firstSubscriber);
- verifyData(secondSubscriber);
- verify(firstSubscriber).onComplete();
- verify(secondSubscriber).onComplete();
- }
-
- private static void verifyData(Subscriber subscriber) {
- verify(subscriber).onNext(
- argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_1.array())));
- verify(subscriber).onNext(
- argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_2.array())));
- }
-
- @Test
- public void send_afterClose_shouldThrowIllegalStateException() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.close();
-
- assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes())))
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Cannot send data to closed body");
- }
-
- @Test
- public void send_afterComplete_shouldThrowIllegalStateException() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.complete();
-
- assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes())))
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Request body has already been completed");
- }
-
- @Test
- public void complete_afterClose_shouldThrowIllegalStateException() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.close();
-
- assertThatThrownBy(() -> body.complete())
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("The AsyncRequestBody has been closed");
- }
-
- @Test
- public void complete_calledMultipleTimes_shouldNotThrow() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.complete();
-
- // Should not throw - method returns early if already completed
- body.complete();
- body.complete();
-
- // Verify it's still in completed state
- Subscriber mockSubscriber = mock(Subscriber.class);
- body.subscribe(mockSubscriber);
- verify(mockSubscriber).onSubscribe(any());
- }
-
- @Test
- public void close_withActiveSubscriptions_shouldNotifyAllSubscribers() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.send(ByteBuffer.wrap("test".getBytes()));
- body.complete();
-
- Subscriber subscriber1 = mock(Subscriber.class);
- Subscriber subscriber2 = mock(Subscriber.class);
- Subscriber subscriber3 = mock(Subscriber.class);
-
- body.subscribe(subscriber1);
- body.subscribe(subscriber2);
- body.subscribe(subscriber3);
-
- body.close();
-
- verify(subscriber1).onError(argThat(e ->
- e instanceof IllegalStateException &&
- e.getMessage().contains("The publisher has been closed")
- ));
- verify(subscriber2).onError(argThat(e ->
- e instanceof IllegalStateException &&
- e.getMessage().contains("The publisher has been closed")
- ));
- verify(subscriber3).onError(argThat(e ->
- e instanceof IllegalStateException &&
- e.getMessage().contains("The publisher has been closed")
- ));
- }
-
- @Test
- public void bufferedData_afterClose_shouldBeEmpty() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.send(ByteBuffer.wrap("test1".getBytes()));
- body.send(ByteBuffer.wrap("test2".getBytes()));
-
- assertThat(body.bufferedData()).hasSize(2);
-
- body.close();
-
- assertThat(body.bufferedData()).isEmpty();
- }
-
- @Test
- public void send_withEmptyByteBuffer_shouldStoreEmptyBuffer() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- ByteBuffer emptyBuffer = ByteBuffer.allocate(0);
-
- body.send(emptyBuffer);
- body.complete();
-
- Subscriber mockSubscriber = mock(Subscriber.class);
- ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
-
- body.subscribe(mockSubscriber);
- verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture());
-
- Subscription subscription = subscriptionCaptor.getValue();
- subscription.request(1);
-
- verify(mockSubscriber).onNext(argThat(buffer -> buffer.remaining() == 0));
- verify(mockSubscriber).onComplete();
- }
-
- @Test
- public void concurrentSendAndComplete_shouldBeThreadSafe() throws InterruptedException {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- ExecutorService executor = Executors.newFixedThreadPool(10);
- CountDownLatch latch = new CountDownLatch(10);
- AtomicInteger successfulSends = new AtomicInteger(0);
-
- // Start multiple threads sending data
- for (int i = 0; i < 10; i++) {
- final int threadId = i;
- executor.submit(() -> {
- try {
- body.send(ByteBuffer.wrap(("data" + threadId).getBytes()));
- successfulSends.incrementAndGet();
- } catch (IllegalStateException e) {
- // Expected if complete() was called first
- } finally {
- latch.countDown();
- }
- });
- }
-
- // Complete after a short delay
- Thread.sleep(10);
- body.complete();
-
- latch.await(5, TimeUnit.SECONDS);
- executor.shutdown();
-
- // Should have some successful sends and the body should be complete
- assertThat(successfulSends.get()).isGreaterThan(0);
- assertThat(body.bufferedData().size()).isEqualTo(successfulSends.get());
- }
-
- @Test
- public void concurrentSubscribeAndClose_shouldBeThreadSafe() throws InterruptedException {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- body.send(ByteBuffer.wrap("test".getBytes()));
- body.complete();
-
- ExecutorService executor = Executors.newFixedThreadPool(10);
- CountDownLatch latch = new CountDownLatch(10);
- AtomicInteger successfulSubscriptions = new AtomicInteger(0);
- AtomicInteger errorNotifications = new AtomicInteger(0);
-
- // Start multiple threads subscribing
- for (int i = 0; i < 10; i++) {
- executor.submit(() -> {
- try {
- Subscriber subscriber = new Subscriber() {
- @Override
- public void onSubscribe(Subscription s) {
- successfulSubscriptions.incrementAndGet();
- s.request(1);
- }
-
- @Override
- public void onNext(ByteBuffer byteBuffer) {}
-
- @Override
- public void onError(Throwable t) {
- errorNotifications.incrementAndGet();
- }
-
- @Override
- public void onComplete() {}
- };
- body.subscribe(subscriber);
- } finally {
- latch.countDown();
- }
- });
- }
-
- // Close after a short delay
- Thread.sleep(10);
- body.close();
-
- latch.await(5, TimeUnit.SECONDS);
- executor.shutdown();
-
- // All subscribers should have been notified
- assertThat(successfulSubscriptions.get()).isEqualTo(10);
- }
-
- @Test
- public void subscription_readOnlyBuffers_shouldNotAffectOriginalData() {
- BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null);
- ByteBuffer originalBuffer = ByteBuffer.wrap("test".getBytes());
- int originalPosition = originalBuffer.position();
-
- body.send(originalBuffer);
- body.complete();
-
- Subscriber mockSubscriber = mock(Subscriber.class);
- ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
- ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
-
- body.subscribe(mockSubscriber);
- verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture());
-
- Subscription subscription = subscriptionCaptor.getValue();
- subscription.request(1);
-
- verify(mockSubscriber).onNext(bufferCaptor.capture());
- ByteBuffer receivedBuffer = bufferCaptor.getValue();
-
- // Verify the received buffer is read-only
- assertThat(receivedBuffer.isReadOnly()).isTrue();
-
- // Verify original buffer position is unchanged
- assertThat(originalBuffer.position()).isEqualTo(originalPosition);
- }
-}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBufferAsyncRequestBodyTckTest.java
similarity index 65%
rename from core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
rename to core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBufferAsyncRequestBodyTckTest.java
index 06418fd59534..3d9df4b9e1e2 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBufferAsyncRequestBodyTckTest.java
@@ -17,30 +17,30 @@
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.tck.TestEnvironment;
import software.amazon.awssdk.core.SdkBytes;
-public class BufferingAsyncRequestBodyTckTest extends org.reactivestreams.tck.PublisherVerification {
- public BufferingAsyncRequestBodyTckTest() {
- super(new TestEnvironment(true));
+public class ByteBufferAsyncRequestBodyTckTest extends org.reactivestreams.tck.PublisherVerification {
+ public ByteBufferAsyncRequestBodyTckTest() {
+ super(new TestEnvironment());
}
@Override
public Publisher createPublisher(long elements) {
- BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024 * elements);
+ List buffers = new ArrayList<>();
for (int i = 0; i < elements; i++) {
- bufferingAsyncRequestBody.send(SdkBytes.fromUtf8String(RandomStringUtils.randomAscii(1024)).asByteBuffer());
+ buffers.add(SdkBytes.fromUtf8String(RandomStringUtils.randomAscii(1024)).asByteBuffer());
}
-
- bufferingAsyncRequestBody.complete();
- return bufferingAsyncRequestBody;
+ return ByteBuffersAsyncRequestBody.of(buffers.toArray(new ByteBuffer[0]));
}
@Override
public Publisher createFailedPublisher() {
- BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024L);
+ ByteBuffersAsyncRequestBody bufferingAsyncRequestBody = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes()));
bufferingAsyncRequestBody.close();
return bufferingAsyncRequestBody;
}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java
index e1035dc25b0c..c80784b26a90 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java
@@ -15,58 +15,44 @@
package software.amazon.awssdk.core.internal.async;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.utils.BinaryUtils;
class ByteBuffersAsyncRequestBodyTest {
- private static class TestSubscriber implements Subscriber {
- private Subscription subscription;
- private boolean onCompleteCalled = false;
- private int callsToComplete = 0;
- private final List publishedResults = Collections.synchronizedList(new ArrayList<>());
-
- public void request(long n) {
- subscription.request(n);
- }
-
- @Override
- public void onSubscribe(Subscription s) {
- this.subscription = s;
- }
-
- @Override
- public void onNext(ByteBuffer byteBuffer) {
- publishedResults.add(byteBuffer);
- }
-
- @Override
- public void onError(Throwable throwable) {
- throw new IllegalStateException(throwable);
- }
+ private static ExecutorService executor = Executors.newFixedThreadPool(10);
- @Override
- public void onComplete() {
- onCompleteCalled = true;
- callsToComplete++;
- }
+ @AfterAll
+ static void tearDown() {
+ executor.shutdown();
}
@Test
@@ -214,4 +200,167 @@ public void staticFromBytesConstructorSetsLengthBasedOnArrayLength() {
assertEquals(bytes.length, body.contentLength().get());
}
+ @Test
+ public void subscribe_whenBodyIsClosed_shouldNotifySubscriberWithError() {
+ ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap("test".getBytes()));
+ body.close(); // Set closed to true
+ Subscriber mockSubscriber = mock(Subscriber.class);
+
+ body.subscribe(mockSubscriber);
+
+ verify(mockSubscriber).onSubscribe(any());
+ verify(mockSubscriber).onError(argThat(e ->
+ e instanceof NonRetryableException &&
+ e.getMessage().equals("AsyncRequestBody has been closed")
+ ));
+ }
+
+ @Test
+ public void close_withActiveSubscriptions_shouldNotifyAllSubscribers() {
+ ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8)));
+
+ Subscriber subscriber1 = mock(Subscriber.class);
+ Subscriber subscriber2 = mock(Subscriber.class);
+ Subscriber subscriber3 = mock(Subscriber.class);
+
+ body.subscribe(subscriber1);
+ body.subscribe(subscriber2);
+ body.subscribe(subscriber3);
+
+ body.close();
+
+ verify(subscriber1).onError(argThat(e ->
+ e instanceof IllegalStateException &&
+ e.getMessage().contains("The publisher has been closed")
+ ));
+ verify(subscriber2).onError(argThat(e ->
+ e instanceof IllegalStateException &&
+ e.getMessage().contains("The publisher has been closed")
+ ));
+ verify(subscriber3).onError(argThat(e ->
+ e instanceof IllegalStateException &&
+ e.getMessage().contains("The publisher has been closed")
+ ));
+ }
+
+ @Test
+ public void bufferedData_afterClose_shouldBeEmpty() {
+ ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(
+ ByteBuffer.wrap("test1".getBytes()),
+ ByteBuffer.wrap("test2".getBytes()));
+
+ assertThat(body.bufferedData()).hasSize(2);
+ body.close();
+ assertThat(body.bufferedData()).isEmpty();
+ }
+
+ @Test
+ public void concurrentSubscribeAndClose_shouldBeThreadSafe() throws InterruptedException {
+ ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(
+ ByteBuffer.wrap("test1".getBytes()),
+ ByteBuffer.wrap("test2".getBytes()));
+
+
+ CountDownLatch latch = new CountDownLatch(10);
+ AtomicInteger successfulSubscriptions = new AtomicInteger(0);
+ AtomicInteger errorNotifications = new AtomicInteger(0);
+
+ // Start multiple threads subscribing
+ for (int i = 0; i < 10; i++) {
+ executor.submit(() -> {
+ try {
+ Subscriber subscriber = new Subscriber() {
+ @Override
+ public void onSubscribe(Subscription s) {
+ successfulSubscriptions.incrementAndGet();
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {}
+
+ @Override
+ public void onError(Throwable t) {
+ errorNotifications.incrementAndGet();
+ }
+
+ @Override
+ public void onComplete() {}
+ };
+ body.subscribe(subscriber);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Close after a short delay
+ Thread.sleep(10);
+ body.close();
+
+ latch.await(5, TimeUnit.SECONDS);
+ executor.shutdown();
+
+ // All subscribers should have been notified
+ assertThat(successfulSubscriptions.get()).isEqualTo(10);
+ }
+
+ @Test
+ public void subscription_readOnlyBuffers_shouldNotAffectOriginalData() {
+ ByteBuffer originalBuffer = ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes());
+ ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(
+ originalBuffer);
+ int originalPosition = originalBuffer.position();
+
+ Subscriber mockSubscriber = mock(Subscriber.class);
+ ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
+ ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
+
+ body.subscribe(mockSubscriber);
+ verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture());
+
+ Subscription subscription = subscriptionCaptor.getValue();
+ subscription.request(1);
+
+ verify(mockSubscriber).onNext(bufferCaptor.capture());
+ ByteBuffer receivedBuffer = bufferCaptor.getValue();
+ byte[] bytes = BinaryUtils.copyBytesFrom(receivedBuffer);
+
+ assertThat(receivedBuffer.isReadOnly()).isTrue();
+
+ assertThat(originalBuffer.position()).isEqualTo(originalPosition);
+ }
+
+ private static class TestSubscriber implements Subscriber {
+ private Subscription subscription;
+ private boolean onCompleteCalled = false;
+ private int callsToComplete = 0;
+ private final List publishedResults = Collections.synchronizedList(new ArrayList<>());
+
+ public void request(long n) {
+ subscription.request(n);
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ this.subscription = s;
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {
+ publishedResults.add(byteBuffer);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throw new IllegalStateException(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ onCompleteCalled = true;
+ callsToComplete++;
+ }
+ }
+
}