Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void setUp() {
when(cloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(3);

thresholdConfig = mock(ThresholdConfig.class);
when(thresholdConfig.getLogSendInterval()).thenReturn(60L);
when(thresholdConfig.getFlushInterval()).thenReturn(60L);
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L);
when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
}
Expand Down Expand Up @@ -266,7 +266,7 @@ private String createLogStream(final String logGroupName) {
void TestSinkOperationWithLogSendInterval() throws Exception {
long startTime = Instant.now().toEpochMilli();
when(thresholdConfig.getBatchSize()).thenReturn(10);
when(thresholdConfig.getLogSendInterval()).thenReturn(10L);
when(thresholdConfig.getFlushInterval()).thenReturn(10L);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);
when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics);
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(),
thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getLogSendInterval());
thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getFlushInterval());

if (awsConfig == null && awsCredentialsSupplier == null) {
throw new RuntimeException("Missing awsConfig and awsCredentialsSupplier");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Max;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
Expand Down Expand Up @@ -37,8 +38,9 @@ public class CloudWatchLogsSinkConfig {
@NotNull
private String logStream;

@JsonProperty("max_retries")
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
@JsonProperty(value = "max_retries", defaultValue = "5")
@Min(1)
@Max(15)
private int maxRetries = DEFAULT_RETRY_COUNT;

public AwsConfig getAwsConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Size;
import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
Expand All @@ -22,23 +24,24 @@ public class ThresholdConfig {
public static final int DEFAULT_BATCH_SIZE = 25;
public static final String DEFAULT_EVENT_SIZE = "256kb";
public static final String DEFAULT_SIZE_OF_REQUEST = "1mb";
public static final long DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final long DEFAULT_FLUSH_INTERVAL = 60;

@JsonProperty("batch_size")
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
@JsonProperty(value = "batch_size", defaultValue="25")
@Min(1)
Comment thread
kkondaka marked this conversation as resolved.
@Max(10000)
private int batchSize = DEFAULT_BATCH_SIZE;

@JsonProperty("max_event_size")
@JsonProperty(value = "max_event_size", defaultValue="256k")
@Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes")
private String maxEventSize = DEFAULT_EVENT_SIZE;

@JsonProperty("max_request_size")
@JsonProperty(value = "max_request_size", defaultValue="1mb")
private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST;

@JsonProperty("log_send_interval")
@JsonProperty("flush_interval")
@DurationMin(seconds = 60)
@DurationMax(seconds = 3600)
private Duration logSendInterval = Duration.ofSeconds(DEFAULT_LOG_SEND_INTERVAL_TIME);
private Duration flushInterval = Duration.ofSeconds(DEFAULT_FLUSH_INTERVAL);

public int getBatchSize() {
return batchSize;
Expand All @@ -52,8 +55,8 @@ public long getMaxRequestSizeBytes() {
return ByteCount.parse(maxRequestSize).getBytes();
}

public long getLogSendInterval() {
return logSendInterval.getSeconds();
public long getFlushInterval() {
return flushInterval.getSeconds();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void setUp() {

thresholdConfig = new ThresholdConfig();
cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getLogSendInterval());
thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getFlushInterval());

mockClient = mock(CloudWatchLogsClient.class);
mockMetrics = mock(CloudWatchLogsMetrics.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() {
assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE));
assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_EVENT_SIZE).getBytes()));
assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST).getBytes()));
assertThat(thresholdConfig.getLogSendInterval(), equalTo(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME));
assertThat(thresholdConfig.getFlushInterval(), equalTo(ThresholdConfig.DEFAULT_FLUSH_INTERVAL));
}

@ParameterizedTest
Expand Down Expand Up @@ -65,8 +65,8 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(fi
@ValueSource(ints = {5, 10, 300})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interval(final int log_send_interval) throws NoSuchFieldException, IllegalAccessException {
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "logSendInterval", Duration.ofSeconds(log_send_interval));
assertThat(sampleThresholdConfig.getLogSendInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ;
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "flushInterval", Duration.ofSeconds(log_send_interval));
assertThat(sampleThresholdConfig.getFlushInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class CloudWatchLogsLimitsTest {
static void setUp() {
thresholdConfig = new ThresholdConfig();
cloudWatchLogsLimits = new CloudWatchLogsLimits(ThresholdConfig.DEFAULT_BATCH_SIZE, thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSizeBytes(), ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME);
thresholdConfig.getMaxRequestSizeBytes(), ThresholdConfig.DEFAULT_FLUSH_INTERVAL);
}

@Test
Expand All @@ -43,62 +43,62 @@ void GIVEN_greater_than_limit_method_WHEN_event_size_equal_to_max_event_size_THE

@Test
void GIVEN_greater_than_limit_method_WHEN_log_send_interval_equal_to_max_log_send_interval_THEN_return_true() {
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
assertTrue(thresholdMetTime);
}

@Test
void SGIVEN_greater_than_limit_method_WHEN_log_send_interval_greater_than_max_log_send_interval_THEN_return_true() {
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME + 1, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL + 1, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
assertTrue(thresholdMetTime);
}

@Test
void GIVEN_greater_than_limit_method_WHEN_log_send_interval_less_than_max_log_send_interval_THEN_return_false() {
long validRequestSize = thresholdConfig.getMaxRequestSizeBytes() - ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, validRequestSize ,ThresholdConfig.DEFAULT_BATCH_SIZE);
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, validRequestSize ,ThresholdConfig.DEFAULT_BATCH_SIZE);
assertFalse(thresholdMetTime);
}

@Test
void GIVEN_greater_than_limit_method_WHEN_request_size_greater_than_max_request_size_THEN_return_true() {
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes() + 1) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
assertTrue(thresholdMetRequestSize);
}

@Test
void GIVEN_greater_than_limit_method_WHEN_request_size_equal_to_max_request_size_THEN_return_false() {
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes()) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
assertFalse(thresholdMetRequestSize);
}

@Test
void GIVEN_greater_than_limit_method_WHEN_request_size_less_than_max_request_size_THEN_return_false() {
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes() - 1) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
assertFalse(thresholdMetRequestSize);
}

@Test
void GIVEN_greater_than_limit_method_WHEN_batch_size_greater_than_max_batch_size_THEN_return_true() {
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes()) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE + 1);
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE + 1);
assertTrue(thresholdMetBatchSize);
}

@Test
void GIVEN_greater_than_limit_method_WHEN_batch_size_equal_to_max_batch_size_THEN_return_false() {
long validRequestSize = thresholdConfig.getMaxRequestSizeBytes() - ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE);
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE);
assertFalse(thresholdMetBatchSize);
}

@Test
void GIVEN_greater_than_limit_method_WHEN_batch_size_less_than_max_batch_size_THEN_return_false() {
long validRequestSize = thresholdConfig.getMaxRequestSizeBytes()- ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1);
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1);
assertFalse(thresholdMetBatchSize);
}

Expand Down Expand Up @@ -143,4 +143,4 @@ void GIVEN_equal_to_limit_method_WHEN_batch_size_less_than_max_batch_size_THEN_r
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isEqualToLimitReached(validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1);
assertFalse(thresholdMetBatchSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package org.opensearch.dataprepper.plugins.sink.sqs;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Max;
import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.types.ByteCount;
Expand All @@ -18,8 +19,9 @@ public class SqsThresholdConfig {
public static final ByteCount DEFAULT_MAX_MESSAGE_SIZE = ByteCount.parse("256kb");
public static final long DEFAULT_FLUSH_INTERVAL_TIME = 30;

@JsonProperty("max_events_per_message")
@Size(min = 1, max = 1000, message = "batch_size amount should be between 1 to 1000")
@JsonProperty(value = "max_events_per_message", defaultValue="25")
@Min(1)
@Max(1000)
private int maxEventsPerMessage = DEFAULT_MESSAGES_PER_EVENT;

@JsonProperty("max_message_size")
Expand Down
Loading