-
Notifications
You must be signed in to change notification settings - Fork 997
Fix Apache5 HTTP client retry failures with non-resettable streams #6154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9e8fc15
70a120e
c7b09a2
068cada
4766e74
c81efdc
6b64589
89debeb
7f9e77f
5fb2d35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |||||||||
| package software.amazon.awssdk.http.apache.internal; | ||||||||||
|
|
||||||||||
| import static org.junit.jupiter.api.Assertions.assertArrayEquals; | ||||||||||
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||||||||||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||||||||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||||||||||
| import static org.junit.jupiter.api.Assertions.assertInstanceOf; | ||||||||||
|
|
@@ -35,12 +36,7 @@ | |||||||||
| import java.io.InputStream; | ||||||||||
| import java.io.InterruptedIOException; | ||||||||||
| import java.net.URI; | ||||||||||
| import java.util.ArrayList; | ||||||||||
| import java.util.Collections; | ||||||||||
| import java.util.List; | ||||||||||
| import java.util.Random; | ||||||||||
| import java.util.concurrent.CountDownLatch; | ||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||||||||
| import org.junit.jupiter.api.BeforeEach; | ||||||||||
| import org.junit.jupiter.api.DisplayName; | ||||||||||
|
|
@@ -343,7 +339,9 @@ public int read() throws IOException { | |||||||||
| return -1; | ||||||||||
| } | ||||||||||
| hasBeenRead = true; | ||||||||||
| return data[position++] & 0xFF; | ||||||||||
| int i = data[position] & 0xFF; | ||||||||||
| position++; | ||||||||||
| return i; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
|
|
@@ -670,51 +668,6 @@ void constructor_WithoutContentType_HandlesGracefully() { | |||||||||
| assertEquals(100L, entity.getContentLength()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("Entity should handle concurrent write attempts") | ||||||||||
| void writeTo_ConcurrentWrites_HandlesCorrectly() throws Exception { | ||||||||||
| // Given | ||||||||||
| String content = "Concurrent test content"; | ||||||||||
| ContentStreamProvider provider = () -> new ByteArrayInputStream(content.getBytes()); | ||||||||||
| SdkHttpRequest httpRequest = httpRequestBuilder.build(); | ||||||||||
| HttpExecuteRequest request = HttpExecuteRequest.builder() | ||||||||||
| .request(httpRequest) | ||||||||||
| .contentStreamProvider(provider) | ||||||||||
| .build(); | ||||||||||
|
|
||||||||||
| entity = new RepeatableInputStreamRequestEntity(request); | ||||||||||
|
|
||||||||||
| // Simulate concurrent writes | ||||||||||
| int threadCount = 5; | ||||||||||
| CountDownLatch latch = new CountDownLatch(threadCount); | ||||||||||
| List<ByteArrayOutputStream> outputs = Collections.synchronizedList(new ArrayList<>()); | ||||||||||
| List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>()); | ||||||||||
|
|
||||||||||
| for (int i = 0; i < threadCount; i++) { | ||||||||||
| new Thread(() -> { | ||||||||||
| try { | ||||||||||
| ByteArrayOutputStream output = new ByteArrayOutputStream(); | ||||||||||
| entity.writeTo(output); | ||||||||||
| outputs.add(output); | ||||||||||
| } catch (Exception e) { | ||||||||||
| exceptions.add(e); | ||||||||||
| } finally { | ||||||||||
| latch.countDown(); | ||||||||||
| } | ||||||||||
| }).start(); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| latch.await(5, TimeUnit.SECONDS); | ||||||||||
|
|
||||||||||
| // At least one should succeed, others may fail due to stream state | ||||||||||
| assertFalse(outputs.isEmpty(), "At least one write should succeed"); | ||||||||||
| for (ByteArrayOutputStream output : outputs) { | ||||||||||
| if (output.size() > 0) { | ||||||||||
| assertEquals(content, output.toString()); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("Entity should handle interrupted IO operations") | ||||||||||
| void writeTo_InterruptedStream_ThrowsIOException() throws IOException { | ||||||||||
|
|
@@ -790,5 +743,161 @@ void multipleOperations_StatePreservation_WorksCorrectly() throws IOException { | |||||||||
| assertEquals(contentLength1, contentLength2); | ||||||||||
| assertEquals(contentLength2, contentLength3); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("markSupported should be be called everytime") | ||||||||||
| void markSupported_NotCachedDuringConstruction() { | ||||||||||
| // Given | ||||||||||
| AtomicInteger markSupportedCalls = new AtomicInteger(0); | ||||||||||
| InputStream trackingStream = new ByteArrayInputStream("test".getBytes()) { | ||||||||||
| @Override | ||||||||||
| public boolean markSupported() { | ||||||||||
| markSupportedCalls.incrementAndGet(); | ||||||||||
| return true; | ||||||||||
| } | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| entity = createEntity(trackingStream); | ||||||||||
| assertEquals(0, markSupportedCalls.get()); | ||||||||||
|
|
||||||||||
| // Multiple isRepeatable calls trigger new markSupported calls | ||||||||||
| assertTrue(entity.isRepeatable()); | ||||||||||
| assertTrue(entity.isRepeatable()); | ||||||||||
| assertEquals(2, markSupportedCalls.get()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("ContentStreamProvider.newStream() should only be called once") | ||||||||||
| void contentStreamProvider_NewStreamCalledOnce() { | ||||||||||
| AtomicInteger newStreamCalls = new AtomicInteger(0); | ||||||||||
| ContentStreamProvider provider = () -> { | ||||||||||
| if (newStreamCalls.incrementAndGet() > 1) { | ||||||||||
| throw new RuntimeException("Could not create new stream: Already created"); | ||||||||||
| } | ||||||||||
| return new ByteArrayInputStream("test".getBytes()); | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| entity = createEntity(provider); | ||||||||||
|
|
||||||||||
| assertEquals(1, newStreamCalls.get()); | ||||||||||
| assertTrue(entity.isRepeatable()); | ||||||||||
| assertFalse(entity.isChunked()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("writeTo should use cached markSupported for reset decision") | ||||||||||
| void writeTo_UsesCachedMarkSupported() throws IOException { | ||||||||||
| // Given - Stream that changes markSupported behavior | ||||||||||
| AtomicInteger markSupportedCalls = new AtomicInteger(0); | ||||||||||
| ByteArrayInputStream baseStream = new ByteArrayInputStream("test".getBytes()); | ||||||||||
| InputStream stream = new InputStream() { | ||||||||||
| @Override | ||||||||||
| public int read() throws IOException { | ||||||||||
| return baseStream.read(); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public boolean markSupported() { | ||||||||||
| return markSupportedCalls.incrementAndGet() == 1; // Only first call returns true | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public synchronized void reset() throws IOException { | ||||||||||
| baseStream.reset(); | ||||||||||
| } | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| entity = createEntity(stream); | ||||||||||
|
|
||||||||||
| // Write twice | ||||||||||
| ByteArrayOutputStream output1 = new ByteArrayOutputStream(); | ||||||||||
| entity.writeTo(output1); | ||||||||||
|
|
||||||||||
| ByteArrayOutputStream output2 = new ByteArrayOutputStream(); | ||||||||||
| entity.writeTo(output2); | ||||||||||
|
|
||||||||||
| // Then - Both writes succeed using cached markSupported value | ||||||||||
| assertEquals("test", output1.toString()); | ||||||||||
| assertEquals("test", output2.toString()); | ||||||||||
| assertEquals(1, markSupportedCalls.get()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("Non-repeatable stream should not attempt reset") | ||||||||||
| void nonRepeatableStream_NoResetAttempt() throws IOException { | ||||||||||
| // Given | ||||||||||
| AtomicInteger resetCalls = new AtomicInteger(0); | ||||||||||
| InputStream nonRepeatableStream = new ByteArrayInputStream("test".getBytes()) { | ||||||||||
| @Override | ||||||||||
| public boolean markSupported() { | ||||||||||
| return false; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public synchronized void reset() { | ||||||||||
| resetCalls.incrementAndGet(); | ||||||||||
| throw new RuntimeException("Reset not supported"); | ||||||||||
| } | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| entity = createEntity(nonRepeatableStream); | ||||||||||
| assertFalse(entity.isRepeatable()); | ||||||||||
| entity.writeTo(new ByteArrayOutputStream()); | ||||||||||
| entity.writeTo(new ByteArrayOutputStream()); | ||||||||||
| assertEquals(0, resetCalls.get()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("Stream should not be read during construction") | ||||||||||
| void constructor_DoesNotReadStream() { | ||||||||||
| // Given | ||||||||||
| InputStream nonReadableStream = new InputStream() { | ||||||||||
| @Override | ||||||||||
| public int read() throws IOException { | ||||||||||
| throw new IOException("Stream should not be read during construction"); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public boolean markSupported() { | ||||||||||
| return true; | ||||||||||
| } | ||||||||||
| }; | ||||||||||
| assertDoesNotThrow(() -> entity = createEntity(nonReadableStream)); | ||||||||||
| assertTrue(entity.isRepeatable()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("getContent should reuse existing stream") | ||||||||||
| void getContent_ReusesExistingStream() throws IOException { | ||||||||||
| InputStream originalStream = new ByteArrayInputStream("content".getBytes()); | ||||||||||
| entity = createEntity(originalStream); | ||||||||||
| InputStream content1 = entity.getContent(); | ||||||||||
| InputStream content2 = entity.getContent(); | ||||||||||
| assertSame(content1, content2); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| @DisplayName("Empty stream should be repeatable") | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi just trying to understand, in this circumstance, is it equivalent to create an Entity with InputStream with zero length?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeap , that is done here Lines 151 to 154 in 0bdf878
|
||||||||||
| void emptyStream_IsRepeatable() { | ||||||||||
| // Given - No content provider | ||||||||||
| HttpExecuteRequest request = HttpExecuteRequest.builder() | ||||||||||
| .request(httpRequestBuilder.build()) | ||||||||||
| .build(); | ||||||||||
| entity = new RepeatableInputStreamRequestEntity(request); | ||||||||||
| assertTrue(entity.isRepeatable()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Helper methods | ||||||||||
| private RepeatableInputStreamRequestEntity createEntity(InputStream stream) { | ||||||||||
| return createEntity(() -> stream); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private RepeatableInputStreamRequestEntity createEntity(ContentStreamProvider provider) { | ||||||||||
| HttpExecuteRequest request = HttpExecuteRequest.builder() | ||||||||||
| .request(httpRequestBuilder.build()) | ||||||||||
| .contentStreamProvider(provider) | ||||||||||
| .build(); | ||||||||||
| return new RepeatableInputStreamRequestEntity(request); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add tests for concurrent access scenarios to ensure concurrent access to writeTo method is safe, or that has already been covered in other places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason why
writeTo_ConcurrentWrites_HandlesCorrectlywas removed is because concurrent reads from an inputStream is not thread safe?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, concurrent reads from an InputStream are not thread-safe, and if we need to implement this functionality, we would need to add proper synchronization mechanisms. Since the Apache 4.x client doesn't have support for this, I removed this test case.
Note that I added this test case in my previous PR where I included several test cases to RepeatableInputStreamRequestEntityTest to verify all possible scenarios in both 4.x and 5.x versions to ensure consistent functionality. However, at that time, I didn't test for flakiness by running it in loop mode. When I later ran it as a @Repeated test for 1000 iterations, it started failing.
So initially I thought it was fine doing concurrent access for Apache 4.x since it was passing (it fails intermittently )