Skip to content

Commit e895771

Browse files
committed
Addressed review comments and fixed acknowledgements issue
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 9f49cd1 commit e895771

5 files changed

Lines changed: 44 additions & 15 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
99
import org.opensearch.dataprepper.model.record.Record;
1010
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.event.InternalEventHandle;
1112
import org.opensearch.dataprepper.model.event.EventMetadata;
1213

1314
import java.util.Collection;
@@ -63,7 +64,20 @@ public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines)
6364
}
6465
}
6566

66-
public boolean forwardRecords(final Collection<Record<Event>> records, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
67+
public SinkForwardRecordsContext prepareRecordsForForwarding(final Collection<Record<Event>> records) {
68+
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
69+
records.forEach((record) -> {
70+
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
71+
if (eventHandle != null) {
72+
eventHandle.acquireReference();
73+
}
74+
});
75+
}
76+
return new SinkForwardRecordsContext();
77+
78+
}
79+
80+
public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecordsContext, final Collection<Record<Event>> records, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
6781
if (forwardToPipelines.size() == 0) {
6882
return false;
6983
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.sink;
7+
8+
public class SinkForwardRecordsContext {
9+
public SinkForwardRecordsContext() {}
10+
}
11+

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010

1111
import org.opensearch.dataprepper.model.record.Record;
1212
import org.opensearch.dataprepper.model.event.Event;
13+
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
1314
import org.opensearch.dataprepper.model.event.EventMetadata;
1415
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1516

1617
import static org.junit.jupiter.api.Assertions.assertThrows;
1718
import static org.hamcrest.MatcherAssert.assertThat;
1819
import static org.hamcrest.Matchers.equalTo;
1920
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.doNothing;
2022
import static org.mockito.ArgumentMatchers.eq;
2123
import static org.mockito.ArgumentMatchers.any;
2224
import static org.mockito.Mockito.when;
@@ -78,23 +80,27 @@ public void testForwardToPipelinesWithPipelineMap() {
7880
Record<Event> record = mock(Record.class);
7981
EventMetadata eventMetadata = mock(EventMetadata.class);
8082
Event event = mock(Event.class);
83+
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
84+
doNothing().when(eventHandle).acquireReference();
85+
when(event.getEventHandle()).thenReturn(eventHandle);
8186
when(record.getData()).thenReturn(event);
8287
when(event.getMetadata()).thenReturn(eventMetadata);
8388
Collection<Record<Event>> records = Collections.singletonList(record);
89+
SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext.prepareRecordsForForwarding(records);
8490

85-
assertThat(sinkContext.forwardRecords(records, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
91+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
8692
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
8793
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
8894
verify(event, times(1)).put(any(String.class), any(Object.class));
8995
verify(event, times(1)).getMetadata();
9096
verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class));
91-
assertThat(sinkContext.forwardRecords(records, null, null), equalTo(true));
97+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, null, null), equalTo(true));
9298
verify(forwardPipeline1, times(2)).sendEvents(eq(records));
9399
verify(forwardPipeline2, times(2)).sendEvents(eq(records));
94-
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(true));
100+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of(), Map.of()), equalTo(true));
95101
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
96102
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
97-
assertThat(sinkContext.forwardRecords(null, Map.of(), Map.of()), equalTo(true));
103+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, Map.of(), Map.of()), equalTo(true));
98104
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
99105
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
100106
}
@@ -113,7 +119,8 @@ public void testForwardToPipelinesWithPipelineMapAndFailureCases() {
113119
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
114120
assertThrows(RuntimeException.class, () -> sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1)));
115121
Collection<Record<Event>> records = mock(Collection.class);
116-
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(false));
122+
SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext.prepareRecordsForForwarding(records);
123+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of(), Map.of()), equalTo(false));
117124
}
118125

119126
@Test
@@ -125,7 +132,8 @@ public void testWithNoForwardToPipelines() {
125132
final List<String> testForwardToPipelineNames = Collections.emptyList();
126133
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
127134
Collection<Record<Event>> records = mock(Collection.class);
128-
assertThat(sinkContext.forwardRecords(records, Map.of(), Map.of()), equalTo(false));
135+
SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext.prepareRecordsForForwarding(records);
136+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of(), Map.of()), equalTo(false));
129137
}
130138
}
131139

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/HeadlessPipelinesIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ void dlq_pipeline_test() {
7878

7979
final List<Record<Event>> sinkRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_DLQ);
8080

81+
assertThat(sinkRecords.size(), equalTo(recordsToCreate));
8182
for (int i = 0; i < sinkRecords.size(); i++) {
8283
final Record<Event> inputRecord = inputRecords.get(i);
8384
final Record<Event> sinkRecord = sinkRecords.get(i);

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,28 +68,23 @@ public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkConte
6868

6969
@Override
7070
public void output(final Collection<Record<Object>> records) {
71-
final boolean doForward = sinkContext.getForwardToPipelines().size() > 0;
71+
sinkContext.prepareForForwardPipelines(records);
7272
lock.lock();
7373
Collection<Record<Event>> events = new ArrayList<>();
74+
7475
try {
7576
if (isStopRequested)
7677
return;
7778

7879
for (final Record<Object> record : records) {
7980
try {
8081
checkTypeAndWriteObject(record.getData(), writer);
81-
if (doForward && record.getData() instanceof Event) {
82-
Event event = (Event)record.getData();
83-
events.add(new Record<>(event));
84-
}
8582
} catch (final IOException ex) {
8683
throw new RuntimeException(format("Encountered exception writing to file %s", outputFilePath), ex);
8784
}
8885
}
8986

90-
if (doForward) {
91-
sinkContext.forwardRecords(events, null, null);
92-
}
87+
sinkContext.forwardRecords(events, null, null);
9388
try {
9489
writer.flush();
9590
} catch (final IOException ex) {

0 commit comments

Comments
 (0)