Skip to content

Commit a8e63e2

Browse files
committed
Fix CTAS with UNION in Spark 4.x native writer (#3429)
1 parent d3ea9fd commit a8e63e2

2 files changed

Lines changed: 163 additions & 3 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -483,15 +483,16 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
483483
val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
484484
if (isOperatorEnabled(serde, op)) {
485485
// For operators that require native children (like writes), check if all data-producing
486-
// children are CometNativeExec. This prevents runtime failures when the native operator
487-
// expects Arrow arrays but receives non-Arrow data (e.g., OnHeapColumnVector).
486+
// children are CometExec (which includes CometNativeExec and sink operators like
487+
// CometUnionExec, CometCoalesceExec, etc.). This prevents runtime failures when the
488+
// native operator expects Arrow arrays but receives non-Arrow data.
488489
if (serde.requiresNativeChildren && op.children.nonEmpty) {
489490
// Get the actual data-producing children (unwrap WriteFilesExec if present)
490491
val dataProducingChildren = op.children.flatMap {
491492
case writeFiles: WriteFilesExec => Seq(writeFiles.child)
492493
case other => Seq(other)
493494
}
494-
if (!dataProducingChildren.forall(_.isInstanceOf[CometNativeExec])) {
495+
if (!dataProducingChildren.forall(_.isInstanceOf[CometExec])) {
495496
withInfo(op, "Cannot perform native operation because input is not in Arrow format")
496497
return None
497498
}

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,165 @@ class CometParquetWriterSuite extends CometTestBase {
140140
}
141141
}
142142

143+
// Test for issue #3429: CTAS with UNION fails in Spark 4.x with native writer
144+
test("parquet write with union - CTAS style") {
145+
withTempPath { dir =>
146+
val outputPath = new File(dir, "output.parquet").getAbsolutePath
147+
148+
withSQLConf(
149+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
150+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
151+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
152+
153+
// Create a DataFrame using UNION - simulating CTAS with UNION pattern
154+
val df1 = spark.range(1, 5).toDF("id")
155+
val df2 = spark.range(10, 15).toDF("id")
156+
val unionDf = df1.union(df2)
157+
158+
// Write using parquet - this is similar to CTAS
159+
val plan = captureWritePlan(path => unionDf.write.parquet(path), outputPath)
160+
161+
// Verify the write completed and data is correct
162+
val result = spark.read.parquet(outputPath)
163+
assert(result.count() == 9, "Expected 9 rows (4 + 5)")
164+
165+
// Verify native write was used
166+
assertHasCometNativeWriteExec(plan)
167+
}
168+
}
169+
}
170+
171+
// Corner case: UNION with multiple (3+) DataFrames
172+
test("parquet write with multiple unions") {
173+
withTempPath { dir =>
174+
val outputPath = new File(dir, "output.parquet").getAbsolutePath
175+
176+
withSQLConf(
177+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
178+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
179+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
180+
181+
val df1 = spark.range(1, 4).toDF("id")
182+
val df2 = spark.range(10, 13).toDF("id")
183+
val df3 = spark.range(20, 23).toDF("id")
184+
val df4 = spark.range(30, 33).toDF("id")
185+
val unionDf = df1.union(df2).union(df3).union(df4)
186+
187+
val plan = captureWritePlan(path => unionDf.write.parquet(path), outputPath)
188+
189+
val result = spark.read.parquet(outputPath)
190+
assert(result.count() == 12, "Expected 12 rows (3 + 3 + 3 + 3)")
191+
192+
assertHasCometNativeWriteExec(plan)
193+
}
194+
}
195+
}
196+
197+
// Corner case: UNION followed by coalesce
198+
test("parquet write with union and coalesce") {
199+
withTempPath { dir =>
200+
val outputPath = new File(dir, "output.parquet").getAbsolutePath
201+
202+
withSQLConf(
203+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
204+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
205+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
206+
207+
val df1 = spark.range(1, 50).toDF("id")
208+
val df2 = spark.range(100, 150).toDF("id")
209+
val unionDf = df1.union(df2).coalesce(2)
210+
211+
val plan = captureWritePlan(path => unionDf.write.parquet(path), outputPath)
212+
213+
val result = spark.read.parquet(outputPath)
214+
assert(result.count() == 98, "Expected 98 rows (49 + 49)")
215+
216+
assertHasCometNativeWriteExec(plan)
217+
}
218+
}
219+
}
220+
221+
// Corner case: UNION with filter
222+
test("parquet write with union and filter") {
223+
withTempPath { dir =>
224+
val outputPath = new File(dir, "output.parquet").getAbsolutePath
225+
226+
withSQLConf(
227+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
228+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
229+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
230+
231+
val df1 = spark.range(1, 10).toDF("id")
232+
val df2 = spark.range(20, 30).toDF("id")
233+
val unionDf = df1.union(df2).filter("id % 2 = 0")
234+
235+
val plan = captureWritePlan(path => unionDf.write.parquet(path), outputPath)
236+
237+
val result = spark.read.parquet(outputPath)
238+
// Even numbers: 2,4,6,8 from df1, 20,22,24,26,28 from df2 = 9 rows
239+
assert(result.count() == 9, "Expected 9 even rows")
240+
241+
assertHasCometNativeWriteExec(plan)
242+
}
243+
}
244+
}
245+
246+
// Corner case: UNION with complex types (struct)
247+
test("parquet write with union of structs") {
248+
withTempPath { dir =>
249+
val outputPath = new File(dir, "output.parquet").getAbsolutePath
250+
251+
withSQLConf(
252+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
253+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
254+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
255+
256+
val df = spark.sql("""
257+
SELECT 1 as id, named_struct('name', 'Alice', 'age', 30) as person
258+
UNION ALL
259+
SELECT 2 as id, named_struct('name', 'Bob', 'age', 25) as person
260+
""")
261+
262+
val plan = captureWritePlan(path => df.write.parquet(path), outputPath)
263+
264+
val result = spark.read.parquet(outputPath)
265+
assert(result.count() == 2)
266+
267+
assertHasCometNativeWriteExec(plan)
268+
}
269+
}
270+
}
271+
272+
// Corner case: Nested UNION (UNION inside subquery)
273+
test("parquet write with nested union in SQL") {
274+
withTempPath { dir =>
275+
val outputPath = new File(dir, "output.parquet").getAbsolutePath
276+
277+
withSQLConf(
278+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
279+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
280+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
281+
282+
val df = spark.sql("""
283+
SELECT * FROM (
284+
SELECT 1 as id UNION ALL SELECT 2 as id
285+
)
286+
UNION ALL
287+
SELECT * FROM (
288+
SELECT 3 as id UNION ALL SELECT 4 as id
289+
)
290+
""")
291+
292+
val plan = captureWritePlan(path => df.write.parquet(path), outputPath)
293+
294+
val result = spark.read.parquet(outputPath)
295+
assert(result.count() == 4)
296+
297+
assertHasCometNativeWriteExec(plan)
298+
}
299+
}
300+
}
301+
143302
test("parquet write with map type") {
144303
withTempPath { dir =>
145304
val outputPath = new File(dir, "output.parquet").getAbsolutePath

0 commit comments

Comments
 (0)