Skip to content

Commit 784be96

Browse files
[SPARK-57590][SQL] Address review: use CANNOT_MERGE_SCHEMAS for archive merge conflicts and cover mergeSchema inference
1 parent a831273 commit 784be96

2 files changed

Lines changed: 27 additions & 3 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GRO
3939
import org.apache.parquet.hadoop._
4040
import org.apache.parquet.hadoop.util.HadoopInputFile
4141

42-
import org.apache.spark.TaskContext
42+
import org.apache.spark.{SparkException, TaskContext}
4343
import org.apache.spark.internal.Logging
4444
import org.apache.spark.internal.LogKeys.{PATH, SCHEMA}
4545
import org.apache.spark.sql.SparkSession
@@ -749,8 +749,8 @@ object ParquetFileFormat extends Logging {
749749
fileSourceOptions.ignoreMissingFiles).foreach { schema =>
750750
merged = Some(merged.fold(schema) { acc =>
751751
try acc.merge(schema, caseSensitive) catch {
752-
case e: Throwable =>
753-
throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(acc, schema, e)
752+
case cause: SparkException =>
753+
throw QueryExecutionErrors.failedMergingSchemaError(acc, schema, cause)
754754
}
755755
})
756756
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ParquetTarArchiveReadSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20+
import java.io.File
21+
import java.nio.file.Files
22+
2023
import org.apache.spark.sql.functions.input_file_name
2124
import org.apache.spark.sql.internal.SQLConf
2225

@@ -75,4 +78,25 @@ class ParquetTarArchiveReadSuite
7578
assertArchiveMatchesDir(
7679
Seq(entryName(0) -> encodeFile(withName), entryName(1) -> encodeFile(idOnly)))
7780
}
81+
82+
test("archive inference unions differing fields across entries with mergeSchema=true") {
83+
// Parquet does not union schemas during default inference, but `mergeSchema=true` folds every
84+
// entry's schema; over an archive that folds each unpacked entry one at a time. The unioned
85+
// schema must match a directory read of the same files under the same option.
86+
val withName = sampleDf((1, "Alice"), (2, "Bob"))
87+
val idExtra = Seq((3, 30)).toDF("id", "extra")
88+
val entries = Seq(entryName(0) -> encodeFile(withName), entryName(1) -> encodeFile(idExtra))
89+
val merge = Map("mergeSchema" -> "true")
90+
withArchiveFile() { archive =>
91+
writeArchive(archive, entries)
92+
val archiveSchema = inferredSchema(Seq(archive.getCanonicalPath), merge)
93+
withTempDir { dir =>
94+
entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, b) }
95+
assert(archiveSchema.fieldNames.toSet == Set("id", "name", "extra"),
96+
s"expected the union of entry fields, got $archiveSchema")
97+
assert(archiveSchema == inferredSchema(Seq(dir.getCanonicalPath), merge),
98+
s"archive mergeSchema inference diverged from a directory read; got $archiveSchema")
99+
}
100+
}
101+
}
78102
}

0 commit comments

Comments
 (0)