Skip to content

Commit 8f194b7

Browse files
committed
Addressed review comments
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent e49cd21 commit 8f194b7

7 files changed

Lines changed: 147 additions & 59 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@
1515

1616
public class SinkForwardRecordsContext {
1717
List<Record<Event>> records;
18+
boolean forwardPipelinesPresent;
1819

19-
public SinkForwardRecordsContext() {
20+
public SinkForwardRecordsContext(SinkContext sinkContext) {
21+
forwardPipelinesPresent = (sinkContext != null && sinkContext.getForwardToPipelines().size() > 0);
2022
records = new ArrayList<>();
2123
}
2224

2325
public void addRecord(Record<Event> record) {
26+
if (!forwardPipelinesPresent)
27+
return;
2428
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
2529
if (eventHandle != null) {
2630
eventHandle.acquireReference();
@@ -29,6 +33,8 @@ public void addRecord(Record<Event> record) {
2933
}
3034

3135
public void addRecords(Collection<Record<Event>> newRecords) {
36+
if (!forwardPipelinesPresent)
37+
return;
3238
newRecords.forEach((record) -> {
3339
Event event = record.getData();
3440
InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle();

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void testForwardToPipelinesWithPipelineMap() {
8686
when(record.getData()).thenReturn(event);
8787
when(event.getMetadata()).thenReturn(eventMetadata);
8888
Collection<Record<Event>> records = Collections.singletonList(record);
89-
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
89+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
9090

9191
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
9292
verify(forwardPipeline1, times(0)).sendEvents(eq(records));
@@ -119,7 +119,7 @@ public void testForwardToPipelinesWithPipelineMapAndFailureCases() {
119119
assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(null));
120120
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
121121
assertThrows(RuntimeException.class, () -> sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1)));
122-
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
122+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
123123
Record<Event> record = mock(Record.class);
124124
Event event = mock(Event.class);
125125
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
@@ -145,7 +145,7 @@ public void testWithNoForwardToPipelines() {
145145
when(record.getData()).thenReturn(event);
146146
when(event.getEventHandle()).thenReturn(eventHandle);
147147

148-
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
148+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
149149
sinkForwardRecordsContext.addRecords(List.of(record));
150150
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false));
151151
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContextTest.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.opensearch.dataprepper.model.record.Record;
1111
import org.opensearch.dataprepper.model.event.Event;
1212
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
13+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1314

1415
import static org.mockito.Mockito.mock;
1516
import static org.hamcrest.MatcherAssert.assertThat;
@@ -18,14 +19,39 @@
1819
import static org.mockito.Mockito.when;
1920

2021
import java.util.List;
22+
import java.util.Map;
2123

2224
public class SinkForwardRecordsContextTest {
2325

2426
SinkForwardRecordsContext sinkForwardRecordsContext;
2527

2628
@Test
2729
public void testSinkForwardRecordContextBasic() {
28-
sinkForwardRecordsContext = new SinkForwardRecordsContext();
30+
SinkContext sinkContext = mock(SinkContext.class);
31+
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of());
32+
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
33+
Event event = mock(Event.class);
34+
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
35+
doNothing().when(eventHandle).acquireReference();
36+
Record<Event> record1 = mock(Record.class);
37+
Record<Event> record2 = mock(Record.class);
38+
Record<Event> record3 = mock(Record.class);
39+
when(record1.getData()).thenReturn(event);
40+
when(record2.getData()).thenReturn(event);
41+
when(record3.getData()).thenReturn(event);
42+
when(event.getEventHandle()).thenReturn(eventHandle);
43+
sinkForwardRecordsContext.addRecord(record1);
44+
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
45+
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
46+
assertThat(records.size(), equalTo(0));
47+
}
48+
49+
@Test
50+
public void testSinkForwardRecordContextWithForwardingPipelines() {
51+
SinkContext sinkContext = mock(SinkContext.class);
52+
HeadlessPipeline headlessPipeline = mock(HeadlessPipeline.class);
53+
when(sinkContext.getForwardToPipelines()).thenReturn(Map.of("pipeline1", headlessPipeline));
54+
sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
2955
Event event = mock(Event.class);
3056
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
3157
doNothing().when(eventHandle).acquireReference();
@@ -42,4 +68,3 @@ public void testSinkForwardRecordContextBasic() {
4268
assertThat(records.size(), equalTo(3));
4369
}
4470
}
45-
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.integration;
7+
8+
import org.junit.jupiter.api.AfterEach;
9+
import org.junit.jupiter.api.Test;
10+
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.event.JacksonEvent;
12+
import org.opensearch.dataprepper.model.record.Record;
13+
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
14+
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
15+
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import java.util.List;
20+
import java.util.UUID;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.IntStream;
24+
25+
import static org.awaitility.Awaitility.await;
26+
import static org.hamcrest.CoreMatchers.equalTo;
27+
import static org.hamcrest.CoreMatchers.not;
28+
import static org.hamcrest.CoreMatchers.notNullValue;
29+
import static org.hamcrest.MatcherAssert.assertThat;
30+
import static org.hamcrest.Matchers.empty;
31+
32+
class DLQHeadlessPipelinesIT {
33+
private static final Logger LOG = LoggerFactory.getLogger(ProcessorPipelineIT.class);
34+
private static final String IN_MEMORY_IDENTIFIER_DLQ = "PipelineDLQIT";
35+
private static final String IN_MEMORY_IDENTIFIER_FORWARD = "ForwardPipelineIT";
36+
private static final String PIPELINE_DLQ_TEST_CONFIGURATION= "pipeline-dlq.yaml";
37+
private static final String FORWARD_PIPELINE_TEST_CONFIGURATION= "forward-pipeline.yaml";
38+
private DataPrepperTestRunner dataPrepperTestRunner;
39+
private InMemorySourceAccessor inMemorySourceAccessor;
40+
private InMemorySinkAccessor inMemorySinkAccessor;
41+
42+
private void createPipeline(final String pipelineConfiguration) {
43+
dataPrepperTestRunner = DataPrepperTestRunner.builder()
44+
.withPipelinesDirectoryOrFile(pipelineConfiguration)
45+
.build();
46+
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
47+
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
48+
dataPrepperTestRunner.start();
49+
LOG.info("Started test runner.");
50+
}
51+
52+
@AfterEach
53+
void tearDown() {
54+
LOG.info("Test tear down. Stop the test runner.");
55+
dataPrepperTestRunner.stop();
56+
}
57+
58+
@Test
59+
void dlq_pipeline_test() {
60+
createPipeline(PIPELINE_DLQ_TEST_CONFIGURATION);
61+
62+
final int recordsToCreate = 200;
63+
final List<Record<Event>> inputRecords = IntStream.range(0, recordsToCreate)
64+
.mapToObj(i -> UUID.randomUUID().toString())
65+
.map(JacksonEvent::fromMessage)
66+
.map(Record::new)
67+
.collect(Collectors.toList());
68+
69+
LOG.info("Submitting a batch of record.");
70+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER_DLQ, inputRecords);
71+
72+
await().atMost(400, TimeUnit.MILLISECONDS)
73+
.untilAsserted(() -> {
74+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ), not(empty()));
75+
});
76+
77+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ).size(), equalTo(recordsToCreate));
78+
79+
final List<Record<Event>> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ);
80+
81+
assertThat(sinkRecords.size(), equalTo(recordsToCreate));
82+
for (int i = 0; i < sinkRecords.size(); i++) {
83+
final Record<Event> inputRecord = inputRecords.get(i);
84+
final Record<Event> sinkRecord = sinkRecords.get(i);
85+
assertThat(sinkRecord, notNullValue());
86+
final Event recordData = sinkRecord.getData();
87+
assertThat(recordData, notNullValue());
88+
assertThat(
89+
recordData.get("message", String.class),
90+
equalTo(inputRecord.getData().get("message", String.class)));
91+
assertThat(recordData.get("test1", String.class),
92+
equalTo(null));
93+
94+
}
95+
}
96+
97+
}
98+

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/HeadlessPipelinesIT.java renamed to data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ForwardingHeadlessPipelinesIT.java

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import static org.hamcrest.MatcherAssert.assertThat;
3030
import static org.hamcrest.Matchers.empty;
3131

32-
class HeadlessPipelinesIT {
32+
class ForwardingHeadlessPipelinesIT {
3333
private static final Logger LOG = LoggerFactory.getLogger(ProcessorPipelineIT.class);
3434
private static final String IN_MEMORY_IDENTIFIER_DLQ = "PipelineDLQIT";
3535
private static final String IN_MEMORY_IDENTIFIER_FORWARD = "ForwardPipelineIT";
@@ -55,45 +55,6 @@ void tearDown() {
5555
dataPrepperTestRunner.stop();
5656
}
5757

58-
@Test
59-
void dlq_pipeline_test() {
60-
createPipeline(PIPELINE_DLQ_TEST_CONFIGURATION);
61-
62-
final int recordsToCreate = 200;
63-
final List<Record<Event>> inputRecords = IntStream.range(0, recordsToCreate)
64-
.mapToObj(i -> UUID.randomUUID().toString())
65-
.map(JacksonEvent::fromMessage)
66-
.map(Record::new)
67-
.collect(Collectors.toList());
68-
69-
LOG.info("Submitting a batch of record.");
70-
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER_DLQ, inputRecords);
71-
72-
await().atMost(400, TimeUnit.MILLISECONDS)
73-
.untilAsserted(() -> {
74-
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ), not(empty()));
75-
});
76-
77-
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ).size(), equalTo(recordsToCreate));
78-
79-
final List<Record<Event>> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ);
80-
81-
assertThat(sinkRecords.size(), equalTo(recordsToCreate));
82-
for (int i = 0; i < sinkRecords.size(); i++) {
83-
final Record<Event> inputRecord = inputRecords.get(i);
84-
final Record<Event> sinkRecord = sinkRecords.get(i);
85-
assertThat(sinkRecord, notNullValue());
86-
final Event recordData = sinkRecord.getData();
87-
assertThat(recordData, notNullValue());
88-
assertThat(
89-
recordData.get("message", String.class),
90-
equalTo(inputRecord.getData().get("message", String.class)));
91-
assertThat(recordData.get("test1", String.class),
92-
equalTo(null));
93-
94-
}
95-
}
96-
9758
@Test
9859
void pipeline_forward_test() {
9960
createPipeline(FORWARD_PIPELINE_TEST_CONFIGURATION);

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.dataprepper.model.record.Record;
1414
import org.opensearch.dataprepper.model.sink.Sink;
1515
import org.opensearch.dataprepper.model.sink.SinkContext;
16+
import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext;
1617

1718
import java.util.Collection;
1819

@@ -23,7 +24,6 @@ public class InMemorySink implements Sink<Record<Event>> {
2324
private final InMemorySinkAccessor inMemorySinkAccessor;
2425
private final AcknowledgementSetManager acknowledgementSetManager;
2526
private final Boolean acknowledgements;
26-
private final boolean doForward;
2727
private final SinkContext sinkContext;
2828

2929
@DataPrepperPluginConstructor
@@ -33,7 +33,6 @@ public InMemorySink(final InMemoryConfig inMemoryConfig,
3333
final InMemorySinkAccessor inMemorySinkAccessor) {
3434
testingKey = inMemoryConfig.getTestingKey();
3535
this.sinkContext = sinkContext;
36-
this.doForward = sinkContext.getForwardToPipelines().size() > 0;
3736
this.inMemorySinkAccessor = inMemorySinkAccessor;
3837
this.acknowledgementSetManager = acknowledgementSetManager;
3938
acknowledgements = inMemoryConfig.getAcknowledgements();
@@ -43,16 +42,15 @@ public InMemorySink(final InMemoryConfig inMemoryConfig,
4342
public void output(final Collection<Record<Event>> records) {
4443
inMemorySinkAccessor.addEvents(testingKey, records);
4544
boolean result = inMemorySinkAccessor.getResult();
46-
if (doForward) {
47-
sinkContext.forwardRecords(records, null, null);
48-
} else {
49-
records.stream().forEach((record) -> {
50-
EventHandle eventHandle = ((Event)record.getData()).getEventHandle();
51-
if (acknowledgements) {
52-
eventHandle.release(result);
53-
}
54-
});
55-
}
45+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
46+
sinkForwardRecordsContext.addRecords(records);
47+
records.stream().forEach((record) -> {
48+
EventHandle eventHandle = ((Event)record.getData()).getEventHandle();
49+
if (acknowledgements) {
50+
eventHandle.release(result);
51+
}
52+
});
53+
sinkContext.forwardRecords(sinkForwardRecordsContext, null, null);
5654
}
5755

5856
@Override

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkConte
6969

7070
@Override
7171
public void output(final Collection<Record<Object>> records) {
72-
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
72+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext);
7373
lock.lock();
7474
Collection<Record<Event>> events = new ArrayList<>();
7575

0 commit comments

Comments
 (0)