Skip to content

Commit 520037d

Browse files
committed
Added distribution summary metrics for log size and request size
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 19a6170 commit 520037d

4 files changed

Lines changed: 28 additions & 1 deletion

File tree

data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;
77

88
import io.micrometer.core.instrument.Counter;
9+
import io.micrometer.core.instrument.DistributionSummary;
910
import com.fasterxml.jackson.databind.ObjectMapper;
1011
import org.apache.commons.lang3.RandomStringUtils;
1112
import org.junit.jupiter.api.extension.ExtendWith;
@@ -98,6 +99,9 @@ public class CloudWatchLogsIT {
9899
@Mock
99100
private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig;
100101

102+
@Mock
103+
private DistributionSummary summary;
104+
101105
@Mock
102106
private Counter eventsSuccessCounter;
103107
@Mock
@@ -156,6 +160,7 @@ void setUp() {
156160
logGroupName = System.getProperty("tests.cloudwatch.log_group");
157161
logStreamName = createLogStream(logGroupName);
158162
pluginMetrics = mock(PluginMetrics.class);
163+
summary = mock(DistributionSummary.class);
159164
eventsSuccessCounter = mock(Counter.class);
160165
requestsSuccessCounter = mock(Counter.class);
161166
eventsFailedCounter = mock(Counter.class);
@@ -218,6 +223,7 @@ void setUp() {
218223
}
219224
return null;
220225
}).when(pluginMetrics).counter(anyString());
226+
when(pluginMetrics.summary(anyString())).thenReturn(summary);
221227
cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class);
222228
when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName);
223229
when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null);

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client;
77

88
import io.micrometer.core.instrument.Counter;
9+
import io.micrometer.core.instrument.DistributionSummary;
910
import org.opensearch.dataprepper.metrics.PluginMetrics;
1011

1112
/**
@@ -19,18 +20,24 @@ public class CloudWatchLogsMetrics {
1920
public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
2021
public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
2122
public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped";
23+
public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize";
24+
public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize";
2225
private final Counter logEventSuccessCounter;
2326
private final Counter logEventFailCounter;
2427
private final Counter requestSuccessCount;
2528
private final Counter requestFailCount;
2629
private final Counter logLargeEventsDroppedCounter;
30+
private final DistributionSummary logSizeMetric;
31+
private final DistributionSummary requestSizeMetric;
2732

2833
public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
2934
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
3035
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
3136
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
3237
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
3338
this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED);
39+
this.logSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_LOG_SIZE);
40+
this.requestSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_REQUEST_SIZE);
3441
}
3542

3643
public void increaseLogEventSuccessCounter(int value) {
@@ -52,4 +59,12 @@ public void increaseRequestFailCounter(int value) {
5259
public void increaseLogLargeEventsDroppedCounter(int value) {
5360
logLargeEventsDroppedCounter.increment(value);
5461
}
62+
63+
public void recordLogSize(int value) {
64+
logSizeMetric.record(value);
65+
}
66+
67+
public void recordRequestSize(int value) {
68+
requestSizeMetric.record(value);
69+
}
5570
}

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public void processLogEvents(final Collection<Record<Event>> logs) {
8383
String logString = log.getData().toJsonString();
8484
int logLength = logString.length();
8585

86+
cloudWatchLogsMetrics.recordLogSize(logLength);
8687
if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) {
8788
final String failureMessage = String.format("Event blocked due to Max Size restriction! Event Size : %s", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
8889
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler, dropIfDlqNotConfigured);
@@ -121,6 +122,7 @@ private void stageLogEvents() {
121122

122123
List<InputLogEvent> inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(buffer.getBufferedData());
123124
cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, buffer.getEventHandles());
125+
cloudWatchLogsMetrics.recordRequestSize(buffer.getBufferSize());
124126

125127
buffer.resetBuffer();
126128
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;
77

8+
import io.micrometer.core.instrument.DistributionSummary;
89
import org.junit.jupiter.api.BeforeEach;
910
import org.junit.jupiter.api.Test;
1011
import org.mockito.MockedStatic;
@@ -30,6 +31,7 @@
3031
import static org.junit.jupiter.api.Assertions.assertTrue;
3132
import static org.junit.jupiter.api.Assertions.assertThrows;
3233
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.ArgumentMatchers.anyString;
3335
import static org.mockito.ArgumentMatchers.eq;
3436
import static org.mockito.Mockito.atLeast;
3537
import static org.mockito.Mockito.mock;
@@ -68,6 +70,8 @@ void setUp() {
6870
mockHeaderOverrides.put("X-Test-Header", "test-value");
6971
mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class);
7072
mockClient = mock(CloudWatchLogsClient.class);
73+
DistributionSummary summary = mock(DistributionSummary.class);
74+
when(mockPluginMetrics.summary(anyString())).thenReturn(summary);
7175

7276
when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig);
7377
when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
@@ -213,4 +217,4 @@ void WHEN_sink_initialization_with_header_overrides_THEN_sink_is_ready() {
213217
assertTrue(testCloudWatchSink.isReady());
214218
}
215219
}
216-
}
220+
}

0 commit comments

Comments
 (0)