Skip to content

Commit 60e5f63

Browse files
authored
Merge branch 'opensearch-project:main' into feature/worker-partition-retry-strategy
2 parents 467a124 + 1f3c152 commit 60e5f63

39 files changed

Lines changed: 2594 additions & 223 deletions

File tree

.gitignore

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
1-
# Ignore Gradle project-specific cache directory
2-
.DS_Store
3-
.idea
4-
*.iml
5-
.gradle
6-
7-
# Ignore Gradle build output directory
1+
# Gradle directories
82
build
9-
10-
# Ignore things downloaded for gradle es plugin
3+
.gradle
114
gradle/tools
125

136
# Ignore config file generated by test
@@ -19,3 +12,10 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml
1912

2013
# output folder created when we run test cases
2114
**/out/
15+
16+
# Development tools
17+
.DS_Store
18+
.idea
19+
*.iml
20+
.kiro
21+
.vscode

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.model.configuration;
77

8+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
89
import com.fasterxml.jackson.annotation.JsonCreator;
910
import com.fasterxml.jackson.annotation.JsonProperty;
1011

@@ -30,6 +31,9 @@ public SinkForwardConfig(
3031
@JsonProperty("pipelines") final List<String> pipelineNames,
3132
@JsonProperty("with_data") final Map<String, Object> withData,
3233
@JsonProperty("with_metadata") final Map<String, Object> withMetadata) {
34+
if (pipelineNames.size() != 1) {
35+
throw new InvalidPluginConfigurationException("Supports only one forwarding pipeline");
36+
}
3337
this.pipelineNames = pipelineNames;
3438
this.withData = withData;
3539
this.withMetadata = withMetadata;
@@ -46,5 +50,6 @@ public Map<String, Object> getWithMetadata() {
4650
public Map<String, Object> getWithData() {
4751
return withData;
4852
}
53+
4954
}
5055

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecords
9797
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
9898
entry.getValue().sendEvents(records);
9999
}
100+
sinkForwardRecordsContext.clearRecords();
100101
return true;
101102
}
102103

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.opensearch.dataprepper.model.record.Record;
99
import org.opensearch.dataprepper.model.event.Event;
10-
import org.opensearch.dataprepper.model.event.InternalEventHandle;
1110

1211
import java.util.ArrayList;
1312
import java.util.Collection;
@@ -25,28 +24,21 @@ public SinkForwardRecordsContext(SinkContext sinkContext) {
2524
public void addRecord(Record<Event> record) {
2625
if (!forwardPipelinesPresent)
2726
return;
28-
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
29-
if (eventHandle != null) {
30-
eventHandle.acquireReference();
31-
}
3227
records.add(record);
3328
}
3429

3530
public void addRecords(Collection<Record<Event>> newRecords) {
3631
if (!forwardPipelinesPresent)
3732
return;
38-
newRecords.forEach((record) -> {
39-
Event event = record.getData();
40-
InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle();
41-
if (eventHandle != null) {
42-
eventHandle.acquireReference();
43-
}
44-
});
4533
records.addAll(newRecords);
4634
}
4735

4836
public List<Record<Event>> getRecords() {
4937
return records;
5038
}
39+
40+
public void clearRecords() {
41+
records.clear();
42+
}
5143
}
5244

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkForwardConfigTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
package org.opensearch.dataprepper.model.configuration;
77

8+
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
89
import org.junit.jupiter.api.Test;
910

1011
import static org.hamcrest.CoreMatchers.equalTo;
1112
import static org.hamcrest.CoreMatchers.nullValue;
1213
import static org.mockito.Mockito.mock;
1314
import static org.hamcrest.MatcherAssert.assertThat;
15+
import static org.junit.jupiter.api.Assertions.assertThrows;
1416

1517
import java.util.List;
1618
import java.util.Map;
@@ -26,14 +28,27 @@ void testDefaults() {
2628
}
2729

2830
@Test
29-
void testCustomValues() {
30-
List<String> pipelines = mock(List.class);
31+
void pipelines_lsit_with_one_pipeline_succeeds() {
32+
List<String> pipelines = List.of("pipeline1");
3133
Map<String, Object> withData = mock(Map.class);
3234
Map<String, Object> withMetadata = mock(Map.class);
3335
SinkForwardConfig sinkForwardConfig = new SinkForwardConfig(pipelines, withData, withMetadata);
3436
assertThat(sinkForwardConfig.getPipelineNames(), equalTo(pipelines));
3537
assertThat(sinkForwardConfig.getWithData(), equalTo(withData));
3638
assertThat(sinkForwardConfig.getWithMetadata(), equalTo(withMetadata));
3739
}
40+
41+
@Test
42+
void pipelines_list_with_two_or_more_pipelines_throws_exception() {
43+
List<String> pipelines = List.of("pipeline1", "pipeline2");
44+
Map<String, Object> withData = mock(Map.class);
45+
Map<String, Object> withMetadata = mock(Map.class);
46+
assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(pipelines, withData, withMetadata));
47+
}
48+
49+
@Test
50+
void empty_pipelines_list_throws_exception() {
51+
assertThrows(InvalidPluginConfigurationException.class, ()->new SinkForwardConfig(List.of(), Map.of(), Map.of()));
52+
}
3853
}
3954

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,21 @@ public void testForwardToPipelinesWithPipelineMap() {
9393
verify(forwardPipeline2, times(0)).sendEvents(eq(records));
9494
sinkForwardRecordsContext.addRecords(records);
9595
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
96-
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
97-
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
96+
verify(forwardPipeline1, times(1)).sendEvents(any());
97+
verify(forwardPipeline2, times(1)).sendEvents(any());
9898
verify(event, times(1)).put(any(String.class), any(Object.class));
9999
verify(event, times(1)).getMetadata();
100100
verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class));
101+
records = Collections.singletonList(record);
102+
sinkForwardRecordsContext.addRecords(records);
101103
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, null), equalTo(true));
102-
verify(forwardPipeline1, times(2)).sendEvents(eq(records));
103-
verify(forwardPipeline2, times(2)).sendEvents(eq(records));
104+
verify(forwardPipeline1, times(2)).sendEvents(any());
105+
verify(forwardPipeline2, times(2)).sendEvents(any());
106+
records = Collections.singletonList(record);
107+
sinkForwardRecordsContext.addRecords(records);
104108
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(true));
105-
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
106-
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
109+
verify(forwardPipeline1, times(3)).sendEvents(any());
110+
verify(forwardPipeline2, times(3)).sendEvents(any());
107111
}
108112

109113
@Test
@@ -148,6 +152,7 @@ public void testWithNoForwardToPipelines() {
148152
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
149153
sinkForwardRecordsContext.addRecords(List.of(record));
150154
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false));
155+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
151156
}
152157
}
153158

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
import org.opensearch.dataprepper.model.record.Record;
1111
import org.opensearch.dataprepper.model.event.Event;
12-
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
1312
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1413

1514
import static org.mockito.Mockito.mock;
1615
import static org.hamcrest.MatcherAssert.assertThat;
1716
import static org.hamcrest.Matchers.equalTo;
18-
import static org.mockito.Mockito.doNothing;
1917
import static org.mockito.Mockito.when;
2018

2119
import java.util.List;
@@ -30,16 +28,9 @@ public void testSinkForwardRecordContextBasic() {
3028
SinkContext sinkContext = mock(SinkContext.class);
3129
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of());
3230
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
33-
Event event = mock(Event.class);
34-
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
35-
doNothing().when(eventHandle).acquireReference();
3631
Record<Event> record1 = mock(Record.class);
3732
Record<Event> record2 = mock(Record.class);
3833
Record<Event> record3 = mock(Record.class);
39-
when(record1.getData()).thenReturn(event);
40-
when(record2.getData()).thenReturn(event);
41-
when(record3.getData()).thenReturn(event);
42-
when(event.getEventHandle()).thenReturn(eventHandle);
4334
sinkForwardRecordsContext.addRecord(record1);
4435
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
4536
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
@@ -52,19 +43,28 @@ public void testSinkForwardRecordContextWithForwardingPipelines() {
5243
HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class);
5344
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline));
5445
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
55-
Event event = mock(Event.class);
56-
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
57-
doNothing().when(eventHandle).acquireReference();
5846
Record<Event> record1 = mock(Record.class);
5947
Record<Event> record2 = mock(Record.class);
6048
Record<Event> record3 = mock(Record.class);
61-
when(record1.getData()).thenReturn(event);
62-
when(record2.getData()).thenReturn(event);
63-
when(record3.getData()).thenReturn(event);
64-
when(event.getEventHandle()).thenReturn(eventHandle);
6549
sinkForwardRecordsContext.addRecord(record1);
6650
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
6751
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
6852
assertThat(records.size(), equalTo(3));
53+
sinkForwardRecordsContext.clearRecords();
54+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
55+
}
56+
57+
@Test
58+
public void testSinkForwardRecordContextClearRecords() {
59+
SinkContext sinkContext = mock(SinkContext.class);
60+
HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class);
61+
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline));
62+
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
63+
Record<Event> record1 = mock(Record.class);
64+
Record<Event> record2 = mock(Record.class);
65+
sinkForwardRecordsContext.addRecords(List.of(record1, record2));
66+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(2));
67+
sinkForwardRecordsContext.clearRecords();
68+
assertThat(sinkForwardRecordsContext.getRecords().size(), equalTo(0));
6969
}
7070
}

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
9494
.dropIfDlqNotConfigured(true)
9595
.logGroup(cloudWatchLogsSinkConfig.getLogGroup())
9696
.logStream(cloudWatchLogsSinkConfig.getLogStream())
97-
.retryCount(cloudWatchLogsSinkConfig.getMaxRetries())
97+
.retryCount(dlqPushHandler == null ? Integer.MAX_VALUE : cloudWatchLogsSinkConfig.getMaxRetries())
9898
.executor(executor)
9999
.build();
100100

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void dispatchLogs(List<InputLogEvent> inputLogEvents, List<EventHandle> e
107107
@Builder
108108
protected static class Uploader implements Runnable {
109109
static final long INITIAL_DELAY_MS = 50;
110+
static final int MULTIPLE_FAILURES_METRIC_COUNT = 5;
110111
static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
111112
private final CloudWatchLogsClient cloudWatchLogsClient;
112113
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
@@ -141,7 +142,9 @@ public void upload() {
141142
failureMessage = e.getMessage();
142143
LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage());
143144
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
144-
failCount++;
145+
if (++failCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) {
146+
cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1);
147+
}
145148
final long delayMillis = backoff.nextDelayMillis(failCount);
146149
if (delayMillis > 0) {
147150
Thread.sleep(delayMillis);

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,23 @@ public class CloudWatchLogsMetrics {
1919
public static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded";
2020
public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
2121
public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
22+
public static final String CLOUDWATCH_LOGS_REQUEST_MULTI_FAILED = "cloudWatchLogsRequestMultipleFailures";
2223
public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped";
2324
public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize";
2425
public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize";
2526
private final Counter logEventSuccessCounter;
2627
private final Counter logEventFailCounter;
2728
private final Counter requestSuccessCount;
2829
private final Counter requestFailCount;
30+
private final Counter requestMultiFailCount;
2931
private final Counter logLargeEventsDroppedCounter;
3032
private final DistributionSummary logSizeMetric;
3133
private final DistributionSummary requestSizeMetric;
3234

3335
public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
3436
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
3537
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
38+
this.requestMultiFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUEST_MULTI_FAILED);
3639
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
3740
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
3841
this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED);
@@ -56,6 +59,10 @@ public void increaseRequestFailCounter(int value) {
5659
requestFailCount.increment(value);
5760
}
5861

62+
public void increaseRequestMultiFailCounter(int value) {
63+
requestMultiFailCount.increment(value);
64+
}
65+
5966
public void increaseLogLargeEventsDroppedCounter(int value) {
6067
logLargeEventsDroppedCounter.increment(value);
6168
}

0 commit comments

Comments
 (0)