diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java index 79aabb3bd1..ebc3ec8be0 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java @@ -211,6 +211,12 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent } private static void verifySingleThreadUsage() { + // Wait for all processor instances to be registered (one per worker) + await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat( + SingleThreadEventsTrackingTestProcessor.getProcessors().size(), + equalTo(4))); + List singleThreadProcessors = SingleThreadEventsTrackingTestProcessor.getProcessors(); assertThat(singleThreadProcessors.size(), equalTo(4)); assertAll( diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java index d6875f501a..5542ebb9ec 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java @@ -11,6 +11,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClientBuilder; @@ -64,8 +65,12 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, } private static ClientOverrideConfiguration createOverrideConfiguration(final Map customHeaders) { + final RetryPolicy retryPolicy = RetryPolicy.builder() + .numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS) + .build(); + final ClientOverrideConfiguration.Builder configBuilder = ClientOverrideConfiguration.builder() - .retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS)); + .retryPolicy(retryPolicy); customHeaders.forEach(configBuilder::putHeader); diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java index c6b3d80428..5cd122d1a3 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java @@ -12,7 +12,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBuffer; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory; @@ -20,8 +22,6 @@ import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; -import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; -import org.opensearch.dataprepper.model.log.JacksonLog; import java.util.ArrayList; import java.util.Collection; @@ -31,13 +31,13 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doAnswer; class CloudWatchLogsServiceTest { private static final int LARGE_THREAD_COUNT = 1000; @@ -95,8 +95,10 @@ Collection> getSampleRecordsCollection() { Collection> getSampleRecordsOfLargerSize() { final ArrayList> returnCollection = new ArrayList<>(); + int messageSize = (int) (thresholdConfig.getMaxRequestSizeBytes() / 24); for (int i = 0; i < thresholdConfig.getBatchSize() * 2; i++) { - JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("a".repeat((int) (thresholdConfig.getMaxRequestSizeBytes()/24))); + JacksonEvent mockJacksonEvent = + (JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize)); returnCollection.add(new Record<>(mockJacksonEvent)); } @@ -105,8 +107,10 @@ Collection> getSampleRecordsOfLargerSize() { Collection> getSampleRecordsOfLimitSize() { final ArrayList> returnCollection = new ArrayList<>(); + int messageSize = (int) thresholdConfig.getMaxEventSizeBytes(); for (int i = 0; i < thresholdConfig.getBatchSize(); i++) { - JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage".repeat((int) thresholdConfig.getMaxEventSizeBytes())); + JacksonEvent mockJacksonEvent = + (JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize)); returnCollection.add(new Record<>(mockJacksonEvent)); } @@ -248,8 +252,8 @@ void GIVEN_large_thread_count_WHEN_processing_log_events_THEN_dispatcher_should_ } private Record getLargeRecord(long size) { - final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).withEventHandle(eventHandle).build(); + final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.insecure().nextAlphabetic((int)size))).withEventHandle(eventHandle).build(); return new Record<>(event); - } + } } diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java index 05deb5aa07..8eea5ee8d7 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java @@ -108,12 +108,18 @@ public boolean isValidPatterns() { } public static boolean isValidPattern(final String pattern) { + // Check for valid epoch patterns first if (pattern.equals("epoch_second") || pattern.equals("epoch_milli") || pattern.equals("epoch_micro") || pattern.equals("epoch_nano")) { return true; } + // Reject any other pattern starting with "epoch_" as invalid + if (pattern.startsWith("epoch_")) { + return false; + } + // Validate as DateTimeFormatter pattern try { DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern); return true; diff --git a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java index b1dddfa013..efe52d2cb9 100644 --- a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java +++ b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java @@ -69,23 +69,28 @@ void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_recei assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false)); } - @Test - void testValidAndInvalidOutputFormats() throws NoSuchFieldException, IllegalAccessException { - setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", random); - assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false)); - - setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_second"); - assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); - setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_milli"); - assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); - setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_nano"); - assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); - setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_micro"); + @ParameterizedTest + @ValueSource(strings = { + "epoch_second", + "epoch_milli", + "epoch_nano", + "epoch_micro", + "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX" + }) + void testValidOutputFormats(String outputFormat) throws NoSuchFieldException, IllegalAccessException { + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", outputFormat); assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); - setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_xyz"); + } + + @ParameterizedTest + @ValueSource(strings = { + "invalid[pattern]format", + "epoch_xyz", + "epoch_invalid" + }) + void testInvalidOutputFormats(String outputFormat) throws NoSuchFieldException, IllegalAccessException { + setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", outputFormat); assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false)); - setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX"); - assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true)); } @Test diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/build.gradle b/data-prepper-plugins/dynamodb-source-coordination-store/build.gradle index 211c4960b0..85223aea8a 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/build.gradle +++ b/data-prepper-plugins/dynamodb-source-coordination-store/build.gradle @@ -17,6 +17,7 @@ dependencies { implementation 'software.amazon.awssdk:sts' implementation 'javax.inject:javax.inject:1' testImplementation 'com.amazonaws:DynamoDBLocal:2.2.1' + testImplementation 'org.awaitility:awaitility:4.2.0' } configurations { diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreIT.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreIT.java index 5cf092f66d..28b989ee61 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreIT.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreIT.java @@ -36,7 +36,9 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -265,8 +267,16 @@ void tryAcquireAvailablePartition_gets_first_unassigned_partition() { objectUnderTest.tryCreatePartitionItem(sourceIdentifier, unassignedPartitionKey3, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); - final Optional maybeAcquired = - objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); + // Wait for partition to be available in DynamoDB Local before attempting to acquire + final Optional[] maybeAcquiredHolder = new Optional[]{Optional.empty()}; + await().atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + maybeAcquiredHolder[0] = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); + assertThat(maybeAcquiredHolder[0].isPresent(), equalTo(true)); + }); + + final Optional maybeAcquired = maybeAcquiredHolder[0]; assertThat(maybeAcquired, notNullValue()); assertThat(maybeAcquired.isPresent(), equalTo(true)); diff --git a/data-prepper-plugins/saas-source-plugins/confluence-source/src/test/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceConfigHelperTest.java b/data-prepper-plugins/saas-source-plugins/confluence-source/src/test/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceConfigHelperTest.java index 80528646e4..35c8263727 100644 --- a/data-prepper-plugins/saas-source-plugins/confluence-source/src/test/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceConfigHelperTest.java +++ b/data-prepper-plugins/saas-source-plugins/confluence-source/src/test/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceConfigHelperTest.java @@ -118,7 +118,7 @@ void testValidateConfig() { @Test void testValidateConfigBasic() { - when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://test.com"); + when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net"); when(confluenceSourceConfig.getAuthType()).thenReturn(BASIC); when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); when(authenticationConfig.getBasicConfig()).thenReturn(basicConfig); @@ -137,7 +137,7 @@ void testValidateConfigBasic() { @Test void testValidateConfigOauth2() { - when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://test.com"); + when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net"); when(confluenceSourceConfig.getAuthType()).thenReturn(OAUTH2); when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); when(authenticationConfig.getOauth2Config()).thenReturn(oauth2Config); diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraConfigHelperTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraConfigHelperTest.java index 960027c659..4e27fb5528 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraConfigHelperTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraConfigHelperTest.java @@ -126,7 +126,7 @@ void testGetProjectNameFilter() { void testValidateConfig() { assertThrows(RuntimeException.class, () -> JiraConfigHelper.validateConfig(jiraSourceConfig)); - when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com"); + when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net"); assertThrows(RuntimeException.class, () -> JiraConfigHelper.validateConfig(jiraSourceConfig)); when(jiraSourceConfig.getAuthType()).thenReturn("fakeType"); @@ -135,7 +135,7 @@ void testValidateConfig() { @Test void testValidateConfigBasic() { - when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com"); + when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net"); when(jiraSourceConfig.getAuthType()).thenReturn(BASIC); when(jiraSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); when(authenticationConfig.getBasicConfig()).thenReturn(basicConfig); @@ -154,7 +154,7 @@ void testValidateConfigBasic() { @Test void testValidateConfigOauth2() { - when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com"); + when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net"); when(jiraSourceConfig.getAuthType()).thenReturn(OAUTH2); when(jiraSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); when(authenticationConfig.getOauth2Config()).thenReturn(oauth2Config); diff --git a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java index 949a8d75d7..8741fc7cb7 100644 --- a/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java +++ b/data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java @@ -10,7 +10,6 @@ package org.opensearch.dataprepper.plugins.source.microsoft_office365; import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,9 +40,6 @@ import java.util.Map; import java.util.ArrayList; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -53,7 +49,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.CONTENT_TYPES; diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelperTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelperTest.java index a762275a83..96546397c3 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelperTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/utils/MetricsHelperTest.java @@ -32,9 +32,16 @@ import java.util.function.BiConsumer; import java.util.stream.Stream; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class MetricsHelperTest {