@@ -126,8 +126,9 @@ private void insert_data(long timestamp)
126126 session_src .insertTablet (tablet );
127127 }
128128
129- private void consume_data_noCommit (SubscriptionTreePullConsumer consumer , Session session )
129+ private int consume_data_noCommit (SubscriptionTreePullConsumer consumer , Session session )
130130 throws InterruptedException , StatementExecutionException , IoTDBConnectionException {
131+ int rowConsumed = 0 ;
131132 while (true ) {
132133 Thread .sleep (1000 );
133134 List <SubscriptionMessage > messages = consumer .poll (Duration .ofMillis (10000 ));
@@ -149,11 +150,15 @@ private void consume_data_noCommit(SubscriptionTreePullConsumer consumer, Sessio
149150 .map (IMeasurementSchema ::getMeasurementName )
150151 .collect (Collectors .toList ()));
151152 session .insertTablet (tablet );
153+ rowConsumed +=
154+ (int )
155+ Arrays .stream (tablet .getTimestamps (), 0 , tablet .getRowSize ()).distinct ().count ();
152156 System .out .println (
153157 FORMAT .format (new Date ()) + " consume data no commit:" + tablet .getRowSize ());
154158 }
155159 }
156160 }
161+ return rowConsumed ;
157162 }
158163
159164 @ Test
@@ -183,13 +188,14 @@ public void do_test()
183188
184189 // Subscribe and then write data
185190 insert_data (System .currentTimeMillis ());
186- consume_data_noCommit (consumer , session_dest2 );
191+ // a leader change may cause a progress-rollback, resulting in consuming more data
192+ int dest2Consumed = consume_data_noCommit (consumer , session_dest2 );
187193 System .out .println ("src sql1: " + getCount (session_src , sql1 ));
188194 System .out .println ("dest sql1: " + getCount (session_dest , sql1 ));
189195 System .out .println ("dest2 sql1: " + getCount (session_dest2 , sql1 ));
190196 check_count (4 , sql1 , "dest consume subscription data 2:s_0" );
191- check_count2 (4 , sql1 , "dest2 consumption subscription data 2:s_0" );
192- check_count2 (4 , sql2 , "dest2 consumption subscription data 2:s_1" );
197+ check_count2 (dest2Consumed , sql1 , "dest2 consumption subscription data 2:s_0" );
198+ check_count2 (dest2Consumed , sql2 , "dest2 consumption subscription data 2:s_1" );
193199
194200 // insert_data(1706659300000L); //2024-01-31 08:00:00+08:00
195201 // Will consume again
@@ -199,6 +205,6 @@ public void do_test()
199205 System .out .println ("dest2 sql1: " + getCount (session_dest2 , sql1 ));
200206 check_count (4 , sql1 , "dest consumption subscription before data3:s_0" );
201207 check_count (4 , sql2 , "dest consume subscription before data3:s_1" );
202- check_count2 (4 , sql2 , "dest2 consumption subscription before count 3 data:s_1" );
208+ check_count2 (dest2Consumed , sql2 , "dest2 consumption subscription before count 3 data:s_1" );
203209 }
204210}
0 commit comments