Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent
}

private static void verifySingleThreadUsage() {
// Wait for all processor instances to be registered (one per worker)
await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(
SingleThreadEventsTrackingTestProcessor.getProcessors().size(),
equalTo(4)));

List<SingleThreadEventsTrackingTestProcessor> singleThreadProcessors = SingleThreadEventsTrackingTestProcessor.getProcessors();
assertThat(singleThreadProcessors.size(), equalTo(4));
assertAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClientBuilder;
Expand Down Expand Up @@ -64,8 +65,12 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig,
}

private static ClientOverrideConfiguration createOverrideConfiguration(final Map<String, String> customHeaders) {
final RetryPolicy retryPolicy = RetryPolicy.builder()
.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS)
.build();

final ClientOverrideConfiguration.Builder configBuilder = ClientOverrideConfiguration.builder()
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS));
.retryPolicy(retryPolicy);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why this improves the tests?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, it started with AWS SDK behavior/validation changes that made the consumer pattern incompatible with the default retry mode. They now changed the default retry mode to be ADAPTIVE_V2

https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/retry-strategy.html

Ideal fix is to move to using RetryStrategy but thats a bigger change so minimized the changes here by avoiding using the consumer pattern. https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/api/RetryStrategy.html


customHeaders.forEach(configBuilder::putHeader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBuffer;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -31,13 +31,13 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;

class CloudWatchLogsServiceTest {
private static final int LARGE_THREAD_COUNT = 1000;
Expand Down Expand Up @@ -95,8 +95,10 @@ Collection<Record<Event>> getSampleRecordsCollection() {

Collection<Record<Event>> getSampleRecordsOfLargerSize() {
final ArrayList<Record<Event>> returnCollection = new ArrayList<>();
int messageSize = (int) (thresholdConfig.getMaxRequestSizeBytes() / 24);
for (int i = 0; i < thresholdConfig.getBatchSize() * 2; i++) {
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("a".repeat((int) (thresholdConfig.getMaxRequestSizeBytes()/24)));
JacksonEvent mockJacksonEvent =
(JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize));
returnCollection.add(new Record<>(mockJacksonEvent));
}

Expand All @@ -105,8 +107,10 @@ Collection<Record<Event>> getSampleRecordsOfLargerSize() {

Collection<Record<Event>> getSampleRecordsOfLimitSize() {
final ArrayList<Record<Event>> returnCollection = new ArrayList<>();
int messageSize = (int) thresholdConfig.getMaxEventSizeBytes();
for (int i = 0; i < thresholdConfig.getBatchSize(); i++) {
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage".repeat((int) thresholdConfig.getMaxEventSizeBytes()));
JacksonEvent mockJacksonEvent =
(JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why the repeat used so much memory?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It was because the original string was already long.

returnCollection.add(new Record<>(mockJacksonEvent));
}

Expand Down Expand Up @@ -248,8 +252,8 @@ void GIVEN_large_thread_count_WHEN_processing_log_events_THEN_dispatcher_should_
}

private Record<Event> getLargeRecord(long size) {
final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).withEventHandle(eventHandle).build();
final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.insecure().nextAlphabetic((int)size))).withEventHandle(eventHandle).build();

return new Record<>(event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,18 @@ public boolean isValidPatterns() {
}

public static boolean isValidPattern(final String pattern) {
// Check for valid epoch patterns first
if (pattern.equals("epoch_second") ||
pattern.equals("epoch_milli") ||
pattern.equals("epoch_micro") ||
pattern.equals("epoch_nano")) {
return true;
}
// Reject any other pattern starting with "epoch_" as invalid
if (pattern.startsWith("epoch_")) {
return false;
}
// Validate as DateTimeFormatter pattern
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,28 @@ void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_recei
assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false));
}

@Test
void testValidAndInvalidOutputFormats() throws NoSuchFieldException, IllegalAccessException {
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", random);
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));

setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_second");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_milli");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_nano");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_micro");
@ParameterizedTest
@ValueSource(strings = {
"epoch_second",
"epoch_milli",
"epoch_nano",
"epoch_micro",
"yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX"
})
void testValidOutputFormats(String outputFormat) throws NoSuchFieldException, IllegalAccessException {
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", outputFormat);
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_xyz");
}

@ParameterizedTest
@ValueSource(strings = {
"invalid[pattern]format",
"epoch_xyz",
"epoch_invalid"
})
void testInvalidOutputFormats(String outputFormat) throws NoSuchFieldException, IllegalAccessException {
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", outputFormat);
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation 'software.amazon.awssdk:sts'
implementation 'javax.inject:javax.inject:1'
testImplementation 'com.amazonaws:DynamoDBLocal:2.2.1'
testImplementation 'org.awaitility:awaitility:4.2.0'
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -265,8 +267,16 @@ void tryAcquireAvailablePartition_gets_first_unassigned_partition() {
objectUnderTest.tryCreatePartitionItem(sourceIdentifier,
unassignedPartitionKey3, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false);

final Optional<SourcePartitionStoreItem> maybeAcquired =
objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20));
// Wait for partition to be available in DynamoDB Local before attempting to acquire
final Optional<SourcePartitionStoreItem>[] maybeAcquiredHolder = new Optional[]{Optional.empty()};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will actually fix the issue. I've tried this a few times and ended up disabling this test in #6328.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still some sleep in that PR, which could add uncertainty. I am hoping, we can give it a try with this approach and see.

await().atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
maybeAcquiredHolder[0] = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20));
assertThat(maybeAcquiredHolder[0].isPresent(), equalTo(true));
});

final Optional<SourcePartitionStoreItem> maybeAcquired = maybeAcquiredHolder[0];

assertThat(maybeAcquired, notNullValue());
assertThat(maybeAcquired.isPresent(), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void testValidateConfig() {

@Test
void testValidateConfigBasic() {
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://test.com");
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
when(confluenceSourceConfig.getAuthType()).thenReturn(BASIC);
when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
when(authenticationConfig.getBasicConfig()).thenReturn(basicConfig);
Expand All @@ -137,7 +137,7 @@ void testValidateConfigBasic() {

@Test
void testValidateConfigOauth2() {
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://test.com");
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
when(confluenceSourceConfig.getAuthType()).thenReturn(OAUTH2);
when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
when(authenticationConfig.getOauth2Config()).thenReturn(oauth2Config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void testGetProjectNameFilter() {
void testValidateConfig() {
assertThrows(RuntimeException.class, () -> JiraConfigHelper.validateConfig(jiraSourceConfig));

when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com");
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
assertThrows(RuntimeException.class, () -> JiraConfigHelper.validateConfig(jiraSourceConfig));

when(jiraSourceConfig.getAuthType()).thenReturn("fakeType");
Expand All @@ -135,7 +135,7 @@ void testValidateConfig() {

@Test
void testValidateConfigBasic() {
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com");
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
when(jiraSourceConfig.getAuthType()).thenReturn(BASIC);
when(jiraSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
when(authenticationConfig.getBasicConfig()).thenReturn(basicConfig);
Expand All @@ -154,7 +154,7 @@ void testValidateConfigBasic() {

@Test
void testValidateConfigOauth2() {
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com");
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
when(jiraSourceConfig.getAuthType()).thenReturn(OAUTH2);
when(jiraSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
when(authenticationConfig.getOauth2Config()).thenReturn(oauth2Config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -41,9 +40,6 @@
import java.util.Map;
import java.util.ArrayList;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -53,7 +49,6 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.CONTENT_TYPES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@
import java.util.function.BiConsumer;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class MetricsHelperTest {
Expand Down
Loading