Skip to content

Commit e2e9c33

Browse files
committed
test: add integration tests for CALL rewrite_data_files procedure
1 parent 38095c3 commit e2e9c33

1 file changed

Lines changed: 246 additions & 0 deletions

File tree

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import java.io.File
23+
import java.nio.file.Files
24+
25+
import org.apache.spark.SparkConf
26+
import org.apache.spark.sql.CometTestBase
27+
import org.apache.spark.sql.comet.CometNativeCompaction
28+
29+
/**
30+
* Integration tests for CALL rewrite_data_files() procedure intercepted by CometCompactionRule.
31+
* Verifies that the SQL procedure path routes through native compaction when enabled.
32+
*/
33+
class CometIcebergCompactionProcedureSuite extends CometTestBase {
34+
35+
private val icebergExtensions =
36+
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
37+
38+
override protected def sparkConf: SparkConf = {
39+
val conf = super.sparkConf
40+
val existing = conf.get("spark.sql.extensions", "")
41+
val extensions =
42+
if (existing.isEmpty) icebergExtensions
43+
else s"$existing,$icebergExtensions"
44+
conf.set("spark.sql.extensions", extensions)
45+
}
46+
47+
private def icebergAvailable: Boolean = {
48+
try {
49+
Class.forName("org.apache.iceberg.catalog.Catalog")
50+
true
51+
} catch {
52+
case _: ClassNotFoundException => false
53+
}
54+
}
55+
56+
private def withTempIcebergDir(f: File => Unit): Unit = {
57+
val dir = Files.createTempDirectory("comet-procedure-test").toFile
58+
try {
59+
f(dir)
60+
} finally {
61+
def deleteRecursively(file: File): Unit = {
62+
if (file.isDirectory) file.listFiles().foreach(deleteRecursively)
63+
file.delete()
64+
}
65+
deleteRecursively(dir)
66+
}
67+
}
68+
69+
private def catalogConf(warehouseDir: File, compactionEnabled: Boolean): Map[String, String] =
70+
Map(
71+
"spark.sql.catalog.proc_cat" -> "org.apache.iceberg.spark.SparkCatalog",
72+
"spark.sql.catalog.proc_cat.type" -> "hadoop",
73+
"spark.sql.catalog.proc_cat.warehouse" -> warehouseDir.getAbsolutePath,
74+
CometConf.COMET_ENABLED.key -> "true",
75+
CometConf.COMET_EXEC_ENABLED.key -> "true",
76+
CometConf.COMET_ICEBERG_COMPACTION_ENABLED.key -> compactionEnabled.toString)
77+
78+
private def createFragmentedTable(tableName: String, rowCount: Int): Unit = {
79+
spark.sql(s"""
80+
CREATE TABLE $tableName (
81+
id BIGINT,
82+
name STRING,
83+
value DOUBLE
84+
) USING iceberg
85+
""")
86+
for (i <- 1 to rowCount) {
87+
spark.sql(s"INSERT INTO $tableName VALUES ($i, 'name_$i', ${i * 1.5})")
88+
}
89+
}
90+
91+
// ============== SQL Procedure Tests ==============
92+
93+
test("CALL rewrite_data_files uses native compaction when enabled") {
94+
assume(icebergAvailable, "Iceberg not available")
95+
assume(CometNativeCompaction.isAvailable, "Native compaction not available")
96+
97+
withTempIcebergDir { warehouseDir =>
98+
withSQLConf(catalogConf(warehouseDir, compactionEnabled = true).toSeq: _*) {
99+
createFragmentedTable("proc_cat.db.proc_table", 10)
100+
101+
val filesBefore =
102+
spark.sql("SELECT file_path FROM proc_cat.db.proc_table.files").count()
103+
assert(filesBefore >= 5, s"Expected fragmented files, got $filesBefore")
104+
105+
val rowsBefore =
106+
spark.sql("SELECT count(*) FROM proc_cat.db.proc_table").collect()(0).getLong(0)
107+
108+
val result =
109+
spark.sql("CALL proc_cat.system.rewrite_data_files(table => 'db.proc_table')")
110+
val resultRow = result.collect()
111+
112+
assert(resultRow.length == 1, "Procedure should return one result row")
113+
val fields = result.schema.fieldNames.toSeq
114+
Seq("rewritten_data_files_count", "added_data_files_count", "rewritten_bytes_count")
115+
.foreach(f => assert(fields.contains(f), s"Missing field $f in $fields"))
116+
117+
val rewrittenCount = resultRow(0).getInt(0)
118+
val addedCount = resultRow(0).getInt(1)
119+
assert(rewrittenCount > 0, "Should rewrite files")
120+
assert(addedCount > 0, "Should add compacted files")
121+
122+
spark.sql("REFRESH TABLE proc_cat.db.proc_table")
123+
val rowsAfter =
124+
spark.sql("SELECT count(*) FROM proc_cat.db.proc_table").collect()(0).getLong(0)
125+
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
126+
127+
val filesAfter =
128+
spark.sql("SELECT file_path FROM proc_cat.db.proc_table.files").count()
129+
assert(
130+
filesAfter < filesBefore,
131+
s"File count should decrease: $filesBefore -> $filesAfter")
132+
133+
spark.sql("DROP TABLE proc_cat.db.proc_table")
134+
}
135+
}
136+
}
137+
138+
test("CALL rewrite_data_files falls back to Spark when config disabled") {
139+
assume(icebergAvailable, "Iceberg not available")
140+
141+
withTempIcebergDir { warehouseDir =>
142+
withSQLConf(catalogConf(warehouseDir, compactionEnabled = false).toSeq: _*) {
143+
createFragmentedTable("proc_cat.db.fallback_table", 10)
144+
145+
val rowsBefore =
146+
spark.sql("SELECT count(*) FROM proc_cat.db.fallback_table").collect()(0).getLong(0)
147+
148+
val result =
149+
spark.sql("CALL proc_cat.system.rewrite_data_files(table => 'db.fallback_table')")
150+
val resultRow = result.collect()
151+
152+
assert(resultRow.length == 1, "Spark procedure should return one result row")
153+
154+
spark.sql("REFRESH TABLE proc_cat.db.fallback_table")
155+
val rowsAfter =
156+
spark.sql("SELECT count(*) FROM proc_cat.db.fallback_table").collect()(0).getLong(0)
157+
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
158+
159+
spark.sql("DROP TABLE proc_cat.db.fallback_table")
160+
}
161+
}
162+
}
163+
164+
test("CALL rewrite_data_files with binpack strategy uses native compaction") {
165+
assume(icebergAvailable, "Iceberg not available")
166+
assume(CometNativeCompaction.isAvailable, "Native compaction not available")
167+
168+
withTempIcebergDir { warehouseDir =>
169+
withSQLConf(catalogConf(warehouseDir, compactionEnabled = true).toSeq: _*) {
170+
createFragmentedTable("proc_cat.db.binpack_table", 10)
171+
172+
val rowsBefore =
173+
spark.sql("SELECT count(*) FROM proc_cat.db.binpack_table").collect()(0).getLong(0)
174+
175+
val result = spark.sql(
176+
"CALL proc_cat.system.rewrite_data_files(table => 'db.binpack_table', strategy => 'binpack')")
177+
val resultRow = result.collect()
178+
179+
assert(resultRow.length == 1)
180+
assert(resultRow(0).getInt(0) > 0, "Should rewrite files with binpack")
181+
182+
spark.sql("REFRESH TABLE proc_cat.db.binpack_table")
183+
val rowsAfter =
184+
spark.sql("SELECT count(*) FROM proc_cat.db.binpack_table").collect()(0).getLong(0)
185+
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
186+
187+
spark.sql("DROP TABLE proc_cat.db.binpack_table")
188+
}
189+
}
190+
}
191+
192+
test("CALL rewrite_data_files with sort strategy falls back to Spark") {
193+
assume(icebergAvailable, "Iceberg not available")
194+
195+
withTempIcebergDir { warehouseDir =>
196+
withSQLConf(catalogConf(warehouseDir, compactionEnabled = true).toSeq: _*) {
197+
createFragmentedTable("proc_cat.db.sort_table", 10)
198+
199+
val rowsBefore =
200+
spark.sql("SELECT count(*) FROM proc_cat.db.sort_table").collect()(0).getLong(0)
201+
202+
// Sort strategy not supported by native compaction, should fall back to Spark
203+
val result = spark.sql(
204+
"CALL proc_cat.system.rewrite_data_files(table => 'db.sort_table', strategy => 'sort', sort_order => 'id')")
205+
val resultRow = result.collect()
206+
207+
assert(resultRow.length == 1, "Spark fallback should still return results")
208+
209+
spark.sql("REFRESH TABLE proc_cat.db.sort_table")
210+
val rowsAfter =
211+
spark.sql("SELECT count(*) FROM proc_cat.db.sort_table").collect()(0).getLong(0)
212+
assert(rowsAfter == rowsBefore, s"Row count changed: $rowsBefore -> $rowsAfter")
213+
214+
spark.sql("DROP TABLE proc_cat.db.sort_table")
215+
}
216+
}
217+
}
218+
219+
test("CALL rewrite_data_files preserves data correctness") {
220+
assume(icebergAvailable, "Iceberg not available")
221+
assume(CometNativeCompaction.isAvailable, "Native compaction not available")
222+
223+
withTempIcebergDir { warehouseDir =>
224+
withSQLConf(catalogConf(warehouseDir, compactionEnabled = true).toSeq: _*) {
225+
createFragmentedTable("proc_cat.db.correct_table", 15)
226+
227+
val dataBefore = spark
228+
.sql("SELECT id, name, value FROM proc_cat.db.correct_table ORDER BY id")
229+
.collect()
230+
.map(_.toString())
231+
232+
spark.sql("CALL proc_cat.system.rewrite_data_files(table => 'db.correct_table')")
233+
234+
spark.sql("REFRESH TABLE proc_cat.db.correct_table")
235+
val dataAfter = spark
236+
.sql("SELECT id, name, value FROM proc_cat.db.correct_table ORDER BY id")
237+
.collect()
238+
.map(_.toString())
239+
240+
assert(dataBefore.toSeq == dataAfter.toSeq, "Data must be identical after procedure call")
241+
242+
spark.sql("DROP TABLE proc_cat.db.correct_table")
243+
}
244+
}
245+
}
246+
}

0 commit comments

Comments
 (0)