@@ -6863,6 +6863,52 @@ public void test_MultithreadedTableWriter_Dimension_enableActualSendTime_true()
68636863 assertEquals (true , sendTime .getNanoTimestamp ().getNano () > now .getNano ());
68646864 assertEquals (true , sendTime .getNanoTimestamp ().getNano () < now1 .getNano ());
68656865 }
6866+
6867+ @ Test (timeout = 120000 )//server support setStreamTableTimestamp
6868+ public void test_MultithreadedTableWriter_enableActualSendTime_setStreamTableTimestamp () throws Exception {
6869+ StringBuilder sb = new StringBuilder ();
6870+ sb .append ("t = streamTable(1000:0, `id`enableActualSendTime`streamTableTimestamp," +
6871+ "[INT,NANOTIMESTAMP,NANOTIMESTAMP]);" +
6872+ "setStreamTableTimestamp(t, `streamTableTimestamp);" +
6873+ "share t as t1;" );
6874+ conn .run (sb .toString ());
6875+ mutithreadTableWriter_ = new MultithreadedTableWriter (HOST , PORT , "admin" , "123456" ,
6876+ "" , "t1" , false , false , null , 1 , 1 ,
6877+ 1 , "id" ,true );
6878+
6879+ ErrorCodeInfo pErrorInfo = mutithreadTableWriter_ .insert ( 1 );
6880+ assertEquals ("code= info=" ,pErrorInfo .toString ());
6881+ mutithreadTableWriter_ .waitForThreadCompletion ();
6882+ BasicTable bt = (BasicTable ) conn .run ("select * from t1;" );
6883+ assertEquals (1 , bt .rows ());
6884+ System .out .println (bt .getString ());
6885+ Entity re = conn .run ("t[`enableActualSendTime][0]<t[`streamTableTimestamp][0]" );
6886+ assertEquals ("true" , re .getString ());
6887+ conn .run ("undef(`t1,SHARED)" );
6888+ }
6889+
6890+ //@Test(timeout = 120000)//server not support setStreamTableTimestamp
6891+ public void test_MultithreadedTableWriter_enableActualSendTime_not_support_setStreamTableTimestamp () throws Exception {
6892+ DBConnection conn = new DBConnection ();
6893+ conn .connect (HOST , 8878 , "admin" , "123456" );
6894+ StringBuilder sb = new StringBuilder ();
6895+ sb .append ("t = streamTable(1000:0, `id`enableActualSendTime`streamTableTimestamp," +
6896+ "[INT,NANOTIMESTAMP,NANOTIMESTAMP]);" +
6897+ "setStreamTableTimestamp(t, `streamTableTimestamp);" +
6898+ "share t as t1;" );
6899+ conn .run (sb .toString ());
6900+ mutithreadTableWriter_ = new MultithreadedTableWriter (HOST , 8878 , "admin" , "123456" ,
6901+ "" , "t1" , false , false , null , 1 , 1 ,
6902+ 1 , "id" ,true );
6903+
6904+ ErrorCodeInfo pErrorInfo = mutithreadTableWriter_ .insert ( 1 );
6905+ assertEquals ("code=A2 info=Column counts don't match." ,pErrorInfo .toString ());
6906+ // mutithreadTableWriter_.waitForThreadCompletion();
6907+ // BasicTable bt = (BasicTable) conn.run("select * from t1;");
6908+ // assertEquals(1, bt.rows());
6909+ conn .run ("undef(`t1,SHARED)" );
6910+ }
6911+
68666912 //@Test(timeout = 120000)
68676913 public void test_MultithreadedTableWriter_allDataType_null () throws Exception {
68686914 List <String > colNames = new ArrayList <String >();
@@ -7029,32 +7075,64 @@ public void test_MultithreadedTableWriter_allDataType_array_null() throws Except
70297075// mutithreadTableWriter_.waitForThreadCompletion();
70307076// }
70317077
7032- // @Test(timeout = 120000)
7033- // public void test_mtw_enableHighAvailability_true() throws Exception {
7034- // DBConnection conn1 = new DBConnection();
7035- // conn1.connect(HOST,8802,"admin","123456");
7036- // conn1.run("share table(10:0,`id`price`val,[INT,DOUBLE,INT]) as table1;\n");
7037- //
7038- // DBConnection conn2 = new DBConnection();
7039- // conn2.connect(HOST,8803,"admin","123456");
7040- // conn2.run("share table(10:0,`id`price`val,[INT,DOUBLE,INT]) as table1;\n");
7041- //
7042- // //System.out.println("节点断掉");
7043- // //Thread.sleep(10000);
7044- // MultithreadedTableWriter mtw1 = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "", "table1",
7045- // false, true, ipports, 1000, 0.001f, 10, "id");
7046- // //检查线程连接情况
7047- // for(int i = 0;i <10000;i++) {
7048- // int tmp =5;
7049- // mtw1.insert(tmp, (double) tmp, 1);
7050- // Thread.sleep(100);
7051- // }
7052- // mtw1.waitForThreadCompletion();
7053- // //BasicInt writedData1 = (BasicInt) conn1.run("(exec count(*) from table1 where val = 1)[0]");
7054- // BasicInt writedData2 = (BasicInt) conn2.run("(exec count(*) from table1 where val = 1)[0]");
7055- // //System.out.println(writedData1);
7056- // System.out.println(writedData2);
7057- // }
7078+ //@Test(timeout = 120000)
7079+ public void test_mtw_enableHighAvailability_true () throws Exception {
7080+ DBConnection conn1 = new DBConnection ();
7081+ conn1 .connect (HOST ,8802 ,"admin" ,"123456" );
7082+ conn1 .run ("share table(10:0,`id`price`val,[INT,DOUBLE,INT]) as table1;\n " );
7083+
7084+ DBConnection conn2 = new DBConnection ();
7085+ conn2 .connect (HOST ,8803 ,"admin" ,"123456" );
7086+ conn2 .run ("share table(10:0,`id`price`val,[INT,DOUBLE,INT]) as table1;\n " );
7087+
7088+ System .out .println ("节点断掉" );
7089+ Thread .sleep (1000 );
7090+ MultithreadedTableWriter mtw1 = new MultithreadedTableWriter (HOST , PORT , "admin" , "123456" , "" , "table1" ,
7091+ false , true , ipports , 1000 , 0.001f , 1 , "id" );
7092+ //检查线程连接情况
7093+ for (int i = 0 ;i <1000 ;i ++) {
7094+ int tmp =5 ;
7095+ mtw1 .insert (tmp , (double ) tmp , 1 );
7096+ Thread .sleep (100 );
7097+ System .out .println ("循环次数:" +i );
7098+ }
7099+ mtw1 .waitForThreadCompletion ();
7100+ //BasicInt writedData1 = (BasicInt) conn1.run("(exec count(*) from table1 where val = 1)[0]");
7101+ BasicInt writedData2 = (BasicInt ) conn2 .run ("(exec count(*) from table1 where val = 1)[0]" );
7102+ //System.out.println(writedData1);
7103+ System .out .println (writedData2 );
7104+ }
7105+
7106+ //@Test//AJ-856
7107+ public void test_mtw_enableHighAvailability_true_append_dfs () throws Exception {
7108+ DBConnection conn1 = new DBConnection ();
7109+ conn1 .connect ("192.168.0.69" ,8802 ,"admin" ,"123456" );
7110+ conn1 .run ("t1= table(10:0,`id`price`val,[INT,DOUBLE,INT])\n " +
7111+ "dbPath = \" dfs://TSDB_mtw\" \n " +
7112+ "if(existsDatabase(dbPath)){dropDatabase(dbPath)}\n " +
7113+ "db = database(dbPath, VALUE, 0..1000, engine='TSDB')\n " +
7114+ "pt=db.createPartitionedTable(t1, \" pt\" ,`id, , [`id])" );
7115+
7116+ DBConnection conn2 = new DBConnection ();
7117+ conn2 .connect ("192.168.0.69" ,8803 ,"admin" ,"123456" );
7118+ DBConnection conn3 = new DBConnection ();
7119+ conn3 .connect ("192.168.0.69" ,8800 ,"admin" ,"123456" );
7120+
7121+ System .out .println ("节点断掉" );
7122+ Thread .sleep (1000 );
7123+ MultithreadedTableWriter mtw1 = new MultithreadedTableWriter ("192.168.0.69" , 8802 , "admin" , "123456" , "dfs://TSDB_mtw" , "pt" ,
7124+ false , true , new String []{"192.168.0.69:8802" ,"192.168.0.69:8803" }, 1 , 0.001f , 1 , "id" );
7125+ //检查线程连接情况
7126+ for (int i = 0 ;i <1000 ;i ++) {
7127+ int tmp =5 ;
7128+ mtw1 .insert (tmp , (double ) tmp , 1 );
7129+ Thread .sleep (100 );
7130+ System .out .println ("循环次数:" +i );
7131+ }
7132+ mtw1 .waitForThreadCompletion ();
7133+ BasicInt writedData2 = (BasicInt ) conn3 .run ("(exec count(*) from table1 where val = 1)[0]" );
7134+ System .out .println (writedData2 );
7135+ }
70587136
70597137// @Test not support
70607138// public void Test_MultithreadedTableWriter_iotAnyVector() throws Exception {
0 commit comments