diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 46a32f89d1..fe8df7aa55 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.extension.ExtendWith; @@ -57,6 +58,7 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.lenient; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import java.time.Duration; @@ -97,6 +99,9 @@ public class CloudWatchLogsIT { @Mock private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; + @Mock + private DistributionSummary summary; + @Mock private Counter eventsSuccessCounter; @Mock @@ -104,6 +109,8 @@ public class CloudWatchLogsIT { @Mock private Counter eventsFailedCounter; @Mock + private Counter largeEventsDroppedCounter; + @Mock private Counter requestsFailedCounter; @Mock private Counter dlqSuccessCounter; @@ -119,6 +126,7 @@ public class CloudWatchLogsIT { private AtomicInteger eventsFailedCount; private AtomicInteger requestsFailedCount; private AtomicInteger dlqSuccessCount; + private AtomicInteger largeEventsDroppedCount; private CloudWatchLogsClient cloudWatchLogsClient; private ObjectMapper objectMapper; private AwsCredentialsProvider awsCredentialsProvider; @@ -130,6 +138,7 @@ void setUp() { eventsSuccessCount = new AtomicInteger(0); requestsSuccessCount = new AtomicInteger(0); eventsFailedCount = new AtomicInteger(0); + largeEventsDroppedCount = new AtomicInteger(0); requestsFailedCount = new AtomicInteger(0); dlqSuccessCount = new AtomicInteger(0); objectMapper = new ObjectMapper(); @@ -151,9 +160,11 @@ void setUp() { logGroupName = System.getProperty("tests.cloudwatch.log_group"); logStreamName = createLogStream(logGroupName); pluginMetrics = mock(PluginMetrics.class); + summary = mock(DistributionSummary.class); eventsSuccessCounter = mock(Counter.class); requestsSuccessCounter = mock(Counter.class); eventsFailedCounter = mock(Counter.class); + largeEventsDroppedCounter = mock(Counter.class); requestsFailedCounter = mock(Counter.class); dlqSuccessCounter = mock(Counter.class); lenient().doAnswer((a)-> { @@ -166,6 +177,11 @@ void setUp() { eventsFailedCount.addAndGet(v); return null; }).when(eventsFailedCounter).increment(any(Double.class)); + lenient().doAnswer((a)-> { + int v = (int)(double)(a.getArgument(0)); + largeEventsDroppedCount.addAndGet(v); + return null; + }).when(largeEventsDroppedCounter).increment(any(Double.class)); lenient().doAnswer((a)-> { requestsSuccessCount.addAndGet(1); return null; @@ -199,11 +215,15 @@ void setUp() { if (s.equals(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED)) { return eventsFailedCounter; } + if (s.equals(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED)) { + return largeEventsDroppedCounter; + } if (s.contains("NumDlqSuccess")) { return dlqSuccessCounter; } return null; }).when(pluginMetrics).counter(anyString()); + when(pluginMetrics.summary(anyString())).thenReturn(summary); cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName); when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null); @@ -437,6 +457,52 @@ void testWithLargeSingleMessagesSentToDLQ() { } + @Test + void testWithLargeSingleMessagesWhenDLQNotConfigured() throws Exception { + long startTime = Instant.now().toEpochMilli(); + when(thresholdConfig.getBatchSize()).thenReturn(NUM_RECORDS); + when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(200L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null); + + sink = createObjectUnderTest(); + Collection> records = getRecordList(NUM_RECORDS); + Record largeRecord = getLargeRecord(200); + records.add(largeRecord); + + sink.doOutput(records); + List[] foundEvents = new List[1]; + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + long endTime = Instant.now().toEpochMilli(); + GetLogEventsRequest getRequest = GetLogEventsRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(endTime) + .build(); + GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); + List events = response.events(); + foundEvents[0] = events; + assertThat(events.size(), equalTo(NUM_RECORDS)); + }); + List events = foundEvents[0]; + assertThat(events, notNullValue()); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person"+i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } + assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); + assertThat(requestsSuccessCount.get(), equalTo(1)); + assertThat(largeEventsDroppedCount.get(), equalTo(1)); + assertThat(dlqSuccessCount.get(), equalTo(0)); + verify(eventHandle, times(NUM_RECORDS+1)).release(true); + + } + @Test void testWithBadCredentials_AllEventsToDLQ() { s3Client = S3Client.builder() diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index 5cdaecb5d2..a6eafe04c4 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -91,6 +91,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, .cloudWatchLogsClient(cloudWatchLogsClient) .cloudWatchLogsMetrics(cloudWatchLogsMetrics) .dlqPushHandler(dlqPushHandler) + .dropIfDlqNotConfigured(true) .logGroup(cloudWatchLogsSinkConfig.getLogGroup()) .logStream(cloudWatchLogsSinkConfig.getLogStream()) .retryCount(cloudWatchLogsSinkConfig.getMaxRetries()) @@ -104,7 +105,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, throw new InvalidBufferTypeException("Error loading buffer!"); } - cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher, dlqPushHandler); + cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, cloudWatchLogsLimits, cloudWatchLogsDispatcher, dlqPushHandler, true); } @Override diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 86accf4755..b1a61781e1 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -35,6 +35,7 @@ public class CloudWatchLogsDispatcher { private CloudWatchLogsClient cloudWatchLogsClient; private CloudWatchLogsMetrics cloudWatchLogsMetrics; private DlqPushHandler dlqPushHandler; + private boolean dropIfDlqNotConfigured; private Executor executor; private String logGroup; private String logStream; @@ -42,6 +43,7 @@ public class CloudWatchLogsDispatcher { public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, final CloudWatchLogsMetrics cloudWatchLogsMetrics, final DlqPushHandler dlqPushHandler, + final boolean dropIfDlqNotConfigured, final Executor executor, final String logGroup, final String logStream, @@ -52,6 +54,7 @@ public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, this.logStream = logStream; this.retryCount = retryCount; this.dlqPushHandler = dlqPushHandler; + this.dropIfDlqNotConfigured = dropIfDlqNotConfigured; this.executor = executor; } @@ -95,6 +98,7 @@ public void dispatchLogs(List inputLogEvents, List e .dlqPushHandler(dlqPushHandler) .putLogEventsRequest(putLogEventsRequest) .eventHandles(eventHandles) + .dropIfDlqNotConfigured(dropIfDlqNotConfigured) .totalEventCount(inputLogEvents.size()) .retryCount(retryCount) .build()); @@ -111,6 +115,7 @@ protected static class Uploader implements Runnable { private final List eventHandles; private final int totalEventCount; private final int retryCount; + private boolean dropIfDlqNotConfigured; @Override public void run() { @@ -154,7 +159,7 @@ public void upload() { cloudWatchLogsMetrics.increaseLogEventFailCounter(totalEventCount); List logEvents = putLogEventsRequest.logEvents(); for (int i = 0; i < logEvents.size(); i++) { - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvents.get(i).message(), failureMessage, dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvents.get(i).message(), failureMessage, dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); } @@ -179,7 +184,7 @@ List getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsRespo if (endIndex != null) { int i = 0; for (InputLogEvent logEvent : logEvents.subList(0, endIndex)) { - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvent.message(), "Too old log event", dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvent.message(), "Too old log event", dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); } @@ -190,7 +195,7 @@ List getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsRespo if (startIndex != null) { int i = 0; for (InputLogEvent logEvent : logEvents.subList(startIndex, logEvents.size())) { - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(startIndex + i), logEvent.message(), "Too old log event", dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(startIndex + i), logEvent.message(), "Too old log event", dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java index b73c8b8bae..3bf920c525 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.metrics.PluginMetrics; /** @@ -18,16 +19,25 @@ public class CloudWatchLogsMetrics { public static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded"; public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed"; public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed"; + public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped"; + public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize"; + public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize"; private final Counter logEventSuccessCounter; private final Counter logEventFailCounter; private final Counter requestSuccessCount; private final Counter requestFailCount; + private final Counter logLargeEventsDroppedCounter; + private final DistributionSummary logSizeMetric; + private final DistributionSummary requestSizeMetric; public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) { this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED); this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED); this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED); this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED); + this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED); + this.logSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_LOG_SIZE); + this.requestSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_REQUEST_SIZE); } public void increaseLogEventSuccessCounter(int value) { @@ -45,4 +55,16 @@ public void increaseLogEventFailCounter(int value) { public void increaseRequestFailCounter(int value) { requestFailCount.increment(value); } + + public void increaseLogLargeEventsDroppedCounter(int value) { + logLargeEventsDroppedCounter.increment(value); + } + + public void recordLogSize(int value) { + logSizeMetric.record(value); + } + + public void recordRequestSize(int value) { + requestSizeMetric.record(value); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java index f7e9df248b..b1e27a2e22 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java @@ -36,22 +36,28 @@ public class CloudWatchLogsService { private final CloudWatchLogsDispatcher cloudWatchLogsDispatcher; private final Buffer buffer; private final CloudWatchLogsLimits cloudWatchLogsLimits; + private CloudWatchLogsMetrics cloudWatchLogsMetrics; private final SinkStopWatch sinkStopWatch; private final ReentrantLock processLock; private final DlqPushHandler dlqPushHandler; + private final boolean dropIfDlqNotConfigured; public CloudWatchLogsService(final Buffer buffer, + final CloudWatchLogsMetrics cloudWatchLogsMetrics, final CloudWatchLogsLimits cloudWatchLogsLimits, final CloudWatchLogsDispatcher cloudWatchLogsDispatcher, - final DlqPushHandler dlqPushHandler) { + final DlqPushHandler dlqPushHandler, + final boolean dropIfDlqNotConfigured) { this.buffer = buffer; this.cloudWatchLogsLimits = cloudWatchLogsLimits; + this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; processLock = new ReentrantLock(); sinkStopWatch = new SinkStopWatch(); this.cloudWatchLogsDispatcher = cloudWatchLogsDispatcher; this.dlqPushHandler = dlqPushHandler; + this.dropIfDlqNotConfigured = dropIfDlqNotConfigured; } /** @@ -77,11 +83,14 @@ public void processLogEvents(final Collection> logs) { String logString = log.getData().toJsonString(); int logLength = logString.length(); + cloudWatchLogsMetrics.recordLogSize(logLength); if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) { final String failureMessage = String.format("Event blocked due to Max Size restriction! Event Size : %s", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE)); - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); + } else if (dropIfDlqNotConfigured) { + cloudWatchLogsMetrics.increaseLogLargeEventsDroppedCounter(1); } continue; } @@ -113,6 +122,7 @@ private void stageLogEvents() { List inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(buffer.getBufferedData()); cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, buffer.getEventHandles()); + cloudWatchLogsMetrics.recordRequestSize(buffer.getBufferSize()); buffer.resetBuffer(); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 048624ce2e..8cddc032ab 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -30,6 +30,7 @@ public class CloudWatchLogsSinkConfig { private PluginModel dlq; @JsonProperty("threshold") + @Valid private ThresholdConfig thresholdConfig = new ThresholdConfig(); @JsonProperty("log_group") diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java index 46d9b815c0..2970c051ad 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java @@ -8,10 +8,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.Size; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.model.constraints.ByteCountMax; +import org.opensearch.dataprepper.model.constraints.ByteCountMin; import java.time.Duration; @@ -21,9 +22,10 @@ * restrictions. */ public class ThresholdConfig { + public static final long ONE_MB = 1048576L; public static final int DEFAULT_BATCH_SIZE = 25; - public static final String DEFAULT_EVENT_SIZE = "256kb"; - public static final String DEFAULT_SIZE_OF_REQUEST = "1mb"; + public static final ByteCount DEFAULT_MAX_EVENT_SIZE = ByteCount.parse("1mb"); + public static final ByteCount DEFAULT_MAX_REQUEST_SIZE = ByteCount.parse("1mb"); public static final long DEFAULT_FLUSH_INTERVAL = 60; @JsonProperty(value = "batch_size", defaultValue="25") @@ -31,12 +33,15 @@ public class ThresholdConfig { @Max(10000) private int batchSize = DEFAULT_BATCH_SIZE; - @JsonProperty(value = "max_event_size", defaultValue="256k") - @Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes") - private String maxEventSize = DEFAULT_EVENT_SIZE; + @JsonProperty(value = "max_event_size", defaultValue="1mb") + @ByteCountMin("1b") + @ByteCountMax("1mb") + private ByteCount maxEventSize = DEFAULT_MAX_EVENT_SIZE; @JsonProperty(value = "max_request_size", defaultValue="1mb") - private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST; + @ByteCountMin("1b") + @ByteCountMax("1mb") + private ByteCount maxRequestSize = DEFAULT_MAX_REQUEST_SIZE; @JsonProperty("flush_interval") @DurationMin(seconds = 60) @@ -48,11 +53,11 @@ public int getBatchSize() { } public long getMaxEventSizeBytes() { - return ByteCount.parse(maxEventSize).getBytes(); + return maxEventSize.getBytes(); } public long getMaxRequestSizeBytes() { - return ByteCount.parse(maxRequestSize).getBytes(); + return maxRequestSize.getBytes(); } public long getFlushInterval() { diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java index 26bfcc84dd..2543d76f85 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java @@ -16,12 +16,12 @@ public class CloudWatchLogsSinkUtils { private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsSinkUtils.class); - public static DlqObject createDlqObject(final int status, final EventHandle eventHandle, final String message, final String failureMessage, final DlqPushHandler dlqPushHandler) { + public static DlqObject createDlqObject(final int status, final EventHandle eventHandle, final String message, final String failureMessage, final DlqPushHandler dlqPushHandler, final boolean dropIfDlqNotConfigured) { if (dlqPushHandler != null) { CloudWatchLogsSinkDlqData cloudWatchLogsSinkDlqData = CloudWatchLogsSinkDlqData.createDlqData(status, message, failureMessage); return DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), List.of(eventHandle), cloudWatchLogsSinkDlqData); } else { - eventHandle.release(false); + eventHandle.release(dropIfDlqNotConfigured); } return null; } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index 91677d390b..750e9f7537 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; +import io.micrometer.core.instrument.DistributionSummary; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; @@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; @@ -68,6 +70,8 @@ void setUp() { mockHeaderOverrides.put("X-Test-Header", "test-value"); mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); mockClient = mock(CloudWatchLogsClient.class); + DistributionSummary summary = mock(DistributionSummary.class); + when(mockPluginMetrics.summary(anyString())).thenReturn(summary); when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig); when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); @@ -213,4 +217,4 @@ void WHEN_sink_initialization_with_header_overrides_THEN_sink_is_ready() { assertTrue(testCloudWatchSink.isReady()); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 2bd9f2e809..31941b42fc 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -76,6 +76,7 @@ CloudWatchLogsDispatcher getCloudWatchLogsDispatcher() { .logGroup(LOG_GROUP) .logStream(LOG_STREAM) .retryCount(RETRY_COUNT) + .dropIfDlqNotConfigured(true) .build(); } @@ -122,8 +123,8 @@ void GIVEN_too_old_events_SHOULD_not_release_old_events() { executeDispatcherRunnable(); // First two events should not be released - verify(eventHandles.get(0), never()).release(true); - verify(eventHandles.get(1), never()).release(true); + verify(eventHandles.get(0)).release(true); + verify(eventHandles.get(1)).release(true); // Remaining events should be released for (int i = 2; i < eventHandles.size(); i++) { @@ -157,7 +158,7 @@ void GIVEN_too_new_events_SHOULD_not_release_new_events() { // Last three events should not be released for (int i = eventHandles.size() - 3; i < eventHandles.size(); i++) { - verify(eventHandles.get(i), never()).release(true); + verify(eventHandles.get(i)).release(true); } } @@ -181,8 +182,8 @@ void GIVEN_both_old_and_new_rejected_events_SHOULD_only_release_valid_events() { executeDispatcherRunnable(); // First two events should not be released (too old) - verify(eventHandles.get(0), never()).release(true); - verify(eventHandles.get(1), never()).release(true); + verify(eventHandles.get(0)).release(true); + verify(eventHandles.get(1)).release(true); // Middle events should be released for (int i = 2; i < eventHandles.size() - 2; i++) { @@ -190,8 +191,8 @@ void GIVEN_both_old_and_new_rejected_events_SHOULD_only_release_valid_events() { } // Last two events should not be released (too new) - verify(eventHandles.get(eventHandles.size() - 2), never()).release(true); - verify(eventHandles.get(eventHandles.size() - 1), never()).release(true); + verify(eventHandles.get(eventHandles.size() - 2)).release(true); + verify(eventHandles.get(eventHandles.size() - 1)).release(true); } @Test @@ -247,6 +248,6 @@ void GIVEN_max_retries_exceeded_SHOULD_not_release_events() { verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); // No events should be released after max retries - eventHandles.forEach(eventHandle -> verify(eventHandle, never()).release(true)); + eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java index 61c1f71bcb..c6b3d80428 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java @@ -29,10 +29,12 @@ import java.util.Map; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; @@ -45,15 +47,19 @@ class CloudWatchLogsServiceTest { private CloudWatchLogsSinkConfig mockCloudWatchLogsSinkConfig; private ThresholdConfig thresholdConfig; private CloudWatchLogsLimits cloudWatchLogsLimits; + private CloudWatchLogsMetrics cloudWatchLogsMetrics; private InMemoryBufferFactory inMemoryBufferFactory; private Buffer buffer; private CloudWatchLogsDispatcher mockDispatcher; private DlqPushHandler dlqPushHandler; + private EventHandle eventHandle; @BeforeEach void setUp() { mockCloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); + eventHandle = mock(EventHandle.class); + cloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); thresholdConfig = new ThresholdConfig(); cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(), thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getFlushInterval()); @@ -63,8 +69,8 @@ void setUp() { inMemoryBufferFactory = new InMemoryBufferFactory(); mockDispatcher = mock(CloudWatchLogsDispatcher.class); dlqPushHandler = mock(DlqPushHandler.class); - cloudWatchLogsService = new CloudWatchLogsService(buffer, - cloudWatchLogsLimits, mockDispatcher, null); + cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, + cloudWatchLogsLimits, mockDispatcher, null, true); } Collection> getSampleRecordsCollectionSmall() { @@ -116,11 +122,11 @@ void setUpRealBuffer() { } CloudWatchLogsService getSampleService(DlqPushHandler dlqPushHandler) { - return new CloudWatchLogsService(buffer, cloudWatchLogsLimits, mockDispatcher, dlqPushHandler); + return new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, cloudWatchLogsLimits, mockDispatcher, dlqPushHandler, true); } CloudWatchLogsService getSampleService() { - return new CloudWatchLogsService(buffer, cloudWatchLogsLimits, mockDispatcher, null); + return new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, cloudWatchLogsLimits, mockDispatcher, null, true); } @Test @@ -161,6 +167,19 @@ void SHOULD_call_dispatcher_WHEN_process_log_events_called_with_limit_sized_coll verify(dlqPushHandler, atLeast(1)).perform(any(List.class)); } + @Test + void SHOULD_call_dispatcher_WHEN_process_log_events_called_with_limit_sized_collection_with_large_record_no_dlq_are_dropped() throws Exception { + PluginSetting pluginSetting = mock(PluginSetting.class); + when(pluginSetting.getName()).thenReturn("test"); + when(pluginSetting.getPipelineName()).thenReturn("test"); + setUpRealBuffer(); + cloudWatchLogsService = getSampleService(null); + cloudWatchLogsService.processLogEvents(List.of(getLargeRecord(2*thresholdConfig.getMaxEventSizeBytes()))); + verify(mockDispatcher, never()).dispatchLogs(any(List.class), any(List.class)); + verify(eventHandle, times(1)).release(eq(true)); + verify(cloudWatchLogsMetrics, times(1)).increaseLogLargeEventsDroppedCounter(eq(1)); + } + @Test void SHOULD_call_dispatcher_WHEN_process_log_events_called_with_dispatcher_throws_sending_events_to_dlq() throws Exception { PluginSetting pluginSetting = mock(PluginSetting.class); @@ -229,7 +248,8 @@ void GIVEN_large_thread_count_WHEN_processing_log_events_THEN_dispatcher_should_ } private Record getLargeRecord(long size) { - final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).build(); + final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).withEventHandle(eventHandle).build(); + return new Record<>(event); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java index 7766fd5057..6ff12b7104 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java @@ -32,8 +32,8 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() { final ThresholdConfig thresholdConfig = new ThresholdConfig(); assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE)); - assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_EVENT_SIZE).getBytes())); - assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST).getBytes())); + assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ThresholdConfig.DEFAULT_MAX_EVENT_SIZE.getBytes())); + assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ThresholdConfig.DEFAULT_MAX_REQUEST_SIZE.getBytes())); assertThat(thresholdConfig.getFlushInterval(), equalTo(ThresholdConfig.DEFAULT_FLUSH_INTERVAL)); } @@ -49,7 +49,7 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_batch_size(final in @ValueSource(strings = {"1kb", "10kb", "256kb"}) void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_event_size(final String max_event_size) throws NoSuchFieldException, IllegalAccessException { ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); - ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", max_event_size); + ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", ByteCount.parse(max_event_size)); assertThat(sampleThresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(max_event_size).getBytes())); } @@ -57,7 +57,7 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_event_size(fina @ValueSource(strings = {"1b", "100b", "1048576b"}) void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(final String max_batch_request_size) throws NoSuchFieldException, IllegalAccessException { ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); - ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", max_batch_request_size); + ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", ByteCount.parse(max_batch_request_size)); assertThat(sampleThresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(max_batch_request_size).getBytes())); }