Skip to content

Commit e49cd21

Browse files
committed
Addressed review comments
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent f85094d commit e49cd21

5 files changed

Lines changed: 119 additions & 32 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
99
import org.opensearch.dataprepper.model.record.Record;
1010
import org.opensearch.dataprepper.model.event.Event;
11-
import org.opensearch.dataprepper.model.event.InternalEventHandle;
1211
import org.opensearch.dataprepper.model.event.EventMetadata;
1312

1413
import java.util.Collection;
@@ -64,20 +63,7 @@ public void setForwardToPipelines(final Map<String, HeadlessPipeline> pipelines)
6463
}
6564
}
6665

67-
public SinkForwardRecordsContext prepareRecordsForForwarding(final Collection<Record<Event>> records) {
68-
for (Map.Entry<String, HeadlessPipeline> entry: forwardToPipelines.entrySet()) {
69-
records.forEach((record) -> {
70-
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
71-
if (eventHandle != null) {
72-
eventHandle.acquireReference();
73-
}
74-
});
75-
}
76-
return new SinkForwardRecordsContext();
77-
78-
}
79-
80-
public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecordsContext, final Collection<Record<Event>> records, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
66+
public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecordsContext, final Map<String, Object> withData, final Map<String, Object> withMetadata) {
8167
if (forwardToPipelines.size() == 0) {
8268
return false;
8369
}
@@ -87,8 +73,9 @@ public boolean forwardRecords(final SinkForwardRecordsContext sinkForwardRecords
8773
return false;
8874
}
8975
}
76+
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
9077

91-
if (records == null) {
78+
if (records.size() == 0) {
9279
return true;
9380
}
9481

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkForwardRecordsContext.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,42 @@
55

66
package org.opensearch.dataprepper.model.sink;
77

8+
import org.opensearch.dataprepper.model.record.Record;
9+
import org.opensearch.dataprepper.model.event.Event;
10+
import org.opensearch.dataprepper.model.event.InternalEventHandle;
11+
12+
import java.util.ArrayList;
13+
import java.util.Collection;
14+
import java.util.List;
15+
816
public class SinkForwardRecordsContext {
9-
public SinkForwardRecordsContext() {}
17+
List<Record<Event>> records;
18+
19+
public SinkForwardRecordsContext() {
20+
records = new ArrayList<>();
21+
}
22+
23+
public void addRecord(Record<Event> record) {
24+
InternalEventHandle eventHandle = (InternalEventHandle)record.getData().getEventHandle();
25+
if (eventHandle != null) {
26+
eventHandle.acquireReference();
27+
}
28+
records.add(record);
29+
}
30+
31+
public void addRecords(Collection<Record<Event>> newRecords) {
32+
newRecords.forEach((record) -> {
33+
Event event = record.getData();
34+
InternalEventHandle eventHandle = (InternalEventHandle)event.getEventHandle();
35+
if (eventHandle != null) {
36+
eventHandle.acquireReference();
37+
}
38+
});
39+
records.addAll(newRecords);
40+
}
41+
42+
public List<Record<Event>> getRecords() {
43+
return records;
44+
}
1045
}
1146

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,22 @@ public void testForwardToPipelinesWithPipelineMap() {
8686
when(record.getData()).thenReturn(event);
8787
when(event.getMetadata()).thenReturn(eventMetadata);
8888
Collection<Record<Event>> records = Collections.singletonList(record);
89-
SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext.prepareRecordsForForwarding(records);
89+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
9090

91-
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
91+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
92+
verify(forwardPipeline1, times(0)).sendEvents(eq(records));
93+
verify(forwardPipeline2, times(0)).sendEvents(eq(records));
94+
sinkForwardRecordsContext.addRecords(records);
95+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of("datakey1", "datavalue1"), Map.of("metadataKey1", "metadataValue1")), equalTo(true));
9296
verify(forwardPipeline1, times(1)).sendEvents(eq(records));
9397
verify(forwardPipeline2, times(1)).sendEvents(eq(records));
9498
verify(event, times(1)).put(any(String.class), any(Object.class));
9599
verify(event, times(1)).getMetadata();
96100
verify(eventMetadata, times(1)).setAttribute(any(String.class), any(Object.class));
97-
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, null, null), equalTo(true));
101+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, null), equalTo(true));
98102
verify(forwardPipeline1, times(2)).sendEvents(eq(records));
99103
verify(forwardPipeline2, times(2)).sendEvents(eq(records));
100-
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of(), Map.of()), equalTo(true));
101-
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
102-
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
103-
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, null, Map.of(), Map.of()), equalTo(true));
104+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(true));
104105
verify(forwardPipeline1, times(3)).sendEvents(eq(records));
105106
verify(forwardPipeline2, times(3)).sendEvents(eq(records));
106107
}
@@ -118,9 +119,15 @@ public void testForwardToPipelinesWithPipelineMapAndFailureCases() {
118119
assertThat(forwardToPipelines.get("forward-pipeline2"), equalTo(null));
119120
HeadlessPipeline forwardPipeline1 = mock(HeadlessPipeline.class);
120121
assertThrows(RuntimeException.class, () -> sinkContext.setForwardToPipelines(Map.of("forward-pipeline1", forwardPipeline1)));
121-
Collection<Record<Event>> records = mock(Collection.class);
122-
SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext.prepareRecordsForForwarding(records);
123-
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of(), Map.of()), equalTo(false));
122+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
123+
Record<Event> record = mock(Record.class);
124+
Event event = mock(Event.class);
125+
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
126+
doNothing().when(eventHandle).acquireReference();
127+
when(record.getData()).thenReturn(event);
128+
when(event.getEventHandle()).thenReturn(eventHandle);
129+
sinkForwardRecordsContext.addRecords(List.of(record));
130+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false));
124131
}
125132

126133
@Test
@@ -131,9 +138,16 @@ public void testWithNoForwardToPipelines() {
131138
final List<String> testExcludeKeys = Collections.emptyList();
132139
final List<String> testForwardToPipelineNames = Collections.emptyList();
133140
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testForwardToPipelineNames);
134-
Collection<Record<Event>> records = mock(Collection.class);
135-
SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext.prepareRecordsForForwarding(records);
136-
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, records, Map.of(), Map.of()), equalTo(false));
141+
Record<Event> record = mock(Record.class);
142+
Event event = mock(Event.class);
143+
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
144+
doNothing().when(eventHandle).acquireReference();
145+
when(record.getData()).thenReturn(event);
146+
when(event.getEventHandle()).thenReturn(eventHandle);
147+
148+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
149+
sinkForwardRecordsContext.addRecords(List.of(record));
150+
assertThat(sinkContext.forwardRecords(sinkForwardRecordsContext, Map.of(), Map.of()), equalTo(false));
137151
}
138152
}
139153

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.sink;
7+
8+
import org.junit.jupiter.api.Test;
9+
10+
import org.opensearch.dataprepper.model.record.Record;
11+
import org.opensearch.dataprepper.model.event.Event;
12+
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
13+
14+
import static org.mockito.Mockito.mock;
15+
import static org.hamcrest.MatcherAssert.assertThat;
16+
import static org.hamcrest.Matchers.equalTo;
17+
import static org.mockito.Mockito.doNothing;
18+
import static org.mockito.Mockito.when;
19+
20+
import java.util.List;
21+
22+
public class SinkForwardRecordsContextTest {
23+
24+
SinkForwardRecordsContext sinkForwardRecordsContext;
25+
26+
@Test
27+
public void testSinkForwardRecordContextBasic() {
28+
sinkForwardRecordsContext = new SinkForwardRecordsContext();
29+
Event event = mock(Event.class);
30+
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
31+
doNothing().when(eventHandle).acquireReference();
32+
Record<Event> record1 = mock(Record.class);
33+
Record<Event> record2 = mock(Record.class);
34+
Record<Event> record3 = mock(Record.class);
35+
when(record1.getData()).thenReturn(event);
36+
when(record2.getData()).thenReturn(event);
37+
when(record3.getData()).thenReturn(event);
38+
when(event.getEventHandle()).thenReturn(eventHandle);
39+
sinkForwardRecordsContext.addRecord(record1);
40+
sinkForwardRecordsContext.addRecords(List.of(record2, record3));
41+
List<Record<Event>> records = sinkForwardRecordsContext.getRecords();
42+
assertThat(records.size(), equalTo(3));
43+
}
44+
}
45+

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.dataprepper.model.record.Record;
1414
import org.opensearch.dataprepper.model.sink.Sink;
1515
import org.opensearch.dataprepper.model.sink.SinkContext;
16+
import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext;
1617
import org.slf4j.Logger;
1718
import org.slf4j.LoggerFactory;
1819

@@ -68,7 +69,7 @@ public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkConte
6869

6970
@Override
7071
public void output(final Collection<Record<Object>> records) {
71-
sinkContext.prepareForForwardPipelines(records);
72+
SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext();
7273
lock.lock();
7374
Collection<Record<Event>> events = new ArrayList<>();
7475

@@ -77,14 +78,19 @@ public void output(final Collection<Record<Object>> records) {
7778
return;
7879

7980
for (final Record<Object> record : records) {
81+
if (record.getData() instanceof Event) {
82+
Event event = (Event)record.getData();
83+
sinkForwardRecordsContext.addRecord(new Record<>(event));
84+
}
85+
8086
try {
8187
checkTypeAndWriteObject(record.getData(), writer);
8288
} catch (final IOException ex) {
8389
throw new RuntimeException(format("Encountered exception writing to file %s", outputFilePath), ex);
8490
}
8591
}
8692

87-
sinkContext.forwardRecords(events, null, null);
93+
sinkContext.forwardRecords(sinkForwardRecordsContext, null, null);
8894
try {
8995
writer.flush();
9096
} catch (final IOException ex) {

0 commit comments

Comments
 (0)