Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add tests for concurrent access scenarios to ensure concurrent access to writeTo method is safe, or that has already been covered in other places?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason why writeTo_ConcurrentWrites_HandlesCorrectly was removed is because concurrent reads from an inputStream is not thread safe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, concurrent reads from an InputStream are not thread-safe, and if we need to implement this functionality, we would need to add proper synchronization mechanisms. Since the Apache 4.x client doesn't have support for this, I removed this test case.
Note that I added this test case in my previous PR where I included several test cases to RepeatableInputStreamRequestEntityTest to verify all possible scenarios in both 4.x and 5.x versions to ensure consistent functionality. However, at that time, I didn't test for flakiness by running it in loop mode. When I later ran it as a @Repeated test for 1000 iterations, it started failing.
So initially I thought it was fine doing concurrent access for Apache 4.x since it was passing (it fails intermittently )

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.http.apache.internal;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand All @@ -35,12 +36,7 @@
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -343,7 +339,9 @@ public int read() throws IOException {
return -1;
}
hasBeenRead = true;
return data[position++] & 0xFF;
int i = data[position] & 0xFF;
position++;
return i;
}

@Override
Expand Down Expand Up @@ -670,51 +668,6 @@ void constructor_WithoutContentType_HandlesGracefully() {
assertEquals(100L, entity.getContentLength());
}

@Test
@DisplayName("Entity should handle concurrent write attempts")
void writeTo_ConcurrentWrites_HandlesCorrectly() throws Exception {
// Given
String content = "Concurrent test content";
ContentStreamProvider provider = () -> new ByteArrayInputStream(content.getBytes());
SdkHttpRequest httpRequest = httpRequestBuilder.build();
HttpExecuteRequest request = HttpExecuteRequest.builder()
.request(httpRequest)
.contentStreamProvider(provider)
.build();

entity = new RepeatableInputStreamRequestEntity(request);

// Simulate concurrent writes
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
List<ByteArrayOutputStream> outputs = Collections.synchronizedList(new ArrayList<>());
List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
entity.writeTo(output);
outputs.add(output);
} catch (Exception e) {
exceptions.add(e);
} finally {
latch.countDown();
}
}).start();
}

latch.await(5, TimeUnit.SECONDS);

// At least one should succeed, others may fail due to stream state
assertFalse(outputs.isEmpty(), "At least one write should succeed");
for (ByteArrayOutputStream output : outputs) {
if (output.size() > 0) {
assertEquals(content, output.toString());
}
}
}

@Test
@DisplayName("Entity should handle interrupted IO operations")
void writeTo_InterruptedStream_ThrowsIOException() throws IOException {
Expand Down Expand Up @@ -790,5 +743,161 @@ void multipleOperations_StatePreservation_WorksCorrectly() throws IOException {
assertEquals(contentLength1, contentLength2);
assertEquals(contentLength2, contentLength3);
}

@Test
@DisplayName("markSupported should be be called everytime")
void markSupported_NotCachedDuringConstruction() {
// Given
AtomicInteger markSupportedCalls = new AtomicInteger(0);
InputStream trackingStream = new ByteArrayInputStream("test".getBytes()) {
@Override
public boolean markSupported() {
markSupportedCalls.incrementAndGet();
return true;
}
};

entity = createEntity(trackingStream);
assertEquals(0, markSupportedCalls.get());

// Multiple isRepeatable calls trigger new markSupported calls
assertTrue(entity.isRepeatable());
assertTrue(entity.isRepeatable());
assertEquals(2, markSupportedCalls.get());
}

@Test
@DisplayName("ContentStreamProvider.newStream() should only be called once")
void contentStreamProvider_NewStreamCalledOnce() {
AtomicInteger newStreamCalls = new AtomicInteger(0);
ContentStreamProvider provider = () -> {
if (newStreamCalls.incrementAndGet() > 1) {
throw new RuntimeException("Could not create new stream: Already created");
}
return new ByteArrayInputStream("test".getBytes());
};

entity = createEntity(provider);

assertEquals(1, newStreamCalls.get());
assertTrue(entity.isRepeatable());
assertFalse(entity.isChunked());
}

@Test
@DisplayName("writeTo should use cached markSupported for reset decision")
void writeTo_UsesCachedMarkSupported() throws IOException {
// Given - Stream that changes markSupported behavior
AtomicInteger markSupportedCalls = new AtomicInteger(0);
ByteArrayInputStream baseStream = new ByteArrayInputStream("test".getBytes());
InputStream stream = new InputStream() {
@Override
public int read() throws IOException {
return baseStream.read();
}

@Override
public boolean markSupported() {
return markSupportedCalls.incrementAndGet() == 1; // Only first call returns true
}

@Override
public synchronized void reset() throws IOException {
baseStream.reset();
}
};

entity = createEntity(stream);

// Write twice
ByteArrayOutputStream output1 = new ByteArrayOutputStream();
entity.writeTo(output1);

ByteArrayOutputStream output2 = new ByteArrayOutputStream();
entity.writeTo(output2);

// Then - Both writes succeed using cached markSupported value
assertEquals("test", output1.toString());
assertEquals("test", output2.toString());
assertEquals(1, markSupportedCalls.get());
}

@Test
@DisplayName("Non-repeatable stream should not attempt reset")
void nonRepeatableStream_NoResetAttempt() throws IOException {
// Given
AtomicInteger resetCalls = new AtomicInteger(0);
InputStream nonRepeatableStream = new ByteArrayInputStream("test".getBytes()) {
@Override
public boolean markSupported() {
return false;
}

@Override
public synchronized void reset() {
resetCalls.incrementAndGet();
throw new RuntimeException("Reset not supported");
}
};

entity = createEntity(nonRepeatableStream);
assertFalse(entity.isRepeatable());
entity.writeTo(new ByteArrayOutputStream());
entity.writeTo(new ByteArrayOutputStream());
assertEquals(0, resetCalls.get());
}

@Test
@DisplayName("Stream should not be read during construction")
void constructor_DoesNotReadStream() {
// Given
InputStream nonReadableStream = new InputStream() {
@Override
public int read() throws IOException {
throw new IOException("Stream should not be read during construction");
}

@Override
public boolean markSupported() {
return true;
}
};
assertDoesNotThrow(() -> entity = createEntity(nonReadableStream));
assertTrue(entity.isRepeatable());
}

@Test
@DisplayName("getContent should reuse existing stream")
void getContent_ReusesExistingStream() throws IOException {
InputStream originalStream = new ByteArrayInputStream("content".getBytes());
entity = createEntity(originalStream);
InputStream content1 = entity.getContent();
InputStream content2 = entity.getContent();
assertSame(content1, content2);
}

@Test
@DisplayName("Empty stream should be repeatable")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi just trying to understand, in this circumstance, is it equivalent to create an Entity with InputStream with zero length?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap , that is done here

private static InputStream getContent(Optional<ContentStreamProvider> contentStreamProvider) {
return contentStreamProvider.map(ContentStreamProvider::newStream)
.orElseGet(() -> new ByteArrayInputStream(new byte[0]));
}

void emptyStream_IsRepeatable() {
// Given - No content provider
HttpExecuteRequest request = HttpExecuteRequest.builder()
.request(httpRequestBuilder.build())
.build();
entity = new RepeatableInputStreamRequestEntity(request);
assertTrue(entity.isRepeatable());
}

// Helper methods
private RepeatableInputStreamRequestEntity createEntity(InputStream stream) {
return createEntity(() -> stream);
}

private RepeatableInputStreamRequestEntity createEntity(ContentStreamProvider provider) {
HttpExecuteRequest request = HttpExecuteRequest.builder()
.request(httpRequestBuilder.build())
.contentStreamProvider(provider)
.build();
return new RepeatableInputStreamRequestEntity(request);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,11 @@ public class RepeatableInputStreamRequestEntity extends HttpEntityWrapper {
/**
* True if the "Transfer-Encoding:chunked" header is present
*/
private boolean isChunked;

private final boolean isChunked;
/**
* The underlying InputStreamEntity being delegated to
* The underlying reference of content
*/
private InputStreamEntity inputStreamRequestEntity;

/**
* The InputStream containing the content to write out
*/
private InputStream content;

private final InputStream content;
/**
* Record the original exception if we do attempt a retry, so that if the
* retry fails, we can report the original exception. Otherwise, we're most
Expand All @@ -70,18 +63,36 @@ public class RepeatableInputStreamRequestEntity extends HttpEntityWrapper {
private IOException originalException;

/**
* Creates a new RepeatableInputStreamRequestEntity using the information
* from the specified request. If the input stream containing the request's
* contents is repeatable, then this RequestEntity will report as being
* repeatable.
*
* @param request The details of the request being written out (content type,
* content length, and content).
* Helper class to capture both the created entity and the original content stream reference.
* <p>
* We store the content stream reference to avoid calling {@code getContent()} on the wrapped
* entity multiple times, which could potentially create new stream instances or perform
* unnecessary operations. This ensures we consistently use the same stream instance for
* {@code markSupported()} checks and {@code reset()} operations throughout the entity's lifecycle.
*/

private static class EntityCreationResult {
final InputStreamEntity entity;
final InputStream content;

EntityCreationResult(InputStreamEntity entity, InputStream content) {
this.entity = entity;
this.content = content;
}
}

public RepeatableInputStreamRequestEntity(HttpExecuteRequest request) {
super(createInputStreamEntity(request));
this(createInputStreamEntityWithMetadata(request), request);
}

private RepeatableInputStreamRequestEntity(EntityCreationResult result, HttpExecuteRequest request) {
super(result.entity);
this.content = result.content;
this.isChunked = request.httpRequest().matchingHeaders(TRANSFER_ENCODING).contains(CHUNKED);
}

isChunked = request.httpRequest().matchingHeaders(TRANSFER_ENCODING).contains(CHUNKED);
private static EntityCreationResult createInputStreamEntityWithMetadata(HttpExecuteRequest request) {
InputStream content = getContent(request.contentStreamProvider());

/*
* If we don't specify a content length when we instantiate our
Expand All @@ -93,35 +104,14 @@ public RepeatableInputStreamRequestEntity(HttpExecuteRequest request) {
.map(RepeatableInputStreamRequestEntity::parseContentLength)
.orElse(-1L);

content = getContent(request.contentStreamProvider());

// Create InputStreamEntity with proper ContentType handling for HttpClient 5.x
ContentType contentType = request.httpRequest().firstMatchingHeader("Content-Type")
.map(RepeatableInputStreamRequestEntity::parseContentType)
.orElse(null);

if (contentLength >= 0) {
inputStreamRequestEntity = new InputStreamEntity(content, contentLength, contentType);
} else {
inputStreamRequestEntity = new InputStreamEntity(content, contentType);
}
}

private static InputStreamEntity createInputStreamEntity(HttpExecuteRequest request) {
InputStream content = getContent(request.contentStreamProvider());

long contentLength = request.httpRequest().firstMatchingHeader("Content-Length")
.map(RepeatableInputStreamRequestEntity::parseContentLength)
.orElse(-1L);

ContentType contentType = request.httpRequest().firstMatchingHeader("Content-Type")
.map(RepeatableInputStreamRequestEntity::parseContentType)
.orElse(null);

if (contentLength >= 0) {
return new InputStreamEntity(content, contentLength, contentType);
}
return new InputStreamEntity(content, contentType);
InputStreamEntity entity = contentLength >= 0
? new InputStreamEntity(content, contentLength, contentType)
: new InputStreamEntity(content, contentType);
return new EntityCreationResult(entity, content);
}

private static long parseContentLength(String contentLength) {
Expand Down Expand Up @@ -164,13 +154,9 @@ public boolean isChunked() {
*/
@Override
public boolean isRepeatable() {
boolean markSupported = content.markSupported();
boolean entityRepeatable = inputStreamRequestEntity.isRepeatable();
boolean result = markSupported || entityRepeatable;
return result;
return content.markSupported() || super.isRepeatable();
}


/**
* Resets the underlying InputStream if this isn't the first attempt to
* write out the request, otherwise simply delegates to
Expand All @@ -189,7 +175,7 @@ public void writeTo(OutputStream output) throws IOException {
}

firstAttempt = false;
inputStreamRequestEntity.writeTo(output);
super.writeTo(output);
} catch (IOException ioe) {
if (originalException == null) {
originalException = ioe;
Expand All @@ -200,12 +186,8 @@ public void writeTo(OutputStream output) throws IOException {

@Override
public void close() throws IOException {
try {
if (content != null) {
content.close();
}
} finally {
super.close();
}
// The InputStreamEntity handles closing the stream when it's closed
// We don't need to close our reference separately to avoid double-closing
super.close();
}
}
Loading
Loading