@@ -86,21 +86,22 @@ public void testForwardToPipelinesWithPipelineMap() {
8686 when (record .getData ()).thenReturn (event );
8787 when (event .getMetadata ()).thenReturn (eventMetadata );
8888 Collection <Record <Event >> records = Collections .singletonList (record );
89- SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext . prepareRecordsForForwarding ( records );
89+ SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext ( );
9090
91- assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , records , Map .of ("datakey1" , "datavalue1" ), Map .of ("metadataKey1" , "metadataValue1" )), equalTo (true ));
91+ assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , Map .of ("datakey1" , "datavalue1" ), Map .of ("metadataKey1" , "metadataValue1" )), equalTo (true ));
92+ verify (forwardPipeline1 , times (0 )).sendEvents (eq (records ));
93+ verify (forwardPipeline2 , times (0 )).sendEvents (eq (records ));
94+ sinkForwardRecordsContext .addRecords (records );
95+ assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , Map .of ("datakey1" , "datavalue1" ), Map .of ("metadataKey1" , "metadataValue1" )), equalTo (true ));
9296 verify (forwardPipeline1 , times (1 )).sendEvents (eq (records ));
9397 verify (forwardPipeline2 , times (1 )).sendEvents (eq (records ));
9498 verify (event , times (1 )).put (any (String .class ), any (Object .class ));
9599 verify (event , times (1 )).getMetadata ();
96100 verify (eventMetadata , times (1 )).setAttribute (any (String .class ), any (Object .class ));
97- assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , records , null , null ), equalTo (true ));
101+ assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , null , null ), equalTo (true ));
98102 verify (forwardPipeline1 , times (2 )).sendEvents (eq (records ));
99103 verify (forwardPipeline2 , times (2 )).sendEvents (eq (records ));
100- assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , records , Map .of (), Map .of ()), equalTo (true ));
101- verify (forwardPipeline1 , times (3 )).sendEvents (eq (records ));
102- verify (forwardPipeline2 , times (3 )).sendEvents (eq (records ));
103- assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , null , Map .of (), Map .of ()), equalTo (true ));
104+ assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , Map .of (), Map .of ()), equalTo (true ));
104105 verify (forwardPipeline1 , times (3 )).sendEvents (eq (records ));
105106 verify (forwardPipeline2 , times (3 )).sendEvents (eq (records ));
106107 }
@@ -118,9 +119,15 @@ public void testForwardToPipelinesWithPipelineMapAndFailureCases() {
118119 assertThat (forwardToPipelines .get ("forward-pipeline2" ), equalTo (null ));
119120 HeadlessPipeline forwardPipeline1 = mock (HeadlessPipeline .class );
120121 assertThrows (RuntimeException .class , () -> sinkContext .setForwardToPipelines (Map .of ("forward-pipeline1" , forwardPipeline1 )));
121- Collection <Record <Event >> records = mock (Collection .class );
122- SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext .prepareRecordsForForwarding (records );
123- assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , records , Map .of (), Map .of ()), equalTo (false ));
122+ SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext ();
123+ Record <Event > record = mock (Record .class );
124+ Event event = mock (Event .class );
125+ DefaultEventHandle eventHandle = mock (DefaultEventHandle .class );
126+ doNothing ().when (eventHandle ).acquireReference ();
127+ when (record .getData ()).thenReturn (event );
128+ when (event .getEventHandle ()).thenReturn (eventHandle );
129+ sinkForwardRecordsContext .addRecords (List .of (record ));
130+ assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , Map .of (), Map .of ()), equalTo (false ));
124131 }
125132
126133 @ Test
@@ -131,9 +138,16 @@ public void testWithNoForwardToPipelines() {
131138 final List <String > testExcludeKeys = Collections .emptyList ();
132139 final List <String > testForwardToPipelineNames = Collections .emptyList ();
133140 sinkContext = new SinkContext (testTagsTargetKey , testRoutes , testIncludeKeys , testExcludeKeys , testForwardToPipelineNames );
134- Collection <Record <Event >> records = mock (Collection .class );
135- SinkForwardRecordsContext sinkForwardRecordsContext = sinkContext .prepareRecordsForForwarding (records );
136- assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , records , Map .of (), Map .of ()), equalTo (false ));
141+ Record <Event > record = mock (Record .class );
142+ Event event = mock (Event .class );
143+ DefaultEventHandle eventHandle = mock (DefaultEventHandle .class );
144+ doNothing ().when (eventHandle ).acquireReference ();
145+ when (record .getData ()).thenReturn (event );
146+ when (event .getEventHandle ()).thenReturn (eventHandle );
147+
148+ SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext ();
149+ sinkForwardRecordsContext .addRecords (List .of (record ));
150+ assertThat (sinkContext .forwardRecords (sinkForwardRecordsContext , Map .of (), Map .of ()), equalTo (false ));
137151 }
138152}
139153
0 commit comments