-
Notifications
You must be signed in to change notification settings - Fork 330
Confluence and CloudWatch and multiple other failing tests fix #6348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
98f4456
d06269b
a766e90
b176372
62b5ebd
7c2de36
1d3d31b
17dfb37
ba129b3
0749d35
abdec4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,16 +12,16 @@ | |
| import org.opensearch.dataprepper.model.event.Event; | ||
| import org.opensearch.dataprepper.model.event.EventHandle; | ||
| import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
| import org.opensearch.dataprepper.model.log.JacksonLog; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; | ||
| import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer; | ||
| import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBuffer; | ||
| import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory; | ||
| import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; | ||
| import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; | ||
| import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; | ||
| import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; | ||
| import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; | ||
| import org.opensearch.dataprepper.model.log.JacksonLog; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
|
|
@@ -31,13 +31,13 @@ | |
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.ArgumentMatchers.eq; | ||
| import static org.mockito.Mockito.atLeast; | ||
| import static org.mockito.Mockito.doAnswer; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.never; | ||
| import static org.mockito.Mockito.spy; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.times; | ||
| import static org.mockito.Mockito.never; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.when; | ||
| import static org.mockito.Mockito.doAnswer; | ||
|
|
||
| class CloudWatchLogsServiceTest { | ||
| private static final int LARGE_THREAD_COUNT = 1000; | ||
|
|
@@ -95,8 +95,10 @@ Collection<Record<Event>> getSampleRecordsCollection() { | |
|
|
||
| Collection<Record<Event>> getSampleRecordsOfLargerSize() { | ||
| final ArrayList<Record<Event>> returnCollection = new ArrayList<>(); | ||
| int messageSize = (int) (thresholdConfig.getMaxRequestSizeBytes() / 24); | ||
| for (int i = 0; i < thresholdConfig.getBatchSize() * 2; i++) { | ||
| JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("a".repeat((int) (thresholdConfig.getMaxRequestSizeBytes()/24))); | ||
| JacksonEvent mockJacksonEvent = | ||
| (JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize)); | ||
| returnCollection.add(new Record<>(mockJacksonEvent)); | ||
| } | ||
|
|
||
|
|
@@ -105,8 +107,10 @@ Collection<Record<Event>> getSampleRecordsOfLargerSize() { | |
|
|
||
| Collection<Record<Event>> getSampleRecordsOfLimitSize() { | ||
| final ArrayList<Record<Event>> returnCollection = new ArrayList<>(); | ||
| int messageSize = (int) thresholdConfig.getMaxEventSizeBytes(); | ||
| for (int i = 0; i < thresholdConfig.getBatchSize(); i++) { | ||
| JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage".repeat((int) thresholdConfig.getMaxEventSizeBytes())); | ||
| JacksonEvent mockJacksonEvent = | ||
| (JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know why the repeat used so much memory?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. It was because the original string was already long. |
||
| returnCollection.add(new Record<>(mockJacksonEvent)); | ||
| } | ||
|
|
||
|
|
@@ -248,8 +252,8 @@ void GIVEN_large_thread_count_WHEN_processing_log_events_THEN_dispatcher_should_ | |
| } | ||
|
|
||
| private Record<Event> getLargeRecord(long size) { | ||
| final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).withEventHandle(eventHandle).build(); | ||
| final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.insecure().nextAlphabetic((int)size))).withEventHandle(eventHandle).build(); | ||
|
|
||
| return new Record<>(event); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,9 @@ | |
| import java.util.Optional; | ||
| import java.util.Random; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import static org.awaitility.Awaitility.await; | ||
| import static org.hamcrest.MatcherAssert.assertThat; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
|
|
@@ -265,8 +267,16 @@ void tryAcquireAvailablePartition_gets_first_unassigned_partition() { | |
| objectUnderTest.tryCreatePartitionItem(sourceIdentifier, | ||
| unassignedPartitionKey3, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); | ||
|
|
||
| final Optional<SourcePartitionStoreItem> maybeAcquired = | ||
| objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); | ||
| // Wait for partition to be available in DynamoDB Local before attempting to acquire | ||
| final Optional<SourcePartitionStoreItem>[] maybeAcquiredHolder = new Optional[]{Optional.empty()}; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this will actually fix the issue. I've tried this a few times and ended up disabling this test in #6328.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is still some sleep in that PR, which could add uncertainty. I am hoping, we can give it a try with this approach and see. |
||
| await().atMost(5, TimeUnit.SECONDS) | ||
| .pollInterval(100, TimeUnit.MILLISECONDS) | ||
| .untilAsserted(() -> { | ||
| maybeAcquiredHolder[0] = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); | ||
| assertThat(maybeAcquiredHolder[0].isPresent(), equalTo(true)); | ||
| }); | ||
|
|
||
| final Optional<SourcePartitionStoreItem> maybeAcquired = maybeAcquiredHolder[0]; | ||
|
|
||
| assertThat(maybeAcquired, notNullValue()); | ||
| assertThat(maybeAcquired.isPresent(), equalTo(true)); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know why this improves the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, it started with AWS SDK behavior/validation changes that made the consumer pattern incompatible with the default retry mode. They now changed the default retry mode to be
ADAPTIVE_V2https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/retry-strategy.html
Ideal fix is to move to using
RetryStrategybut thats a bigger change so minimized the changes here by avoiding using the consumer pattern. https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html