@@ -26,13 +26,15 @@ import scala.concurrent.{Await, Future}
2626import scala .language .postfixOps
2727import scala .reflect .io .Path
2828import scala .util .{Failure , Success , Try }
29+
2930import com .gemstone .gemfire .internal .cache .PartitionedRegion
3031import com .pivotal .gemfirexd .internal .engine .Misc
3132import io .snappydata .core .{TestData , TestData2 }
32- import io .snappydata .test .dunit .{AvailablePortHelper , SerializableRunnable }
33+ import io .snappydata .test .dunit .{AvailablePortHelper , DistributedTestBase , SerializableRunnable }
3334import io .snappydata .util .TestUtils
3435import io .snappydata .{ColumnUpdateDeleteTests , ConcurrentOpsTests , Property , SnappyTableStatsProviderService }
3536import org .junit .Assert
37+
3638import org .apache .spark .rdd .RDD
3739import org .apache .spark .sql ._
3840import org .apache .spark .sql .catalyst .encoders .{ExpressionEncoder , RowEncoder }
@@ -254,7 +256,7 @@ class SplitSnappyClusterDUnitTest(s: String)
254256 if (jars.count() > 0 ) {
255257 var str = msg
256258 jars.collect().foreach(x => str += s " $x, " )
257- assert(false , str)
259+ assert(assertion = false , str)
258260 }
259261 }
260262
@@ -400,12 +402,12 @@ class SplitSnappyClusterDUnitTest(s: String)
400402
401403 try {
402404 // wait till the smart connector job perform at-least one putInto operation
403- var count = 0
404- while (snc.table( " T5 " ).count() == 0 && count < 10 ) {
405- Thread .sleep( 4000 )
406- count += 1
407- }
408- assert(count != 10 , " Smart connector application not performing putInto as expected. " )
405+ DistributedTestBase .waitForCriterion( new DistributedTestBase . WaitCriterion {
406+
407+ override def description () : String = " waiting for putInto from smart connector "
408+
409+ override def done () : Boolean = snc.table( " T5 " ).count() > 0
410+ }, 60000 , 500 , true )
409411
410412 // perform DDL
411413 snc.sql(s " CREATE TABLE T6(COL1 STRING, COL2 STRING) " +
@@ -435,12 +437,12 @@ class SplitSnappyClusterDUnitTest(s: String)
435437 startArgs :+ Int .box(locatorClientPort) :+ testTempDirectory)
436438 }
437439 try {
438- var attempts = 0
439- while ( ! Files .exists( Paths .get(testTempDirectory, " file0 " )) && attempts < 15 ) {
440- Thread .sleep( 4000 )
441- attempts += 1
442- }
443- assert(attempts < 14 , " No data ingested by streaming application. " )
440+ DistributedTestBase .waitForCriterion( new DistributedTestBase . WaitCriterion {
441+
442+ override def description () : String = " no data ingested by streaming application "
443+
444+ override def done () : Boolean = Files .exists( Paths .get(testTempDirectory, " file0 " ))
445+ }, 60000 , 500 , true )
444446
445447 // perform DDL leading to stale catalog in smart connector application
446448 snc.sql(s " CREATE TABLE SYNC_TABLE(COL1 STRING) " + s " USING column " )
@@ -470,12 +472,12 @@ class SplitSnappyClusterDUnitTest(s: String)
470472
471473 try {
472474 // wait till the smart connector job perform at-least one putInto operation
473- var count = 0
474- while (snc.table( " T5 " ).count() == 3 && count < 10 ) {
475- Thread .sleep( 4000 )
476- count += 1
477- }
478- assert(count != 10 , " Smart connector application not performing putInto as expected. " )
475+ DistributedTestBase .waitForCriterion( new DistributedTestBase . WaitCriterion {
476+
477+ override def description () : String = " waiting for putInto from smart connector "
478+
479+ override def done () : Boolean = snc.table( " T5 " ).count() != 3
480+ }, 60000 , 500 , true )
479481
480482 // perform DDL
481483 snc.sql(s " CREATE TABLE T6(COL1 STRING, COL2 STRING) " +
@@ -504,12 +506,12 @@ class SplitSnappyClusterDUnitTest(s: String)
504506
505507 try {
506508 // wait till the smart connector job perform at-least one putInto operation
507- var count = 0
508- while (snc.table( " T5 " ).count() == 3 && count < 10 ) {
509- Thread .sleep( 4000 )
510- count += 1
511- }
512- assert(count != 10 , " Smart connector application not performing putInto as expected. " )
509+ DistributedTestBase .waitForCriterion( new DistributedTestBase . WaitCriterion {
510+
511+ override def description () : String = " waiting for putInto from smart connector "
512+
513+ override def done () : Boolean = snc.table( " T5 " ).count() != 3
514+ }, 60000 , 500 , true )
513515
514516 // rebalance the buckets
515517 snc.sql(s " CALL SYS.REBALANCE_ALL_BUCKETS() " )
@@ -551,12 +553,12 @@ class SplitSnappyClusterDUnitTest(s: String)
551553
552554 try {
553555 // wait till the smart connector job perform at-least one putInto operation
554- var count = 0
555- while (snc.table( " T5 " ).count() == 3 && count < 10 ) {
556- Thread .sleep( 4000 )
557- count += 1
558- }
559- assert(count != 10 , " Smart connector application not performing insert as expected. " )
556+ DistributedTestBase .waitForCriterion( new DistributedTestBase . WaitCriterion {
557+
558+ override def description () : String = " waiting for insertInto from smart connector "
559+
560+ override def done () : Boolean = snc.table( " T5 " ).count() != 3
561+ }, 60000 , 500 , true )
560562
561563 logInfo(" testInsertQueryAfterStaleCatalog dropping table t5" )
562564 // drop the table and create a table with same name and different schema
@@ -592,12 +594,12 @@ class SplitSnappyClusterDUnitTest(s: String)
592594
593595 try {
594596 // wait till the smart connector job perform at-least one putInto operation
595- var count = 0
596- while (snc.table( " T6 " ).count() == 3 && count < 10 ) {
597- Thread .sleep( 4000 )
598- count += 1
599- }
600- assert(count != 10 , " Smart connector application not performing delete as expected. " )
597+ DistributedTestBase .waitForCriterion( new DistributedTestBase . WaitCriterion {
598+
599+ override def description () : String = " waiting for delete from smart connector "
600+
601+ override def done () : Boolean = snc.table( " T6 " ).count() != 3
602+ }, 60000 , 500 , true )
601603
602604 logInfo(" testDeleteAfterStaleCatalog dropping table t6" )
603605 snc.sql(" drop table t6" )
@@ -628,12 +630,12 @@ class SplitSnappyClusterDUnitTest(s: String)
628630
629631 try {
630632 // wait till the smart connector job perform at-least one putInto operation
631- var count = 0
632- while (snc.table( " T7 " ).count() == 3 && count < 10 ) {
633- Thread .sleep( 4000 )
634- count += 1
635- }
636- assert(count != 10 , " Smart connector application not performing delete as expected. " )
633+ DistributedTestBase .waitForCriterion( new DistributedTestBase . WaitCriterion {
634+
635+ override def description () : String = " waiting for delete from smart connector "
636+
637+ override def done () : Boolean = snc.table( " T7 " ).count() != 3
638+ }, 60000 , 500 , true )
637639
638640 snc.sql(s " CREATE TABLE T8(COL1 DATE, COL2 DATE) USING column OPTIONS " +
639641 s " (key_columns 'COL1', PARTITION_BY 'COL1', COLUMN_MAX_DELTA_ROWS '1') " )
@@ -665,7 +667,7 @@ object SplitSnappyClusterDUnitTest
665667 s " cached Hive catalog " )
666668 } catch {
667669 // expected exception
668- case _ : org.apache.spark.sql. TableNotFoundException =>
670+ case _ : AnalysisException =>
669671 }
670672 }
671673
@@ -1289,7 +1291,7 @@ object SplitSnappyClusterDUnitTest
12891291 Thread .sleep(6000 )
12901292 try {
12911293 for (_ <- 1 to 20 ) {
1292- Thread .sleep(500 )
1294+ Thread .sleep(200 )
12931295 logInfo(" calling dataFrame.write.insertInto(\" T5\" )" )
12941296 logInfo(" 2. schema is = " + snc.table(" T5" ).schema)
12951297 dataFrame2.write.insertInto(" T5" )
@@ -1315,9 +1317,8 @@ object SplitSnappyClusterDUnitTest
13151317 success = true
13161318 } catch {
13171319 // if table is not created yet on embedded cluster,
1318- // TableNotFoundException can be seen; retry in
1319- // such a case
1320- case t : TableNotFoundException =>
1320+ // table may not be found ; retry in such a case
1321+ case t : AnalysisException =>
13211322 retryCount = retryCount + 1
13221323 if (retryCount == maxRetryAttempts) {
13231324 throw t
@@ -1338,7 +1339,7 @@ object SplitSnappyClusterDUnitTest
13381339 Thread .sleep(6000 )
13391340 try {
13401341 for (_ <- 1 to 20 ) {
1341- Thread .sleep(500 )
1342+ Thread .sleep(200 )
13421343 snc.sql(" delete from t6 where col1 like '2%'" )
13431344 }
13441345 Assert .fail(" Should have thrown CatalogStaleException." )
@@ -1362,7 +1363,7 @@ object SplitSnappyClusterDUnitTest
13621363 Thread .sleep(6000 )
13631364 try {
13641365 for (_ <- 1 to 20 ) {
1365- Thread .sleep(500 )
1366+ Thread .sleep(200 )
13661367 snc.sql(" update t7 set col2 = '22' where col1 = '2'" )
13671368 }
13681369 Assert .fail(" Should have thrown CatalogStaleException." )
@@ -1424,13 +1425,12 @@ object SplitSnappyClusterDUnitTest
14241425 }
14251426
14261427 // wait till DDL is fired on snappy cluster which will lead to stale smart-connector catalog
1427- var attempts = 0
1428- while (! Files .exists(Paths .get(testTempDir, " file1" )) && attempts < 15 ) {
1429- Thread .sleep(4000 )
1430- attempts += 1
1431- }
1428+ DistributedTestBase .waitForCriterion(new DistributedTestBase .WaitCriterion {
1429+
1430+ override def description (): String = " waiting for stale catalog timed out"
14321431
1433- assert(attempts < 14 , " Waiting for stale catalog timed out" )
1432+ override def done (): Boolean = Files .exists(Paths .get(testTempDir, " file1" ))
1433+ }, 60000 , 500 , true )
14341434
14351435 // produce second batch of data
14361436 val dataBatch2 = Seq (Seq (3 , " name3" , 20 ))
0 commit comments