Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.lenient;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;

import java.time.Duration;
Expand Down Expand Up @@ -97,13 +99,18 @@ public class CloudWatchLogsIT {
@Mock
private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig;

@Mock
private DistributionSummary summary;

@Mock
private Counter eventsSuccessCounter;
@Mock
private Counter requestsSuccessCounter;
@Mock
private Counter eventsFailedCounter;
@Mock
private Counter largeEventsDroppedCounter;
@Mock
private Counter requestsFailedCounter;
@Mock
private Counter dlqSuccessCounter;
Expand All @@ -119,6 +126,7 @@ public class CloudWatchLogsIT {
private AtomicInteger eventsFailedCount;
private AtomicInteger requestsFailedCount;
private AtomicInteger dlqSuccessCount;
private AtomicInteger largeEventsDroppedCount;
private CloudWatchLogsClient cloudWatchLogsClient;
private ObjectMapper objectMapper;
private AwsCredentialsProvider awsCredentialsProvider;
Expand All @@ -130,6 +138,7 @@ void setUp() {
eventsSuccessCount = new AtomicInteger(0);
requestsSuccessCount = new AtomicInteger(0);
eventsFailedCount = new AtomicInteger(0);
largeEventsDroppedCount = new AtomicInteger(0);
requestsFailedCount = new AtomicInteger(0);
dlqSuccessCount = new AtomicInteger(0);
objectMapper = new ObjectMapper();
Expand All @@ -151,9 +160,11 @@ void setUp() {
logGroupName = System.getProperty("tests.cloudwatch.log_group");
logStreamName = createLogStream(logGroupName);
pluginMetrics = mock(PluginMetrics.class);
summary = mock(DistributionSummary.class);
eventsSuccessCounter = mock(Counter.class);
requestsSuccessCounter = mock(Counter.class);
eventsFailedCounter = mock(Counter.class);
largeEventsDroppedCounter = mock(Counter.class);
requestsFailedCounter = mock(Counter.class);
dlqSuccessCounter = mock(Counter.class);
lenient().doAnswer((a)-> {
Expand All @@ -166,6 +177,11 @@ void setUp() {
eventsFailedCount.addAndGet(v);
return null;
}).when(eventsFailedCounter).increment(any(Double.class));
lenient().doAnswer((a)-> {
int v = (int)(double)(a.getArgument(0));
largeEventsDroppedCount.addAndGet(v);
return null;
}).when(largeEventsDroppedCounter).increment(any(Double.class));
lenient().doAnswer((a)-> {
requestsSuccessCount.addAndGet(1);
return null;
Expand Down Expand Up @@ -199,11 +215,15 @@ void setUp() {
if (s.equals(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED)) {
return eventsFailedCounter;
}
if (s.equals(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED)) {
return largeEventsDroppedCounter;
}
if (s.contains("NumDlqSuccess")) {
return dlqSuccessCounter;
}
return null;
}).when(pluginMetrics).counter(anyString());
when(pluginMetrics.summary(anyString())).thenReturn(summary);
cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class);
when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName);
when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null);
Expand Down Expand Up @@ -437,6 +457,52 @@ void testWithLargeSingleMessagesSentToDLQ() {

}

@Test
void testWithLargeSingleMessagesWhenDLQNotConfigured() throws Exception {
long startTime = Instant.now().toEpochMilli();
when(thresholdConfig.getBatchSize()).thenReturn(NUM_RECORDS);
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(200L);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);
when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
Record<Event> largeRecord = getLargeRecord(200);
records.add(largeRecord);

sink.doOutput(records);
List<OutputLogEvent>[] foundEvents = new List[1];
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
long endTime = Instant.now().toEpochMilli();
GetLogEventsRequest getRequest = GetLogEventsRequest
.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.startTime(startTime)
.endTime(endTime)
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
foundEvents[0] = events;
assertThat(events.size(), equalTo(NUM_RECORDS));
});
List<OutputLogEvent> events = foundEvents[0];
assertThat(events, notNullValue());
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person"+i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS));
assertThat(requestsSuccessCount.get(), equalTo(1));
assertThat(largeEventsDroppedCount.get(), equalTo(1));
assertThat(dlqSuccessCount.get(), equalTo(0));
verify(eventHandle, times(NUM_RECORDS+1)).release(true);

}

@Test
void testWithBadCredentials_AllEventsToDLQ() {
s3Client = S3Client.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.dlqPushHandler(dlqPushHandler)
.dropIfDlqNotConfigured(true)
.logGroup(cloudWatchLogsSinkConfig.getLogGroup())
.logStream(cloudWatchLogsSinkConfig.getLogStream())
.retryCount(cloudWatchLogsSinkConfig.getMaxRetries())
Expand All @@ -104,7 +105,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
throw new InvalidBufferTypeException("Error loading buffer!");
}

cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher, dlqPushHandler);
cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsMetrics, cloudWatchLogsLimits, cloudWatchLogsDispatcher, dlqPushHandler, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ public class CloudWatchLogsDispatcher {
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private DlqPushHandler dlqPushHandler;
private boolean dropIfDlqNotConfigured;
private Executor executor;
private String logGroup;
private String logStream;
private int retryCount;
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final DlqPushHandler dlqPushHandler,
final boolean dropIfDlqNotConfigured,
final Executor executor,
final String logGroup,
final String logStream,
Expand All @@ -52,6 +54,7 @@ public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
this.logStream = logStream;
this.retryCount = retryCount;
this.dlqPushHandler = dlqPushHandler;
this.dropIfDlqNotConfigured = dropIfDlqNotConfigured;

this.executor = executor;
}
Expand Down Expand Up @@ -95,6 +98,7 @@ public void dispatchLogs(List<InputLogEvent> inputLogEvents, List<EventHandle> e
.dlqPushHandler(dlqPushHandler)
.putLogEventsRequest(putLogEventsRequest)
.eventHandles(eventHandles)
.dropIfDlqNotConfigured(dropIfDlqNotConfigured)
.totalEventCount(inputLogEvents.size())
.retryCount(retryCount)
.build());
Expand All @@ -111,6 +115,7 @@ protected static class Uploader implements Runnable {
private final List<EventHandle> eventHandles;
private final int totalEventCount;
private final int retryCount;
private boolean dropIfDlqNotConfigured;

@Override
public void run() {
Expand Down Expand Up @@ -154,7 +159,7 @@ public void upload() {
cloudWatchLogsMetrics.increaseLogEventFailCounter(totalEventCount);
List<InputLogEvent> logEvents = putLogEventsRequest.logEvents();
for (int i = 0; i < logEvents.size(); i++) {
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvents.get(i).message(), failureMessage, dlqPushHandler);
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvents.get(i).message(), failureMessage, dlqPushHandler, dropIfDlqNotConfigured);
if (dlqObject != null) {
dlqObjects.add(dlqObject);
}
Expand All @@ -179,7 +184,7 @@ List<DlqObject> getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsRespo
if (endIndex != null) {
int i = 0;
for (InputLogEvent logEvent : logEvents.subList(0, endIndex)) {
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvent.message(), "Too old log event", dlqPushHandler);
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(i), logEvent.message(), "Too old log event", dlqPushHandler, dropIfDlqNotConfigured);
if (dlqObject != null) {
dlqObjects.add(dlqObject);
}
Expand All @@ -190,7 +195,7 @@ List<DlqObject> getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsRespo
if (startIndex != null) {
int i = 0;
for (InputLogEvent logEvent : logEvents.subList(startIndex, logEvents.size())) {
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(startIndex + i), logEvent.message(), "Too old log event", dlqPushHandler);
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, eventHandles.get(startIndex + i), logEvent.message(), "Too old log event", dlqPushHandler, dropIfDlqNotConfigured);
if (dlqObject != null) {
dlqObjects.add(dlqObject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;

/**
Expand All @@ -18,16 +19,25 @@ public class CloudWatchLogsMetrics {
public static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded";
public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have a metric specifically for this. There could be other reasons to drop events. Then we'd need a metric for each one.

It seems to me that we could solve this with a generic cloudWatchLogsEventsDropped metric. This could alert Data Prepper administrators that events are being dropped. Then other metrics and/or logs can clue them into the exact issue. In this case, we can look at the distribution summary.

I think as it is, Data Prepper has a lot of metrics. I think we should consider ways to avoid continuing this growth.

public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize";
public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize";
private final Counter logEventSuccessCounter;
private final Counter logEventFailCounter;
private final Counter requestSuccessCount;
private final Counter requestFailCount;
private final Counter logLargeEventsDroppedCounter;
private final DistributionSummary logSizeMetric;
private final DistributionSummary requestSizeMetric;

public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED);
this.logSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_LOG_SIZE);
this.requestSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_REQUEST_SIZE);
}

public void increaseLogEventSuccessCounter(int value) {
Expand All @@ -45,4 +55,16 @@ public void increaseLogEventFailCounter(int value) {
public void increaseRequestFailCounter(int value) {
requestFailCount.increment(value);
}

public void increaseLogLargeEventsDroppedCounter(int value) {
logLargeEventsDroppedCounter.increment(value);
}

public void recordLogSize(int value) {
logSizeMetric.record(value);
}

public void recordRequestSize(int value) {
requestSizeMetric.record(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,28 @@ public class CloudWatchLogsService {
private final CloudWatchLogsDispatcher cloudWatchLogsDispatcher;
private final Buffer buffer;
private final CloudWatchLogsLimits cloudWatchLogsLimits;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final SinkStopWatch sinkStopWatch;
private final ReentrantLock processLock;
private final DlqPushHandler dlqPushHandler;
private final boolean dropIfDlqNotConfigured;
public CloudWatchLogsService(final Buffer buffer,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final CloudWatchLogsLimits cloudWatchLogsLimits,
final CloudWatchLogsDispatcher cloudWatchLogsDispatcher,
final DlqPushHandler dlqPushHandler) {
final DlqPushHandler dlqPushHandler,
final boolean dropIfDlqNotConfigured) {

this.buffer = buffer;
this.cloudWatchLogsLimits = cloudWatchLogsLimits;

this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
processLock = new ReentrantLock();
sinkStopWatch = new SinkStopWatch();

this.cloudWatchLogsDispatcher = cloudWatchLogsDispatcher;
this.dlqPushHandler = dlqPushHandler;
this.dropIfDlqNotConfigured = dropIfDlqNotConfigured;
}

/**
Expand All @@ -77,11 +83,14 @@ public void processLogEvents(final Collection<Record<Event>> logs) {
String logString = log.getData().toJsonString();
int logLength = logString.length();

cloudWatchLogsMetrics.recordLogSize(logLength);
if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) {
final String failureMessage = String.format("Event blocked due to Max Size restriction! Event Size : %s", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler);
DlqObject dlqObject = CloudWatchLogsSinkUtils.createDlqObject(0, log.getData().getEventHandle(), logString, failureMessage, dlqPushHandler, dropIfDlqNotConfigured);
if (dlqObject != null) {
dlqObjects.add(dlqObject);
} else if (dropIfDlqNotConfigured) {
cloudWatchLogsMetrics.increaseLogLargeEventsDroppedCounter(1);
}
continue;
}
Expand Down Expand Up @@ -113,6 +122,7 @@ private void stageLogEvents() {

List<InputLogEvent> inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(buffer.getBufferedData());
cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, buffer.getEventHandles());
cloudWatchLogsMetrics.recordRequestSize(buffer.getBufferSize());

buffer.resetBuffer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class CloudWatchLogsSinkConfig {
private PluginModel dlq;

@JsonProperty("threshold")
@Valid
private ThresholdConfig thresholdConfig = new ThresholdConfig();

@JsonProperty("log_group")
Expand Down
Loading
Loading