From e5708e125d0b3fb04061aece1ce434a5057339a7 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Thu, 31 Jul 2025 13:55:53 -0700 Subject: [PATCH 1/3] Add BufferingAsyncRequestBody that buffers the entire content and supports multiple concurrent subscribers --- .../async/BufferingAsyncRequestBody.java | 320 +++++++++++++++ .../BufferingAsyncRequestBodyTckTest.java | 52 +++ .../async/BufferingAsyncRequestBodyTest.java | 384 ++++++++++++++++++ 3 files changed, 756 insertions(+) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java new file mode 100644 index 000000000000..d3f080c05683 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java @@ -0,0 +1,320 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; +import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.NonRetryableException; +import software.amazon.awssdk.core.internal.util.NoopSubscription; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.SdkAutoCloseable; +import software.amazon.awssdk.utils.Validate; + +/** + * An implementation of {@link AsyncRequestBody} that buffers the entire content and supports multiple concurrent subscribers. + * + *

This class allows data to be sent incrementally via the {@link #send(ByteBuffer)} method, buffered internally, + * and then replayed to multiple subscribers independently. Each subscriber receives a complete copy of all buffered data + * when they subscribe and request it. + * + *

Usage Pattern: + * {@snippet : + * BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(contentLength); + * + * // Send data incrementally + * body.send(ByteBuffer.wrap("Hello ".getBytes())); + * body.send(ByteBuffer.wrap("World".getBytes())); + * + * // Mark data as complete and ready for subscription + * body.complete(); + * + * // Multiple subscribers can now consume the buffered data + * body.subscribe(subscriber1); + * body.subscribe(subscriber2); + * } + * + *

Thread Safety:

+ * This class is thread-safe and supports concurrent operations: + * + * + *

Subscription Behavior:

+ * + * + *

Resource Management:

+ * The body should be closed when no longer needed to free buffered data and notify active subscribers. + * Closing the body will: + * + * + */ +@ThreadSafe +@SdkInternalApi +public final class BufferingAsyncRequestBody implements AsyncRequestBody, SdkAutoCloseable { + private static final Logger log = Logger.loggerFor(BufferingAsyncRequestBody.class); + + private final Long length; + private final List bufferedData = new ArrayList<>(); + private boolean dataReady; + private boolean closed; + private final Set subscriptions; + private final Object lock = new Object(); + + /** + * Creates a new BufferingAsyncRequestBody with the specified content length. + * + * @param length the total content length in bytes, or null if unknown + */ + BufferingAsyncRequestBody(Long length) { + this.length = length; + this.subscriptions = ConcurrentHashMap.newKeySet(); + } + + /** + * Sends a chunk of data to be buffered for later consumption by subscribers. + * + * @param data the data to buffer, must not be null + * @throws NullPointerException if data is null + */ + public void send(ByteBuffer data) { + Validate.paramNotNull(data, "data"); + synchronized (lock) { + if (closed) { + throw new IllegalStateException("Cannot send data to closed body"); + } + if (dataReady) { + throw new IllegalStateException("Request body has already been completed"); + } + bufferedData.add(data); + } + } + + /** + * Marks the request body as complete and ready for subscription. + * + *

This method must be called before any subscribers can successfully subscribe + * to this request body. After calling this method, no more data should be sent + * via {@link #send(ByteBuffer)}. + * + *

Once complete, multiple subscribers can subscribe and will each receive + * the complete buffered content independently. + */ + public void complete() { + synchronized (lock) { + if (dataReady) { + return; + } + if (closed) { + throw new IllegalStateException("The AsyncRequestBody has been closed"); + } + dataReady = true; + } + } + + @Override + public Optional contentLength() { + return Optional.ofNullable(length); + } + + @Override + public void subscribe(Subscriber subscriber) { + Validate.paramNotNull(subscriber, "subscriber"); + + synchronized (lock) { + if (!dataReady) { + subscriber.onSubscribe(new NoopSubscription(subscriber)); + subscriber.onError(NonRetryableException.create( + "Unexpected error occurred. Data is not ready to be subscribed")); + return; + } + + if (closed) { + subscriber.onSubscribe(new NoopSubscription(subscriber)); + subscriber.onError(NonRetryableException.create( + "AsyncRequestBody has been closed")); + return; + } + } + + ReplayableByteBufferSubscription replayableByteBufferSubscription = + new ReplayableByteBufferSubscription(subscriber, bufferedData); + subscriber.onSubscribe(replayableByteBufferSubscription); + subscriptions.add(replayableByteBufferSubscription); + } + + @Override + public String body() { + return BodyType.BYTES.getName(); + } + + /** + *

Closes this request body and releases all resources. This will: + *

+ * + *

This method is idempotent - calling it multiple times has no additional effect. + * It is safe to call this method concurrently from multiple threads. + */ + @Override + public void close() { + synchronized (lock) { + if (closed) { + return; + } + + closed = true; + bufferedData.clear(); + subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed"))); + subscriptions.clear(); + } + + } + + @SdkTestInternalApi + List bufferedData() { + return Collections.unmodifiableList(bufferedData); + } + + private class ReplayableByteBufferSubscription implements Subscription { + private final AtomicInteger index = new AtomicInteger(0); + private final AtomicBoolean done = new AtomicBoolean(false); + private final List buffers; + private final AtomicBoolean processingRequest = new AtomicBoolean(false); + private Subscriber currentSubscriber; + private final AtomicLong outstandingDemand = new AtomicLong(); + + private ReplayableByteBufferSubscription(Subscriber subscriber, List buffers) { + this.buffers = buffers; + this.currentSubscriber = subscriber; + } + + @Override + public void request(long n) { + if (n <= 0) { + currentSubscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); + currentSubscriber = null; + return; + } + + if (done.get()) { + return; + } + + outstandingDemand.updateAndGet(current -> { + if (Long.MAX_VALUE - current < n) { + return Long.MAX_VALUE; + } + + return current + n; + }); + processRequest(); + } + + private void processRequest() { + do { + if (!processingRequest.compareAndSet(false, true)) { + // Some other thread is processing the queue, so we don't need to. + return; + } + + try { + doProcessRequest(); + } catch (Throwable e) { + notifyError(new IllegalStateException("Encountered fatal error in publisher", e)); + subscriptions.remove(this); + break; + } finally { + processingRequest.set(false); + } + + } while (shouldProcessRequest()); + } + + private boolean shouldProcessRequest() { + return !done.get() && outstandingDemand.get() > 0 && index.get() < buffers.size(); + } + + private void doProcessRequest() { + while (true) { + if (!shouldProcessRequest()) { + return; + } + + int currentIndex = this.index.getAndIncrement(); + + if (currentIndex >= buffers.size()) { + // This should never happen, but defensive programming + notifyError(new IllegalStateException("Index out of bounds")); + subscriptions.remove(this); + return; + } + + ByteBuffer buffer = buffers.get(currentIndex); + currentSubscriber.onNext(buffer.asReadOnlyBuffer()); + outstandingDemand.decrementAndGet(); + + if (currentIndex == buffers.size() - 1) { + done.set(true); + currentSubscriber.onComplete(); + subscriptions.remove(this); + break; + } + } + } + + @Override + public void cancel() { + done.set(true); + subscriptions.remove(this); + } + + public void notifyError(Exception exception) { + if (currentSubscriber != null) { + done.set(true); + currentSubscriber.onError(exception); + currentSubscriber = null; + } + } + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java new file mode 100644 index 000000000000..cf5b14c090b5 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + + +import java.nio.ByteBuffer; +import org.apache.commons.lang3.RandomStringUtils; +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.TestEnvironment; +import software.amazon.awssdk.core.SdkBytes; + +public class BufferingAsyncRequestBodyTckTest extends org.reactivestreams.tck.PublisherVerification { + public BufferingAsyncRequestBodyTckTest() { + super(new TestEnvironment(true)); + } + + @Override + public Publisher createPublisher(long elements) { + BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024 * elements); + for (int i = 0; i < elements; i++) { + bufferingAsyncRequestBody.send(SdkBytes.fromUtf8String(RandomStringUtils.randomAscii(1024)).asByteBuffer()); + } + + bufferingAsyncRequestBody.complete(); + return bufferingAsyncRequestBody; + } + + @Override + public Publisher createFailedPublisher() { + BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024L); + bufferingAsyncRequestBody.close(); + return null; + } + + public long maxElementsFromPublisher() { + return 100; + } + +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java new file mode 100644 index 000000000000..ce3566e2de3f --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java @@ -0,0 +1,384 @@ +package software.amazon.awssdk.core.internal.async; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.exception.NonRetryableException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; + +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.utils.BinaryUtils; + +public class BufferingAsyncRequestBodyTest { + + private static final ByteBuffer TEST_DATA_SET_1 = + ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8)); + private static final ByteBuffer TEST_DATA_SET_2 = + ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8)); + + @Test + public void body_whenCalled_shouldReturnConstantBytes() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + assertEquals("Bytes", body.body()); + } + + @Test + public void close_whenCalledMultipleTimes_shouldExecuteOnlyOnce() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + Subscriber mockSubscriber = mock(Subscriber.class); + + body.subscribe(mockSubscriber); + + body.close(); + assertThat(body.bufferedData()).isEmpty(); + body.close(); + body.close(); + + verify(mockSubscriber, times(1)).onError(any(NonRetryableException.class)); + } + + @ParameterizedTest + @MethodSource("contentLengthTestCases") + public void contentLength_withVariousInputs_shouldReturnExpectedResult(Long inputLength, boolean shouldBePresent, + Long expectedValue) { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(inputLength); + Optional result = body.contentLength(); + + assertThat(result.isPresent()).isEqualTo(shouldBePresent); + if (shouldBePresent) { + assertThat(result.get()).isEqualTo(expectedValue); + } + } + + private static Stream contentLengthTestCases() { + return Stream.of( + Arguments.of(null, false, null), + Arguments.of(100L, true, 100L), + Arguments.of(0L, true, 0L) + ); + } + + @Test + public void send_whenByteBufferIsNull_shouldThrowNullPointerException() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + + assertThatThrownBy(() -> body.send(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("data must not be null"); + } + + @Test + public void subscribe_whenBodyIsClosed_shouldNotifySubscriberWithError() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.complete(); // Set dataReady to true + body.close(); // Set closed to true + Subscriber mockSubscriber = mock(Subscriber.class); + + body.subscribe(mockSubscriber); + + verify(mockSubscriber).onSubscribe(any()); + verify(mockSubscriber).onError(argThat(e -> + e instanceof NonRetryableException && + e.getMessage().equals("AsyncRequestBody has been closed") + )); + } + + @Test + public void subscribe_whenDataNotReady_shouldNotifySubscriberWithError() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + Subscriber mockSubscriber = mock(Subscriber.class); + + body.subscribe(mockSubscriber); + + verify(mockSubscriber).onSubscribe(any()); + verify(mockSubscriber).onError(argThat(e -> + e instanceof NonRetryableException && + e.getMessage().equals("Unexpected error occurred. Data is not ready to be " + + "subscribed") + )); + } + + @Test + public void subscribe_whenMultipleSubscribers_shouldSupportConcurrentSubscriptions() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.send(TEST_DATA_SET_1); + body.send(TEST_DATA_SET_2); + body.complete(); + + Subscriber firstSubscriber = mock(Subscriber.class); + Subscriber secondSubscriber = mock(Subscriber.class); + + ArgumentCaptor firstSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); + ArgumentCaptor secondSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); + + body.subscribe(firstSubscriber); + body.subscribe(secondSubscriber); + + verify(firstSubscriber).onSubscribe(firstSubscriptionCaptor.capture()); + verify(secondSubscriber).onSubscribe(secondSubscriptionCaptor.capture()); + + Subscription firstSubscription = firstSubscriptionCaptor.getValue(); + Subscription secondSubscription = secondSubscriptionCaptor.getValue(); + + firstSubscription.request(2); + secondSubscription.request(2); + verifyData(firstSubscriber); + verifyData(secondSubscriber); + verify(firstSubscriber).onComplete(); + verify(secondSubscriber).onComplete(); + } + + private static void verifyData(Subscriber subscriber) { + verify(subscriber).onNext( + argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_1.array()))); + verify(subscriber).onNext( + argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_2.array()))); + verify(subscriber).onComplete(); + } + + @Test + public void send_afterClose_shouldThrowIllegalStateException() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.close(); + + assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes()))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot send data to closed body"); + } + + @Test + public void send_afterComplete_shouldThrowIllegalStateException() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.complete(); + + assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes()))) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Request body has already been completed"); + } + + @Test + public void complete_afterClose_shouldThrowIllegalStateException() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.close(); + + assertThatThrownBy(() -> body.complete()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("The AsyncRequestBody has been closed"); + } + + @Test + public void complete_calledMultipleTimes_shouldNotThrow() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.complete(); + + // Should not throw - method returns early if already completed + body.complete(); + body.complete(); + + // Verify it's still in completed state + Subscriber mockSubscriber = mock(Subscriber.class); + body.subscribe(mockSubscriber); + verify(mockSubscriber).onSubscribe(any()); + } + + @Test + public void close_withActiveSubscriptions_shouldNotifyAllSubscribers() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.send(ByteBuffer.wrap("test".getBytes())); + body.complete(); + + Subscriber subscriber1 = mock(Subscriber.class); + Subscriber subscriber2 = mock(Subscriber.class); + Subscriber subscriber3 = mock(Subscriber.class); + + body.subscribe(subscriber1); + body.subscribe(subscriber2); + body.subscribe(subscriber3); + + body.close(); + + verify(subscriber1).onError(argThat(e -> + e instanceof IllegalStateException && + e.getMessage().contains("The publisher has been closed") + )); + verify(subscriber2).onError(argThat(e -> + e instanceof IllegalStateException && + e.getMessage().contains("The publisher has been closed") + )); + verify(subscriber3).onError(argThat(e -> + e instanceof IllegalStateException && + e.getMessage().contains("The publisher has been closed") + )); + } + + @Test + public void bufferedData_afterClose_shouldBeEmpty() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.send(ByteBuffer.wrap("test1".getBytes())); + body.send(ByteBuffer.wrap("test2".getBytes())); + + assertThat(body.bufferedData()).hasSize(2); + + body.close(); + + assertThat(body.bufferedData()).isEmpty(); + } + + @Test + public void send_withEmptyByteBuffer_shouldStoreEmptyBuffer() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + ByteBuffer emptyBuffer = ByteBuffer.allocate(0); + + body.send(emptyBuffer); + body.complete(); + + Subscriber mockSubscriber = mock(Subscriber.class); + ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); + + body.subscribe(mockSubscriber); + verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture()); + + Subscription subscription = subscriptionCaptor.getValue(); + subscription.request(1); + + verify(mockSubscriber).onNext(argThat(buffer -> buffer.remaining() == 0)); + verify(mockSubscriber).onComplete(); + } + + @Test + public void concurrentSendAndComplete_shouldBeThreadSafe() throws InterruptedException { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + ExecutorService executor = Executors.newFixedThreadPool(10); + CountDownLatch latch = new CountDownLatch(10); + AtomicInteger successfulSends = new AtomicInteger(0); + + // Start multiple threads sending data + for (int i = 0; i < 10; i++) { + final int threadId = i; + executor.submit(() -> { + try { + body.send(ByteBuffer.wrap(("data" + threadId).getBytes())); + successfulSends.incrementAndGet(); + } catch (IllegalStateException e) { + // Expected if complete() was called first + } finally { + latch.countDown(); + } + }); + } + + // Complete after a short delay + Thread.sleep(10); + body.complete(); + + latch.await(5, TimeUnit.SECONDS); + executor.shutdown(); + + // Should have some successful sends and the body should be complete + assertThat(successfulSends.get()).isGreaterThan(0); + assertThat(body.bufferedData().size()).isEqualTo(successfulSends.get()); + } + + @Test + public void concurrentSubscribeAndClose_shouldBeThreadSafe() throws InterruptedException { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + body.send(ByteBuffer.wrap("test".getBytes())); + body.complete(); + + ExecutorService executor = Executors.newFixedThreadPool(10); + CountDownLatch latch = new CountDownLatch(10); + AtomicInteger successfulSubscriptions = new AtomicInteger(0); + AtomicInteger errorNotifications = new AtomicInteger(0); + + // Start multiple threads subscribing + for (int i = 0; i < 10; i++) { + executor.submit(() -> { + try { + Subscriber subscriber = new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + successfulSubscriptions.incrementAndGet(); + s.request(1); + } + + @Override + public void onNext(ByteBuffer byteBuffer) {} + + @Override + public void onError(Throwable t) { + errorNotifications.incrementAndGet(); + } + + @Override + public void onComplete() {} + }; + body.subscribe(subscriber); + } finally { + latch.countDown(); + } + }); + } + + // Close after a short delay + Thread.sleep(10); + body.close(); + + latch.await(5, TimeUnit.SECONDS); + executor.shutdown(); + + // All subscribers should have been notified + assertThat(successfulSubscriptions.get()).isEqualTo(10); + } + + @Test + public void subscription_readOnlyBuffers_shouldNotAffectOriginalData() { + BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); + ByteBuffer originalBuffer = ByteBuffer.wrap("test".getBytes()); + int originalPosition = originalBuffer.position(); + + body.send(originalBuffer); + body.complete(); + + Subscriber mockSubscriber = mock(Subscriber.class); + ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); + ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); + + body.subscribe(mockSubscriber); + verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture()); + + Subscription subscription = subscriptionCaptor.getValue(); + subscription.request(1); + + verify(mockSubscriber).onNext(bufferCaptor.capture()); + ByteBuffer receivedBuffer = bufferCaptor.getValue(); + + // Verify the received buffer is read-only + assertThat(receivedBuffer.isReadOnly()).isTrue(); + + // Verify original buffer position is unchanged + assertThat(originalBuffer.position()).isEqualTo(originalPosition); + } +} From 0ce2e41390f0e9cc7fcfb3f1b26f1b8f2208d128 Mon Sep 17 00:00:00 2001 From: Zoe Wang Date: Fri, 1 Aug 2025 10:08:54 -0700 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../awssdk/core/internal/async/BufferingAsyncRequestBody.java | 4 +++- .../core/internal/async/BufferingAsyncRequestBodyTckTest.java | 2 +- .../core/internal/async/BufferingAsyncRequestBodyTest.java | 1 - 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java index d3f080c05683..779122cbdc2d 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java @@ -284,7 +284,9 @@ private void doProcessRequest() { int currentIndex = this.index.getAndIncrement(); if (currentIndex >= buffers.size()) { - // This should never happen, but defensive programming + // This should never happen because shouldProcessRequest() ensures that index.get() < buffers.size() + // before incrementing. If this condition is true, it likely indicates a concurrency bug or that buffers + // was modified unexpectedly. This defensive check is here to catch such rare, unexpected situations. notifyError(new IllegalStateException("Index out of bounds")); subscriptions.remove(this); return; diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java index cf5b14c090b5..06418fd59534 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java @@ -42,7 +42,7 @@ public Publisher createPublisher(long elements) { public Publisher createFailedPublisher() { BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024L); bufferingAsyncRequestBody.close(); - return null; + return bufferingAsyncRequestBody; } public long maxElementsFromPublisher() { diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java index ce3566e2de3f..ae3f89ad3321 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java @@ -156,7 +156,6 @@ private static void verifyData(Subscriber subscriber) { argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_1.array()))); verify(subscriber).onNext( argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_2.array()))); - verify(subscriber).onComplete(); } @Test From 62008d62dfca73478037eed98343b6ad721f492d Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Tue, 5 Aug 2025 15:12:38 -0700 Subject: [PATCH 3/3] Update existing byteBuffersAsyncRequestBody to support multicast --- .../async/BufferingAsyncRequestBody.java | 322 --------------- .../async/ByteBuffersAsyncRequestBody.java | 255 +++++++++--- .../async/BufferingAsyncRequestBodyTest.java | 383 ------------------ ...=> ByteBufferAsyncRequestBodyTckTest.java} | 18 +- .../ByteBuffersAsyncRequestBodyTest.java | 207 ++++++++-- 5 files changed, 378 insertions(+), 807 deletions(-) delete mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java delete mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java rename core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/{BufferingAsyncRequestBodyTckTest.java => ByteBufferAsyncRequestBodyTckTest.java} (65%) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java deleted file mode 100644 index 779122cbdc2d..000000000000 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBody.java +++ /dev/null @@ -1,322 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.core.internal.async; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.annotations.SdkTestInternalApi; -import software.amazon.awssdk.annotations.ThreadSafe; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.exception.NonRetryableException; -import software.amazon.awssdk.core.internal.util.NoopSubscription; -import software.amazon.awssdk.utils.Logger; -import software.amazon.awssdk.utils.SdkAutoCloseable; -import software.amazon.awssdk.utils.Validate; - -/** - * An implementation of {@link AsyncRequestBody} that buffers the entire content and supports multiple concurrent subscribers. - * - *

This class allows data to be sent incrementally via the {@link #send(ByteBuffer)} method, buffered internally, - * and then replayed to multiple subscribers independently. Each subscriber receives a complete copy of all buffered data - * when they subscribe and request it. - * - *

Usage Pattern: - * {@snippet : - * BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(contentLength); - * - * // Send data incrementally - * body.send(ByteBuffer.wrap("Hello ".getBytes())); - * body.send(ByteBuffer.wrap("World".getBytes())); - * - * // Mark data as complete and ready for subscription - * body.complete(); - * - * // Multiple subscribers can now consume the buffered data - * body.subscribe(subscriber1); - * body.subscribe(subscriber2); - * } - * - *

Thread Safety:

- * This class is thread-safe and supports concurrent operations: - *
    - *
  • Multiple threads can call {@link #send(ByteBuffer)} concurrently
  • - *
  • Multiple subscribers can be added concurrently
  • - *
  • Each subscriber operates independently with its own state
  • - *
- * - *

Subscription Behavior:

- *
    - *
  • Subscribers can only subscribe after {@link #complete()} has been called
  • - *
  • Each subscriber receives a read-only view of the buffered data
  • - *
  • Subscribers receive data independently based on their own demand signaling
  • - *
  • If the body is closed, new subscribers will receive an error immediately
  • - *
- * - *

Resource Management:

- * The body should be closed when no longer needed to free buffered data and notify active subscribers. - * Closing the body will: - *
    - *
  • Clear all buffered data
  • - *
  • Send error notifications to all active subscribers
  • - *
  • Prevent new subscriptions
  • - *
- * - */ -@ThreadSafe -@SdkInternalApi -public final class BufferingAsyncRequestBody implements AsyncRequestBody, SdkAutoCloseable { - private static final Logger log = Logger.loggerFor(BufferingAsyncRequestBody.class); - - private final Long length; - private final List bufferedData = new ArrayList<>(); - private boolean dataReady; - private boolean closed; - private final Set subscriptions; - private final Object lock = new Object(); - - /** - * Creates a new BufferingAsyncRequestBody with the specified content length. - * - * @param length the total content length in bytes, or null if unknown - */ - BufferingAsyncRequestBody(Long length) { - this.length = length; - this.subscriptions = ConcurrentHashMap.newKeySet(); - } - - /** - * Sends a chunk of data to be buffered for later consumption by subscribers. - * - * @param data the data to buffer, must not be null - * @throws NullPointerException if data is null - */ - public void send(ByteBuffer data) { - Validate.paramNotNull(data, "data"); - synchronized (lock) { - if (closed) { - throw new IllegalStateException("Cannot send data to closed body"); - } - if (dataReady) { - throw new IllegalStateException("Request body has already been completed"); - } - bufferedData.add(data); - } - } - - /** - * Marks the request body as complete and ready for subscription. - * - *

This method must be called before any subscribers can successfully subscribe - * to this request body. After calling this method, no more data should be sent - * via {@link #send(ByteBuffer)}. - * - *

Once complete, multiple subscribers can subscribe and will each receive - * the complete buffered content independently. - */ - public void complete() { - synchronized (lock) { - if (dataReady) { - return; - } - if (closed) { - throw new IllegalStateException("The AsyncRequestBody has been closed"); - } - dataReady = true; - } - } - - @Override - public Optional contentLength() { - return Optional.ofNullable(length); - } - - @Override - public void subscribe(Subscriber subscriber) { - Validate.paramNotNull(subscriber, "subscriber"); - - synchronized (lock) { - if (!dataReady) { - subscriber.onSubscribe(new NoopSubscription(subscriber)); - subscriber.onError(NonRetryableException.create( - "Unexpected error occurred. Data is not ready to be subscribed")); - return; - } - - if (closed) { - subscriber.onSubscribe(new NoopSubscription(subscriber)); - subscriber.onError(NonRetryableException.create( - "AsyncRequestBody has been closed")); - return; - } - } - - ReplayableByteBufferSubscription replayableByteBufferSubscription = - new ReplayableByteBufferSubscription(subscriber, bufferedData); - subscriber.onSubscribe(replayableByteBufferSubscription); - subscriptions.add(replayableByteBufferSubscription); - } - - @Override - public String body() { - return BodyType.BYTES.getName(); - } - - /** - *

Closes this request body and releases all resources. This will: - *

    - *
  • Clear all buffered data to free memory
  • - *
  • Notify all active subscribers with an error
  • - *
  • Prevent new subscriptions from succeeding
  • - *
- * - *

This method is idempotent - calling it multiple times has no additional effect. - * It is safe to call this method concurrently from multiple threads. - */ - @Override - public void close() { - synchronized (lock) { - if (closed) { - return; - } - - closed = true; - bufferedData.clear(); - subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed"))); - subscriptions.clear(); - } - - } - - @SdkTestInternalApi - List bufferedData() { - return Collections.unmodifiableList(bufferedData); - } - - private class ReplayableByteBufferSubscription implements Subscription { - private final AtomicInteger index = new AtomicInteger(0); - private final AtomicBoolean done = new AtomicBoolean(false); - private final List buffers; - private final AtomicBoolean processingRequest = new AtomicBoolean(false); - private Subscriber currentSubscriber; - private final AtomicLong outstandingDemand = new AtomicLong(); - - private ReplayableByteBufferSubscription(Subscriber subscriber, List buffers) { - this.buffers = buffers; - this.currentSubscriber = subscriber; - } - - @Override - public void request(long n) { - if (n <= 0) { - currentSubscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); - currentSubscriber = null; - return; - } - - if (done.get()) { - return; - } - - outstandingDemand.updateAndGet(current -> { - if (Long.MAX_VALUE - current < n) { - return Long.MAX_VALUE; - } - - return current + n; - }); - processRequest(); - } - - private void processRequest() { - do { - if (!processingRequest.compareAndSet(false, true)) { - // Some other thread is processing the queue, so we don't need to. - return; - } - - try { - doProcessRequest(); - } catch (Throwable e) { - notifyError(new IllegalStateException("Encountered fatal error in publisher", e)); - subscriptions.remove(this); - break; - } finally { - processingRequest.set(false); - } - - } while (shouldProcessRequest()); - } - - private boolean shouldProcessRequest() { - return !done.get() && outstandingDemand.get() > 0 && index.get() < buffers.size(); - } - - private void doProcessRequest() { - while (true) { - if (!shouldProcessRequest()) { - return; - } - - int currentIndex = this.index.getAndIncrement(); - - if (currentIndex >= buffers.size()) { - // This should never happen because shouldProcessRequest() ensures that index.get() < buffers.size() - // before incrementing. If this condition is true, it likely indicates a concurrency bug or that buffers - // was modified unexpectedly. This defensive check is here to catch such rare, unexpected situations. - notifyError(new IllegalStateException("Index out of bounds")); - subscriptions.remove(this); - return; - } - - ByteBuffer buffer = buffers.get(currentIndex); - currentSubscriber.onNext(buffer.asReadOnlyBuffer()); - outstandingDemand.decrementAndGet(); - - if (currentIndex == buffers.size() - 1) { - done.set(true); - currentSubscriber.onComplete(); - subscriptions.remove(this); - break; - } - } - } - - @Override - public void cancel() { - done.set(true); - subscriptions.remove(this); - } - - public void notifyError(Exception exception) { - if (currentSubscriber != null) { - done.set(true); - currentSubscriber.onError(exception); - currentSubscriber = null; - } - } - } -} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java index c19ab8e245f8..a304d75ccf94 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java @@ -16,21 +16,47 @@ package software.amazon.awssdk.core.internal.async; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.NonRetryableException; import software.amazon.awssdk.core.internal.util.Mimetype; +import software.amazon.awssdk.core.internal.util.NoopSubscription; import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.SdkAutoCloseable; +import software.amazon.awssdk.utils.Validate; /** * An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created * using static methods on {@link AsyncRequestBody} * + *

Subscription Behavior:

+ *
    + *
  • Each subscriber receives a read-only view of the buffered data
  • + *
  • Subscribers receive data independently based on their own demand signaling
  • + *
  • If the body is closed, new subscribers will receive an error immediately
  • + *
+ * + *

Resource Management:

+ * The body should be closed when no longer needed to free buffered data and notify active subscribers. + * Closing the body will: + *
    + *
  • Clear all buffered data
  • + *
  • Send error notifications to all active subscribers
  • + *
  • Prevent new subscriptions
  • + *
* @see AsyncRequestBody#fromBytes(byte[]) * @see AsyncRequestBody#fromBytesUnsafe(byte[]) * @see AsyncRequestBody#fromByteBuffer(ByteBuffer) @@ -40,17 +66,21 @@ * @see AsyncRequestBody#fromString(String) */ @SdkInternalApi -public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody { +public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody, SdkAutoCloseable { private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class); private final String mimetype; private final Long length; - private final ByteBuffer[] buffers; + private List buffers; + private final Set subscriptions; + private final Object lock = new Object(); + private boolean closed; - private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) { + private ByteBuffersAsyncRequestBody(String mimetype, Long length, List buffers) { this.mimetype = mimetype; - this.length = length; this.buffers = buffers; + this.length = length; + this.subscriptions = ConcurrentHashMap.newKeySet(); } @Override @@ -64,61 +94,25 @@ public String contentType() { } @Override - public void subscribe(Subscriber s) { - // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null - if (s == null) { - throw new NullPointerException("Subscription MUST NOT be null."); + public void subscribe(Subscriber subscriber) { + Validate.paramNotNull(subscriber, "subscriber"); + synchronized (lock) { + if (closed) { + subscriber.onSubscribe(new NoopSubscription(subscriber)); + subscriber.onError(NonRetryableException.create( + "AsyncRequestBody has been closed")); + return; + } } - // As per 2.13, this method must return normally (i.e. not throw). try { - s.onSubscribe( - new Subscription() { - private final AtomicInteger index = new AtomicInteger(0); - private final AtomicBoolean completed = new AtomicBoolean(false); - - @Override - public void request(long n) { - if (completed.get()) { - return; - } - - if (n > 0) { - int i = index.getAndIncrement(); - - if (buffers.length == 0 && completed.compareAndSet(false, true)) { - s.onComplete(); - } - - if (i >= buffers.length) { - return; - } - - long remaining = n; - - do { - ByteBuffer buffer = buffers[i]; - - s.onNext(buffer.asReadOnlyBuffer()); - remaining--; - } while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length); - - if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) { - s.onComplete(); - } - } else { - s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); - } - } - - @Override - public void cancel() { - completed.set(true); - } - } - ); + ReplayableByteBufferSubscription replayableByteBufferSubscription = + new ReplayableByteBufferSubscription(subscriber); + subscriber.onSubscribe(replayableByteBufferSubscription); + subscriptions.add(replayableByteBufferSubscription); } catch (Throwable ex) { - log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex); + log.error(() -> subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", + ex); } } @@ -127,34 +121,167 @@ public String body() { return BodyType.BYTES.getName(); } - public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) { - long length = Arrays.stream(buffers) - .mapToLong(ByteBuffer::remaining) - .sum(); + public static ByteBuffersAsyncRequestBody of(List buffers) { + long length = buffers.stream() + .mapToLong(ByteBuffer::remaining) + .sum(); return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers); } + public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) { + return of(Arrays.asList(buffers)); + } + public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) { - return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers); + return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, Arrays.asList(buffers)); } public static ByteBuffersAsyncRequestBody of(String mimetype, ByteBuffer... buffers) { long length = Arrays.stream(buffers) .mapToLong(ByteBuffer::remaining) .sum(); - return new ByteBuffersAsyncRequestBody(mimetype, length, buffers); + return new ByteBuffersAsyncRequestBody(mimetype, length, Arrays.asList(buffers)); } public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) { - return new ByteBuffersAsyncRequestBody(mimetype, length, buffers); + return new ByteBuffersAsyncRequestBody(mimetype, length, Arrays.asList(buffers)); } public static ByteBuffersAsyncRequestBody from(byte[] bytes) { return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length, - ByteBuffer.wrap(bytes)); + Collections.singletonList(ByteBuffer.wrap(bytes))); } public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) { - return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes)); + return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, + Collections.singletonList(ByteBuffer.wrap(bytes))); + } + + @Override + public void close() { + synchronized (lock) { + if (closed) { + return; + } + + closed = true; + buffers = new ArrayList<>(); + subscriptions.forEach(s -> s.notifyError(new IllegalStateException("The publisher has been closed"))); + subscriptions.clear(); + } + } + + @SdkTestInternalApi + public List bufferedData() { + return buffers; + } + + private class ReplayableByteBufferSubscription implements Subscription { + private final AtomicInteger index = new AtomicInteger(0); + private volatile boolean done; + private final AtomicBoolean processingRequest = new AtomicBoolean(false); + private Subscriber currentSubscriber; + private final AtomicLong outstandingDemand = new AtomicLong(); + + private ReplayableByteBufferSubscription(Subscriber subscriber) { + this.currentSubscriber = subscriber; + } + + @Override + public void request(long n) { + if (n <= 0) { + currentSubscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); + currentSubscriber = null; + return; + } + + if (done) { + return; + } + + if (buffers.size() == 0) { + currentSubscriber.onComplete(); + done = true; + subscriptions.remove(this); + return; + } + + outstandingDemand.updateAndGet(current -> { + if (Long.MAX_VALUE - current < n) { + return Long.MAX_VALUE; + } + + return current + n; + }); + processRequest(); + } + + private void processRequest() { + do { + if (!processingRequest.compareAndSet(false, true)) { + // Some other thread is processing the queue, so we don't need to. + return; + } + + try { + doProcessRequest(); + } catch (Throwable e) { + notifyError(new IllegalStateException("Encountered fatal error in publisher", e)); + subscriptions.remove(this); + break; + } finally { + processingRequest.set(false); + } + + } while (shouldProcessRequest()); + } + + private boolean shouldProcessRequest() { + return !done && outstandingDemand.get() > 0 && index.get() < buffers.size(); + } + + private void doProcessRequest() { + while (true) { + if (!shouldProcessRequest()) { + return; + } + + int currentIndex = this.index.getAndIncrement(); + + if (currentIndex >= buffers.size()) { + // This should never happen because shouldProcessRequest() ensures that index.get() < buffers.size() + // before incrementing. If this condition is true, it likely indicates a concurrency bug or that buffers + // was modified unexpectedly. This defensive check is here to catch such rare, unexpected situations. + notifyError(new IllegalStateException("Index out of bounds")); + subscriptions.remove(this); + return; + } + + ByteBuffer buffer = buffers.get(currentIndex); + currentSubscriber.onNext(buffer.asReadOnlyBuffer()); + outstandingDemand.decrementAndGet(); + + if (currentIndex == buffers.size() - 1) { + done = true; + currentSubscriber.onComplete(); + subscriptions.remove(this); + break; + } + } + } + + @Override + public void cancel() { + done = true; + subscriptions.remove(this); + } + + public void notifyError(Exception exception) { + if (currentSubscriber != null) { + done = true; + currentSubscriber.onError(exception); + currentSubscriber = null; + } + } } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java deleted file mode 100644 index ae3f89ad3321..000000000000 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTest.java +++ /dev/null @@ -1,383 +0,0 @@ -package software.amazon.awssdk.core.internal.async; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; -import org.apache.commons.lang3.RandomStringUtils; -import org.junit.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.core.exception.NonRetryableException; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; -import static org.mockito.Mockito.mock; - -import org.mockito.ArgumentCaptor; -import software.amazon.awssdk.utils.BinaryUtils; - -public class BufferingAsyncRequestBodyTest { - - private static final ByteBuffer TEST_DATA_SET_1 = - ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8)); - private static final ByteBuffer TEST_DATA_SET_2 = - ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8)); - - @Test - public void body_whenCalled_shouldReturnConstantBytes() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - assertEquals("Bytes", body.body()); - } - - @Test - public void close_whenCalledMultipleTimes_shouldExecuteOnlyOnce() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - Subscriber mockSubscriber = mock(Subscriber.class); - - body.subscribe(mockSubscriber); - - body.close(); - assertThat(body.bufferedData()).isEmpty(); - body.close(); - body.close(); - - verify(mockSubscriber, times(1)).onError(any(NonRetryableException.class)); - } - - @ParameterizedTest - @MethodSource("contentLengthTestCases") - public void contentLength_withVariousInputs_shouldReturnExpectedResult(Long inputLength, boolean shouldBePresent, - Long expectedValue) { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(inputLength); - Optional result = body.contentLength(); - - assertThat(result.isPresent()).isEqualTo(shouldBePresent); - if (shouldBePresent) { - assertThat(result.get()).isEqualTo(expectedValue); - } - } - - private static Stream contentLengthTestCases() { - return Stream.of( - Arguments.of(null, false, null), - Arguments.of(100L, true, 100L), - Arguments.of(0L, true, 0L) - ); - } - - @Test - public void send_whenByteBufferIsNull_shouldThrowNullPointerException() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - - assertThatThrownBy(() -> body.send(null)) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("data must not be null"); - } - - @Test - public void subscribe_whenBodyIsClosed_shouldNotifySubscriberWithError() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.complete(); // Set dataReady to true - body.close(); // Set closed to true - Subscriber mockSubscriber = mock(Subscriber.class); - - body.subscribe(mockSubscriber); - - verify(mockSubscriber).onSubscribe(any()); - verify(mockSubscriber).onError(argThat(e -> - e instanceof NonRetryableException && - e.getMessage().equals("AsyncRequestBody has been closed") - )); - } - - @Test - public void subscribe_whenDataNotReady_shouldNotifySubscriberWithError() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - Subscriber mockSubscriber = mock(Subscriber.class); - - body.subscribe(mockSubscriber); - - verify(mockSubscriber).onSubscribe(any()); - verify(mockSubscriber).onError(argThat(e -> - e instanceof NonRetryableException && - e.getMessage().equals("Unexpected error occurred. Data is not ready to be " - + "subscribed") - )); - } - - @Test - public void subscribe_whenMultipleSubscribers_shouldSupportConcurrentSubscriptions() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.send(TEST_DATA_SET_1); - body.send(TEST_DATA_SET_2); - body.complete(); - - Subscriber firstSubscriber = mock(Subscriber.class); - Subscriber secondSubscriber = mock(Subscriber.class); - - ArgumentCaptor firstSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); - ArgumentCaptor secondSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); - - body.subscribe(firstSubscriber); - body.subscribe(secondSubscriber); - - verify(firstSubscriber).onSubscribe(firstSubscriptionCaptor.capture()); - verify(secondSubscriber).onSubscribe(secondSubscriptionCaptor.capture()); - - Subscription firstSubscription = firstSubscriptionCaptor.getValue(); - Subscription secondSubscription = secondSubscriptionCaptor.getValue(); - - firstSubscription.request(2); - secondSubscription.request(2); - verifyData(firstSubscriber); - verifyData(secondSubscriber); - verify(firstSubscriber).onComplete(); - verify(secondSubscriber).onComplete(); - } - - private static void verifyData(Subscriber subscriber) { - verify(subscriber).onNext( - argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_1.array()))); - verify(subscriber).onNext( - argThat(buffer -> Arrays.equals(BinaryUtils.copyBytesFrom(buffer), TEST_DATA_SET_2.array()))); - } - - @Test - public void send_afterClose_shouldThrowIllegalStateException() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.close(); - - assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes()))) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("Cannot send data to closed body"); - } - - @Test - public void send_afterComplete_shouldThrowIllegalStateException() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.complete(); - - assertThatThrownBy(() -> body.send(ByteBuffer.wrap("test".getBytes()))) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("Request body has already been completed"); - } - - @Test - public void complete_afterClose_shouldThrowIllegalStateException() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.close(); - - assertThatThrownBy(() -> body.complete()) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("The AsyncRequestBody has been closed"); - } - - @Test - public void complete_calledMultipleTimes_shouldNotThrow() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.complete(); - - // Should not throw - method returns early if already completed - body.complete(); - body.complete(); - - // Verify it's still in completed state - Subscriber mockSubscriber = mock(Subscriber.class); - body.subscribe(mockSubscriber); - verify(mockSubscriber).onSubscribe(any()); - } - - @Test - public void close_withActiveSubscriptions_shouldNotifyAllSubscribers() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.send(ByteBuffer.wrap("test".getBytes())); - body.complete(); - - Subscriber subscriber1 = mock(Subscriber.class); - Subscriber subscriber2 = mock(Subscriber.class); - Subscriber subscriber3 = mock(Subscriber.class); - - body.subscribe(subscriber1); - body.subscribe(subscriber2); - body.subscribe(subscriber3); - - body.close(); - - verify(subscriber1).onError(argThat(e -> - e instanceof IllegalStateException && - e.getMessage().contains("The publisher has been closed") - )); - verify(subscriber2).onError(argThat(e -> - e instanceof IllegalStateException && - e.getMessage().contains("The publisher has been closed") - )); - verify(subscriber3).onError(argThat(e -> - e instanceof IllegalStateException && - e.getMessage().contains("The publisher has been closed") - )); - } - - @Test - public void bufferedData_afterClose_shouldBeEmpty() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.send(ByteBuffer.wrap("test1".getBytes())); - body.send(ByteBuffer.wrap("test2".getBytes())); - - assertThat(body.bufferedData()).hasSize(2); - - body.close(); - - assertThat(body.bufferedData()).isEmpty(); - } - - @Test - public void send_withEmptyByteBuffer_shouldStoreEmptyBuffer() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - ByteBuffer emptyBuffer = ByteBuffer.allocate(0); - - body.send(emptyBuffer); - body.complete(); - - Subscriber mockSubscriber = mock(Subscriber.class); - ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); - - body.subscribe(mockSubscriber); - verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture()); - - Subscription subscription = subscriptionCaptor.getValue(); - subscription.request(1); - - verify(mockSubscriber).onNext(argThat(buffer -> buffer.remaining() == 0)); - verify(mockSubscriber).onComplete(); - } - - @Test - public void concurrentSendAndComplete_shouldBeThreadSafe() throws InterruptedException { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - ExecutorService executor = Executors.newFixedThreadPool(10); - CountDownLatch latch = new CountDownLatch(10); - AtomicInteger successfulSends = new AtomicInteger(0); - - // Start multiple threads sending data - for (int i = 0; i < 10; i++) { - final int threadId = i; - executor.submit(() -> { - try { - body.send(ByteBuffer.wrap(("data" + threadId).getBytes())); - successfulSends.incrementAndGet(); - } catch (IllegalStateException e) { - // Expected if complete() was called first - } finally { - latch.countDown(); - } - }); - } - - // Complete after a short delay - Thread.sleep(10); - body.complete(); - - latch.await(5, TimeUnit.SECONDS); - executor.shutdown(); - - // Should have some successful sends and the body should be complete - assertThat(successfulSends.get()).isGreaterThan(0); - assertThat(body.bufferedData().size()).isEqualTo(successfulSends.get()); - } - - @Test - public void concurrentSubscribeAndClose_shouldBeThreadSafe() throws InterruptedException { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - body.send(ByteBuffer.wrap("test".getBytes())); - body.complete(); - - ExecutorService executor = Executors.newFixedThreadPool(10); - CountDownLatch latch = new CountDownLatch(10); - AtomicInteger successfulSubscriptions = new AtomicInteger(0); - AtomicInteger errorNotifications = new AtomicInteger(0); - - // Start multiple threads subscribing - for (int i = 0; i < 10; i++) { - executor.submit(() -> { - try { - Subscriber subscriber = new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - successfulSubscriptions.incrementAndGet(); - s.request(1); - } - - @Override - public void onNext(ByteBuffer byteBuffer) {} - - @Override - public void onError(Throwable t) { - errorNotifications.incrementAndGet(); - } - - @Override - public void onComplete() {} - }; - body.subscribe(subscriber); - } finally { - latch.countDown(); - } - }); - } - - // Close after a short delay - Thread.sleep(10); - body.close(); - - latch.await(5, TimeUnit.SECONDS); - executor.shutdown(); - - // All subscribers should have been notified - assertThat(successfulSubscriptions.get()).isEqualTo(10); - } - - @Test - public void subscription_readOnlyBuffers_shouldNotAffectOriginalData() { - BufferingAsyncRequestBody body = new BufferingAsyncRequestBody(null); - ByteBuffer originalBuffer = ByteBuffer.wrap("test".getBytes()); - int originalPosition = originalBuffer.position(); - - body.send(originalBuffer); - body.complete(); - - Subscriber mockSubscriber = mock(Subscriber.class); - ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); - ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); - - body.subscribe(mockSubscriber); - verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture()); - - Subscription subscription = subscriptionCaptor.getValue(); - subscription.request(1); - - verify(mockSubscriber).onNext(bufferCaptor.capture()); - ByteBuffer receivedBuffer = bufferCaptor.getValue(); - - // Verify the received buffer is read-only - assertThat(receivedBuffer.isReadOnly()).isTrue(); - - // Verify original buffer position is unchanged - assertThat(originalBuffer.position()).isEqualTo(originalPosition); - } -} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBufferAsyncRequestBodyTckTest.java similarity index 65% rename from core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java rename to core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBufferAsyncRequestBodyTckTest.java index 06418fd59534..3d9df4b9e1e2 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/BufferingAsyncRequestBodyTckTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBufferAsyncRequestBodyTckTest.java @@ -17,30 +17,30 @@ import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.lang3.RandomStringUtils; import org.reactivestreams.Publisher; import org.reactivestreams.tck.TestEnvironment; import software.amazon.awssdk.core.SdkBytes; -public class BufferingAsyncRequestBodyTckTest extends org.reactivestreams.tck.PublisherVerification { - public BufferingAsyncRequestBodyTckTest() { - super(new TestEnvironment(true)); +public class ByteBufferAsyncRequestBodyTckTest extends org.reactivestreams.tck.PublisherVerification { + public ByteBufferAsyncRequestBodyTckTest() { + super(new TestEnvironment()); } @Override public Publisher createPublisher(long elements) { - BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024 * elements); + List buffers = new ArrayList<>(); for (int i = 0; i < elements; i++) { - bufferingAsyncRequestBody.send(SdkBytes.fromUtf8String(RandomStringUtils.randomAscii(1024)).asByteBuffer()); + buffers.add(SdkBytes.fromUtf8String(RandomStringUtils.randomAscii(1024)).asByteBuffer()); } - - bufferingAsyncRequestBody.complete(); - return bufferingAsyncRequestBody; + return ByteBuffersAsyncRequestBody.of(buffers.toArray(new ByteBuffer[0])); } @Override public Publisher createFailedPublisher() { - BufferingAsyncRequestBody bufferingAsyncRequestBody = new BufferingAsyncRequestBody(1024L); + ByteBuffersAsyncRequestBody bufferingAsyncRequestBody = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes())); bufferingAsyncRequestBody.close(); return bufferingAsyncRequestBody; } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java index e1035dc25b0c..c80784b26a90 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java @@ -15,58 +15,44 @@ package software.amazon.awssdk.core.internal.async; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.NonRetryableException; import software.amazon.awssdk.utils.BinaryUtils; class ByteBuffersAsyncRequestBodyTest { - private static class TestSubscriber implements Subscriber { - private Subscription subscription; - private boolean onCompleteCalled = false; - private int callsToComplete = 0; - private final List publishedResults = Collections.synchronizedList(new ArrayList<>()); - - public void request(long n) { - subscription.request(n); - } - - @Override - public void onSubscribe(Subscription s) { - this.subscription = s; - } - - @Override - public void onNext(ByteBuffer byteBuffer) { - publishedResults.add(byteBuffer); - } - - @Override - public void onError(Throwable throwable) { - throw new IllegalStateException(throwable); - } + private static ExecutorService executor = Executors.newFixedThreadPool(10); - @Override - public void onComplete() { - onCompleteCalled = true; - callsToComplete++; - } + @AfterAll + static void tearDown() { + executor.shutdown(); } @Test @@ -214,4 +200,167 @@ public void staticFromBytesConstructorSetsLengthBasedOnArrayLength() { assertEquals(bytes.length, body.contentLength().get()); } + @Test + public void subscribe_whenBodyIsClosed_shouldNotifySubscriberWithError() { + ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap("test".getBytes())); + body.close(); // Set closed to true + Subscriber mockSubscriber = mock(Subscriber.class); + + body.subscribe(mockSubscriber); + + verify(mockSubscriber).onSubscribe(any()); + verify(mockSubscriber).onError(argThat(e -> + e instanceof NonRetryableException && + e.getMessage().equals("AsyncRequestBody has been closed") + )); + } + + @Test + public void close_withActiveSubscriptions_shouldNotifyAllSubscribers() { + ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes(StandardCharsets.UTF_8))); + + Subscriber subscriber1 = mock(Subscriber.class); + Subscriber subscriber2 = mock(Subscriber.class); + Subscriber subscriber3 = mock(Subscriber.class); + + body.subscribe(subscriber1); + body.subscribe(subscriber2); + body.subscribe(subscriber3); + + body.close(); + + verify(subscriber1).onError(argThat(e -> + e instanceof IllegalStateException && + e.getMessage().contains("The publisher has been closed") + )); + verify(subscriber2).onError(argThat(e -> + e instanceof IllegalStateException && + e.getMessage().contains("The publisher has been closed") + )); + verify(subscriber3).onError(argThat(e -> + e instanceof IllegalStateException && + e.getMessage().contains("The publisher has been closed") + )); + } + + @Test + public void bufferedData_afterClose_shouldBeEmpty() { + ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of( + ByteBuffer.wrap("test1".getBytes()), + ByteBuffer.wrap("test2".getBytes())); + + assertThat(body.bufferedData()).hasSize(2); + body.close(); + assertThat(body.bufferedData()).isEmpty(); + } + + @Test + public void concurrentSubscribeAndClose_shouldBeThreadSafe() throws InterruptedException { + ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of( + ByteBuffer.wrap("test1".getBytes()), + ByteBuffer.wrap("test2".getBytes())); + + + CountDownLatch latch = new CountDownLatch(10); + AtomicInteger successfulSubscriptions = new AtomicInteger(0); + AtomicInteger errorNotifications = new AtomicInteger(0); + + // Start multiple threads subscribing + for (int i = 0; i < 10; i++) { + executor.submit(() -> { + try { + Subscriber subscriber = new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + successfulSubscriptions.incrementAndGet(); + s.request(1); + } + + @Override + public void onNext(ByteBuffer byteBuffer) {} + + @Override + public void onError(Throwable t) { + errorNotifications.incrementAndGet(); + } + + @Override + public void onComplete() {} + }; + body.subscribe(subscriber); + } finally { + latch.countDown(); + } + }); + } + + // Close after a short delay + Thread.sleep(10); + body.close(); + + latch.await(5, TimeUnit.SECONDS); + executor.shutdown(); + + // All subscribers should have been notified + assertThat(successfulSubscriptions.get()).isEqualTo(10); + } + + @Test + public void subscription_readOnlyBuffers_shouldNotAffectOriginalData() { + ByteBuffer originalBuffer = ByteBuffer.wrap(RandomStringUtils.randomAscii(1024).getBytes()); + ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of( + originalBuffer); + int originalPosition = originalBuffer.position(); + + Subscriber mockSubscriber = mock(Subscriber.class); + ArgumentCaptor subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class); + ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); + + body.subscribe(mockSubscriber); + verify(mockSubscriber).onSubscribe(subscriptionCaptor.capture()); + + Subscription subscription = subscriptionCaptor.getValue(); + subscription.request(1); + + verify(mockSubscriber).onNext(bufferCaptor.capture()); + ByteBuffer receivedBuffer = bufferCaptor.getValue(); + byte[] bytes = BinaryUtils.copyBytesFrom(receivedBuffer); + + assertThat(receivedBuffer.isReadOnly()).isTrue(); + + assertThat(originalBuffer.position()).isEqualTo(originalPosition); + } + + private static class TestSubscriber implements Subscriber { + private Subscription subscription; + private boolean onCompleteCalled = false; + private int callsToComplete = 0; + private final List publishedResults = Collections.synchronizedList(new ArrayList<>()); + + public void request(long n) { + subscription.request(n); + } + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + publishedResults.add(byteBuffer); + } + + @Override + public void onError(Throwable throwable) { + throw new IllegalStateException(throwable); + } + + @Override + public void onComplete() { + onCompleteCalled = true; + callsToComplete++; + } + } + }