@@ -66,6 +66,7 @@ public void setUp() {
6666 protected void setupConfig () {
6767 super .setupConfig ();
6868 senderEnv .getConfig ().getDataNodeConfig ().setEnableRestService (true );
69+ senderEnv .getConfig ().getCommonConfig ().setPipeAutoSplitFullEnabled (true );
6970 }
7071
7172 @ Test
@@ -106,9 +107,6 @@ public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
106107
107108 Assert .assertEquals (TSStatusCode .SUCCESS_STATUS .getStatusCode (), status .getCode ());
108109
109- Assert .assertEquals (
110- TSStatusCode .SUCCESS_STATUS .getStatusCode (), client .startPipe ("testPipe" ).getCode ());
111-
112110 // Do not fail if the failure has nothing to do with pipe
113111 // Because the failures will randomly generate due to resource limitation
114112 TestUtils .executeNonQueries (
@@ -173,9 +171,6 @@ private void testSinkFormat(final String format) throws Exception {
173171 .setProcessorAttributes (processorAttributes ))
174172 .getCode ());
175173
176- Assert .assertEquals (
177- TSStatusCode .SUCCESS_STATUS .getStatusCode (), client .startPipe ("testPipe" ).getCode ());
178-
179174 // Do not fail if the failure has nothing to do with pipe
180175 // Because the failures will randomly generate due to resource limitation
181176 TestUtils .executeNonQueries (
@@ -237,9 +232,6 @@ public void testLegacySink() throws Exception {
237232
238233 try (final SyncConfigNodeIServiceClient client =
239234 (SyncConfigNodeIServiceClient ) senderEnv .getLeaderConfigNodeConnection ()) {
240- Assert .assertEquals (
241- TSStatusCode .SUCCESS_STATUS .getStatusCode (), client .startPipe ("testPipe" ).getCode ());
242-
243235 // Do not fail if the failure has nothing to do with pipe
244236 // Because the failures will randomly generate due to resource limitation
245237 TestUtils .executeNonQueries (
@@ -417,9 +409,6 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
417409 .setProcessorAttributes (processorAttributes ))
418410 .getCode ());
419411
420- Assert .assertEquals (
421- TSStatusCode .SUCCESS_STATUS .getStatusCode (), client .startPipe ("testPipe" ).getCode ());
422-
423412 // Do not fail if the failure has nothing to do with pipe
424413 // Because the failures will randomly generate due to resource limitation
425414 TestUtils .executeNonQueries (
@@ -513,9 +502,6 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
513502 .setProcessorAttributes (processorAttributes ))
514503 .getCode ());
515504
516- Assert .assertEquals (
517- TSStatusCode .SUCCESS_STATUS .getStatusCode (), client .startPipe ("testPipe" ).getCode ());
518-
519505 TestUtils .executeNonQueries (
520506 senderEnv ,
521507 Arrays .asList ("insert into root.vehicle.d0(time, s1) values (2, 1)" , "flush" ),
@@ -577,4 +563,62 @@ public void testSpecialPartialInsert() throws Exception {
577563 "1635232151960,null,null,2.0,2.1,null," ,
578564 "1635232143960,6.0,4.0,null,null,null," )));
579565 }
566+
567+ @ Test
568+ public void testTransferMods () {
569+ try {
570+ TestUtils .executeNonQueries (
571+ senderEnv ,
572+ Arrays .asList (
573+ "create database root.sg_nonAligned" ,
574+ "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 with datatype=boolean, encoding=RLE,compressor=snappy" ,
575+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 with datatype=int32, encoding=PLAIN,compressor=LZ4" ,
576+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 with datatype=int64,encoding=gorilla,compressor=uncompressed" ,
577+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 with datatype=float,encoding=chimp,compressor=gzip" ,
578+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 with datatype=double,encoding=ts_2diff,compressor=zstd" ,
579+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 with datatype=text,encoding=dictionary,compressor=lzma2" ,
580+ "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, s2,s3,s4,s0,s5) values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.39,85.51234,false,'第9条device_nonAligned'),(91,10,100,5.39,95.51234,false,'第10条device_nonAligned'),(101,11,110,5.39,105.51234,false,'第11条device_nonAligned'),(111,12,120,5.39,115.51234,false,'第12条device_nonAligned'),(121,13,130,5.39,125.51234,false,'第13条device_nonAligned'),(131,14,140,5.39,135.51234,false,'第14条device_nonAligned'),(141,15,150,5.39,145.51234,false,'第15条'),(151,16,160,5.39,155.51234,false,'第16条'),(161,17,170,5.39,165.51234,false,'第17条'),(171,18,180,5.39,175.51234,false,'第18条'),(181,19,190,5.39,185.51234,false,'第19条'),(191,20,200,5.39,195.51234,false,'第20条'),(201,21,210,5.39,null,false,'第21条')" ,
581+ "flush" ,
582+ "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0" ,
583+ String .format (
584+ "create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')" ,
585+ receiverEnv .getDataNodeWrapperList ().get (0 ).getIpAndPortString ())));
586+
587+ TestUtils .assertDataEventuallyOnEnv (
588+ receiverEnv ,
589+ "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*" ,
590+ "count(timeseries)," ,
591+ Collections .singleton ("5," ));
592+
593+ TestUtils .executeNonQueries (
594+ senderEnv , Arrays .asList ("drop pipe test_history" , "drop pipe test_realtime" ));
595+
596+ TestUtils .executeNonQuery (receiverEnv , "drop database root.**" );
597+
598+ TestUtils .executeNonQueries (
599+ senderEnv ,
600+ Arrays .asList (
601+ "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1" ,
602+ String .format (
603+ "create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')" ,
604+ receiverEnv .getDataNodeWrapperList ().get (0 ).getIpAndPortString ())));
605+
606+ TestUtils .assertDataEventuallyOnEnv (
607+ receiverEnv ,
608+ "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*" ,
609+ "count(timeseries)," ,
610+ Collections .singleton ("4," ),
611+ 15 );
612+ TestUtils .assertDataAlwaysOnEnv (
613+ receiverEnv ,
614+ "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*" ,
615+ "count(timeseries)," ,
616+ Collections .singleton ("4," ));
617+ } finally {
618+ TestUtils .executeNonQueries (
619+ senderEnv ,
620+ Arrays .asList (
621+ "drop pipe test_history" , "drop pipe test_realtime" , "drop database root.**" ));
622+ }
623+ }
580624}
0 commit comments