From 7735744dec7026214b1ac8ca434557b8b7fa8bfb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 08:56:10 -0700 Subject: [PATCH 1/4] savE --- .../CometPartitionColumnBenchmark.scala | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala new file mode 100644 index 0000000000..9f52ba5ef0 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.benchmark.Benchmark + +import org.apache.comet.CometConf +import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT} + +/** + * Benchmark to measure partition column scan performance. This exercises the CometConstantVector + * path where constant columns are exported as 1-element Arrow arrays and expanded on the native + * side. + * + * To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make \ + * benchmark-org.apache.spark.sql.benchmark.CometPartitionColumnBenchmark + * }}} + * + * Results will be written to "spark/benchmarks/CometPartitionColumnBenchmark-**results.txt". + */ +object CometPartitionColumnBenchmark extends CometBenchmarkBase { + + def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit = { + val sqlBenchmark = new Benchmark( + s"Partitioned Scan with $numPartitionCols partition column(s)", + values, + output = output) + + withTempPath { dir => + withTempTable("parquetV1Table") { + val partCols = + (1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ") + val partNames = (1 to numPartitionCols).map(i => s"p$i") + val df = spark.sql(s"SELECT value as id, $partCols FROM $tbl") + val parquetDir = dir.getCanonicalPath + "/parquetV1" + df.write + .partitionBy(partNames: _*) + .mode("overwrite") + .option("compression", "snappy") + .parquet(parquetDir) + spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table") + + sqlBenchmark.addCase("SQL Parquet - Spark") { _ => + spark.sql("select sum(id) from parquetV1Table").noop() + } + + sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { + spark.sql("select sum(id) from parquetV1Table").noop() + } + } + + sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { + spark.sql("select sum(id) from parquetV1Table").noop() + } + } + + // Also benchmark reading partition columns themselves + val partSumExpr = + (1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ") + + sqlBenchmark.addCase("SQL Parquet - Spark (read partition cols)") { _ => + spark.sql(s"select $partSumExpr from parquetV1Table").noop() + } + + sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion (partition cols)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { + spark.sql(s"select $partSumExpr from parquetV1Table").noop() + } + } + + sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat (partition cols)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { + spark.sql(s"select $partSumExpr from parquetV1Table").noop() + } + } + + sqlBenchmark.run() + } + } + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v => + for (numPartCols <- List(1, 5)) { + partitionColumnScanBenchmark(v, numPartCols) + } + } + } +} From 02e8ca4671997cb384d4e8b93962b0fdb50e8be5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 09:07:26 -0700 Subject: [PATCH 2/4] refactor: Split read benchmarks and add addParquetScanCases helper Extract iceberg benchmarks into CometIcebergReadBenchmark and add addParquetScanCases helper to CometBenchmarkBase to eliminate the repeated 3-case pattern (Spark / native_datafusion / native_iceberg_compat) across all parquet benchmarks. Co-Authored-By: Claude Opus 4.5 --- .../sql/benchmark/CometBenchmarkBase.scala | 27 ++ .../benchmark/CometIcebergReadBenchmark.scala | 81 +++++ .../CometPartitionColumnBenchmark.scala | 50 +-- .../sql/benchmark/CometReadBenchmark.scala | 305 ++---------------- 4 files changed, 137 insertions(+), 326 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala index 5d1d0c5718..2a81316c95 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DecimalType import org.apache.comet.CometConf +import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT} import org.apache.comet.CometSparkSessionExtensions trait CometBenchmarkBase @@ -164,6 +165,32 @@ trait CometBenchmarkBase benchmark.run() } + protected def addParquetScanCases( + benchmark: Benchmark, + query: String, + caseSuffix: String = "", + extraConf: Map[String, String] = Map.empty): Unit = { + val suffix = if (caseSuffix.nonEmpty) s" ($caseSuffix)" else "" + + benchmark.addCase(s"SQL Parquet - Spark$suffix") { _ => + withSQLConf(extraConf.toSeq: _*) { + spark.sql(query).noop() + } + } + + for (scanImpl <- Seq(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT)) { + benchmark.addCase(s"SQL Parquet - Comet ($scanImpl)$suffix") { _ => + withSQLConf( + (extraConf ++ Map( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl)).toSeq: _*) { + spark.sql(query).noop() + } + } + } + } + protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = { val testDf = if (partition.isDefined) { df.write.partitionBy(partition.get) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala new file mode 100644 index 0000000000..b90b893712 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergReadBenchmark.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.types._ + +import org.apache.comet.CometConf + +/** + * Benchmark to measure Comet Iceberg read performance. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make + * benchmark-org.apache.spark.sql.benchmark.CometIcebergReadBenchmark` Results will be written to + * "spark/benchmarks/CometIcebergReadBenchmark-**results.txt". + */ +object CometIcebergReadBenchmark extends CometBenchmarkBase { + + def icebergScanBenchmark(values: Int, dataType: DataType): Unit = { + val sqlBenchmark = + new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, output = output) + + withTempPath { dir => + withTempTable("icebergTable") { + prepareIcebergTable( + dir, + spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"), + "icebergTable") + + val query = dataType match { + case BooleanType => "sum(cast(id as bigint))" + case _ => "sum(id)" + } + + sqlBenchmark.addCase("SQL Iceberg - Spark") { _ => + withSQLConf( + "spark.memory.offHeap.enabled" -> "true", + "spark.memory.offHeap.size" -> "10g") { + spark.sql(s"select $query from icebergTable").noop() + } + } + + sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + "spark.memory.offHeap.enabled" -> "true", + "spark.memory.offHeap.size" -> "10g", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + spark.sql(s"select $query from icebergTable").noop() + } + } + + sqlBenchmark.run() + } + } + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 1024 * 128) { v => + Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType) + .foreach(icebergScanBenchmark(v, _)) + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala index 9f52ba5ef0..a7d170057f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala @@ -21,9 +21,6 @@ package org.apache.spark.sql.benchmark import org.apache.spark.benchmark.Benchmark -import org.apache.comet.CometConf -import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT} - /** * Benchmark to measure partition column scan performance. This exercises the CometConstantVector * path where constant columns are exported as 1-element Arrow arrays and expanded on the native @@ -59,53 +56,16 @@ object CometPartitionColumnBenchmark extends CometBenchmarkBase { .parquet(parquetDir) spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table") - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("select sum(id) from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("select sum(id) from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("select sum(id) from parquetV1Table").noop() - } - } + addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table") // Also benchmark reading partition columns themselves val partSumExpr = (1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ") - sqlBenchmark.addCase("SQL Parquet - Spark (read partition cols)") { _ => - spark.sql(s"select $partSumExpr from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion (partition cols)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql(s"select $partSumExpr from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat (partition cols)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql(s"select $partSumExpr from parquetV1Table").noop() - } - } + addParquetScanCases( + sqlBenchmark, + s"select $partSumExpr from parquetV1Table", + caseSuffix = "partition cols") sqlBenchmark.run() } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala index 3bfbdee91a..a2f196a4fc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector import org.apache.comet.{CometConf, WithHdfsCluster} -import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT} import org.apache.comet.parquet.BatchReader /** @@ -50,7 +49,6 @@ import org.apache.comet.parquet.BatchReader class CometReadBaseBenchmark extends CometBenchmarkBase { def numericScanBenchmark(values: Int, dataType: DataType): Unit = { - // Benchmarks running through spark sql. val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values, output = output) @@ -63,76 +61,13 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { case _ => "sum(id)" } - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(s"select $query from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.run() - } - } - } - - def icebergScanBenchmark(values: Int, dataType: DataType): Unit = { - // Benchmarks running through spark sql. - val sqlBenchmark = - new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, output = output) - - withTempPath { dir => - withTempTable("icebergTable") { - prepareIcebergTable( - dir, - spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"), - "icebergTable") - - val query = dataType match { - case BooleanType => "sum(cast(id as bigint))" - case _ => "sum(id)" - } - - sqlBenchmark.addCase("SQL Iceberg - Spark") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g") { - spark.sql(s"select $query from icebergTable").noop() - } - } - - sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { - spark.sql(s"select $query from icebergTable").noop() - } - } - + addParquetScanCases(sqlBenchmark, s"select $query from parquetV1Table") sqlBenchmark.run() } } } def encryptedScanBenchmark(values: Int, dataType: DataType): Unit = { - // Benchmarks running through spark sql. val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Encrypted Column Scan", values, output = output) @@ -143,6 +78,15 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { val cryptoFactoryClass = "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory" + val cryptoConf = Map( + "spark.memory.offHeap.enabled" -> "true", + "spark.memory.offHeap.size" -> "10g", + DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, + KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> + "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", + InMemoryKMS.KEY_LIST_PROPERTY_NAME -> + s"footerKey: ${footerKey}, key1: ${key1}") + withTempPath { dir => withTempTable("parquetV1Table") { prepareEncryptedTable( @@ -154,51 +98,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { case _ => "sum(id)" } - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, - KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> - "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", - InMemoryKMS.KEY_LIST_PROPERTY_NAME -> - s"footerKey: ${footerKey}, key1: ${key1}") { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION, - DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, - KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> - "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", - InMemoryKMS.KEY_LIST_PROPERTY_NAME -> - s"footerKey: ${footerKey}, key1: ${key1}") { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - "spark.memory.offHeap.enabled" -> "true", - "spark.memory.offHeap.size" -> "10g", - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT, - DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> cryptoFactoryClass, - KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME -> - "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS", - InMemoryKMS.KEY_LIST_PROPERTY_NAME -> - s"footerKey: ${footerKey}, key1: ${key1}") { - spark.sql(s"select $query from parquetV1Table").noop() - } - } - + addParquetScanCases( + sqlBenchmark, + s"select $query from parquetV1Table", + extraConf = cryptoConf) sqlBenchmark.run() } } @@ -218,28 +121,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT CAST(value / 10000000.0 as DECIMAL($precision, $scale)) " + s"id FROM $tbl")) - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("select sum(id) from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("select sum(id) from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("select sum(id) from parquetV1Table").noop() - } - } - + addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table") sqlBenchmark.run() } } @@ -338,28 +220,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, value AS c2 FROM " + s"$tbl")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop() - } - } - + addParquetScanCases(benchmark, "select sum(c2) from parquetV1Table where c1 + 1 > 0") benchmark.run() } } @@ -388,28 +249,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { |FROM tmp |""".stripMargin)) - sqlBenchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("select sum(length(id)) from parquetV1Table").noop() - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("select sum(length(id)) from parquetV1Table").noop() - } - } - - sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("select sum(length(id)) from parquetV1Table").noop() - } - } - + addParquetScanCases(sqlBenchmark, "select sum(length(id)) from parquetV1Table") sqlBenchmark.run() } } @@ -428,37 +268,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfNulls, NULL, CAST(value as STRING)) AS c1, " + s"IF(RAND(2) < $fractionOfNulls, NULL, CAST(value as STRING)) AS c2 FROM $tbl")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark - .sql("select sum(length(c2)) from parquetV1Table where c1 is " + - "not NULL and c2 is not NULL") - .noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark - .sql("select sum(length(c2)) from parquetV1Table where c1 is " + - "not NULL and c2 is not NULL") - .noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark - .sql("select sum(length(c2)) from parquetV1Table where c1 is " + - "not NULL and c2 is not NULL") - .noop() - } - } - + addParquetScanCases( + benchmark, + "select sum(length(c2)) from parquetV1Table where c1 is " + + "not NULL and c2 is not NULL") benchmark.run() } } @@ -476,28 +289,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { prepareTable(dir, spark.sql("SELECT * FROM t1")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop() - } - } - + addParquetScanCases(benchmark, s"SELECT sum(c$middle) FROM parquetV1Table") benchmark.run() } } @@ -519,28 +311,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " + s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - + addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1 + 1 > 0") benchmark.run() } } @@ -562,28 +333,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { s"SELECT IF(RAND(1) < $fractionOfZeros, -1, value) AS c1, " + s"REPEAT(CAST(value AS STRING), 100) AS c2 FROM $tbl ORDER BY c1, c2")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - - benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - - benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { - spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop() - } - } - + addParquetScanCases(benchmark, "SELECT * FROM parquetV1Table WHERE c1 + 1 > 0") benchmark.run() } } @@ -611,13 +361,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { } } - runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 1024 * 128) { v => - Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType) - .foreach { dataType => - icebergScanBenchmark(v, dataType) - } - } - runBenchmarkWithTable("SQL Single Numeric Encrypted Column Scan", 1024 * 1024 * 128) { v => Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType) .foreach { dataType => From 75750af74e06ecfe19c9505c11bc7ac9aedce974 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 10:44:48 -0700 Subject: [PATCH 3/4] feat: Native batch passthrough for native_iceberg_compat V1 scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminate the JVM data round trip for data columns in native_iceberg_compat scans. Data columns are read directly from the native BatchContext via zero-copy Arc::clone, while only partition columns cross the JVM boundary via Arrow FFI. Previously, data made a wasteful round trip: Rust ParquetSource → per-column JNI export to JVM → JVM wraps as CometVector → JVM exports ALL cols back to Rust via Arrow FFI → Rust ScanExec deep-copies every column Now in passthrough mode: Rust ParquetSource → batch stays in native BatchContext → Rust ScanExec reads data cols directly (zero-copy) → Only partition cols imported from JVM FFI (small, constant) Co-Authored-By: Claude Opus 4.5 --- .../comet/parquet/NativeBatchReader.java | 16 +++ native/core/src/execution/operators/scan.rs | 123 +++++++++++++++++- native/core/src/execution/planner.rs | 12 ++ native/core/src/jvm_bridge/batch_iterator.rs | 16 +++ native/core/src/parquet/mod.rs | 6 +- native/proto/src/proto/operator.proto | 6 + .../org/apache/comet/CometBatchIterator.java | 42 ++++++ .../comet/serde/operator/CometSink.scala | 13 +- 8 files changed, 224 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index d10a8932be..17a758ffb3 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -154,6 +154,13 @@ public URI pathUri() throws URISyntaxException { protected static final BufferAllocator ALLOCATOR = new RootAllocator(); private NativeUtil nativeUtil = new NativeUtil(); + /** + * Thread-local holding the native BatchContext handle of the current reader. Set during + * nextBatch() in passthrough mode so that CometBatchIterator.advancePassthrough() can retrieve + * it. + */ + public static final ThreadLocal CURRENT_READER_HANDLE = ThreadLocal.withInitial(() -> 0L); + protected Configuration conf; protected int capacity; protected boolean isCaseSensitive; @@ -888,6 +895,10 @@ private boolean containsPath(Type parquetType, String[] path, int depth) { return false; } + public long getHandle() { + return this.handle; + } + public void setSparkSchema(StructType schema) { this.sparkSchema = schema; } @@ -956,6 +967,11 @@ public boolean nextBatch() throws IOException { if (batchSize == 0) return false; + // Set the thread-local handle so CometBatchIterator.advancePassthrough() can retrieve it. + // This is always set after a successful loadNextBatch() regardless of whether passthrough + // mode will be used — the Rust ScanExec decides whether to use it. + CURRENT_READER_HANDLE.set(this.handle); + long totalDecodeTime = 0, totalLoadTime = 0; for (int i = 0; i < columnReaders.length; i++) { AbstractColumnReader reader = columnReaders[i]; diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 2543705fb0..723656b05a 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -16,6 +16,7 @@ // under the License. use crate::execution::operators::{copy_array, copy_or_unpack_array, CopyMode}; +use crate::parquet::get_batch_context; use crate::{ errors::CometError, execution::{ @@ -79,6 +80,11 @@ pub struct ScanExec { baseline_metrics: BaselineMetrics, /// Whether native code can assume ownership of batches that it receives arrow_ffi_safe: bool, + /// When true, data columns are read directly from the native reader's + /// BatchContext instead of through JVM FFI (zero-copy). + native_batch_passthrough: bool, + /// Number of data columns from native reader. Remaining are partition columns. + num_data_columns: usize, } impl ScanExec { @@ -88,6 +94,8 @@ impl ScanExec { input_source_description: &str, data_types: Vec, arrow_ffi_safe: bool, + native_batch_passthrough: bool, + num_data_columns: usize, ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); @@ -115,6 +123,8 @@ impl ScanExec { baseline_metrics, schema, arrow_ffi_safe, + native_batch_passthrough, + num_data_columns, }) } @@ -143,12 +153,21 @@ impl ScanExec { let mut current_batch = self.batch.try_lock().unwrap(); if current_batch.is_none() { - let next_batch = ScanExec::get_next( - self.exec_context_id, - self.input_source.as_ref().unwrap().as_obj(), - self.data_types.len(), - self.arrow_ffi_safe, - )?; + let next_batch = if self.native_batch_passthrough { + ScanExec::get_next_passthrough( + self.exec_context_id, + self.input_source.as_ref().unwrap().as_obj(), + self.num_data_columns, + self.data_types.len(), + )? + } else { + ScanExec::get_next( + self.exec_context_id, + self.input_source.as_ref().unwrap().as_obj(), + self.data_types.len(), + self.arrow_ffi_safe, + )? + }; *current_batch = Some(next_batch); } @@ -259,6 +278,98 @@ impl ScanExec { Ok(InputBatch::new(inputs, Some(actual_num_rows))) } + /// Passthrough mode: data columns are read directly from native BatchContext + /// (zero-copy Arc::clone). Only partition columns are imported from JVM via FFI. + fn get_next_passthrough( + exec_context_id: i64, + iter: &JObject, + num_data_cols: usize, + num_total_cols: usize, + ) -> Result { + if exec_context_id == TEST_EXEC_CONTEXT_ID { + return Ok(InputBatch::EOF); + } + + if iter.is_null() { + return Err(CometError::from(ExecutionError::GeneralError(format!( + "Null batch iterator object. Plan id: {exec_context_id}" + )))); + } + + let mut env = JVMClasses::get_env()?; + + // 1. Advance reader; get native batch handle (data stays in Rust) + let handle: i64 = unsafe { + jni_call!(&mut env, + comet_batch_iterator(iter).advance_passthrough() -> i64)? + }; + if handle == 0 { + return Ok(InputBatch::EOF); + } + + // 2. Get data columns from native BatchContext (zero-copy) + let context = get_batch_context(handle)?; + let batch = context.current_batch.as_ref().ok_or_else(|| { + CometError::from(ExecutionError::GeneralError( + "No current batch in BatchContext".to_string(), + )) + })?; + + let num_rows = batch.num_rows(); + let mut inputs: Vec = Vec::with_capacity(num_total_cols); + + for i in 0..num_data_cols { + // Zero-copy: just increment the Arc reference count + inputs.push(Arc::clone(batch.column(i))); + } + + // 3. Import partition columns from JVM FFI (if any) + let num_partition_cols = num_total_cols - num_data_cols; + if num_partition_cols > 0 { + let mut array_addrs = Vec::with_capacity(num_partition_cols); + let mut schema_addrs = Vec::with_capacity(num_partition_cols); + + for _ in 0..num_partition_cols { + let arrow_array = Rc::new(FFI_ArrowArray::empty()); + let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); + array_addrs.push(Rc::into_raw(arrow_array) as i64); + schema_addrs.push(Rc::into_raw(arrow_schema) as i64); + } + + let long_array_addrs = env.new_long_array(num_partition_cols as jsize)?; + let long_schema_addrs = env.new_long_array(num_partition_cols as jsize)?; + env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?; + env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?; + + let array_obj = JObject::from(long_array_addrs); + let schema_obj = JObject::from(long_schema_addrs); + let num_data_cols_jint = num_data_cols as i32; + + let _part_rows: i32 = unsafe { + jni_call!(&mut env, + comet_batch_iterator(iter).next_partition_columns_only( + JValueGen::Object(array_obj.as_ref()), + JValueGen::Object(schema_obj.as_ref()), + JValueGen::Int(num_data_cols_jint) + ) -> i32)? + }; + + for i in 0..num_partition_cols { + let array_data = ArrayData::from_spark((array_addrs[i], schema_addrs[i]))?; + let array = make_array(array_data); + // Partition columns come from JVM mutable buffers, must copy + inputs.push(copy_array(&array)); + + unsafe { + Rc::from_raw(array_addrs[i] as *const FFI_ArrowArray); + Rc::from_raw(schema_addrs[i] as *const FFI_ArrowSchema); + } + } + } + + Ok(InputBatch::new(inputs, Some(num_rows))) + } + /// Allocates Arrow FFI structures and calls JNI to get the next batch data. /// Returns the number of rows and the allocated array/schema addresses. fn allocate_and_fetch_batch( diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 44ff20a44f..b9be681394 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1124,6 +1124,8 @@ impl PhysicalPlanner { &scan.source, data_types, scan.arrow_ffi_safe, + scan.native_batch_passthrough, + scan.num_data_columns as usize, )?; Ok(( @@ -3473,6 +3475,8 @@ mod tests { }], source: "".to_string(), arrow_ffi_safe: false, + native_batch_passthrough: false, + num_data_columns: 0, })), }; @@ -3547,6 +3551,8 @@ mod tests { }], source: "".to_string(), arrow_ffi_safe: false, + native_batch_passthrough: false, + num_data_columns: 0, })), }; @@ -3754,6 +3760,8 @@ mod tests { fields: vec![create_proto_datatype()], source: "".to_string(), arrow_ffi_safe: false, + native_batch_passthrough: false, + num_data_columns: 0, })), } } @@ -3797,6 +3805,8 @@ mod tests { ], source: "".to_string(), arrow_ffi_safe: false, + native_batch_passthrough: false, + num_data_columns: 0, })), }; @@ -3913,6 +3923,8 @@ mod tests { ], source: "".to_string(), arrow_ffi_safe: false, + native_batch_passthrough: false, + num_data_columns: 0, })), }; diff --git a/native/core/src/jvm_bridge/batch_iterator.rs b/native/core/src/jvm_bridge/batch_iterator.rs index 2824bdbfc6..daa5837873 100644 --- a/native/core/src/jvm_bridge/batch_iterator.rs +++ b/native/core/src/jvm_bridge/batch_iterator.rs @@ -35,6 +35,10 @@ pub struct CometBatchIterator<'a> { pub method_has_selection_vectors_ret: ReturnType, pub method_export_selection_indices: JMethodID, pub method_export_selection_indices_ret: ReturnType, + pub method_advance_passthrough: JMethodID, + pub method_advance_passthrough_ret: ReturnType, + pub method_next_partition_columns_only: JMethodID, + pub method_next_partition_columns_only_ret: ReturnType, } impl<'a> CometBatchIterator<'a> { @@ -61,6 +65,18 @@ impl<'a> CometBatchIterator<'a> { "([J[J)I", )?, method_export_selection_indices_ret: ReturnType::Primitive(Primitive::Int), + method_advance_passthrough: env.get_method_id( + Self::JVM_CLASS, + "advancePassthrough", + "()J", + )?, + method_advance_passthrough_ret: ReturnType::Primitive(Primitive::Long), + method_next_partition_columns_only: env.get_method_id( + Self::JVM_CLASS, + "nextPartitionColumnsOnly", + "([J[JI)I", + )?, + method_next_partition_columns_only_ret: ReturnType::Primitive(Primitive::Int), }) } } diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index c8a480e97a..595137bb19 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -601,16 +601,16 @@ enum ParquetReaderState { Complete, } /// Parquet read context maintained across multiple JNI calls. -struct BatchContext { +pub struct BatchContext { native_plan: Arc, metrics_node: Arc, batch_stream: Option, - current_batch: Option, + pub current_batch: Option, reader_state: ParquetReaderState, } #[inline] -fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut BatchContext, CometError> { +pub fn get_batch_context<'a>(handle: i64) -> Result<&'a mut BatchContext, CometError> { unsafe { (handle as *mut BatchContext) .as_mut() diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 73c087cf36..af2e226988 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -83,6 +83,12 @@ message Scan { string source = 2; // Whether native code can assume ownership of batches that it receives bool arrow_ffi_safe = 3; + // When true, data columns are read directly from the native reader's + // BatchContext instead of through JVM FFI. Only partition columns + // cross the JVM boundary. + bool native_batch_passthrough = 4; + // Number of data columns (from native reader). Remaining columns are partition cols. + int32 num_data_columns = 5; } message NativeScan { diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java b/spark/src/main/java/org/apache/comet/CometBatchIterator.java index 4f45f98a6b..05f8225f84 100644 --- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java +++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java @@ -23,7 +23,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.comet.parquet.NativeBatchReader; import org.apache.comet.vector.CometSelectionVector; +import org.apache.comet.vector.CometVector; import org.apache.comet.vector.NativeUtil; /** @@ -111,6 +113,46 @@ public boolean hasSelectionVectors() { return true; } + /** + * Advance to next batch in passthrough mode. Data columns stay in native BatchContext; only + * partition columns are exported via FFI. + * + * @return native reader handle, or 0 for EOF + */ + public long advancePassthrough() { + previousBatch = null; + + if (currentBatch == null) { + if (input.hasNext()) { + currentBatch = input.next(); + } + } + if (currentBatch == null) { + return 0; // EOF + } + long handle = NativeBatchReader.CURRENT_READER_HANDLE.get(); + previousBatch = currentBatch; + currentBatch = null; + return handle; + } + + /** + * Export only partition columns (columns at indices >= numDataCols). + * + * @param arrayAddrs The addresses of the ArrowArray structures for partition columns + * @param schemaAddrs The addresses of the ArrowSchema structures for partition columns + * @param numDataCols Number of data columns to skip + * @return the number of rows, or -1 if no batch + */ + public int nextPartitionColumnsOnly(long[] arrayAddrs, long[] schemaAddrs, int numDataCols) { + if (previousBatch == null) return -1; + for (int i = numDataCols; i < previousBatch.numCols(); i++) { + CometVector vec = (CometVector) previousBatch.column(i); + nativeUtil.exportSingleVector(vec, arrayAddrs[i - numDataCols], schemaAddrs[i - numDataCols]); + } + return previousBatch.numRows(); + } + /** * Export selection indices for all columns when they are selection vectors. * diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala index ca9dbdad7c..1a2c5b6206 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala @@ -21,9 +21,10 @@ package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.comet.{CometNativeExec, CometSinkPlaceHolder} +import org.apache.spark.sql.comet.{CometNativeExec, CometScanExec, CometSinkPlaceHolder} import org.apache.spark.sql.execution.SparkPlan +import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.ConfigEntry import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} @@ -63,6 +64,16 @@ abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] { } scanBuilder.setArrowFfiSafe(isFfiSafe) + // Enable native batch passthrough for native_iceberg_compat V1 scans + op match { + case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + scanBuilder.setNativeBatchPassthrough(true) + val numDataCols = + scan.output.length - scan.relation.partitionSchema.length + scanBuilder.setNumDataColumns(numDataCols) + case _ => + } + val scanTypes = op.output.flatten { attr => serializeDataType(attr.dataType) } From fef0576d8cead3fcf46a71b00ef1673f6b5c58ca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 11:07:42 -0700 Subject: [PATCH 4/4] Revert "feat: Native batch passthrough for native_iceberg_compat V1 scans" This reverts commit 75750af74e06ecfe19c9505c11bc7ac9aedce974. --- .../comet/parquet/NativeBatchReader.java | 16 --- native/core/src/execution/operators/scan.rs | 123 +----------------- native/core/src/execution/planner.rs | 12 -- native/core/src/jvm_bridge/batch_iterator.rs | 16 --- native/core/src/parquet/mod.rs | 6 +- native/proto/src/proto/operator.proto | 6 - .../org/apache/comet/CometBatchIterator.java | 42 ------ .../comet/serde/operator/CometSink.scala | 13 +- 8 files changed, 10 insertions(+), 224 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 17a758ffb3..d10a8932be 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -154,13 +154,6 @@ public URI pathUri() throws URISyntaxException { protected static final BufferAllocator ALLOCATOR = new RootAllocator(); private NativeUtil nativeUtil = new NativeUtil(); - /** - * Thread-local holding the native BatchContext handle of the current reader. Set during - * nextBatch() in passthrough mode so that CometBatchIterator.advancePassthrough() can retrieve - * it. - */ - public static final ThreadLocal CURRENT_READER_HANDLE = ThreadLocal.withInitial(() -> 0L); - protected Configuration conf; protected int capacity; protected boolean isCaseSensitive; @@ -895,10 +888,6 @@ private boolean containsPath(Type parquetType, String[] path, int depth) { return false; } - public long getHandle() { - return this.handle; - } - public void setSparkSchema(StructType schema) { this.sparkSchema = schema; } @@ -967,11 +956,6 @@ public boolean nextBatch() throws IOException { if (batchSize == 0) return false; - // Set the thread-local handle so CometBatchIterator.advancePassthrough() can retrieve it. - // This is always set after a successful loadNextBatch() regardless of whether passthrough - // mode will be used — the Rust ScanExec decides whether to use it. - CURRENT_READER_HANDLE.set(this.handle); - long totalDecodeTime = 0, totalLoadTime = 0; for (int i = 0; i < columnReaders.length; i++) { AbstractColumnReader reader = columnReaders[i]; diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 723656b05a..2543705fb0 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -16,7 +16,6 @@ // under the License. use crate::execution::operators::{copy_array, copy_or_unpack_array, CopyMode}; -use crate::parquet::get_batch_context; use crate::{ errors::CometError, execution::{ @@ -80,11 +79,6 @@ pub struct ScanExec { baseline_metrics: BaselineMetrics, /// Whether native code can assume ownership of batches that it receives arrow_ffi_safe: bool, - /// When true, data columns are read directly from the native reader's - /// BatchContext instead of through JVM FFI (zero-copy). - native_batch_passthrough: bool, - /// Number of data columns from native reader. Remaining are partition columns. - num_data_columns: usize, } impl ScanExec { @@ -94,8 +88,6 @@ impl ScanExec { input_source_description: &str, data_types: Vec, arrow_ffi_safe: bool, - native_batch_passthrough: bool, - num_data_columns: usize, ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); @@ -123,8 +115,6 @@ impl ScanExec { baseline_metrics, schema, arrow_ffi_safe, - native_batch_passthrough, - num_data_columns, }) } @@ -153,21 +143,12 @@ impl ScanExec { let mut current_batch = self.batch.try_lock().unwrap(); if current_batch.is_none() { - let next_batch = if self.native_batch_passthrough { - ScanExec::get_next_passthrough( - self.exec_context_id, - self.input_source.as_ref().unwrap().as_obj(), - self.num_data_columns, - self.data_types.len(), - )? - } else { - ScanExec::get_next( - self.exec_context_id, - self.input_source.as_ref().unwrap().as_obj(), - self.data_types.len(), - self.arrow_ffi_safe, - )? - }; + let next_batch = ScanExec::get_next( + self.exec_context_id, + self.input_source.as_ref().unwrap().as_obj(), + self.data_types.len(), + self.arrow_ffi_safe, + )?; *current_batch = Some(next_batch); } @@ -278,98 +259,6 @@ impl ScanExec { Ok(InputBatch::new(inputs, Some(actual_num_rows))) } - /// Passthrough mode: data columns are read directly from native BatchContext - /// (zero-copy Arc::clone). Only partition columns are imported from JVM via FFI. - fn get_next_passthrough( - exec_context_id: i64, - iter: &JObject, - num_data_cols: usize, - num_total_cols: usize, - ) -> Result { - if exec_context_id == TEST_EXEC_CONTEXT_ID { - return Ok(InputBatch::EOF); - } - - if iter.is_null() { - return Err(CometError::from(ExecutionError::GeneralError(format!( - "Null batch iterator object. Plan id: {exec_context_id}" - )))); - } - - let mut env = JVMClasses::get_env()?; - - // 1. Advance reader; get native batch handle (data stays in Rust) - let handle: i64 = unsafe { - jni_call!(&mut env, - comet_batch_iterator(iter).advance_passthrough() -> i64)? - }; - if handle == 0 { - return Ok(InputBatch::EOF); - } - - // 2. Get data columns from native BatchContext (zero-copy) - let context = get_batch_context(handle)?; - let batch = context.current_batch.as_ref().ok_or_else(|| { - CometError::from(ExecutionError::GeneralError( - "No current batch in BatchContext".to_string(), - )) - })?; - - let num_rows = batch.num_rows(); - let mut inputs: Vec = Vec::with_capacity(num_total_cols); - - for i in 0..num_data_cols { - // Zero-copy: just increment the Arc reference count - inputs.push(Arc::clone(batch.column(i))); - } - - // 3. Import partition columns from JVM FFI (if any) - let num_partition_cols = num_total_cols - num_data_cols; - if num_partition_cols > 0 { - let mut array_addrs = Vec::with_capacity(num_partition_cols); - let mut schema_addrs = Vec::with_capacity(num_partition_cols); - - for _ in 0..num_partition_cols { - let arrow_array = Rc::new(FFI_ArrowArray::empty()); - let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); - array_addrs.push(Rc::into_raw(arrow_array) as i64); - schema_addrs.push(Rc::into_raw(arrow_schema) as i64); - } - - let long_array_addrs = env.new_long_array(num_partition_cols as jsize)?; - let long_schema_addrs = env.new_long_array(num_partition_cols as jsize)?; - env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?; - env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?; - - let array_obj = JObject::from(long_array_addrs); - let schema_obj = JObject::from(long_schema_addrs); - let num_data_cols_jint = num_data_cols as i32; - - let _part_rows: i32 = unsafe { - jni_call!(&mut env, - comet_batch_iterator(iter).next_partition_columns_only( - JValueGen::Object(array_obj.as_ref()), - JValueGen::Object(schema_obj.as_ref()), - JValueGen::Int(num_data_cols_jint) - ) -> i32)? - }; - - for i in 0..num_partition_cols { - let array_data = ArrayData::from_spark((array_addrs[i], schema_addrs[i]))?; - let array = make_array(array_data); - // Partition columns come from JVM mutable buffers, must copy - inputs.push(copy_array(&array)); - - unsafe { - Rc::from_raw(array_addrs[i] as *const FFI_ArrowArray); - Rc::from_raw(schema_addrs[i] as *const FFI_ArrowSchema); - } - } - } - - Ok(InputBatch::new(inputs, Some(num_rows))) - } - /// Allocates Arrow FFI structures and calls JNI to get the next batch data. /// Returns the number of rows and the allocated array/schema addresses. fn allocate_and_fetch_batch( diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b9be681394..44ff20a44f 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1124,8 +1124,6 @@ impl PhysicalPlanner { &scan.source, data_types, scan.arrow_ffi_safe, - scan.native_batch_passthrough, - scan.num_data_columns as usize, )?; Ok(( @@ -3475,8 +3473,6 @@ mod tests { }], source: "".to_string(), arrow_ffi_safe: false, - native_batch_passthrough: false, - num_data_columns: 0, })), }; @@ -3551,8 +3547,6 @@ mod tests { }], source: "".to_string(), arrow_ffi_safe: false, - native_batch_passthrough: false, - num_data_columns: 0, })), }; @@ -3760,8 +3754,6 @@ mod tests { fields: vec![create_proto_datatype()], source: "".to_string(), arrow_ffi_safe: false, - native_batch_passthrough: false, - num_data_columns: 0, })), } } @@ -3805,8 +3797,6 @@ mod tests { ], source: "".to_string(), arrow_ffi_safe: false, - native_batch_passthrough: false, - num_data_columns: 0, })), }; @@ -3923,8 +3913,6 @@ mod tests { ], source: "".to_string(), arrow_ffi_safe: false, - native_batch_passthrough: false, - num_data_columns: 0, })), }; diff --git a/native/core/src/jvm_bridge/batch_iterator.rs b/native/core/src/jvm_bridge/batch_iterator.rs index daa5837873..2824bdbfc6 100644 --- a/native/core/src/jvm_bridge/batch_iterator.rs +++ b/native/core/src/jvm_bridge/batch_iterator.rs @@ -35,10 +35,6 @@ pub struct CometBatchIterator<'a> { pub method_has_selection_vectors_ret: ReturnType, pub method_export_selection_indices: JMethodID, pub method_export_selection_indices_ret: ReturnType, - pub method_advance_passthrough: JMethodID, - pub method_advance_passthrough_ret: ReturnType, - pub method_next_partition_columns_only: JMethodID, - pub method_next_partition_columns_only_ret: ReturnType, } impl<'a> CometBatchIterator<'a> { @@ -65,18 +61,6 @@ impl<'a> CometBatchIterator<'a> { "([J[J)I", )?, method_export_selection_indices_ret: ReturnType::Primitive(Primitive::Int), - method_advance_passthrough: env.get_method_id( - Self::JVM_CLASS, - "advancePassthrough", - "()J", - )?, - method_advance_passthrough_ret: ReturnType::Primitive(Primitive::Long), - method_next_partition_columns_only: env.get_method_id( - Self::JVM_CLASS, - "nextPartitionColumnsOnly", - "([J[JI)I", - )?, - method_next_partition_columns_only_ret: ReturnType::Primitive(Primitive::Int), }) } } diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 595137bb19..c8a480e97a 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -601,16 +601,16 @@ enum ParquetReaderState { Complete, } /// Parquet read context maintained across multiple JNI calls. -pub struct BatchContext { +struct BatchContext { native_plan: Arc, metrics_node: Arc, batch_stream: Option, - pub current_batch: Option, + current_batch: Option, reader_state: ParquetReaderState, } #[inline] -pub fn get_batch_context<'a>(handle: i64) -> Result<&'a mut BatchContext, CometError> { +fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut BatchContext, CometError> { unsafe { (handle as *mut BatchContext) .as_mut() diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index af2e226988..73c087cf36 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -83,12 +83,6 @@ message Scan { string source = 2; // Whether native code can assume ownership of batches that it receives bool arrow_ffi_safe = 3; - // When true, data columns are read directly from the native reader's - // BatchContext instead of through JVM FFI. Only partition columns - // cross the JVM boundary. - bool native_batch_passthrough = 4; - // Number of data columns (from native reader). Remaining columns are partition cols. - int32 num_data_columns = 5; } message NativeScan { diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java b/spark/src/main/java/org/apache/comet/CometBatchIterator.java index 05f8225f84..4f45f98a6b 100644 --- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java +++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java @@ -23,9 +23,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.comet.parquet.NativeBatchReader; import org.apache.comet.vector.CometSelectionVector; -import org.apache.comet.vector.CometVector; import org.apache.comet.vector.NativeUtil; /** @@ -113,46 +111,6 @@ public boolean hasSelectionVectors() { return true; } - /** - * Advance to next batch in passthrough mode. Data columns stay in native BatchContext; only - * partition columns are exported via FFI. - * - * @return native reader handle, or 0 for EOF - */ - public long advancePassthrough() { - previousBatch = null; - - if (currentBatch == null) { - if (input.hasNext()) { - currentBatch = input.next(); - } - } - if (currentBatch == null) { - return 0; // EOF - } - long handle = NativeBatchReader.CURRENT_READER_HANDLE.get(); - previousBatch = currentBatch; - currentBatch = null; - return handle; - } - - /** - * Export only partition columns (columns at indices >= numDataCols). - * - * @param arrayAddrs The addresses of the ArrowArray structures for partition columns - * @param schemaAddrs The addresses of the ArrowSchema structures for partition columns - * @param numDataCols Number of data columns to skip - * @return the number of rows, or -1 if no batch - */ - public int nextPartitionColumnsOnly(long[] arrayAddrs, long[] schemaAddrs, int numDataCols) { - if (previousBatch == null) return -1; - for (int i = numDataCols; i < previousBatch.numCols(); i++) { - CometVector vec = (CometVector) previousBatch.column(i); - nativeUtil.exportSingleVector(vec, arrayAddrs[i - numDataCols], schemaAddrs[i - numDataCols]); - } - return previousBatch.numRows(); - } - /** * Export selection indices for all columns when they are selection vectors. * diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala index 1a2c5b6206..ca9dbdad7c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala @@ -21,10 +21,9 @@ package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.comet.{CometNativeExec, CometScanExec, CometSinkPlaceHolder} +import org.apache.spark.sql.comet.{CometNativeExec, CometSinkPlaceHolder} import org.apache.spark.sql.execution.SparkPlan -import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.ConfigEntry import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} @@ -64,16 +63,6 @@ abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] { } scanBuilder.setArrowFfiSafe(isFfiSafe) - // Enable native batch passthrough for native_iceberg_compat V1 scans - op match { - case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => - scanBuilder.setNativeBatchPassthrough(true) - val numDataCols = - scan.output.length - scan.relation.partitionSchema.length - scanBuilder.setNumDataColumns(numDataCols) - case _ => - } - val scanTypes = op.output.flatten { attr => serializeDataType(attr.dataType) }