Skip to content

Commit d76fe17

Browse files
authored
Fix CloudwatchLogs and Sqs sink config to use correct Jakarta annotations (#5714)
* Fix CloudwatchLogs sink config to use correct Jakarta annotations Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Also, fixed SQS sink Jakarta annotation issue Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 1031c23 commit d76fe17

8 files changed

Lines changed: 40 additions & 33 deletions

File tree

data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ void setUp() {
205205
when(cloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(3);
206206

207207
thresholdConfig = mock(ThresholdConfig.class);
208-
when(thresholdConfig.getLogSendInterval()).thenReturn(60L);
208+
when(thresholdConfig.getFlushInterval()).thenReturn(60L);
209209
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L);
210210
when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
211211
}
@@ -266,7 +266,7 @@ private String createLogStream(final String logGroupName) {
266266
void TestSinkOperationWithLogSendInterval() throws Exception {
267267
long startTime = Instant.now().toEpochMilli();
268268
when(thresholdConfig.getBatchSize()).thenReturn(10);
269-
when(thresholdConfig.getLogSendInterval()).thenReturn(10L);
269+
when(thresholdConfig.getFlushInterval()).thenReturn(10L);
270270
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);
271271
when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null);
272272

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
5555
CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics);
5656
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(),
5757
thresholdConfig.getMaxEventSizeBytes(),
58-
thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getLogSendInterval());
58+
thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getFlushInterval());
5959

6060
if (awsConfig == null && awsCredentialsSupplier == null) {
6161
throw new RuntimeException("Missing awsConfig and awsCredentialsSupplier");

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;
77

88
import com.fasterxml.jackson.annotation.JsonProperty;
9-
import jakarta.validation.constraints.Size;
9+
import jakarta.validation.constraints.Min;
10+
import jakarta.validation.constraints.Max;
1011
import jakarta.validation.Valid;
1112
import jakarta.validation.constraints.NotEmpty;
1213
import jakarta.validation.constraints.NotNull;
@@ -37,8 +38,9 @@ public class CloudWatchLogsSinkConfig {
3738
@NotNull
3839
private String logStream;
3940

40-
@JsonProperty("max_retries")
41-
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
41+
@JsonProperty(value = "max_retries", defaultValue = "5")
42+
@Min(1)
43+
@Max(15)
4244
private int maxRetries = DEFAULT_RETRY_COUNT;
4345

4446
public AwsConfig getAwsConfig() {

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;
77

88
import com.fasterxml.jackson.annotation.JsonProperty;
9+
import jakarta.validation.constraints.Min;
10+
import jakarta.validation.constraints.Max;
911
import jakarta.validation.constraints.Size;
1012
import org.hibernate.validator.constraints.time.DurationMax;
1113
import org.hibernate.validator.constraints.time.DurationMin;
@@ -22,23 +24,24 @@ public class ThresholdConfig {
2224
public static final int DEFAULT_BATCH_SIZE = 25;
2325
public static final String DEFAULT_EVENT_SIZE = "256kb";
2426
public static final String DEFAULT_SIZE_OF_REQUEST = "1mb";
25-
public static final long DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
27+
public static final long DEFAULT_FLUSH_INTERVAL = 60;
2628

27-
@JsonProperty("batch_size")
28-
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
29+
@JsonProperty(value = "batch_size", defaultValue="25")
30+
@Min(1)
31+
@Max(10000)
2932
private int batchSize = DEFAULT_BATCH_SIZE;
3033

31-
@JsonProperty("max_event_size")
34+
@JsonProperty(value = "max_event_size", defaultValue="256k")
3235
@Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes")
3336
private String maxEventSize = DEFAULT_EVENT_SIZE;
3437

35-
@JsonProperty("max_request_size")
38+
@JsonProperty(value = "max_request_size", defaultValue="1mb")
3639
private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST;
3740

38-
@JsonProperty("log_send_interval")
41+
@JsonProperty("flush_interval")
3942
@DurationMin(seconds = 60)
4043
@DurationMax(seconds = 3600)
41-
private Duration logSendInterval = Duration.ofSeconds(DEFAULT_LOG_SEND_INTERVAL_TIME);
44+
private Duration flushInterval = Duration.ofSeconds(DEFAULT_FLUSH_INTERVAL);
4245

4346
public int getBatchSize() {
4447
return batchSize;
@@ -52,8 +55,8 @@ public long getMaxRequestSizeBytes() {
5255
return ByteCount.parse(maxRequestSize).getBytes();
5356
}
5457

55-
public long getLogSendInterval() {
56-
return logSendInterval.getSeconds();
58+
public long getFlushInterval() {
59+
return flushInterval.getSeconds();
5760
}
5861

5962
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void setUp() {
5656

5757
thresholdConfig = new ThresholdConfig();
5858
cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(),
59-
thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getLogSendInterval());
59+
thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getFlushInterval());
6060

6161
mockClient = mock(CloudWatchLogsClient.class);
6262
mockMetrics = mock(CloudWatchLogsMetrics.class);

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() {
3434
assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE));
3535
assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_EVENT_SIZE).getBytes()));
3636
assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST).getBytes()));
37-
assertThat(thresholdConfig.getLogSendInterval(), equalTo(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME));
37+
assertThat(thresholdConfig.getFlushInterval(), equalTo(ThresholdConfig.DEFAULT_FLUSH_INTERVAL));
3838
}
3939

4040
@ParameterizedTest
@@ -65,8 +65,8 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(fi
6565
@ValueSource(ints = {5, 10, 300})
6666
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interval(final int log_send_interval) throws NoSuchFieldException, IllegalAccessException {
6767
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
68-
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "logSendInterval", Duration.ofSeconds(log_send_interval));
69-
assertThat(sampleThresholdConfig.getLogSendInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ;
68+
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "flushInterval", Duration.ofSeconds(log_send_interval));
69+
assertThat(sampleThresholdConfig.getFlushInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ;
7070
}
7171

7272
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class CloudWatchLogsLimitsTest {
2020
static void setUp() {
2121
thresholdConfig = new ThresholdConfig();
2222
cloudWatchLogsLimits = new CloudWatchLogsLimits(ThresholdConfig.DEFAULT_BATCH_SIZE, thresholdConfig.getMaxEventSizeBytes(),
23-
thresholdConfig.getMaxRequestSizeBytes(), ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME);
23+
thresholdConfig.getMaxRequestSizeBytes(), ThresholdConfig.DEFAULT_FLUSH_INTERVAL);
2424
}
2525

2626
@Test
@@ -43,62 +43,62 @@ void GIVEN_greater_than_limit_method_WHEN_event_size_equal_to_max_event_size_THE
4343

4444
@Test
4545
void GIVEN_greater_than_limit_method_WHEN_log_send_interval_equal_to_max_log_send_interval_THEN_return_true() {
46-
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
46+
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
4747
assertTrue(thresholdMetTime);
4848
}
4949

5050
@Test
5151
void SGIVEN_greater_than_limit_method_WHEN_log_send_interval_greater_than_max_log_send_interval_THEN_return_true() {
52-
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME + 1, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
52+
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL + 1, thresholdConfig.getMaxRequestSizeBytes(),ThresholdConfig.DEFAULT_BATCH_SIZE);
5353
assertTrue(thresholdMetTime);
5454
}
5555

5656
@Test
5757
void GIVEN_greater_than_limit_method_WHEN_log_send_interval_less_than_max_log_send_interval_THEN_return_false() {
5858
long validRequestSize = thresholdConfig.getMaxRequestSizeBytes() - ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
59-
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, validRequestSize ,ThresholdConfig.DEFAULT_BATCH_SIZE);
59+
boolean thresholdMetTime = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, validRequestSize ,ThresholdConfig.DEFAULT_BATCH_SIZE);
6060
assertFalse(thresholdMetTime);
6161
}
6262

6363
@Test
6464
void GIVEN_greater_than_limit_method_WHEN_request_size_greater_than_max_request_size_THEN_return_true() {
6565
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes() + 1) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
66-
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
66+
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
6767
assertTrue(thresholdMetRequestSize);
6868
}
6969

7070
@Test
7171
void GIVEN_greater_than_limit_method_WHEN_request_size_equal_to_max_request_size_THEN_return_false() {
7272
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes()) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
73-
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
73+
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
7474
assertFalse(thresholdMetRequestSize);
7575
}
7676

7777
@Test
7878
void GIVEN_greater_than_limit_method_WHEN_request_size_less_than_max_request_size_THEN_return_false() {
7979
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes() - 1) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
80-
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
80+
boolean thresholdMetRequestSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL - 1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE);
8181
assertFalse(thresholdMetRequestSize);
8282
}
8383

8484
@Test
8585
void GIVEN_greater_than_limit_method_WHEN_batch_size_greater_than_max_batch_size_THEN_return_true() {
8686
long requestSizeWithoutOverhead = (thresholdConfig.getMaxRequestSizeBytes()) - ThresholdConfig.DEFAULT_BATCH_SIZE * (CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
87-
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE + 1);
87+
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, requestSizeWithoutOverhead, ThresholdConfig.DEFAULT_BATCH_SIZE + 1);
8888
assertTrue(thresholdMetBatchSize);
8989
}
9090

9191
@Test
9292
void GIVEN_greater_than_limit_method_WHEN_batch_size_equal_to_max_batch_size_THEN_return_false() {
9393
long validRequestSize = thresholdConfig.getMaxRequestSizeBytes() - ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
94-
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE);
94+
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE);
9595
assertFalse(thresholdMetBatchSize);
9696
}
9797

9898
@Test
9999
void GIVEN_greater_than_limit_method_WHEN_batch_size_less_than_max_batch_size_THEN_return_false() {
100100
long validRequestSize = thresholdConfig.getMaxRequestSizeBytes()- ((ThresholdConfig.DEFAULT_BATCH_SIZE) * CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE);
101-
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1);
101+
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isGreaterThanLimitReached(ThresholdConfig.DEFAULT_FLUSH_INTERVAL -1, validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1);
102102
assertFalse(thresholdMetBatchSize);
103103
}
104104

@@ -143,4 +143,4 @@ void GIVEN_equal_to_limit_method_WHEN_batch_size_less_than_max_batch_size_THEN_r
143143
boolean thresholdMetBatchSize = cloudWatchLogsLimits.isEqualToLimitReached(validRequestSize, ThresholdConfig.DEFAULT_BATCH_SIZE - 1);
144144
assertFalse(thresholdMetBatchSize);
145145
}
146-
}
146+
}

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsThresholdConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
package org.opensearch.dataprepper.plugins.sink.sqs;
77

88
import com.fasterxml.jackson.annotation.JsonProperty;
9-
import jakarta.validation.constraints.Size;
9+
import jakarta.validation.constraints.Min;
10+
import jakarta.validation.constraints.Max;
1011
import org.hibernate.validator.constraints.time.DurationMax;
1112
import org.hibernate.validator.constraints.time.DurationMin;
1213
import org.opensearch.dataprepper.model.types.ByteCount;
@@ -18,8 +19,9 @@ public class SqsThresholdConfig {
1819
public static final ByteCount DEFAULT_MAX_MESSAGE_SIZE = ByteCount.parse("256kb");
1920
public static final long DEFAULT_FLUSH_INTERVAL_TIME = 30;
2021

21-
@JsonProperty("max_events_per_message")
22-
@Size(min = 1, max = 1000, message = "batch_size amount should be between 1 to 1000")
22+
@JsonProperty(value = "max_events_per_message", defaultValue="25")
23+
@Min(1)
24+
@Max(1000)
2325
private int maxEventsPerMessage = DEFAULT_MESSAGES_PER_EVENT;
2426

2527
@JsonProperty("max_message_size")

0 commit comments

Comments
 (0)