Skip to content

Commit 2425f22

Browse files
authored
Merge branch 'main' into coalesce_broadcast
2 parents 3fdd14b + 420dd89 commit 2425f22

8 files changed

Lines changed: 128 additions & 126 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 12 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -502,35 +502,18 @@ index a206e97c353..79813d8e259 100644
502502

503503
test("SPARK-35884: Explain Formatted") {
504504
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
505-
index 93275487f29..510e3087e0f 100644
505+
index 93275487f29..ca79ad8b6d9 100644
506506
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
507507
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
508-
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
509-
510-
import scala.collection.mutable
511-
512-
+import org.apache.comet.CometConf
513-
import org.apache.hadoop.conf.Configuration
514-
import org.apache.hadoop.fs.{LocalFileSystem, Path}
515-
516-
@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
508+
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
517509
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
518510
import org.apache.spark.sql.catalyst.plans.logical.Filter
519511
import org.apache.spark.sql.catalyst.types.DataTypeUtils
520512
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
521513
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
522514
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
523515
import org.apache.spark.sql.execution.datasources.FilePartition
524-
@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
525-
case "" => "_LEGACY_ERROR_TEMP_2062"
526-
case _ => "_LEGACY_ERROR_TEMP_2055"
527-
}
528-
+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException
529-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
530-
checkErrorMatchPVals(
531-
exception = intercept[SparkException] {
532-
testIgnoreMissingFiles(options)
533-
@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
516+
@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest
534517
}
535518

536519
Seq("parquet", "orc").foreach { format =>
@@ -540,31 +523,31 @@ index 93275487f29..510e3087e0f 100644
540523
withTempDir { dir =>
541524
val tableName = s"spark_25132_${format}_native"
542525
val tableDir = dir.getCanonicalPath + s"/$tableName"
543-
@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
526+
@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest
544527
assert(bJoinExec.isEmpty)
545528
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
546529
case smJoin: SortMergeJoinExec => smJoin
547530
+ case smJoin: CometSortMergeJoinExec => smJoin
548531
}
549532
assert(smJoinExec.nonEmpty)
550533
}
551-
@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
534+
@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest
552535

553536
val fileScan = df.queryExecution.executedPlan collectFirst {
554537
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
555538
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
556539
}
557540
assert(fileScan.nonEmpty)
558541
assert(fileScan.get.partitionFilters.nonEmpty)
559-
@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
542+
@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest
560543

561544
val fileScan = df.queryExecution.executedPlan collectFirst {
562545
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
563546
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
564547
}
565548
assert(fileScan.nonEmpty)
566549
assert(fileScan.get.partitionFilters.isEmpty)
567-
@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
550+
@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest
568551
val filters = df.queryExecution.executedPlan.collect {
569552
case f: FileSourceScanLike => f.dataFilters
570553
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -965,7 +948,7 @@ index 3cf2bfd17ab..49728c35c42 100644
965948
SQLConf.ANSI_ENABLED.key -> "true") {
966949
withTable("t") {
967950
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
968-
index fa1a64460fc..1d2e215d6a3 100644
951+
index fa1a64460fc..134f0db1fb8 100644
969952
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
970953
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
971954
@@ -17,6 +17,8 @@
@@ -1134,31 +1117,18 @@ index d269290e616..13726a31e07 100644
11341117
}
11351118
}
11361119
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
1137-
index cfc8b2cc845..b7c234e1437 100644
1120+
index cfc8b2cc845..c4be7eb3731 100644
11381121
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
11391122
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
1140-
@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
1141-
import scala.collection.mutable.ArrayBuffer
1142-
1123+
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
11431124
import org.apache.spark.SparkConf
1144-
-import org.apache.spark.sql.{AnalysisException, QueryTest}
1145-
+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest}
1125+
import org.apache.spark.sql.{AnalysisException, QueryTest}
11461126
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
11471127
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
11481128
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
11491129
import org.apache.spark.sql.connector.read.ScanBuilder
11501130
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
1151-
@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
1152-
}
1153-
}
1154-
1155-
- test("Fallback Parquet V2 to V1") {
1156-
+ test("Fallback Parquet V2 to V1",
1157-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
1158-
Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format =>
1159-
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
1160-
val commands = ArrayBuffer.empty[(String, LogicalPlan)]
1161-
@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
1131+
@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
11621132
val df = spark.read.format(format).load(path.getCanonicalPath)
11631133
checkAnswer(df, inputData.toDF())
11641134
assert(
@@ -1422,28 +1392,6 @@ index 47679ed7865..9ffbaecb98e 100644
14221392
}.length == hashAggCount)
14231393
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
14241394
}
1425-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
1426-
index a1147c16cc8..c7a29496328 100644
1427-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
1428-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
1429-
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
1430-
1431-
import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException}
1432-
import org.apache.spark.sql._
1433-
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
1434-
import org.apache.spark.sql.catalyst.TableIdentifier
1435-
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
1436-
import org.apache.spark.sql.catalyst.parser.ParseException
1437-
@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
1438-
}
1439-
}
1440-
1441-
- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
1442-
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
1443-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
1444-
withTable("t") {
1445-
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
1446-
withView("v1") {
14471395
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
14481396
index eec396b2e39..bf3f1c769d6 100644
14491397
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -2930,39 +2878,6 @@ index aad91601758..201083bd621 100644
29302878
})
29312879
}
29322880

2933-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
2934-
index b5cf13a9c12..ac17603fb7f 100644
2935-
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
2936-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
2937-
@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
2938-
2939-
import org.apache.spark.{SparkException, TestUtils}
2940-
import org.apache.spark.internal.Logging
2941-
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
2942-
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode}
2943-
import org.apache.spark.sql.catalyst.InternalRow
2944-
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
2945-
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation}
2946-
@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
2947-
)
2948-
}
2949-
2950-
- test("SPARK-41198: input row calculation with CTE") {
2951-
+ test("SPARK-41198: input row calculation with CTE",
2952-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
2953-
withTable("parquet_tbl", "parquet_streaming_tbl") {
2954-
spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
2955-
.write.format("parquet").saveAsTable("parquet_tbl")
2956-
@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
2957-
}
2958-
}
2959-
2960-
- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
2961-
+ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources",
2962-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
2963-
withTable("parquet_streaming_tbl") {
2964-
val streamInput = MemoryStream[Int]
2965-
val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream")
29662881
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
29672882
index 8f099c31e6b..ce4b7ad25b3 100644
29682883
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala

native/core/src/errors.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,37 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
413413
// Fall back to plain SparkError (no context)
414414
throw_spark_error_as_json(env, spark_error)
415415
} else {
416-
// Not a SparkError, use generic exception
416+
// Check for file-not-found errors from object store
417+
let error_msg = e.to_string();
418+
if error_msg.contains("not found")
419+
&& error_msg.contains("No such file or directory")
420+
{
421+
let spark_error = SparkError::FileNotFound { message: error_msg };
422+
throw_spark_error_as_json(env, &spark_error)
423+
} else {
424+
// Not a SparkError, use generic exception
425+
let exception = error.to_exception();
426+
match backtrace {
427+
Some(backtrace_string) => env.throw_new(
428+
exception.class,
429+
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
430+
),
431+
_ => env.throw_new(exception.class, exception.msg),
432+
}
433+
}
434+
}
435+
}
436+
// Handle direct SparkError - serialize to JSON
437+
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
438+
_ => {
439+
// Check for file-not-found errors that may arrive through other wrapping paths
440+
let error_msg = error.to_string();
441+
if error_msg.contains("not found")
442+
&& error_msg.contains("No such file or directory")
443+
{
444+
let spark_error = SparkError::FileNotFound { message: error_msg };
445+
throw_spark_error_as_json(env, &spark_error)
446+
} else {
417447
let exception = error.to_exception();
418448
match backtrace {
419449
Some(backtrace_string) => env.throw_new(
@@ -424,18 +454,6 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
424454
}
425455
}
426456
}
427-
// Handle direct SparkError - serialize to JSON
428-
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
429-
_ => {
430-
let exception = error.to_exception();
431-
match backtrace {
432-
Some(backtrace_string) => env.throw_new(
433-
exception.class,
434-
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
435-
),
436-
_ => env.throw_new(exception.class, exception.msg),
437-
}
438-
}
439457
}
440458
.expect("Thrown exception")
441459
}

native/spark-expr/src/error.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ pub enum SparkError {
166166
#[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more than one row.")]
167167
ScalarSubqueryTooManyRows,
168168

169+
#[error("{message}")]
170+
FileNotFound { message: String },
171+
169172
#[error("ArrowError: {0}.")]
170173
Arrow(Arc<ArrowError>),
171174

@@ -236,6 +239,7 @@ impl SparkError {
236239
SparkError::InvalidRegexGroupIndex { .. } => "InvalidRegexGroupIndex",
237240
SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
238241
SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows",
242+
SparkError::FileNotFound { .. } => "FileNotFound",
239243
SparkError::Arrow(_) => "Arrow",
240244
SparkError::Internal(_) => "Internal",
241245
}
@@ -421,6 +425,11 @@ impl SparkError {
421425
"dataType": data_type,
422426
})
423427
}
428+
SparkError::FileNotFound { message } => {
429+
serde_json::json!({
430+
"message": message,
431+
})
432+
}
424433
SparkError::Arrow(e) => {
425434
serde_json::json!({
426435
"message": e.to_string(),
@@ -487,6 +496,9 @@ impl SparkError {
487496
SparkError::DatatypeCannotOrder { .. }
488497
| SparkError::InvalidUtf8String { .. } => "org/apache/spark/SparkIllegalArgumentException",
489498

499+
// FileNotFound - will be converted to SparkFileNotFoundException by the shim
500+
SparkError::FileNotFound { .. } => "org/apache/spark/SparkException",
501+
490502
// Generic errors
491503
SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException",
492504
}
@@ -559,6 +571,9 @@ impl SparkError {
559571
// Subquery errors
560572
SparkError::ScalarSubqueryTooManyRows => Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"),
561573

574+
// File not found
575+
SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),
576+
562577
// Generic errors (no error class)
563578
SparkError::Arrow(_) | SparkError::Internal(_) => None,
564579
}

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@
1919

2020
package org.apache.comet
2121

22-
import java.io.FileNotFoundException
2322
import java.lang.management.ManagementFactory
2423

25-
import scala.util.matching.Regex
26-
2724
import org.apache.hadoop.conf.Configuration
2825
import org.apache.spark._
2926
import org.apache.spark.broadcast.Broadcast
@@ -163,19 +160,9 @@ class CometExecIterator(
163160
// threw the exception, so we log the exception with taskAttemptId here
164161
logError(s"Native execution for task $taskAttemptId failed", e)
165162

166-
val fileNotFoundPattern: Regex =
167-
("""^External: Object at location (.+?) not found: No such file or directory """ +
168-
"""\(os error \d+\)$""").r
169-
val parquetError: Regex =
163+
val parquetError: scala.util.matching.Regex =
170164
"""^Parquet error: (?:.*)$""".r
171165
e.getMessage match {
172-
case fileNotFoundPattern(filePath) =>
173-
// See org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError
174-
throw new SparkException(
175-
errorClass = "_LEGACY_ERROR_TEMP_2055",
176-
messageParameters = Map("message" -> e.getMessage),
177-
cause = new FileNotFoundException(filePath)
178-
) // Can't use SparkFileNotFoundException because it's private.
179166
case parquetError() =>
180167
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
181168
// See org.apache.parquet.hadoop.ParquetFileReader for error message.

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,14 @@ case class CometNativeScanExec(
202202

203203
override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt)
204204

205-
override lazy val metrics: Map[String, SQLMetric] =
206-
CometMetricNode.nativeScanMetrics(session.sparkContext)
205+
override lazy val metrics: Map[String, SQLMetric] = {
206+
val nativeMetrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
207+
// Map native metric names to Spark metric names
208+
nativeMetrics.get("output_rows") match {
209+
case Some(metric) => nativeMetrics + ("numOutputRows" -> metric)
210+
case None => nativeMetrics
211+
}
212+
}
207213

208214
/**
209215
* See [[org.apache.spark.sql.execution.DataSourceScanExec.inputRDDs]]. Only used for tests.

spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,20 @@
1919

2020
package org.apache.spark.sql.comet.shims
2121

22+
import java.io.FileNotFoundException
23+
24+
import scala.util.matching.Regex
25+
2226
import org.apache.spark.{QueryContext, SparkException}
2327
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
2428
import org.apache.spark.sql.errors.QueryExecutionErrors
2529
import org.apache.spark.sql.types._
2630
import org.apache.spark.unsafe.types.UTF8String
2731

32+
object ShimSparkErrorConverter {
33+
val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
34+
}
35+
2836
/**
2937
* Spark 3.4 implementation for converting error types to proper Spark exceptions.
3038
*
@@ -243,6 +251,18 @@ trait ShimSparkErrorConverter {
243251
QueryExecutionErrors
244252
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))
245253

254+
case "FileNotFound" =>
255+
val msg = params("message").toString
256+
// Extract file path from native error message and format like Hadoop's
257+
// FileNotFoundException: "File <path> does not exist"
258+
val path = ShimSparkErrorConverter.ObjectLocationPattern
259+
.findFirstMatchIn(msg)
260+
.map(_.group(1))
261+
.getOrElse(msg)
262+
Some(
263+
QueryExecutionErrors.readCurrentFileNotFoundError(
264+
new FileNotFoundException(s"File $path does not exist")))
265+
246266
case _ =>
247267
None
248268
}

0 commit comments

Comments
 (0)