diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index c4c821abef..46f4e95c92 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -205,7 +205,7 @@ void setUp() { when(cloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(3); thresholdConfig = mock(ThresholdConfig.class); - when(thresholdConfig.getLogSendInterval()).thenReturn(60L); + when(thresholdConfig.getFlushInterval()).thenReturn(60L); when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); } @@ -266,7 +266,7 @@ private String createLogStream(final String logGroupName) { void TestSinkOperationWithLogSendInterval() throws Exception { long startTime = Instant.now().toEpochMilli(); when(thresholdConfig.getBatchSize()).thenReturn(10); - when(thresholdConfig.getLogSendInterval()).thenReturn(10L); + when(thresholdConfig.getFlushInterval()).thenReturn(10L); when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index e634c06351..83ddd7c786 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -55,7 +55,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics); CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(), - thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getLogSendInterval()); + thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getFlushInterval()); if (awsConfig == null && awsCredentialsSupplier == null) { throw new RuntimeException("Missing awsConfig and awsCredentialsSupplier"); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index a187c4ae5d..bbb8825435 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -6,7 +6,8 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.constraints.Size; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.Max; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -37,8 +38,9 @@ public class CloudWatchLogsSinkConfig { @NotNull private String logStream; - @JsonProperty("max_retries") - @Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15") + @JsonProperty(value = "max_retries", defaultValue = "5") + @Min(1) + @Max(15) private int maxRetries = DEFAULT_RETRY_COUNT; public AwsConfig getAwsConfig() { diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java index af18446142..46d9b815c0 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Size; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; @@ -22,23 +24,24 @@ public class ThresholdConfig { public static final int DEFAULT_BATCH_SIZE = 25; public static final String DEFAULT_EVENT_SIZE = "256kb"; public static final String DEFAULT_SIZE_OF_REQUEST = "1mb"; - public static final long DEFAULT_LOG_SEND_INTERVAL_TIME = 60; + public static final long DEFAULT_FLUSH_INTERVAL = 60; - @JsonProperty("batch_size") - @Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000") + @JsonProperty(value = "batch_size", defaultValue="25") + @Min(1) + @Max(10000) private int batchSize = DEFAULT_BATCH_SIZE; - @JsonProperty("max_event_size") + @JsonProperty(value = "max_event_size", defaultValue="256k") @Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes") private String maxEventSize = DEFAULT_EVENT_SIZE; - @JsonProperty("max_request_size") + @JsonProperty(value = "max_request_size", defaultValue="1mb") private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST; - @JsonProperty("log_send_interval") + @JsonProperty("flush_interval") @DurationMin(seconds = 60) @DurationMax(seconds = 3600) - private Duration logSendInterval = Duration.ofSeconds(DEFAULT_LOG_SEND_INTERVAL_TIME); + private Duration flushInterval = Duration.ofSeconds(DEFAULT_FLUSH_INTERVAL); public int getBatchSize() { return batchSize; @@ -52,8 +55,8 @@ public long getMaxRequestSizeBytes() { return ByteCount.parse(maxRequestSize).getBytes(); } - public long getLogSendInterval() { - return logSendInterval.getSeconds(); + public long getFlushInterval() { + return flushInterval.getSeconds(); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java index f9dd174a80..61c1f71bcb 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java @@ -56,7 +56,7 @@ void setUp() { thresholdConfig = new ThresholdConfig(); cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(), - thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getLogSendInterval()); + thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getFlushInterval()); mockClient = mock(CloudWatchLogsClient.class); mockMetrics = mock(CloudWatchLogsMetrics.class); diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java index 28825af795..7766fd5057 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java @@ -34,7 +34,7 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() { assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE)); assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_EVENT_SIZE).getBytes())); assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST).getBytes())); - assertThat(thresholdConfig.getLogSendInterval(), equalTo(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME)); + assertThat(thresholdConfig.getFlushInterval(), equalTo(ThresholdConfig.DEFAULT_FLUSH_INTERVAL)); } @ParameterizedTest @@ -65,8 +65,8 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(fi @ValueSource(ints = {5, 10, 300}) void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interval(final int log_send_interval) throws NoSuchFieldException, IllegalAccessException { ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); - ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "logSendInterval", Duration.ofSeconds(log_send_interval)); - assertThat(sampleThresholdConfig.getLogSendInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ; + ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "flushInterval", Duration.ofSeconds(log_send_interval)); + assertThat(sampleThresholdConfig.getFlushInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ; } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java index 5370fd09fb..0a0e741a6e 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java @@ -20,7 +20,7 @@ class CloudWatchLogsLimitsTest { static void setUp() { thresholdConfig = new ThresholdConfig(); cloudWatchLogsLimits = new CloudWatchLogsLimits(ThresholdConfig.DEFAULT_BATCH_SIZE, thresholdConfig.getMaxEventSizeBytes(), - thresholdConfig.getMaxRequestSizeBytes(), ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME); + thresholdConfig.getMaxRequestSizeBytes(), ThresholdConfig.DEFAULT_FLUSH_INTERVAL); } @Test @@ -43,62 +43,62 @@ void GIVEN_greater_than_limit_method_WHEN_event_size_equal_to_max_event_size_THE @Test void GIVEN_greater_than_limit_method_WHEN_log_send_interval_equal_to_max_log_send_interval_THEN_return_true() { - boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE); + boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE); assertTrue(thresholdMetTime); } @Test void SGIVEN_greater_than_limit_method_WHEN_log_send_interval_greater_than_max_log_send_interval_THEN_return_true() { - boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME + 1, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE); + boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL + 1, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE); assertTrue(thresholdMetTime); } @Test void GIVEN_greater_than_limit_method_WHEN_log_send_interval_less_than_max_log_send_interval_THEN_return_false() { long validRequestSize = thresholdConfig.getMaxRequestSizeBytes() - ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE); - boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, validRequestSize ,ThresholdConfig.DEFAULT_BATCH_SIZE); + boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, validRequestSize ,ThresholdConfig.DEFAULT_BATCH_SIZE); assertFalse(thresholdMetTime); } @Test void GIVEN_greater_than_limit_method_WHEN_request_size_greater_than_max_request_size_THEN_return_true() { long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes() + 1) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE); - boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE); + boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE); assertTrue(thresholdMetRequestSize); } @Test void GIVEN_greater_than_limit_method_WHEN_request_size_equal_to_max_request_size_THEN_return_false() { long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes()) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE); - boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE); + boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE); assertFalse(thresholdMetRequestSize); } @Test void GIVEN_greater_than_limit_method_WHEN_request_size_less_than_max_request_size_THEN_return_false() { long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes() - 1) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE); - boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE); + boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE); assertFalse(thresholdMetRequestSize); } @Test void GIVEN_greater_than_limit_method_WHEN_batch_size_greater_than_max_batch_size_THEN_return_true() { long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes()) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE); - boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE + 1); + boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE + 1); assertTrue(thresholdMetBatchSize); } @Test void GIVEN_greater_than_limit_method_WHEN_batch_size_equal_to_max_batch_size_THEN_return_false() { long validRequestSize = thresholdConfig.getMaxRequestSizeBytes() - ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE); - boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE); + boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE); assertFalse(thresholdMetBatchSize); } @Test void GIVEN_greater_than_limit_method_WHEN_batch_size_less_than_max_batch_size_THEN_return_false() { long validRequestSize = thresholdConfig.getMaxRequestSizeBytes()- ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE); - boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1); + boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1); assertFalse(thresholdMetBatchSize); } @@ -143,4 +143,4 @@ void GIVEN_equal_to_limit_method_WHEN_batch_size_less_than_max_batch_size_THEN_r boolean thresholdMetBatchSize = cloudWatchLogsLimits.isEqualToLimitReached(validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1); assertFalse(thresholdMetBatchSize); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsThresholdConfig.java b/data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsThresholdConfig.java index 04c9d80a75..a45468f34e 100644 --- a/data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsThresholdConfig.java +++ b/data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsThresholdConfig.java @@ -6,7 +6,8 @@ package org.opensearch.dataprepper.plugins.sink.sqs; import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.constraints.Size; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.Max; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; import org.opensearch.dataprepper.model.types.ByteCount; @@ -18,8 +19,9 @@ public class SqsThresholdConfig { public static final ByteCount DEFAULT_MAX_MESSAGE_SIZE = ByteCount.parse("256kb"); public static final long DEFAULT_FLUSH_INTERVAL_TIME = 30; - @JsonProperty("max_events_per_message") - @Size(min = 1, max = 1000, message = "batch_size amount should be between 1 to 1000") + @JsonProperty(value = "max_events_per_message", defaultValue="25") + @Min(1) + @Max(1000) private int maxEventsPerMessage = DEFAULT_MESSAGES_PER_EVENT; @JsonProperty("max_message_size")