Skip to content

Commit 19a6170

Browse files
committed
Addressed review comments
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent e67f243 commit 19a6170

4 files changed

Lines changed: 29 additions & 48 deletions

File tree

data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static org.mockito.Mockito.when;
5858
import static org.mockito.Mockito.lenient;
5959
import static org.hamcrest.CoreMatchers.equalTo;
60+
import static org.hamcrest.CoreMatchers.notNullValue;
6061
import static org.hamcrest.MatcherAssert.assertThat;
6162

6263
import java.time.Duration;
@@ -451,12 +452,7 @@ void testWithLargeSingleMessagesSentToDLQ() {
451452
}
452453

453454
@Test
454-
void testWithLargeSingleMessagesWhenDLQNotConfigured() {
455-
s3Client = S3Client.builder()
456-
.credentialsProvider(awsCredentialsProvider)
457-
.region(Region.of(awsRegion))
458-
.build();
459-
455+
void testWithLargeSingleMessagesWhenDLQNotConfigured() throws Exception {
460456
long startTime = Instant.now().toEpochMilli();
461457
when(thresholdConfig.getBatchSize()).thenReturn(NUM_RECORDS);
462458
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(200L);
@@ -469,6 +465,7 @@ void testWithLargeSingleMessagesWhenDLQNotConfigured() {
469465
records.add(largeRecord);
470466

471467
sink.doOutput(records);
468+
List<OutputLogEvent>[] foundEvents = new List[1];
472469
await().atMost(Duration.ofSeconds(30))
473470
.untilAsserted(() -> {
474471
long endTime = Instant.now().toEpochMilli();
@@ -481,14 +478,17 @@ void testWithLargeSingleMessagesWhenDLQNotConfigured() {
481478
.build();
482479
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
483480
List<OutputLogEvent> events = response.events();
481+
foundEvents[0] = events;
484482
assertThat(events.size(), equalTo(NUM_RECORDS));
485-
for (int i = 0; i < events.size(); i++) {
486-
String message = events.get(i).message();
487-
Map<String, Object> event = objectMapper.readValue(message, Map.class);
488-
assertThat(event.get("name"), equalTo("Person"+i));
489-
assertThat(event.get("age"), equalTo(Integer.toString(i)));
490-
}
491483
});
484+
List<OutputLogEvent> events = foundEvents[0];
485+
assertThat(events, notNullValue());
486+
for (int i = 0; i < events.size(); i++) {
487+
String message = events.get(i).message();
488+
Map<String, Object> event = objectMapper.readValue(message, Map.class);
489+
assertThat(event.get("name"), equalTo("Person"+i));
490+
assertThat(event.get("age"), equalTo(Integer.toString(i)));
491+
}
492492
assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS));
493493
assertThat(requestsSuccessCount.get(), equalTo(1));
494494
assertThat(largeEventsDroppedCount.get(), equalTo(1));

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class CloudWatchLogsSinkConfig {
3030
private PluginModel dlq;
3131

3232
@JsonProperty("threshold")
33+
@Valid
3334
private ThresholdConfig thresholdConfig = new ThresholdConfig();
3435

3536
@JsonProperty("log_group")

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
import com.fasterxml.jackson.annotation.JsonProperty;
99
import jakarta.validation.constraints.Min;
1010
import jakarta.validation.constraints.Max;
11-
import jakarta.validation.constraints.AssertTrue;
1211
import org.hibernate.validator.constraints.time.DurationMax;
1312
import org.hibernate.validator.constraints.time.DurationMin;
1413
import org.opensearch.dataprepper.model.types.ByteCount;
14+
import org.opensearch.dataprepper.model.constraints.ByteCountMax;
15+
import org.opensearch.dataprepper.model.constraints.ByteCountMin;
1516

1617
import java.time.Duration;
1718

@@ -23,8 +24,8 @@
2324
public class ThresholdConfig {
2425
public static final long ONE_MB = 1048576L;
2526
public static final int DEFAULT_BATCH_SIZE = 25;
26-
public static final String DEFAULT_MAX_EVENT_SIZE = "1mb";
27-
public static final String DEFAULT_MAX_REQUEST_SIZE = "1mb";
27+
public static final ByteCount DEFAULT_MAX_EVENT_SIZE = ByteCount.parse("1mb");
28+
public static final ByteCount DEFAULT_MAX_REQUEST_SIZE = ByteCount.parse("1mb");
2829
public static final long DEFAULT_FLUSH_INTERVAL = 60;
2930

3031
@JsonProperty(value = "batch_size", defaultValue="25")
@@ -33,10 +34,14 @@ public class ThresholdConfig {
3334
private int batchSize = DEFAULT_BATCH_SIZE;
3435

3536
@JsonProperty(value = "max_event_size", defaultValue="1mb")
36-
private String maxEventSize = DEFAULT_MAX_EVENT_SIZE;
37+
@ByteCountMin("1b")
38+
@ByteCountMax("1mb")
39+
private ByteCount maxEventSize = DEFAULT_MAX_EVENT_SIZE;
3740

3841
@JsonProperty(value = "max_request_size", defaultValue="1mb")
39-
private String maxRequestSize = DEFAULT_MAX_REQUEST_SIZE;
42+
@ByteCountMin("1b")
43+
@ByteCountMax("1mb")
44+
private ByteCount maxRequestSize = DEFAULT_MAX_REQUEST_SIZE;
4045

4146
@JsonProperty("flush_interval")
4247
@DurationMin(seconds = 60)
@@ -48,26 +53,15 @@ public int getBatchSize() {
4853
}
4954

5055
public long getMaxEventSizeBytes() {
51-
return ByteCount.parse(maxEventSize).getBytes();
56+
return maxEventSize.getBytes();
5257
}
5358

5459
public long getMaxRequestSizeBytes() {
55-
return ByteCount.parse(maxRequestSize).getBytes();
60+
return maxRequestSize.getBytes();
5661
}
5762

5863
public long getFlushInterval() {
5964
return flushInterval.getSeconds();
6065
}
6166

62-
@AssertTrue(message = "Both the maximum event size and maximum request size must be configured with a value greater than zero (0) and less than 1 megabyte (MB)")
63-
boolean isValidConfig() {
64-
long maxEventSizeBytes = ByteCount.parse(maxEventSize).getBytes();
65-
long maxRequestSizeBytes = ByteCount.parse(maxRequestSize).getBytes();
66-
67-
return maxEventSizeBytes > 0 &&
68-
maxEventSizeBytes < ONE_MB &&
69-
maxRequestSizeBytes > 0 &&
70-
maxRequestSizeBytes < ONE_MB;
71-
}
72-
7367
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() {
3232
final ThresholdConfig thresholdConfig = new ThresholdConfig();
3333

3434
assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE));
35-
assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_MAX_EVENT_SIZE).getBytes()));
36-
assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_MAX_REQUEST_SIZE).getBytes()));
35+
assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ThresholdConfig.DEFAULT_MAX_EVENT_SIZE.getBytes()));
36+
assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ThresholdConfig.DEFAULT_MAX_REQUEST_SIZE.getBytes()));
3737
assertThat(thresholdConfig.getFlushInterval(), equalTo(ThresholdConfig.DEFAULT_FLUSH_INTERVAL));
3838
}
3939

@@ -49,15 +49,15 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_batch_size(final in
4949
@ValueSource(strings = {"1kb", "10kb", "256kb"})
5050
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_event_size(final String max_event_size) throws NoSuchFieldException, IllegalAccessException {
5151
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
52-
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", max_event_size);
52+
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", ByteCount.parse(max_event_size));
5353
assertThat(sampleThresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(max_event_size).getBytes()));
5454
}
5555

5656
@ParameterizedTest
5757
@ValueSource(strings = {"1b", "100b", "1048576b"})
5858
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(final String max_batch_request_size) throws NoSuchFieldException, IllegalAccessException {
5959
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
60-
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", max_batch_request_size);
60+
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", ByteCount.parse(max_batch_request_size));
6161
assertThat(sampleThresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(max_batch_request_size).getBytes()));
6262
}
6363

@@ -69,18 +69,4 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interv
6969
assertThat(sampleThresholdConfig.getFlushInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ;
7070
}
7171

72-
@ParameterizedTest
73-
@ValueSource(strings = {"0", "2mb"})
74-
void GIVEN_invalid_max_request_size_SHOULD_fail(final String maxRequestSize) throws NoSuchFieldException, IllegalAccessException {
75-
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
76-
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", maxRequestSize);
77-
}
78-
79-
@ParameterizedTest
80-
@ValueSource(strings = {"0", "2mb"})
81-
void GIVEN_invalid_max_event_size_SHOULD_fail(final String maxEventSize) throws NoSuchFieldException, IllegalAccessException {
82-
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
83-
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", maxEventSize);
84-
}
85-
8672
}

0 commit comments

Comments
 (0)