Skip to content

Commit 82bf3ae

Browse files
authored
feat: surface native parquet read failures as FAILED_READ_FILE (#4536)
1 parent dfc1588 commit 82bf3ae

12 files changed

Lines changed: 635 additions & 67 deletions

File tree

native/common/src/error.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,15 @@ pub enum SparkError {
215215
spark_type: String,
216216
},
217217

218+
/// A per-file read failure (corrupt footer/page, truncated/empty file, deleted file) raised by
219+
/// the native parquet reader / object_store. Classified by typed `DataFusionError` variant (no
220+
/// message matching) and translated by the JVM shim into Spark's `FAILED_READ_FILE`
221+
/// (`QueryExecutionErrors.cannotReadFilesError`). `file_path` may be empty when the underlying
222+
/// error doesn't carry it (only `object_store::Error::NotFound` does); the JVM side then fills
223+
/// it from the per-task file list.
224+
#[error("Encountered error while reading file {file_path}: {message}")]
225+
CannotReadFile { file_path: String, message: String },
226+
218227
#[error("ArrowError: {0}.")]
219228
Arrow(Arc<ArrowError>),
220229

@@ -291,6 +300,7 @@ impl SparkError {
291300
SparkError::DuplicateFieldByFieldId { .. } => "DuplicateFieldByFieldId",
292301
SparkError::ParquetMissingFieldIds => "ParquetMissingFieldIds",
293302
SparkError::ParquetSchemaConvert { .. } => "ParquetSchemaConvert",
303+
SparkError::CannotReadFile { .. } => "CannotReadFile",
294304
SparkError::Arrow(_) => "Arrow",
295305
SparkError::Internal(_) => "Internal",
296306
}
@@ -528,6 +538,12 @@ impl SparkError {
528538
"sparkType": spark_type,
529539
})
530540
}
541+
SparkError::CannotReadFile { file_path, message } => {
542+
serde_json::json!({
543+
"filePath": file_path,
544+
"message": message,
545+
})
546+
}
531547
SparkError::Arrow(e) => {
532548
serde_json::json!({
533549
"message": e.to_string(),
@@ -617,6 +633,10 @@ impl SparkError {
617633
"org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException"
618634
}
619635

636+
// CannotReadFile - converted to a FAILED_READ_FILE SparkException by the shim
637+
// (QueryExecutionErrors.cannotReadFilesError).
638+
SparkError::CannotReadFile { .. } => "org/apache/spark/SparkException",
639+
620640
// Generic errors
621641
SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException",
622642
}
@@ -707,6 +727,10 @@ impl SparkError {
707727
// SparkException error class, so no error class is exposed here.
708728
SparkError::ParquetSchemaConvert { .. } => None,
709729

730+
// CannotReadFile — the JVM shim wraps it via cannotReadFilesError, which supplies the
731+
// FAILED_READ_FILE error class, so none is exposed here.
732+
SparkError::CannotReadFile { .. } => None,
733+
710734
// Generic errors (no error class)
711735
SparkError::Arrow(_) | SparkError::Internal(_) => None,
712736
}

native/jni-bridge/src/errors.rs

Lines changed: 378 additions & 38 deletions
Large diffs are not rendered by default.

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ class CometExecIterator(
7171
partitionIndex: Int,
7272
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None,
7373
encryptedFilePaths: Seq[String] = Seq.empty,
74-
shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty)
74+
shuffleBlockIterators: Map[Int, CometShuffleBlockIterator] = Map.empty,
75+
taskFilePaths: Seq[String] = Seq.empty)
7576
extends Iterator[ColumnarBatch]
7677
with Logging {
7778

@@ -164,29 +165,14 @@ class CometExecIterator(
164165
// Handle CometQueryExecutionException with JSON payload first
165166
case e: CometQueryExecutionException =>
166167
logError(s"Native execution for task $taskAttemptId failed", e)
167-
throw SparkErrorConverter.convertToSparkException(e)
168+
throw SparkErrorConverter.convertToSparkException(e, taskFilePaths)
168169

169170
case e: CometNativeException =>
170171
// it is generally considered bad practice to log and then rethrow an
171172
// exception, but it really helps debugging to be able to see which task
172173
// threw the exception, so we log the exception with taskAttemptId here
173174
logError(s"Native execution for task $taskAttemptId failed", e)
174-
175-
val parquetError: scala.util.matching.Regex =
176-
"""^Parquet error: (?:.*)$""".r
177-
e.getMessage match {
178-
case parquetError() =>
179-
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
180-
// See org.apache.parquet.hadoop.ParquetFileReader for error message.
181-
// _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks
182-
// parameters and raises INTERNAL_ERROR if any are passed.
183-
throw new SparkException(
184-
errorClass = "_LEGACY_ERROR_TEMP_2254",
185-
messageParameters = Map.empty,
186-
cause = new SparkException("File is not a Parquet file.", e))
187-
case _ =>
188-
throw e
189-
}
175+
throw e
190176
case e: Throwable =>
191177
throw e
192178
}

spark/src/main/scala/org/apache/comet/SparkErrorConverter.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ object SparkErrorConverter extends ShimSparkErrorConverter {
6969
* @return
7070
* the corresponding Spark exception, or the original exception if parsing fails
7171
*/
72-
def convertToSparkException(e: CometQueryExecutionException): Throwable = {
72+
def convertToSparkException(
73+
e: CometQueryExecutionException,
74+
taskFilePaths: Seq[String] = Seq.empty): Throwable = {
7375
try {
7476
if (!e.isJsonMessage()) {
7577
// Not JSON, return original exception
@@ -83,7 +85,18 @@ object SparkErrorConverter extends ShimSparkErrorConverter {
8385

8486
val json = parse(e.getMessage)
8587
val errorJson = json.extract[ErrorJson]
86-
val params = errorJson.params.getOrElse(Map.empty)
88+
val rawParams = errorJson.params.getOrElse(Map.empty)
89+
// CannotReadFile carries the offending file path natively only for the object_store NotFound
90+
// case; for corrupt/truncated parquet the native error has no path, so fall back to the
91+
// per-task file list threaded in from CometExecIterator.
92+
val params =
93+
if (errorJson.errorType == "CannotReadFile"
94+
&& rawParams.get("filePath").forall(p => p == null || p.toString.isEmpty)
95+
&& taskFilePaths.nonEmpty) {
96+
rawParams + ("filePath" -> taskFilePaths.mkString(","))
97+
} else {
98+
rawParams
99+
}
87100
val errorClass =
88101
errorJson.errorClass.map(_.trim).filter(_.nonEmpty).getOrElse(UNKNOWN_ERROR)
89102

spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ import org.apache.comet.serde.OperatorOuterClass
3737
private[spark] class CometExecPartition(
3838
override val index: Int,
3939
val inputPartitions: Array[Partition],
40-
val planDataByKey: Map[String, Array[Byte]])
40+
val planDataByKey: Map[String, Array[Byte]],
41+
val filePaths: Seq[String] = Seq.empty)
4142
extends Partition
4243

4344
/**
@@ -65,7 +66,8 @@ private[spark] class CometExecRDD(
6566
subqueries: Seq[ScalarSubquery],
6667
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None,
6768
encryptedFilePaths: Seq[String] = Seq.empty,
68-
shuffleScanIndices: Set[Int] = Set.empty)
69+
shuffleScanIndices: Set[Int] = Set.empty,
70+
@transient perPartitionFilePaths: Array[Seq[String]] = Array.empty)
6971
extends RDD[ColumnarBatch](sc, inputRDDs.map(rdd => new OneToOneDependency(rdd))) {
7072

7173
// Determine partition count: from inputs if available, otherwise from parameter
@@ -89,7 +91,9 @@ private[spark] class CometExecRDD(
8991
(0 until numPartitions).map { idx =>
9092
val inputParts = inputRDDs.map(_.partitions(idx)).toArray
9193
val planData = perPartitionByKey.map { case (key, arr) => key -> arr(idx) }
92-
new CometExecPartition(idx, inputParts, planData)
94+
val fp =
95+
if (perPartitionFilePaths.length > idx) perPartitionFilePaths(idx) else Seq.empty[String]
96+
new CometExecPartition(idx, inputParts, planData, fp)
9397
}.toArray
9498
}
9599

@@ -123,7 +127,8 @@ private[spark] class CometExecRDD(
123127
partition.index,
124128
broadcastedHadoopConfForEncryption,
125129
encryptedFilePaths,
126-
shuffleBlockIters)
130+
shuffleBlockIters,
131+
taskFilePaths = partition.filePaths)
127132

128133
// Register ScalarSubqueries so native code can look them up
129134
subqueries.foreach(sub => CometScalarSubquery.setSubquery(it.id, sub))
@@ -214,7 +219,8 @@ object CometExecRDD {
214219
subqueries: Seq[ScalarSubquery],
215220
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None,
216221
encryptedFilePaths: Seq[String] = Seq.empty,
217-
shuffleScanIndices: Set[Int] = Set.empty): CometExecRDD = {
222+
shuffleScanIndices: Set[Int] = Set.empty,
223+
perPartitionFilePaths: Array[Seq[String]] = Array.empty): CometExecRDD = {
218224
// scalastyle:on
219225

220226
new CometExecRDD(
@@ -229,6 +235,7 @@ object CometExecRDD {
229235
subqueries,
230236
broadcastedHadoopConfForEncryption,
231237
encryptedFilePaths,
232-
shuffleScanIndices)
238+
shuffleScanIndices,
239+
perPartitionFilePaths)
233240
}
234241
}

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ case class CometNativeScanExec(
154154
* all files for all partitions in the driver, we serialize only common metadata (once) and each
155155
* partition's files (lazily, as tasks are scheduled).
156156
*/
157-
@transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = {
157+
@transient private lazy val serializedPartitionData
158+
: (Array[Byte], Array[Array[Byte]], Array[Seq[String]]) = {
158159
// Outer partitionFilters (wrapper) DPP is resolved by Spark's standard
159160
// prepare -> waitForSubqueries lifecycle, triggered explicitly via
160161
// CometLeafExec.ensureSubqueriesResolved called from
@@ -225,13 +226,20 @@ case class CometNativeScanExec(
225226
partitionNativeScan.toByteArray
226227
}.toArray
227228

228-
(commonBytes, perPartitionBytes)
229+
// File paths per partition -- threaded through CometExecRDD to CometExecIterator so a native
230+
// CannotReadFile error that lacks a path (corrupt/truncated parquet) can be surfaced as
231+
// FAILED_READ_FILE naming the actual file (see SparkErrorConverter.convertToSparkException).
232+
val perPartitionPaths = filePartitions.map(_.files.map(_.filePath.toString).toSeq).toArray
233+
234+
(commonBytes, perPartitionBytes, perPartitionPaths)
229235
}
230236

231237
def commonData: Array[Byte] = serializedPartitionData._1
232238

233239
def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2
234240

241+
def perPartitionFilePaths: Array[Seq[String]] = serializedPartitionData._3
242+
235243
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
236244
val nativeMetrics = CometMetricNode.fromCometPlan(this)
237245
val serializedPlan = CometExec.serializeNativePlan(nativeOp)
@@ -259,7 +267,8 @@ case class CometNativeScanExec(
259267
nativeMetrics,
260268
Seq.empty,
261269
broadcastedHadoopConfForEncryption,
262-
encryptedFilePaths) {
270+
encryptedFilePaths,
271+
perPartitionFilePaths = perPartitionFilePaths) {
263272
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
264273
val res = super.compute(split, context)
265274

spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,16 @@ trait ShimSparkErrorConverter {
341341
QueryExecutionErrors.readCurrentFileNotFoundError(
342342
new FileNotFoundException(s"File $path does not exist")))
343343

344+
case "CannotReadFile" =>
345+
// A per-file read failure of a readable-but-broken file (corrupt/truncated parquet,
346+
// object_store, IO) classified by typed DataFusionError variant on the native side. Wrap
347+
// in the FAILED_READ_FILE SparkException Spark itself produces when its own parquet reader
348+
// fails. (A genuinely-missing file is "FileNotFound" above.) `filePath` is filled by
349+
// SparkErrorConverter from the per-task file list when the native error carried none.
350+
val message = params.get("message").map(_.toString).getOrElse("")
351+
val filePath = params.get("filePath").map(_.toString).getOrElse("")
352+
Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath))
353+
344354
case _ =>
345355
None
346356
}

spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,16 @@ trait ShimSparkErrorConverter {
336336
QueryExecutionErrors.readCurrentFileNotFoundError(
337337
new FileNotFoundException(s"File $path does not exist")))
338338

339+
case "CannotReadFile" =>
340+
// A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified
341+
// by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE
342+
// SparkException Spark itself produces when its own parquet reader fails. `filePath` is
343+
// supplied by the native object_store NotFound error or, when empty, filled by
344+
// SparkErrorConverter from the per-task file list.
345+
val message = params.get("message").map(_.toString).getOrElse("")
346+
val filePath = params.get("filePath").map(_.toString).getOrElse("")
347+
Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath))
348+
339349
case _ =>
340350
None
341351
}

spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,16 @@ trait ShimSparkErrorConverter {
351351
QueryExecutionErrors
352352
.fileNotExistError(path, new FileNotFoundException(s"File $path does not exist")))
353353

354+
case "CannotReadFile" =>
355+
// A per-file read failure (corrupt/truncated/deleted parquet, object_store, IO) classified
356+
// by typed DataFusionError variant on the native side. Wrap in the FAILED_READ_FILE
357+
// SparkException Spark itself produces when its own parquet reader fails. `filePath` is
358+
// supplied by the native object_store NotFound error or, when empty, filled by
359+
// SparkErrorConverter from the per-task file list.
360+
val message = params.get("message").map(_.toString).getOrElse("")
361+
val filePath = params.get("filePath").map(_.toString).getOrElse("")
362+
Some(QueryExecutionErrors.cannotReadFilesError(new SparkException(message), filePath))
363+
354364
case _ =>
355365
// Unknown error type - return None to trigger fallback
356366
None

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3159,4 +3159,45 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
31593159
}
31603160
}
31613161

3162+
test("pushed-down predicate divide-by-zero is not surfaced as FAILED_READ_FILE") {
3163+
// A throwing expression in a WHERE clause can be pushed into the parquet row filter (reachable
3164+
// by default once Scala UDF codegen dispatch routes the UDF natively). DataFusion's row filter
3165+
// returns the failure as an ArrowError wrapped by the parquet reader as ParquetError::External,
3166+
// which must NOT be misclassified as a corrupt-file read (FAILED_READ_FILE / cannotReadFiles).
3167+
// The divide-by-zero must surface as a divide-by-zero. Regression for the FAILED_READ_FILE
3168+
// classifier over-matching (see jni-bridge errors.rs parquet_external_wraps_arrow_error).
3169+
withSQLConf(
3170+
"spark.sql.ansi.enabled" -> "true",
3171+
CometConf.COMET_ENABLED.key -> "true",
3172+
CometConf.COMET_EXEC_ENABLED.key -> "true",
3173+
CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "true") {
3174+
val ident = org.apache.spark.sql.functions.udf((x: Int) => x)
3175+
spark.udf.register("comet_test_identity", ident)
3176+
withParquetTable(Seq(Tuple1(0), Tuple1(1)), "t") {
3177+
val ex = intercept[Exception] {
3178+
sql("SELECT 1 AS one FROM t WHERE 1 / comet_test_identity(_1) = 1").collect()
3179+
}
3180+
val chain = {
3181+
val sb = new StringBuilder
3182+
var c: Throwable = ex
3183+
var depth = 0
3184+
while (c != null && depth < 12) {
3185+
sb.append(c.getClass.getName).append(": ").append(String.valueOf(c.getMessage))
3186+
c = c.getCause
3187+
depth += 1
3188+
}
3189+
sb.toString
3190+
}
3191+
assert(
3192+
!chain.contains("Encountered error while reading file") &&
3193+
!chain.contains("FAILED_READ_FILE") &&
3194+
!chain.contains("cannotReadFiles"),
3195+
s"divide-by-zero must not surface as a file-read error, but got:\n$chain")
3196+
assert(
3197+
chain.contains("DivideByZero") || chain.contains("DIVIDE_BY_ZERO"),
3198+
s"expected a divide-by-zero error, but got:\n$chain")
3199+
}
3200+
}
3201+
}
3202+
31623203
}

0 commit comments

Comments
 (0)