Skip to content

Commit f451f4b

Browse files
author
sonal
committed
- Changing the way we compare dataframes, use dataframe apis.
1 parent ed512e7 commit f451f4b

4 files changed

Lines changed: 58 additions & 55 deletions

File tree

dtests/src/test/java/io/snappydata/hydra/adAnalytics/SnappyAdAnalyticsTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -375,15 +375,14 @@ protected void executeSnappyStreamingJob(Vector jobClassNames, String logFileNam
375375
Log.getLogWriter().info("JobID is : " + jobID);
376376
SnappyBB.getBB().getSharedMap().put(appName, jobID);
377377
for (int j = 0; j < 3; j++) {
378+
if (!getJobStatus(jobID)) {
379+
throw new TestException("Got Exception while executing streaming job. Please check " +
380+
"the job status output.");
381+
}
378382
try {
379383
Thread.sleep(10 * 1000);
380384
} catch (InterruptedException ie) {
381385
}
382-
getJobStatus(jobID);
383-
}
384-
if(!checkJobStatus(jobID)){
385-
throw new TestException("Got Exception while executing streaming job. Please check " +
386-
"the job status output.");
387386
}
388387
}
389388
}
@@ -596,7 +595,7 @@ public boolean getJobStatus(String jobID){
596595
return false;
597596
break;
598597
}
599-
} try { Thread.sleep(10*1000);} catch(InterruptedException ie) { }
598+
}
600599
} catch (IOException ie){
601600
Log.getLogWriter().info("Got exception while accessing current dir");
602601
}

dtests/src/test/java/io/snappydata/hydra/testDMLOps/SnappyDMLOpsUtil.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -970,8 +970,6 @@ public void performUpdate() {
970970
if (stmt.toUpperCase().contains("SELECT"))
971971
getAndExecuteSelect(dConn,stmt,true);
972972
Log.getLogWriter().info("Executing " + stmt + " on derby.");
973-
if (stmt.toUpperCase().contains("SELECT"))
974-
getAndExecuteSelect(dConn, stmt, true);
975973
int derbyRows = dConn.createStatement().executeUpdate(stmt);
976974
Log.getLogWriter().info("Updated " + derbyRows + " rows in derby.");
977975
if (numRows != derbyRows) {
@@ -1021,8 +1019,6 @@ public void performDelete() {
10211019
if (stmt.toUpperCase().contains("SELECT"))
10221020
getAndExecuteSelect(dConn,stmt,true);
10231021
Log.getLogWriter().info("Executing " + stmt + " on derby.");
1024-
if (stmt.toUpperCase().contains("SELECT"))
1025-
getAndExecuteSelect(dConn, stmt, true);
10261022
int derbyRows = dConn.createStatement().executeUpdate(stmt);
10271023
Log.getLogWriter().info("Deleted " + derbyRows + " rows in derby.");
10281024
if (numRows != derbyRows) {

dtests/src/test/scala/io/snappydata/hydra/SnappyTestUtils.scala

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ object SnappyTestUtils {
8282
pw: PrintWriter, sqlContext: SQLContext): Boolean = {
8383
var validationFailed = false
8484
var snappyDF: DataFrame = null
85-
snappyDF = snc.sql(sqlString)
85+
snappyDF = snc.sql(sqlString).cache()
8686
val snappyDFCount = snappyDF.count
8787
// scalastyle:off println
8888
pw.println(s"\n${logTime} Executing Query $queryNum ...")
@@ -99,7 +99,7 @@ object SnappyTestUtils {
9999
}
100100
var fullRSValidationFailed: Boolean = false
101101
if (validateFullResultSet) {
102-
val sparkDF = sqlContext.sql(sqlString)
102+
val sparkDF = sqlContext.sql(sqlString).cache()
103103
val sparkDFCount = sparkDF.count()
104104
if(snappyDFCount != sparkDFCount) {
105105
pw.println(s"Count difference observed in snappy and spark resultset for query " +
@@ -125,18 +125,16 @@ object SnappyTestUtils {
125125
def assertQuery(snc: SnappyContext, snappyDF: DataFrame, sparkDF: DataFrame, queryNum: String,
126126
pw: PrintWriter): Boolean = {
127127
var fullRSValidationFailed = false
128-
val snappyQueryFileName = s"Snappy_${queryNum}"
129-
val snappyDest: String = getQueryResultDir("snappyResults") +
130-
File.separator + snappyQueryFileName
128+
129+
val snappyResFileName = s"Snappy_${queryNum}"
130+
val snappyDest: String = getQueryResultDir("snappyResults") + File.separator + snappyResFileName
131131
// scalastyle:off println
132-
// pw.println(s"Snappy query results are at : ${snappyDest}")
133132
val snappyFile: File = new java.io.File(snappyDest)
134133

135-
val sparkQueryFileName = s"Spark_${queryNum}"
136-
val sparkDest: String = getQueryResultDir("sparkResults") + File.separator +
137-
sparkQueryFileName
138-
// pw.println(s"Spark query results are at : ${sparkDest}")
134+
val sparkResFileName = s"Spark_${queryNum}"
135+
val sparkDest: String = getQueryResultDir("sparkResults") + File.separator + sparkResFileName
139136
val sparkFile: File = new java.io.File(sparkDest)
137+
140138
try {
141139
if (!snappyFile.exists()) {
142140
// val snap_col1 = snappyDF.schema.fieldNames(0)
@@ -145,27 +143,52 @@ object SnappyTestUtils {
145143
writeToFile(snappyDF.repartition((1)), snappyDest, snc)
146144
pw.println(s"${logTime} Snappy result collected in : ${snappyDest}")
147145
}
146+
148147
if (!sparkFile.exists()) {
149148
// val col1 = sparkDF.schema.fieldNames(0)
150149
// val col = sparkDF.schema.fieldNames.filter(!_.equals(col1)).toSeq
151150
// sparkDF.repartition(1).sortWithinPartitions(col1, col: _*)
152151
writeToFile(sparkDF.repartition(1), sparkDest, snc)
153152
pw.println(s"${logTime} Spark result collected in : ${sparkDest}")
154153
}
155-
val missingDF = sparkDF.except(snappyDF).collectAsList()
156-
val unexpectedDF = snappyDF.except(sparkDF).collectAsList()
157-
if(missingDF.size() > 0 || unexpectedDF.size() > 0) {
158-
fullRSValidationFailed = true
159-
pw.println("Found mismatch in resultset")
160-
if(missingDF.size() > 0) {
161-
pw.println(s"The following ${missingDF.size} rows were missing in snappyDF:\n ")
162-
for(i <- 0 to missingDF.size())
163-
pw.println(missingDF.get(i))
154+
val expectedFile = sparkFile.listFiles.filter(_.getName.endsWith(".csv"))
155+
val sparkDF2 = snc.read.format("com.databricks.spark.csv")
156+
.option("header", "false")
157+
.option("inferSchema", "false")
158+
.option("nullValue", "NULL")
159+
.option("maxCharsPerColumn", "4096")
160+
.load(s"${expectedFile}")
161+
162+
val missingDF: Array[Row] = sparkDF2.except(snappyDF).sort(sparkDF2.columns(0)).collect()
163+
val unexpectedDF: Array[Row] = snappyDF.except(sparkDF2).sort(sparkDF2.columns(0)).collect()
164+
165+
val aStr = new StringBuilder
166+
if(missingDF.length > 0 || unexpectedDF.length > 0) {
167+
pw.println(s"Found mismatch in resultset for query ${queryNum}... ")
168+
if(missingDF.length > 0) {
169+
aStr.append(s"The following ${missingDF.size} rows were missing in snappyDF:\n ")
170+
for(i <- 0 to missingDF.size)
171+
aStr.append(missingDF(i) + "\n")
164172
}
165-
if(unexpectedDF.size() > 0) {
166-
pw.println(s"The following ${unexpectedDF.size} rows were unexpected in snappyDF:\n")
167-
for(i <- 0 to unexpectedDF.size())
168-
pw.println(unexpectedDF.get(i))
173+
if(unexpectedDF.length > 0) {
174+
aStr.append(s"The following ${unexpectedDF.size} rows were unexpected in snappyDF:\n")
175+
for(i <- 0 to unexpectedDF.size)
176+
aStr.append(unexpectedDF(i) + "\n")
177+
}
178+
179+
// check if the mismatch is due to decimal, and can be ignored
180+
if (unexpectedDF.length == missingDF.length) {
181+
for (i <- 0 until missingDF.size) {
182+
if (!isIgnorable(missingDF(i).toString, unexpectedDF(i).toString)) {
183+
fullRSValidationFailed = true
184+
}
185+
}
186+
pw.println("This mismatch can be ignored.")
187+
aStr.setLength(0) // data mismatch can be ignored
188+
}
189+
if(aStr.length > 0) {
190+
pw.println(aStr)
191+
fullRSValidationFailed = true
169192
}
170193
}
171194
// fullRSValidationFailed
@@ -182,7 +205,6 @@ object SnappyTestUtils {
182205
fullRSValidationFailed
183206
}
184207

185-
186208
def dataTypeConverter(row: Row): Row = {
187209
val md = row.toSeq.map {
188210
// case d: Double => "%18.1f".format(d).trim().toDouble
@@ -216,7 +238,7 @@ object SnappyTestUtils {
216238
})
217239
sb.toString()
218240
}).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option(
219-
"header", false).save(dest)
241+
"header", true).save(dest)
220242
}
221243

222244
/*

dtests/src/test/scala/io/snappydata/hydra/spva/SPVATestUtil.scala

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,34 +29,26 @@ object SPVATestUtil {
2929
def createAndLoadReplicatedTables(snc: SnappyContext): Unit = {
3030

3131
snc.sql(SPVAQueries.patients_table)
32-
SPVAQueries.patients(snc).write.insertInto("patient")
3332

3433
snc.sql(SPVAQueries.encounters_table)
35-
SPVAQueries.encounters(snc).write.insertInto("encounters")
3634

3735
snc.sql(SPVAQueries.allergies_table)
38-
SPVAQueries.allergies(snc).write.insertInto("allergies")
3936

4037
snc.sql(SPVAQueries.careplans_table)
41-
SPVAQueries.careplans(snc).write.insertInto("careplans")
4238

4339
snc.sql(SPVAQueries.conditions_table)
44-
SPVAQueries.conditions(snc).write.insertInto("conditions")
4540

4641
snc.sql(SPVAQueries.imaging_studies_table)
47-
SPVAQueries.imaging_studies(snc).write.insertInto("imaging_studies")
4842

4943
snc.sql(SPVAQueries.immunizations_table)
50-
SPVAQueries.immunizations(snc).write.insertInto("immunizations")
5144

5245
snc.sql(SPVAQueries.medications_table)
53-
SPVAQueries.medications(snc).write.insertInto("medications")
5446

5547
snc.sql(SPVAQueries.observations_table)
56-
SPVAQueries.observations(snc).write.insertInto("observations")
5748

5849
snc.sql(SPVAQueries.procedures_table)
59-
SPVAQueries.procedures(snc).write.insertInto("procedures")
50+
51+
loadTables(snc)
6052
}
6153

6254
def createAndLoadPartitionedTables(snc: SnappyContext): Unit = {
@@ -91,16 +83,7 @@ object SPVATestUtil {
9183
" colocate_with 'PATIENTS', buckets '12', redundancy '1', PERSISTENT 'sync', " +
9284
" EVICTION_BY 'LRUHEAPPERCENT')")
9385

94-
SPVAQueries.patients(snc).write.insertInto("patients")
95-
SPVAQueries.encounters(snc).write.insertInto("encounters")
96-
SPVAQueries.allergies(snc).write.insertInto("allergies")
97-
SPVAQueries.careplans(snc).write.insertInto("careplans")
98-
SPVAQueries.conditions(snc).write.insertInto("conditions")
99-
SPVAQueries.imaging_studies(snc).write.insertInto("imaging_studies")
100-
SPVAQueries.immunizations(snc).write.insertInto("immunizations")
101-
SPVAQueries.medications(snc).write.insertInto("medications")
102-
SPVAQueries.observations(snc).write.insertInto("observations")
103-
SPVAQueries.procedures(snc).write.insertInto("procedures")
86+
loadTables(snc)
10487
}
10588

10689
def createAndLoadColumnTables(snc: SnappyContext): Unit = {
@@ -134,7 +117,10 @@ object SPVATestUtil {
134117
snc.sql(SPVAQueries.procedures_table + " using column options(PARTITION_BY 'PATIENT', " +
135118
" colocate_with 'PATIENTS', buckets '12', redundancy '1', PERSISTENT 'sync', " +
136119
" EVICTION_BY 'LRUHEAPPERCENT')")
120+
loadTables(snc)
121+
}
137122

123+
def loadTables(snc: SnappyContext): Unit = {
138124
SPVAQueries.patients(snc).write.insertInto("patients")
139125
SPVAQueries.encounters(snc).write.insertInto("encounters")
140126
SPVAQueries.allergies(snc).write.insertInto("allergies")

0 commit comments

Comments
 (0)