Skip to content

Commit ad88f6e

Browse files
committed
test: add file count validation and Spark vs Native comparison test
1 parent 1df0011 commit ad88f6e

1 file changed

Lines changed: 202 additions & 42 deletions

File tree

spark/src/test/scala/org/apache/comet/CometIcebergCompactionSuite.scala

Lines changed: 202 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -440,16 +440,33 @@ class CometIcebergCompactionSuite extends CometTestBase {
440440
spark.sql(s"INSERT INTO compact_cat.db.bucket_table VALUES ($i, 'cat_$i', ${i * 1.5})")
441441
}
442442

443-
val rowsBefore =
444-
spark.sql("SELECT count(*) FROM compact_cat.db.bucket_table").collect()(0).getLong(0)
443+
val filesBefore =
444+
spark.sql("SELECT file_path FROM compact_cat.db.bucket_table.files").count()
445+
assert(filesBefore >= 10, s"Expected multiple files, got $filesBefore")
446+
447+
val dataBefore = spark
448+
.sql("SELECT id, category, value FROM compact_cat.db.bucket_table ORDER BY id")
449+
.collect()
445450

446451
val icebergTable = loadIcebergTable("compact_cat.db.bucket_table")
447-
CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
452+
val summary = CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
453+
454+
assert(summary.filesDeleted > 0, "Should delete files")
455+
assert(summary.filesAdded > 0, "Should add files")
456+
assert(summary.filesAdded < summary.filesDeleted, "Should reduce file count")
448457

449458
spark.sql("REFRESH TABLE compact_cat.db.bucket_table")
450-
val rowsAfter =
451-
spark.sql("SELECT count(*) FROM compact_cat.db.bucket_table").collect()(0).getLong(0)
452-
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
459+
460+
val filesAfter =
461+
spark.sql("SELECT file_path FROM compact_cat.db.bucket_table.files").count()
462+
assert(filesAfter < filesBefore, s"Expected fewer files: $filesBefore -> $filesAfter")
463+
464+
val dataAfter = spark
465+
.sql("SELECT id, category, value FROM compact_cat.db.bucket_table ORDER BY id")
466+
.collect()
467+
assert(
468+
dataBefore.map(_.toString()).toSeq == dataAfter.map(_.toString()).toSeq,
469+
"Data must be identical after compaction")
453470

454471
spark.sql("DROP TABLE compact_cat.db.bucket_table")
455472
}
@@ -476,16 +493,32 @@ class CometIcebergCompactionSuite extends CometTestBase {
476493
s"INSERT INTO compact_cat.db.truncate_table VALUES ($i, 'name_$i', ${i * 1.5})")
477494
}
478495

479-
val rowsBefore =
480-
spark.sql("SELECT count(*) FROM compact_cat.db.truncate_table").collect()(0).getLong(0)
496+
val filesBefore =
497+
spark.sql("SELECT file_path FROM compact_cat.db.truncate_table.files").count()
498+
assert(filesBefore >= 10, s"Expected multiple files, got $filesBefore")
499+
500+
val dataBefore = spark
501+
.sql("SELECT id, name, value FROM compact_cat.db.truncate_table ORDER BY id")
502+
.collect()
481503

482504
val icebergTable = loadIcebergTable("compact_cat.db.truncate_table")
483-
CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
505+
val summary = CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
506+
507+
assert(summary.filesDeleted > 0, "Should delete files")
508+
assert(summary.filesAdded > 0, "Should add files")
484509

485510
spark.sql("REFRESH TABLE compact_cat.db.truncate_table")
486-
val rowsAfter =
487-
spark.sql("SELECT count(*) FROM compact_cat.db.truncate_table").collect()(0).getLong(0)
488-
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
511+
512+
val filesAfter =
513+
spark.sql("SELECT file_path FROM compact_cat.db.truncate_table.files").count()
514+
assert(filesAfter < filesBefore, s"Expected fewer files: $filesBefore -> $filesAfter")
515+
516+
val dataAfter = spark
517+
.sql("SELECT id, name, value FROM compact_cat.db.truncate_table ORDER BY id")
518+
.collect()
519+
assert(
520+
dataBefore.map(_.toString()).toSeq == dataAfter.map(_.toString()).toSeq,
521+
"Data must be identical after compaction")
489522

490523
spark.sql("DROP TABLE compact_cat.db.truncate_table")
491524
}
@@ -515,22 +548,32 @@ class CometIcebergCompactionSuite extends CometTestBase {
515548
""")
516549
}
517550

518-
val rowsBefore =
519-
spark
520-
.sql("SELECT count(*) FROM compact_cat.db.month_part_table")
521-
.collect()(0)
522-
.getLong(0)
551+
val filesBefore =
552+
spark.sql("SELECT file_path FROM compact_cat.db.month_part_table.files").count()
553+
assert(filesBefore >= 5, s"Expected multiple files, got $filesBefore")
554+
555+
val dataBefore = spark
556+
.sql("SELECT id, event_ts, data FROM compact_cat.db.month_part_table ORDER BY id")
557+
.collect()
523558

524559
val icebergTable = loadIcebergTable("compact_cat.db.month_part_table")
525-
CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
560+
val summary = CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
561+
562+
assert(summary.filesDeleted > 0, "Should delete files")
563+
assert(summary.filesAdded > 0, "Should add files")
526564

527565
spark.sql("REFRESH TABLE compact_cat.db.month_part_table")
528-
val rowsAfter =
529-
spark
530-
.sql("SELECT count(*) FROM compact_cat.db.month_part_table")
531-
.collect()(0)
532-
.getLong(0)
533-
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
566+
567+
val filesAfter =
568+
spark.sql("SELECT file_path FROM compact_cat.db.month_part_table.files").count()
569+
assert(filesAfter < filesBefore, s"Expected fewer files: $filesBefore -> $filesAfter")
570+
571+
val dataAfter = spark
572+
.sql("SELECT id, event_ts, data FROM compact_cat.db.month_part_table ORDER BY id")
573+
.collect()
574+
assert(
575+
dataBefore.map(_.toString()).toSeq == dataAfter.map(_.toString()).toSeq,
576+
"Data must be identical after compaction")
534577

535578
spark.sql("DROP TABLE compact_cat.db.month_part_table")
536579
}
@@ -560,16 +603,32 @@ class CometIcebergCompactionSuite extends CometTestBase {
560603
""")
561604
}
562605

563-
val rowsBefore =
564-
spark.sql("SELECT count(*) FROM compact_cat.db.hour_part_table").collect()(0).getLong(0)
606+
val filesBefore =
607+
spark.sql("SELECT file_path FROM compact_cat.db.hour_part_table.files").count()
608+
assert(filesBefore >= 5, s"Expected multiple files, got $filesBefore")
609+
610+
val dataBefore = spark
611+
.sql("SELECT id, event_ts, data FROM compact_cat.db.hour_part_table ORDER BY id")
612+
.collect()
565613

566614
val icebergTable = loadIcebergTable("compact_cat.db.hour_part_table")
567-
CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
615+
val summary = CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
616+
617+
assert(summary.filesDeleted > 0, "Should delete files")
618+
assert(summary.filesAdded > 0, "Should add files")
568619

569620
spark.sql("REFRESH TABLE compact_cat.db.hour_part_table")
570-
val rowsAfter =
571-
spark.sql("SELECT count(*) FROM compact_cat.db.hour_part_table").collect()(0).getLong(0)
572-
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
621+
622+
val filesAfter =
623+
spark.sql("SELECT file_path FROM compact_cat.db.hour_part_table.files").count()
624+
assert(filesAfter < filesBefore, s"Expected fewer files: $filesBefore -> $filesAfter")
625+
626+
val dataAfter = spark
627+
.sql("SELECT id, event_ts, data FROM compact_cat.db.hour_part_table ORDER BY id")
628+
.collect()
629+
assert(
630+
dataBefore.map(_.toString()).toSeq == dataAfter.map(_.toString()).toSeq,
631+
"Data must be identical after compaction")
573632

574633
spark.sql("DROP TABLE compact_cat.db.hour_part_table")
575634
}
@@ -603,22 +662,32 @@ class CometIcebergCompactionSuite extends CometTestBase {
603662
""")
604663
}
605664

606-
val rowsBefore =
607-
spark
608-
.sql("SELECT count(*) FROM compact_cat.db.multi_part_table")
609-
.collect()(0)
610-
.getLong(0)
665+
val filesBefore =
666+
spark.sql("SELECT file_path FROM compact_cat.db.multi_part_table.files").count()
667+
assert(filesBefore >= 5, s"Expected multiple files, got $filesBefore")
668+
669+
val dataBefore = spark
670+
.sql("SELECT id, region, event_date, value FROM compact_cat.db.multi_part_table ORDER BY id")
671+
.collect()
611672

612673
val icebergTable = loadIcebergTable("compact_cat.db.multi_part_table")
613-
CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
674+
val summary = CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
675+
676+
assert(summary.filesDeleted > 0, "Should delete files")
677+
assert(summary.filesAdded > 0, "Should add files")
614678

615679
spark.sql("REFRESH TABLE compact_cat.db.multi_part_table")
616-
val rowsAfter =
617-
spark
618-
.sql("SELECT count(*) FROM compact_cat.db.multi_part_table")
619-
.collect()(0)
620-
.getLong(0)
621-
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
680+
681+
val filesAfter =
682+
spark.sql("SELECT file_path FROM compact_cat.db.multi_part_table.files").count()
683+
assert(filesAfter < filesBefore, s"Expected fewer files: $filesBefore -> $filesAfter")
684+
685+
val dataAfter = spark
686+
.sql("SELECT id, region, event_date, value FROM compact_cat.db.multi_part_table ORDER BY id")
687+
.collect()
688+
assert(
689+
dataBefore.map(_.toString()).toSeq == dataAfter.map(_.toString()).toSeq,
690+
"Data must be identical after compaction")
622691

623692
spark.sql("DROP TABLE compact_cat.db.multi_part_table")
624693
}
@@ -917,4 +986,95 @@ class CometIcebergCompactionSuite extends CometTestBase {
917986
}
918987
}
919988
}
989+
990+
// ============== Spark Default vs Native Compaction Comparison ==============
991+
992+
test("native compaction produces same result as Spark default compaction") {
993+
assume(icebergAvailable, "Iceberg not available in classpath")
994+
assume(CometNativeCompaction.isAvailable, "Native compaction not available")
995+
996+
withTempIcebergDir { warehouseDir =>
997+
withSQLConf(icebergCatalogConf(warehouseDir).toSeq: _*) {
998+
// Create two identical tables
999+
spark.sql("""
1000+
CREATE TABLE compact_cat.db.spark_table (
1001+
id BIGINT,
1002+
name STRING,
1003+
value DOUBLE
1004+
) USING iceberg
1005+
""")
1006+
1007+
spark.sql("""
1008+
CREATE TABLE compact_cat.db.native_table (
1009+
id BIGINT,
1010+
name STRING,
1011+
value DOUBLE
1012+
) USING iceberg
1013+
""")
1014+
1015+
// Insert same data into both tables
1016+
for (i <- 1 to 15) {
1017+
spark.sql(s"INSERT INTO compact_cat.db.spark_table VALUES ($i, 'name_$i', ${i * 1.5})")
1018+
spark.sql(s"INSERT INTO compact_cat.db.native_table VALUES ($i, 'name_$i', ${i * 1.5})")
1019+
}
1020+
1021+
// Verify both tables have same fragmented state
1022+
val sparkFilesBefore =
1023+
spark.sql("SELECT file_path FROM compact_cat.db.spark_table.files").count()
1024+
val nativeFilesBefore =
1025+
spark.sql("SELECT file_path FROM compact_cat.db.native_table.files").count()
1026+
assert(sparkFilesBefore == nativeFilesBefore, "Both tables should start with same files")
1027+
1028+
// Run Spark default compaction
1029+
val sparkTable = loadIcebergTable("compact_cat.db.spark_table")
1030+
import org.apache.iceberg.spark.actions.SparkActions
1031+
SparkActions.get(spark).rewriteDataFiles(sparkTable).execute()
1032+
1033+
// Run Native compaction
1034+
val nativeTable = loadIcebergTable("compact_cat.db.native_table")
1035+
CometNativeCompaction(spark).rewriteDataFiles(nativeTable)
1036+
1037+
// Refresh tables
1038+
spark.sql("REFRESH TABLE compact_cat.db.spark_table")
1039+
spark.sql("REFRESH TABLE compact_cat.db.native_table")
1040+
1041+
// Compare file counts after compaction
1042+
val sparkFilesAfter =
1043+
spark.sql("SELECT file_path FROM compact_cat.db.spark_table.files").count()
1044+
val nativeFilesAfter =
1045+
spark.sql("SELECT file_path FROM compact_cat.db.native_table.files").count()
1046+
1047+
assert(sparkFilesAfter < sparkFilesBefore, "Spark compaction should reduce files")
1048+
assert(nativeFilesAfter < nativeFilesBefore, "Native compaction should reduce files")
1049+
1050+
// Compare data - both should produce identical results
1051+
val sparkData = spark
1052+
.sql("SELECT id, name, value FROM compact_cat.db.spark_table ORDER BY id")
1053+
.collect()
1054+
.map(_.toString())
1055+
1056+
val nativeData = spark
1057+
.sql("SELECT id, name, value FROM compact_cat.db.native_table ORDER BY id")
1058+
.collect()
1059+
.map(_.toString())
1060+
1061+
assert(
1062+
sparkData.toSeq == nativeData.toSeq,
1063+
"Spark and Native compaction should produce identical data")
1064+
1065+
// Verify row counts match
1066+
val sparkRows =
1067+
spark.sql("SELECT count(*) FROM compact_cat.db.spark_table").collect()(0).getLong(0)
1068+
val nativeRows =
1069+
spark.sql("SELECT count(*) FROM compact_cat.db.native_table").collect()(0).getLong(0)
1070+
assert(
1071+
sparkRows == nativeRows,
1072+
s"Row counts differ: Spark=$sparkRows, Native=$nativeRows")
1073+
assert(sparkRows == 15, s"Expected 15 rows, got $sparkRows")
1074+
1075+
spark.sql("DROP TABLE compact_cat.db.spark_table")
1076+
spark.sql("DROP TABLE compact_cat.db.native_table")
1077+
}
1078+
}
1079+
}
9201080
}

0 commit comments

Comments
 (0)