From e67f2431e770847e6bdb94fe8579b83c70a70eda Mon Sep 17 00:00:00 2001 From: Kondaka Date: Wed, 8 Oct 2025 17:15:02 -0700 Subject: [PATCH 1/3] CloudWatchLogs Sink: Update max event size and drop error events if DLQ is not configured Signed-off-by: Kondaka --- .../cloudwatch_logs/CloudWatchLogsIT.java | 60 +++++++++++++++++++ .../cloudwatch_logs/CloudWatchLogsSink.java | 3 +- .../client/CloudWatchLogsDispatcher.java | 11 +++- .../client/CloudWatchLogsMetrics.java | 7 +++ .../client/CloudWatchLogsService.java | 12 +++- .../config/ThresholdConfig.java | 25 +++++--- .../utils/CloudWatchLogsSinkUtils.java | 4 +- .../client/CloudWatchLogsDispatcherTest.java | 17 +++--- .../client/CloudWatchLogsServiceTest.java | 30 ++++++++-- .../config/ThresholdConfigTest.java | 18 +++++- 10 files changed, 157 insertions(+), 30 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 46a32f89d1..8702026dda 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -104,6 +104,8 @@ public class CloudWatchLogsIT { @Mock private Counter eventsFailedCounter; @Mock + private Counter largeEventsDroppedCounter; + @Mock private Counter requestsFailedCounter; @Mock private Counter dlqSuccessCounter; @@ -119,6 +121,7 @@ public class CloudWatchLogsIT { private AtomicInteger eventsFailedCount; private AtomicInteger requestsFailedCount; private AtomicInteger dlqSuccessCount; + private AtomicInteger largeEventsDroppedCount; private CloudWatchLogsClient cloudWatchLogsClient; private ObjectMapper objectMapper; private AwsCredentialsProvider awsCredentialsProvider; @@ -130,6 +133,7 @@ void setUp() { eventsSuccessCount = new AtomicInteger(0); requestsSuccessCount = new AtomicInteger(0); eventsFailedCount = new AtomicInteger(0); + largeEventsDroppedCount = new AtomicInteger(0); requestsFailedCount = new AtomicInteger(0); dlqSuccessCount = new AtomicInteger(0); objectMapper = new ObjectMapper(); @@ -154,6 +158,7 @@ void setUp() { eventsSuccessCounter = mock(Counter.class); requestsSuccessCounter = mock(Counter.class); eventsFailedCounter = mock(Counter.class); + largeEventsDroppedCounter = mock(Counter.class); requestsFailedCounter = mock(Counter.class); dlqSuccessCounter = mock(Counter.class); lenient().doAnswer((a)-> { @@ -166,6 +171,11 @@ void setUp() { eventsFailedCount.addAndGet(v); return null; }).when(eventsFailedCounter).increment(any(Double.class)); + lenient().doAnswer((a)-> { + int v = (int)(double)(a.getArgument(0)); + largeEventsDroppedCount.addAndGet(v); + return null; + }).when(largeEventsDroppedCounter).increment(any(Double.class)); lenient().doAnswer((a)-> { requestsSuccessCount.addAndGet(1); return null; @@ -199,6 +209,9 @@ void setUp() { if (s.equals(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED)) { return eventsFailedCounter; } + if (s.equals(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED)) { + return largeEventsDroppedCounter; + } if (s.contains("NumDlqSuccess")) { return dlqSuccessCounter; } @@ -437,6 +450,53 @@ void testWithLargeSingleMessagesSentToDLQ() { } + @Test + void testWithLargeSingleMessagesWhenDLQNotConfigured() { + s3Client = S3Client.builder() + .credentialsProvider(awsCredentialsProvider) + .region(Region.of(awsRegion)) + .build(); + + long startTime = Instant.now().toEpochMilli(); + when(thresholdConfig.getBatchSize()).thenReturn(NUM_RECORDS); + when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(200L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null); + + sink = createObjectUnderTest(); + Collection> records = getRecordList(NUM_RECORDS); + Record largeRecord = getLargeRecord(200); + records.add(largeRecord); + + sink.doOutput(records); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + long endTime = Instant.now().toEpochMilli(); + GetLogEventsRequest getRequest = GetLogEventsRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(endTime) + .build(); + GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); + List events = response.events(); + assertThat(events.size(), equalTo(NUM_RECORDS)); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person"+i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } + }); + assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); + assertThat(requestsSuccessCount.get(), equalTo(1)); + assertThat(largeEventsDroppedCount.get(), equalTo(1)); + assertThat(dlqSuccessCount.get(), equalTo(0)); + verify(eventHandle, times(NUM_RECORDS+1)).release(true); + + } + @Test void testWithBadCredentials_AllEventsToDLQ() { s3Client = S3Client.builder() diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index 5cdaecb5d2..a6eafe04c4 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -91,6 +91,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, .cloudWatchLogsClient(cloudWatchLogsClient) .cloudWatchLogsMetrics(cloudWatchLogsMetrics) .dlqPushHandler(dlqPushHandler) + .dropIfDlqNotConfigured(true) .logGroup(cloudWatchLogsSinkConfig.getLogGroup()) .logStream(cloudWatchLogsSinkConfig.getLogStream()) .retryCount(cloudWatchLogsSinkConfig.getMaxRetries()) @@ -104,7 +105,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, throw new InvalidBufferTypeException("Error loading buffer!"); } - cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher, dlqPushHandler); + cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, cloudWatchLogsLimits, cloudWatchLogsDispatcher, dlqPushHandler, true); } @Override diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 86accf4755..b1a61781e1 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -35,6 +35,7 @@ public class CloudWatchLogsDispatcher { private CloudWatchLogsClient cloudWatchLogsClient; private CloudWatchLogsMetrics cloudWatchLogsMetrics; private DlqPushHandler dlqPushHandler; + private boolean dropIfDlqNotConfigured; private Executor executor; private String logGroup; private String logStream; @@ -42,6 +43,7 @@ public class CloudWatchLogsDispatcher { public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, final CloudWatchLogsMetrics cloudWatchLogsMetrics, final DlqPushHandler dlqPushHandler, + final boolean dropIfDlqNotConfigured, final Executor executor, final String logGroup, final String logStream, @@ -52,6 +54,7 @@ public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, this.logStream = logStream; this.retryCount = retryCount; this.dlqPushHandler = dlqPushHandler; + this.dropIfDlqNotConfigured = dropIfDlqNotConfigured; this.executor = executor; } @@ -95,6 +98,7 @@ public void dispatchLogs(List inputLogEvents, List e .dlqPushHandler(dlqPushHandler) .putLogEventsRequest(putLogEventsRequest) .eventHandles(eventHandles) + .dropIfDlqNotConfigured(dropIfDlqNotConfigured) .totalEventCount(inputLogEvents.size()) .retryCount(retryCount) .build()); @@ -111,6 +115,7 @@ protected static class Uploader implements Runnable { private final List eventHandles; private final int totalEventCount; private final int retryCount; + private boolean dropIfDlqNotConfigured; @Override public void run() { @@ -154,7 +159,7 @@ public void upload() { cloudWatchLogsMetrics.increaseLogEventFailCounter(totalEventCount); List logEvents = putLogEventsRequest.logEvents(); for (int i = 0; i < logEvents.size(); i++) { - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvents.get(i).message(), failureMessage, dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvents.get(i).message(), failureMessage, dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); } @@ -179,7 +184,7 @@ List getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsRespo if (endIndex != null) { int i = 0; for (InputLogEvent logEvent : logEvents.subList(0, endIndex)) { - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvent.message(), "Too old log event", dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvent.message(), "Too old log event", dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); } @@ -190,7 +195,7 @@ List getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsRespo if (startIndex != null) { int i = 0; for (InputLogEvent logEvent : logEvents.subList(startIndex, logEvents.size())) { - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(startIndex + i), logEvent.message(), "Too old log event", dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(startIndex + i), logEvent.message(), "Too old log event", dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java index b73c8b8bae..aa856cb280 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java @@ -18,16 +18,19 @@ public class CloudWatchLogsMetrics { public static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded"; public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed"; public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed"; + public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped"; private final Counter logEventSuccessCounter; private final Counter logEventFailCounter; private final Counter requestSuccessCount; private final Counter requestFailCount; + private final Counter logLargeEventsDroppedCounter; public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) { this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED); this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED); this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED); this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED); + this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED); } public void increaseLogEventSuccessCounter(int value) { @@ -45,4 +48,8 @@ public void increaseLogEventFailCounter(int value) { public void increaseRequestFailCounter(int value) { requestFailCount.increment(value); } + + public void increaseLogLargeEventsDroppedCounter(int value) { + logLargeEventsDroppedCounter.increment(value); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java index f7e9df248b..1d3ffca91c 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java @@ -36,22 +36,28 @@ public class CloudWatchLogsService { private final CloudWatchLogsDispatcher cloudWatchLogsDispatcher; private final Buffer buffer; private final CloudWatchLogsLimits cloudWatchLogsLimits; + private CloudWatchLogsMetrics cloudWatchLogsMetrics; private final SinkStopWatch sinkStopWatch; private final ReentrantLock processLock; private final DlqPushHandler dlqPushHandler; + private final boolean dropIfDlqNotConfigured; public CloudWatchLogsService(final Buffer buffer, + final CloudWatchLogsMetrics cloudWatchLogsMetrics, final CloudWatchLogsLimits cloudWatchLogsLimits, final CloudWatchLogsDispatcher cloudWatchLogsDispatcher, - final DlqPushHandler dlqPushHandler) { + final DlqPushHandler dlqPushHandler, + final boolean dropIfDlqNotConfigured) { this.buffer = buffer; this.cloudWatchLogsLimits = cloudWatchLogsLimits; + this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; processLock = new ReentrantLock(); sinkStopWatch = new SinkStopWatch(); this.cloudWatchLogsDispatcher = cloudWatchLogsDispatcher; this.dlqPushHandler = dlqPushHandler; + this.dropIfDlqNotConfigured = dropIfDlqNotConfigured; } /** @@ -79,9 +85,11 @@ public void processLogEvents(final Collection> logs) { if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) { final String failureMessage = String.format("Event blocked due to Max Size restriction! Event Size : %s", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE)); - DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler); + DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler, dropIfDlqNotConfigured); if (dlqObject != null) { dlqObjects.add(dlqObject); + } else if (dropIfDlqNotConfigured) { + cloudWatchLogsMetrics.increaseLogLargeEventsDroppedCounter(1); } continue; } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java index 46d9b815c0..1bdbf0499c 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java @@ -8,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.Size; +import jakarta.validation.constraints.AssertTrue; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; import org.opensearch.dataprepper.model.types.ByteCount; @@ -21,9 +21,10 @@ * restrictions. */ public class ThresholdConfig { + public static final long ONE_MB = 1048576L; public static final int DEFAULT_BATCH_SIZE = 25; - public static final String DEFAULT_EVENT_SIZE = "256kb"; - public static final String DEFAULT_SIZE_OF_REQUEST = "1mb"; + public static final String DEFAULT_MAX_EVENT_SIZE = "1mb"; + public static final String DEFAULT_MAX_REQUEST_SIZE = "1mb"; public static final long DEFAULT_FLUSH_INTERVAL = 60; @JsonProperty(value = "batch_size", defaultValue="25") @@ -31,12 +32,11 @@ public class ThresholdConfig { @Max(10000) private int batchSize = DEFAULT_BATCH_SIZE; - @JsonProperty(value = "max_event_size", defaultValue="256k") - @Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes") - private String maxEventSize = DEFAULT_EVENT_SIZE; + @JsonProperty(value = "max_event_size", defaultValue="1mb") + private String maxEventSize = DEFAULT_MAX_EVENT_SIZE; @JsonProperty(value = "max_request_size", defaultValue="1mb") - private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST; + private String maxRequestSize = DEFAULT_MAX_REQUEST_SIZE; @JsonProperty("flush_interval") @DurationMin(seconds = 60) @@ -59,4 +59,15 @@ public long getFlushInterval() { return flushInterval.getSeconds(); } + @AssertTrue(message = "Both the maximum event size and maximum request size must be configured with a value greater than zero (0) and less than 1 megabyte (MB)") + boolean isValidConfig() { + long maxEventSizeBytes = ByteCount.parse(maxEventSize).getBytes(); + long maxRequestSizeBytes = ByteCount.parse(maxRequestSize).getBytes(); + + return maxEventSizeBytes > 0 && + maxEventSizeBytes < ONE_MB && + maxRequestSizeBytes > 0 && + maxRequestSizeBytes < ONE_MB; + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java index 26bfcc84dd..2543d76f85 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java @@ -16,12 +16,12 @@ public class CloudWatchLogsSinkUtils { private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsSinkUtils.class); - public static DlqObject createDlqObject(final int status, final EventHandle eventHandle, final String message, final String failureMessage, final DlqPushHandler dlqPushHandler) { + public static DlqObject createDlqObject(final int status, final EventHandle eventHandle, final String message, final String failureMessage, final DlqPushHandler dlqPushHandler, final boolean dropIfDlqNotConfigured) { if (dlqPushHandler != null) { CloudWatchLogsSinkDlqData cloudWatchLogsSinkDlqData = CloudWatchLogsSinkDlqData.createDlqData(status, message, failureMessage); return DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), List.of(eventHandle), cloudWatchLogsSinkDlqData); } else { - eventHandle.release(false); + eventHandle.release(dropIfDlqNotConfigured); } return null; } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 2bd9f2e809..31941b42fc 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -76,6 +76,7 @@ CloudWatchLogsDispatcher getCloudWatchLogsDispatcher() { .logGroup(LOG_GROUP) .logStream(LOG_STREAM) .retryCount(RETRY_COUNT) + .dropIfDlqNotConfigured(true) .build(); } @@ -122,8 +123,8 @@ void GIVEN_too_old_events_SHOULD_not_release_old_events() { executeDispatcherRunnable(); // First two events should not be released - verify(eventHandles.get(0), never()).release(true); - verify(eventHandles.get(1), never()).release(true); + verify(eventHandles.get(0)).release(true); + verify(eventHandles.get(1)).release(true); // Remaining events should be released for (int i = 2; i < eventHandles.size(); i++) { @@ -157,7 +158,7 @@ void GIVEN_too_new_events_SHOULD_not_release_new_events() { // Last three events should not be released for (int i = eventHandles.size() - 3; i < eventHandles.size(); i++) { - verify(eventHandles.get(i), never()).release(true); + verify(eventHandles.get(i)).release(true); } } @@ -181,8 +182,8 @@ void GIVEN_both_old_and_new_rejected_events_SHOULD_only_release_valid_events() { executeDispatcherRunnable(); // First two events should not be released (too old) - verify(eventHandles.get(0), never()).release(true); - verify(eventHandles.get(1), never()).release(true); + verify(eventHandles.get(0)).release(true); + verify(eventHandles.get(1)).release(true); // Middle events should be released for (int i = 2; i < eventHandles.size() - 2; i++) { @@ -190,8 +191,8 @@ void GIVEN_both_old_and_new_rejected_events_SHOULD_only_release_valid_events() { } // Last two events should not be released (too new) - verify(eventHandles.get(eventHandles.size() - 2), never()).release(true); - verify(eventHandles.get(eventHandles.size() - 1), never()).release(true); + verify(eventHandles.get(eventHandles.size() - 2)).release(true); + verify(eventHandles.get(eventHandles.size() - 1)).release(true); } @Test @@ -247,6 +248,6 @@ void GIVEN_max_retries_exceeded_SHOULD_not_release_events() { verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); // No events should be released after max retries - eventHandles.forEach(eventHandle -> verify(eventHandle, never()).release(true)); + eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java index 61c1f71bcb..c6b3d80428 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java @@ -29,10 +29,12 @@ import java.util.Map; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; @@ -45,15 +47,19 @@ class CloudWatchLogsServiceTest { private CloudWatchLogsSinkConfig mockCloudWatchLogsSinkConfig; private ThresholdConfig thresholdConfig; private CloudWatchLogsLimits cloudWatchLogsLimits; + private CloudWatchLogsMetrics cloudWatchLogsMetrics; private InMemoryBufferFactory inMemoryBufferFactory; private Buffer buffer; private CloudWatchLogsDispatcher mockDispatcher; private DlqPushHandler dlqPushHandler; + private EventHandle eventHandle; @BeforeEach void setUp() { mockCloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); + eventHandle = mock(EventHandle.class); + cloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); thresholdConfig = new ThresholdConfig(); cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(), thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getFlushInterval()); @@ -63,8 +69,8 @@ void setUp() { inMemoryBufferFactory = new InMemoryBufferFactory(); mockDispatcher = mock(CloudWatchLogsDispatcher.class); dlqPushHandler = mock(DlqPushHandler.class); - cloudWatchLogsService = new CloudWatchLogsService(buffer, - cloudWatchLogsLimits, mockDispatcher, null); + cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, + cloudWatchLogsLimits, mockDispatcher, null, true); } Collection> getSampleRecordsCollectionSmall() { @@ -116,11 +122,11 @@ void setUpRealBuffer() { } CloudWatchLogsService getSampleService(DlqPushHandler dlqPushHandler) { - return new CloudWatchLogsService(buffer, cloudWatchLogsLimits, mockDispatcher, dlqPushHandler); + return new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, cloudWatchLogsLimits, mockDispatcher, dlqPushHandler, true); } CloudWatchLogsService getSampleService() { - return new CloudWatchLogsService(buffer, cloudWatchLogsLimits, mockDispatcher, null); + return new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, cloudWatchLogsLimits, mockDispatcher, null, true); } @Test @@ -161,6 +167,19 @@ void SHOULD_call_dispatcher_WHEN_process_log_events_called_with_limit_sized_coll verify(dlqPushHandler, atLeast(1)).perform(any(List.class)); } + @Test + void SHOULD_call_dispatcher_WHEN_process_log_events_called_with_limit_sized_collection_with_large_record_no_dlq_are_dropped() throws Exception { + PluginSetting pluginSetting = mock(PluginSetting.class); + when(pluginSetting.getName()).thenReturn("test"); + when(pluginSetting.getPipelineName()).thenReturn("test"); + setUpRealBuffer(); + cloudWatchLogsService = getSampleService(null); + cloudWatchLogsService.processLogEvents(List.of(getLargeRecord(2*thresholdConfig.getMaxEventSizeBytes()))); + verify(mockDispatcher, never()).dispatchLogs(any(List.class), any(List.class)); + verify(eventHandle, times(1)).release(eq(true)); + verify(cloudWatchLogsMetrics, times(1)).increaseLogLargeEventsDroppedCounter(eq(1)); + } + @Test void SHOULD_call_dispatcher_WHEN_process_log_events_called_with_dispatcher_throws_sending_events_to_dlq() throws Exception { PluginSetting pluginSetting = mock(PluginSetting.class); @@ -229,7 +248,8 @@ void GIVEN_large_thread_count_WHEN_processing_log_events_THEN_dispatcher_should_ } private Record getLargeRecord(long size) { - final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).build(); + final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).withEventHandle(eventHandle).build(); + return new Record<>(event); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java index 7766fd5057..edee61aa0a 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java @@ -32,8 +32,8 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() { final ThresholdConfig thresholdConfig = new ThresholdConfig(); assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE)); - assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_EVENT_SIZE).getBytes())); - assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST).getBytes())); + assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_MAX_EVENT_SIZE).getBytes())); + assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_MAX_REQUEST_SIZE).getBytes())); assertThat(thresholdConfig.getFlushInterval(), equalTo(ThresholdConfig.DEFAULT_FLUSH_INTERVAL)); } @@ -69,4 +69,18 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interv assertThat(sampleThresholdConfig.getFlushInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ; } + @ParameterizedTest + @ValueSource(strings = {"0", "2mb"}) + void GIVEN_invalid_max_request_size_SHOULD_fail(final String maxRequestSize) throws NoSuchFieldException, IllegalAccessException { + ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); + ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", maxRequestSize); + } + + @ParameterizedTest + @ValueSource(strings = {"0", "2mb"}) + void GIVEN_invalid_max_event_size_SHOULD_fail(final String maxEventSize) throws NoSuchFieldException, IllegalAccessException { + ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); + ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", maxEventSize); + } + } From 19a617054e4cf8ce9b30e89f8224e16f82cede1d Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 13 Oct 2025 15:38:56 -0700 Subject: [PATCH 2/3] Addressed review comments Signed-off-by: Kondaka --- .../cloudwatch_logs/CloudWatchLogsIT.java | 24 +++++++-------- .../config/CloudWatchLogsSinkConfig.java | 1 + .../config/ThresholdConfig.java | 30 ++++++++----------- .../config/ThresholdConfigTest.java | 22 +++----------- 4 files changed, 29 insertions(+), 48 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 8702026dda..67299b63e4 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -57,6 +57,7 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.lenient; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import java.time.Duration; @@ -451,12 +452,7 @@ void testWithLargeSingleMessagesSentToDLQ() { } @Test - void testWithLargeSingleMessagesWhenDLQNotConfigured() { - s3Client = S3Client.builder() - .credentialsProvider(awsCredentialsProvider) - .region(Region.of(awsRegion)) - .build(); - + void testWithLargeSingleMessagesWhenDLQNotConfigured() throws Exception { long startTime = Instant.now().toEpochMilli(); when(thresholdConfig.getBatchSize()).thenReturn(NUM_RECORDS); when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(200L); @@ -469,6 +465,7 @@ void testWithLargeSingleMessagesWhenDLQNotConfigured() { records.add(largeRecord); sink.doOutput(records); + List[] foundEvents = new List[1]; await().atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { long endTime = Instant.now().toEpochMilli(); @@ -481,14 +478,17 @@ void testWithLargeSingleMessagesWhenDLQNotConfigured() { .build(); GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); List events = response.events(); + foundEvents[0] = events; assertThat(events.size(), equalTo(NUM_RECORDS)); - for (int i = 0; i < events.size(); i++) { - String message = events.get(i).message(); - Map event = objectMapper.readValue(message, Map.class); - assertThat(event.get("name"), equalTo("Person"+i)); - assertThat(event.get("age"), equalTo(Integer.toString(i))); - } }); + List events = foundEvents[0]; + assertThat(events, notNullValue()); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person"+i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); assertThat(requestsSuccessCount.get(), equalTo(1)); assertThat(largeEventsDroppedCount.get(), equalTo(1)); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 048624ce2e..8cddc032ab 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -30,6 +30,7 @@ public class CloudWatchLogsSinkConfig { private PluginModel dlq; @JsonProperty("threshold") + @Valid private ThresholdConfig thresholdConfig = new ThresholdConfig(); @JsonProperty("log_group") diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java index 1bdbf0499c..2970c051ad 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java @@ -8,10 +8,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.AssertTrue; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.model.constraints.ByteCountMax; +import org.opensearch.dataprepper.model.constraints.ByteCountMin; import java.time.Duration; @@ -23,8 +24,8 @@ public class ThresholdConfig { public static final long ONE_MB = 1048576L; public static final int DEFAULT_BATCH_SIZE = 25; - public static final String DEFAULT_MAX_EVENT_SIZE = "1mb"; - public static final String DEFAULT_MAX_REQUEST_SIZE = "1mb"; + public static final ByteCount DEFAULT_MAX_EVENT_SIZE = ByteCount.parse("1mb"); + public static final ByteCount DEFAULT_MAX_REQUEST_SIZE = ByteCount.parse("1mb"); public static final long DEFAULT_FLUSH_INTERVAL = 60; @JsonProperty(value = "batch_size", defaultValue="25") @@ -33,10 +34,14 @@ public class ThresholdConfig { private int batchSize = DEFAULT_BATCH_SIZE; @JsonProperty(value = "max_event_size", defaultValue="1mb") - private String maxEventSize = DEFAULT_MAX_EVENT_SIZE; + @ByteCountMin("1b") + @ByteCountMax("1mb") + private ByteCount maxEventSize = DEFAULT_MAX_EVENT_SIZE; @JsonProperty(value = "max_request_size", defaultValue="1mb") - private String maxRequestSize = DEFAULT_MAX_REQUEST_SIZE; + @ByteCountMin("1b") + @ByteCountMax("1mb") + private ByteCount maxRequestSize = DEFAULT_MAX_REQUEST_SIZE; @JsonProperty("flush_interval") @DurationMin(seconds = 60) @@ -48,26 +53,15 @@ public int getBatchSize() { } public long getMaxEventSizeBytes() { - return ByteCount.parse(maxEventSize).getBytes(); + return maxEventSize.getBytes(); } public long getMaxRequestSizeBytes() { - return ByteCount.parse(maxRequestSize).getBytes(); + return maxRequestSize.getBytes(); } public long getFlushInterval() { return flushInterval.getSeconds(); } - @AssertTrue(message = "Both the maximum event size and maximum request size must be configured with a value greater than zero (0) and less than 1 megabyte (MB)") - boolean isValidConfig() { - long maxEventSizeBytes = ByteCount.parse(maxEventSize).getBytes(); - long maxRequestSizeBytes = ByteCount.parse(maxRequestSize).getBytes(); - - return maxEventSizeBytes > 0 && - maxEventSizeBytes < ONE_MB && - maxRequestSizeBytes > 0 && - maxRequestSizeBytes < ONE_MB; - } - } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java index edee61aa0a..6ff12b7104 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java @@ -32,8 +32,8 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() { final ThresholdConfig thresholdConfig = new ThresholdConfig(); assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE)); - assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_MAX_EVENT_SIZE).getBytes())); - assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_MAX_REQUEST_SIZE).getBytes())); + assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ThresholdConfig.DEFAULT_MAX_EVENT_SIZE.getBytes())); + assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ThresholdConfig.DEFAULT_MAX_REQUEST_SIZE.getBytes())); assertThat(thresholdConfig.getFlushInterval(), equalTo(ThresholdConfig.DEFAULT_FLUSH_INTERVAL)); } @@ -49,7 +49,7 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_batch_size(final in @ValueSource(strings = {"1kb", "10kb", "256kb"}) void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_event_size(final String max_event_size) throws NoSuchFieldException, IllegalAccessException { ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); - ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", max_event_size); + ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", ByteCount.parse(max_event_size)); assertThat(sampleThresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(max_event_size).getBytes())); } @@ -57,7 +57,7 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_event_size(fina @ValueSource(strings = {"1b", "100b", "1048576b"}) void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(final String max_batch_request_size) throws NoSuchFieldException, IllegalAccessException { ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); - ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", max_batch_request_size); + ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", ByteCount.parse(max_batch_request_size)); assertThat(sampleThresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(max_batch_request_size).getBytes())); } @@ -69,18 +69,4 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interv assertThat(sampleThresholdConfig.getFlushInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ; } - @ParameterizedTest - @ValueSource(strings = {"0", "2mb"}) - void GIVEN_invalid_max_request_size_SHOULD_fail(final String maxRequestSize) throws NoSuchFieldException, IllegalAccessException { - ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); - ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", maxRequestSize); - } - - @ParameterizedTest - @ValueSource(strings = {"0", "2mb"}) - void GIVEN_invalid_max_event_size_SHOULD_fail(final String maxEventSize) throws NoSuchFieldException, IllegalAccessException { - ThresholdConfig sampleThresholdConfig = new ThresholdConfig(); - ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", maxEventSize); - } - } From 520037dca3d83f85576c77e1d1a795b43e59a220 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 14 Oct 2025 16:44:21 -0700 Subject: [PATCH 3/3] Added distribution summary metrics for log size and request size Signed-off-by: Kondaka --- .../sink/cloudwatch_logs/CloudWatchLogsIT.java | 6 ++++++ .../client/CloudWatchLogsMetrics.java | 15 +++++++++++++++ .../client/CloudWatchLogsService.java | 2 ++ .../cloudwatch_logs/CloudWatchLogsSinkTest.java | 6 +++++- 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 67299b63e4..fe8df7aa55 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.extension.ExtendWith; @@ -98,6 +99,9 @@ public class CloudWatchLogsIT { @Mock private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; + @Mock + private DistributionSummary summary; + @Mock private Counter eventsSuccessCounter; @Mock @@ -156,6 +160,7 @@ void setUp() { logGroupName = System.getProperty("tests.cloudwatch.log_group"); logStreamName = createLogStream(logGroupName); pluginMetrics = mock(PluginMetrics.class); + summary = mock(DistributionSummary.class); eventsSuccessCounter = mock(Counter.class); requestsSuccessCounter = mock(Counter.class); eventsFailedCounter = mock(Counter.class); @@ -218,6 +223,7 @@ void setUp() { } return null; }).when(pluginMetrics).counter(anyString()); + when(pluginMetrics.summary(anyString())).thenReturn(summary); cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName); when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java index aa856cb280..3bf920c525 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.metrics.PluginMetrics; /** @@ -19,11 +20,15 @@ public class CloudWatchLogsMetrics { public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed"; public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed"; public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped"; + public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize"; + public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize"; private final Counter logEventSuccessCounter; private final Counter logEventFailCounter; private final Counter requestSuccessCount; private final Counter requestFailCount; private final Counter logLargeEventsDroppedCounter; + private final DistributionSummary logSizeMetric; + private final DistributionSummary requestSizeMetric; public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) { this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED); @@ -31,6 +36,8 @@ public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) { this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED); this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED); this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED); + this.logSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_LOG_SIZE); + this.requestSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_REQUEST_SIZE); } public void increaseLogEventSuccessCounter(int value) { @@ -52,4 +59,12 @@ public void increaseRequestFailCounter(int value) { public void increaseLogLargeEventsDroppedCounter(int value) { logLargeEventsDroppedCounter.increment(value); } + + public void recordLogSize(int value) { + logSizeMetric.record(value); + } + + public void recordRequestSize(int value) { + requestSizeMetric.record(value); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java index 1d3ffca91c..b1e27a2e22 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java @@ -83,6 +83,7 @@ public void processLogEvents(final Collection> logs) { String logString = log.getData().toJsonString(); int logLength = logString.length(); + cloudWatchLogsMetrics.recordLogSize(logLength); if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) { final String failureMessage = String.format("Event blocked due to Max Size restriction! Event Size : %s", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE)); DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler, dropIfDlqNotConfigured); @@ -121,6 +122,7 @@ private void stageLogEvents() { List inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(buffer.getBufferedData()); cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, buffer.getEventHandles()); + cloudWatchLogsMetrics.recordRequestSize(buffer.getBufferSize()); buffer.resetBuffer(); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index 91677d390b..750e9f7537 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; +import io.micrometer.core.instrument.DistributionSummary; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; @@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; @@ -68,6 +70,8 @@ void setUp() { mockHeaderOverrides.put("X-Test-Header", "test-value"); mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); mockClient = mock(CloudWatchLogsClient.class); + DistributionSummary summary = mock(DistributionSummary.class); + when(mockPluginMetrics.summary(anyString())).thenReturn(summary); when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig); when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); @@ -213,4 +217,4 @@ void WHEN_sink_initialization_with_header_overrides_THEN_sink_is_ready() { assertTrue(testCloudWatchSink.isReady()); } } -} \ No newline at end of file +}