Skip to content

Commit 88fc313

Browse files
Merge branch 'apache:main' into main
2 parents 160a817 + 1d01b7d commit 88fc313

2,874 files changed

Lines changed: 2191 additions & 554314 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/iceberg_spark_test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ jobs:
7777
# Use CI profile for faster builds (no LTO) and to share cache with pr_build_linux.yml.
7878
run: |
7979
cd native && cargo build --profile ci
80+
env:
81+
RUSTFLAGS: "-Ctarget-cpu=x86-64-v3"
8082

8183
- name: Save Cargo cache
8284
uses: actions/cache/save@v5

.github/workflows/pr_build_linux.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ jobs:
9494
# CI profile: same overflow behavior as release, but faster compilation
9595
# (no LTO, parallel codegen)
9696
cargo build --profile ci
97+
env:
98+
RUSTFLAGS: "-Ctarget-cpu=x86-64-v3"
9799

98100
- name: Upload native library
99101
uses: actions/upload-artifact@v6
@@ -260,6 +262,7 @@ jobs:
260262
org.apache.comet.CometStringExpressionSuite
261263
org.apache.comet.CometBitwiseExpressionSuite
262264
org.apache.comet.CometMapExpressionSuite
265+
org.apache.comet.CometCsvExpressionSuite
263266
org.apache.comet.CometJsonExpressionSuite
264267
org.apache.comet.expressions.conditional.CometIfSuite
265268
org.apache.comet.expressions.conditional.CometCoalesceSuite

.github/workflows/pr_build_macos.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ jobs:
9494
# CI profile: same overflow behavior as release, but faster compilation
9595
# (no LTO, parallel codegen)
9696
cargo build --profile ci
97+
env:
98+
RUSTFLAGS: "-Ctarget-cpu=apple-m1"
9799

98100
- name: Upload native library
99101
uses: actions/upload-artifact@v6
@@ -204,6 +206,7 @@ jobs:
204206
org.apache.comet.CometBitwiseExpressionSuite
205207
org.apache.comet.CometMapExpressionSuite
206208
org.apache.comet.CometJsonExpressionSuite
209+
org.apache.comet.CometCsvExpressionSuite
207210
org.apache.comet.expressions.conditional.CometIfSuite
208211
org.apache.comet.expressions.conditional.CometCoalesceSuite
209212
org.apache.comet.expressions.conditional.CometCaseWhenSuite

.github/workflows/spark_sql_test.yml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ jobs:
8383
run: |
8484
cd native
8585
cargo build --profile ci
86+
env:
87+
RUSTFLAGS: "-Ctarget-cpu=x86-64-v3"
8688

8789
- name: Upload native library
8890
uses: actions/upload-artifact@v6
@@ -118,13 +120,13 @@ jobs:
118120
# - auto scan: all Spark versions (3.4, 3.5, 4.0)
119121
# - native_iceberg_compat: Spark 3.5 only
120122
config:
121-
- {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'auto', scan-env: ''}
122-
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto', scan-env: ''}
123-
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_iceberg_compat', scan-env: 'COMET_PARQUET_SCAN_IMPL=native_iceberg_compat'}
124-
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}
123+
- {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'auto'}
124+
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'}
125+
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_datafusion'}
126+
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'}
125127
# Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946
126128
exclude:
127-
- config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto', scan-env: ''}
129+
- config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'}
128130
module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
129131
fail-fast: false
130132
name: spark-sql-${{ matrix.config.scan-impl }}-${{ matrix.module.name }}/spark-${{ matrix.config.spark-full }}
@@ -153,7 +155,7 @@ jobs:
153155
run: |
154156
cd apache-spark
155157
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
156-
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ${{ matrix.config.scan-env }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
158+
NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=${{ matrix.config.scan-impl }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
157159
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
158160
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
159161
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log

dev/diffs/3.4.3.diff

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,7 +1150,7 @@ index 02990a7a40d..bddf5e1ccc2 100644
11501150
}
11511151
}
11521152
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
1153-
index cfc8b2cc845..c6fcfd7bd08 100644
1153+
index cfc8b2cc845..c4be7eb3731 100644
11541154
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
11551155
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
11561156
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
@@ -2385,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644
23852385
import testImplicits._
23862386

23872387
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2388-
index 266bb343526..e58a2f49eb9 100644
2388+
index 266bb343526..f8ad838e2b2 100644
23892389
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23902390
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23912391
@@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
@@ -2409,7 +2409,7 @@ index 266bb343526..e58a2f49eb9 100644
24092409
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
24102410
import org.apache.spark.sql.functions._
24112411
import org.apache.spark.sql.internal.SQLConf
2412-
@@ -101,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2412+
@@ -101,12 +104,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24132413
}
24142414
}
24152415

@@ -2419,6 +2419,7 @@ index 266bb343526..e58a2f49eb9 100644
24192419
+ val fileScan = collect(plan) {
24202420
+ case f: FileSourceScanExec => f
24212421
+ case f: CometScanExec => f
2422+
+ case f: CometNativeScanExec => f
24222423
+ }
24232424
assert(fileScan.nonEmpty, plan)
24242425
fileScan.head
@@ -2427,12 +2428,13 @@ index 266bb343526..e58a2f49eb9 100644
24272428
+ private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) match {
24282429
+ case fs: FileSourceScanExec => fs.bucketedScan
24292430
+ case bs: CometScanExec => bs.bucketedScan
2431+
+ case ns: CometNativeScanExec => ns.bucketedScan
24302432
+ }
24312433
+
24322434
// To verify if the bucket pruning works, this function checks two conditions:
24332435
// 1) Check if the pruned buckets (before filtering) are empty.
24342436
// 2) Verify the final result is the same as the expected one
2435-
@@ -155,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2437+
@@ -155,7 +168,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24362438
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
24372439
.queryExecution.executedPlan
24382440
val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2442,7 +2444,7 @@ index 266bb343526..e58a2f49eb9 100644
24422444

24432445
val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
24442446
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
2445-
@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2447+
@@ -451,28 +465,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24462448
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
24472449
val executedPlan =
24482450
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2505,7 +2507,7 @@ index 266bb343526..e58a2f49eb9 100644
25052507
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
25062508

25072509
// check the output partitioning
2508-
@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2510+
@@ -835,11 +875,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25092511
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
25102512

25112513
val scanDF = spark.table("bucketed_table").select("j")
@@ -2519,7 +2521,7 @@ index 266bb343526..e58a2f49eb9 100644
25192521
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
25202522
}
25212523
}
2522-
@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2524+
@@ -894,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25232525
}
25242526

25252527
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
@@ -2530,7 +2532,7 @@ index 266bb343526..e58a2f49eb9 100644
25302532
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
25312533
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
25322534
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
2533-
@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2535+
@@ -913,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25342536
}
25352537

25362538
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
@@ -2541,7 +2543,7 @@ index 266bb343526..e58a2f49eb9 100644
25412543
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
25422544
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
25432545

2544-
@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2546+
@@ -943,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25452547
}
25462548

25472549
test("bucket coalescing eliminates shuffle") {
@@ -2552,7 +2554,7 @@ index 266bb343526..e58a2f49eb9 100644
25522554
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
25532555
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
25542556
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
2555-
@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2557+
@@ -1026,15 +1075,26 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25562558
expectedNumShuffles: Int,
25572559
expectedCoalescedNumBuckets: Option[Int]): Unit = {
25582560
val plan = sql(query).queryExecution.executedPlan
@@ -2565,6 +2567,7 @@ index 266bb343526..e58a2f49eb9 100644
25652567
val scans = plan.collect {
25662568
case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f
25672569
+ case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined => b
2570+
+ case b: CometNativeScanExec if b.optionalNumCoalescedBuckets.isDefined => b
25682571
}
25692572
if (expectedCoalescedNumBuckets.isDefined) {
25702573
assert(scans.length == 1)
@@ -2574,6 +2577,8 @@ index 266bb343526..e58a2f49eb9 100644
25742577
+ assert(f.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
25752578
+ case b: CometScanExec =>
25762579
+ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
2580+
+ case b: CometNativeScanExec =>
2581+
+ assert(b.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets)
25772582
+ }
25782583
} else {
25792584
assert(scans.isEmpty)
@@ -2604,25 +2609,26 @@ index b5f6d2f9f68..277784a92af 100644
26042609

26052610
protected override lazy val sql = spark.sql _
26062611
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
2607-
index 1f55742cd67..42377f7cf26 100644
2612+
index 1f55742cd67..f20129d9dd8 100644
26082613
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
26092614
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
26102615
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
26112616
import org.apache.spark.sql.QueryTest
26122617
import org.apache.spark.sql.catalyst.expressions.AttributeReference
26132618
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
2614-
+import org.apache.spark.sql.comet.CometScanExec
2619+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
26152620
import org.apache.spark.sql.execution.FileSourceScanExec
26162621
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
26172622
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
2618-
@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
2623+
@@ -71,7 +72,11 @@ abstract class DisableUnnecessaryBucketedScanSuite
26192624

26202625
def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = {
26212626
val plan = sql(query).queryExecution.executedPlan
26222627
- val bucketedScan = collect(plan) { case s: FileSourceScanExec if s.bucketedScan => s }
26232628
+ val bucketedScan = collect(plan) {
26242629
+ case s: FileSourceScanExec if s.bucketedScan => s
26252630
+ case s: CometScanExec if s.bucketedScan => s
2631+
+ case s: CometNativeScanExec if s.bucketedScan => s
26262632
+ }
26272633
assert(bucketedScan.length == expectedNumBucketedScan)
26282634
}

0 commit comments

Comments
 (0)