From 3830969bdb41e403dfe7815a6cf9a02ef4d4205e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Feb 2026 11:22:02 -0700 Subject: [PATCH 01/10] docs: fix inaccurate claim about mutable buffers in parquet scan documentation The documentation incorrectly claimed that native_iceberg_compat "removes the use of reusable mutable-buffers". In reality, both native_comet and native_iceberg_compat use reusable mutable buffers when transferring data via Arrow FFI. This commit: - Removes the inaccurate claim - Replaces it with accurate description of Parquet decoding delegation - Adds a note explaining the actual mutable buffer behavior - Links to the FFI documentation for details on arrow_ffi_safe flag Co-Authored-By: Claude Opus 4.5 --- docs/source/contributor-guide/parquet_scans.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 626b65b4eb..babbd3cb2c 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -37,9 +37,15 @@ implementation: - Leverages the DataFusion community's ongoing improvements to `DataSourceExec` - Provides support for reading complex types (structs, arrays, and maps) -- Removes the use of reusable mutable-buffers in Comet, which is complex to maintain +- Delegates Parquet decoding to native Rust code rather than JVM-side decoding - Improves performance +> **Note on mutable buffers:** Both `native_comet` and `native_iceberg_compat` use reusable mutable buffers +> when transferring data from JVM to native code via Arrow FFI. The `native_iceberg_compat` implementation +> still reuses `CometVector` arrays across batches in `NativeBatchReader`. This means native operators that +> buffer data (such as `SortExec` or `ShuffleWriterExec`) must perform deep copies to avoid data corruption. +> See the [FFI documentation](ffi.md) for details on the `arrow_ffi_safe` flag and ownership semantics. + The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: - When reading Parquet files written by systems other than Spark that contain columns with the logical type `UINT_8` From 64632730d809a6da03f5cf98eaa15c4f5ca3f558 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Feb 2026 14:13:12 -0700 Subject: [PATCH 02/10] Update parquet_scans.md with mutable buffers note Clarified note on mutable buffers and updated details on `native_iceberg_compat` implementation. --- docs/source/contributor-guide/parquet_scans.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index babbd3cb2c..04f517315a 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -41,8 +41,7 @@ implementation: - Improves performance > **Note on mutable buffers:** Both `native_comet` and `native_iceberg_compat` use reusable mutable buffers -> when transferring data from JVM to native code via Arrow FFI. The `native_iceberg_compat` implementation -> still reuses `CometVector` arrays across batches in `NativeBatchReader`. This means native operators that +> when transferring data from JVM to native code via Arrow FFI. The `native_iceberg_compat` implementation uses DataFusion's native Parquet reader for data columns, bypassing Comet's mutable buffer infrastructure entirely. However, partition columns still use `ConstantColumnReader`, which relies on Comet's mutable buffers that are reused across batches. This means native operators that > buffer data (such as `SortExec` or `ShuffleWriterExec`) must perform deep copies to avoid data corruption. > See the [FFI documentation](ffi.md) for details on the `arrow_ffi_safe` flag and ownership semantics. From 0cc8fbeddee5c7b9fd5dcb7dcceeb2aa3f833d0f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Feb 2026 14:18:14 -0700 Subject: [PATCH 03/10] format --- docs/source/contributor-guide/parquet_scans.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 04f517315a..bbacff4d93 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -41,8 +41,7 @@ implementation: - Improves performance > **Note on mutable buffers:** Both `native_comet` and `native_iceberg_compat` use reusable mutable buffers -> when transferring data from JVM to native code via Arrow FFI. The `native_iceberg_compat` implementation uses DataFusion's native Parquet reader for data columns, bypassing Comet's mutable buffer infrastructure entirely. However, partition columns still use `ConstantColumnReader`, which relies on Comet's mutable buffers that are reused across batches. This means native operators that -> buffer data (such as `SortExec` or `ShuffleWriterExec`) must perform deep copies to avoid data corruption. +> when transferring data from JVM to native code via Arrow FFI. The `native_iceberg_compat` implementation uses DataFusion's native Parquet reader for data columns, bypassing Comet's mutable buffer infrastructure entirely. However, partition columns still use `ConstantColumnReader`, which relies on Comet's mutable buffers that are reused across batches. This means native operators that buffer data (such as `SortExec` or `ShuffleWriterExec`) must perform deep copies to avoid data corruption. > See the [FFI documentation](ffi.md) for details on the `arrow_ffi_safe` flag and ownership semantics. The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: From 33d87cd4c12a28b16abe6f2764e1e6c97b4bc345 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Feb 2026 16:05:10 -0700 Subject: [PATCH 04/10] Replace mutable buffers with immutable Arrow vectors in NativeBatchReader Add ImmutableConstantColumnReader that creates Arrow vectors directly in Java without using native Rust mutable buffers. This is used for partition columns and missing columns in NativeBatchReader. Key changes: - New ImmutableConstantColumnReader creates Arrow vectors using Arrow Java APIs, supporting primitive types (Boolean, Byte, Short, Integer, Long, Float, Double, String, Binary, Date, Timestamp, Decimal, Null) - NativeBatchReader now uses ImmutableConstantColumnReader instead of ConstantColumnReader for partition and missing columns - CometScanRule checks partition column types at planning time and falls back to Spark if complex types (StructType, ArrayType, MapType) are used, since ImmutableConstantColumnReader only supports primitives Co-Authored-By: Claude Opus 4.5 --- .../ImmutableConstantColumnReader.java | 358 ++++++++++++++++++ .../comet/parquet/NativeBatchReader.java | 9 +- .../apache/comet/rules/CometScanRule.scala | 32 +- 3 files changed, 392 insertions(+), 7 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java diff --git a/common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java new file mode 100644 index 0000000000..55f75446f9 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.*; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +import org.apache.comet.vector.CometPlainVector; +import org.apache.comet.vector.CometVector; + +/** + * A column reader that returns constant vectors without using native mutable buffers. This is used + * for reading partition columns and missing columns in NativeBatchReader. + * + *

Unlike {@link ConstantColumnReader} which uses native Rust code with mutable buffers, this + * implementation creates Arrow vectors directly in Java using Arrow's immutable buffer APIs. + */ +public class ImmutableConstantColumnReader extends AbstractColumnReader { + + /** + * Checks if the given Spark DataType is supported by this reader. This is used at query planning + * time to determine if NativeBatchReader can handle the partition schema or if it should fall + * back to Spark. + * + * @param type the Spark DataType to check + * @return true if the type is supported, false otherwise + */ + public static boolean isTypeSupported(DataType type) { + if (type == DataTypes.BooleanType + || type == DataTypes.ByteType + || type == DataTypes.ShortType + || type == DataTypes.IntegerType + || type == DataTypes.LongType + || type == DataTypes.FloatType + || type == DataTypes.DoubleType + || type == DataTypes.StringType + || type == DataTypes.BinaryType + || type == DataTypes.DateType + || type == DataTypes.TimestampType + || type == TimestampNTZType$.MODULE$ + || type == DataTypes.NullType + || type instanceof DecimalType) { + return true; + } + // Complex types (StructType, ArrayType, MapType) and other types are not supported + return false; + } + + private final BufferAllocator allocator = new RootAllocator(); + + /** Whether all the values in this constant column are nulls */ + private boolean isNull; + + /** The constant value */ + private Object value; + + /** The current vector */ + private CometVector vector; + + /** The Arrow field type for this column */ + private final Field arrowField; + + /** Constructor for missing columns with default values */ + ImmutableConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) { + super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, false); + this.batchSize = batchSize; + this.arrowField = toArrowField(field); + this.value = + ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[ + 0]; + this.isNull = (this.value == null); + } + + /** Constructor for partition columns */ + ImmutableConstantColumnReader( + StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) { + super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, false); + this.batchSize = batchSize; + this.arrowField = toArrowField(field); + this.value = values.get(index, field.dataType()); + this.isNull = (this.value == null); + } + + @Override + public void setBatchSize(int batchSize) { + close(); + this.batchSize = batchSize; + } + + @Override + public void readBatch(int total) { + if (vector != null) { + vector.close(); + vector = null; + } + vector = createConstantVector(total); + } + + @Override + public CometVector currentBatch() { + return vector; + } + + @Override + public void close() { + if (vector != null) { + vector.close(); + vector = null; + } + } + + @Override + protected void initNative() { + // No native initialization needed - we create vectors purely in Java + nativeHandle = 0; + } + + /** Creates a constant Arrow vector with the specified number of rows. */ + private CometVector createConstantVector(int numRows) { + ValueVector arrowVector = createArrowVector(numRows); + return new CometPlainVector(arrowVector, useDecimal128); + } + + /** Creates an Arrow vector filled with constant values. */ + private ValueVector createArrowVector(int numRows) { + if (isNull) { + return createNullVector(numRows); + } + + if (type == DataTypes.BooleanType) { + return createBooleanVector(numRows, (Boolean) value); + } else if (type == DataTypes.ByteType) { + return createByteVector(numRows, (Byte) value); + } else if (type == DataTypes.ShortType) { + return createShortVector(numRows, (Short) value); + } else if (type == DataTypes.IntegerType) { + return createIntVector(numRows, (Integer) value); + } else if (type == DataTypes.LongType) { + return createLongVector(numRows, (Long) value); + } else if (type == DataTypes.FloatType) { + return createFloatVector(numRows, (Float) value); + } else if (type == DataTypes.DoubleType) { + return createDoubleVector(numRows, (Double) value); + } else if (type == DataTypes.StringType) { + return createStringVector(numRows, (UTF8String) value); + } else if (type == DataTypes.BinaryType) { + return createBinaryVector(numRows, (byte[]) value); + } else if (type == DataTypes.DateType) { + return createDateVector(numRows, (Integer) value); + } else if (type == DataTypes.TimestampType || type == TimestampNTZType$.MODULE$) { + return createTimestampVector(numRows, (Long) value); + } else if (type instanceof DecimalType) { + return createDecimalVector(numRows, (Decimal) value, (DecimalType) type); + } else { + throw new UnsupportedOperationException("Unsupported Spark type: " + type); + } + } + + private ValueVector createNullVector(int numRows) { + NullVector vector = new NullVector(arrowField.getName(), numRows); + return vector; + } + + private ValueVector createBooleanVector(int numRows, boolean value) { + BitVector vector = new BitVector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value ? 1 : 0); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createByteVector(int numRows, byte value) { + TinyIntVector vector = new TinyIntVector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createShortVector(int numRows, short value) { + SmallIntVector vector = new SmallIntVector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createIntVector(int numRows, int value) { + IntVector vector = new IntVector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createLongVector(int numRows, long value) { + BigIntVector vector = new BigIntVector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createFloatVector(int numRows, float value) { + Float4Vector vector = new Float4Vector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createDoubleVector(int numRows, double value) { + Float8Vector vector = new Float8Vector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createStringVector(int numRows, UTF8String value) { + VarCharVector vector = new VarCharVector(arrowField, allocator); + byte[] bytes = value.getBytes(); + vector.allocateNew((long) bytes.length * numRows, numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, bytes); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createBinaryVector(int numRows, byte[] value) { + VarBinaryVector vector = new VarBinaryVector(arrowField, allocator); + vector.allocateNew((long) value.length * numRows, numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createDateVector(int numRows, int value) { + DateDayVector vector = new DateDayVector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createTimestampVector(int numRows, long value) { + TimeStampMicroTZVector vector = new TimeStampMicroTZVector(arrowField, allocator); + vector.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + vector.set(i, value); + } + vector.setValueCount(numRows); + return vector; + } + + private ValueVector createDecimalVector(int numRows, Decimal value, DecimalType dt) { + DecimalVector vector = + new DecimalVector(arrowField.getName(), allocator, dt.precision(), dt.scale()); + vector.allocateNew(numRows); + + java.math.BigDecimal bigDecimal = value.toJavaBigDecimal(); + for (int i = 0; i < numRows; i++) { + vector.set(i, bigDecimal); + } + vector.setValueCount(numRows); + return vector; + } + + /** Converts a Spark StructField to an Arrow Field. */ + private Field toArrowField(StructField field) { + ArrowType arrowType = toArrowType(field.dataType()); + FieldType fieldType = new FieldType(field.nullable(), arrowType, null); + return new Field(field.name(), fieldType, null); + } + + /** Converts a Spark DataType to an Arrow ArrowType. */ + private ArrowType toArrowType(DataType type) { + if (type == DataTypes.BooleanType) { + return ArrowType.Bool.INSTANCE; + } else if (type == DataTypes.ByteType) { + return new ArrowType.Int(8, true); + } else if (type == DataTypes.ShortType) { + return new ArrowType.Int(16, true); + } else if (type == DataTypes.IntegerType) { + return new ArrowType.Int(32, true); + } else if (type == DataTypes.LongType) { + return new ArrowType.Int(64, true); + } else if (type == DataTypes.FloatType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + } else if (type == DataTypes.DoubleType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + } else if (type == DataTypes.StringType) { + return ArrowType.Utf8.INSTANCE; + } else if (type == DataTypes.BinaryType) { + return ArrowType.Binary.INSTANCE; + } else if (type == DataTypes.DateType) { + return new ArrowType.Date(DateUnit.DAY); + } else if (type == DataTypes.TimestampType) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + } else if (type == TimestampNTZType$.MODULE$) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else if (type instanceof DecimalType) { + DecimalType dt = (DecimalType) type; + return new ArrowType.Decimal(dt.precision(), dt.scale(), 128); + } else if (type == DataTypes.NullType) { + return ArrowType.Null.INSTANCE; + } else { + throw new UnsupportedOperationException("Unsupported Spark type: " + type); + } + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index d10a8932be..e49aad383c 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -473,8 +473,8 @@ public void init() throws Throwable { + filePath); } if (field.isPrimitive()) { - ConstantColumnReader reader = - new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128); + ImmutableConstantColumnReader reader = + new ImmutableConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128); columnReaders[i] = reader; missingColumns[i] = true; } else { @@ -492,8 +492,9 @@ public void init() throws Throwable { for (int i = fields.size(); i < columnReaders.length; i++) { int fieldIndex = i - fields.size(); StructField field = partitionFields[fieldIndex]; - ConstantColumnReader reader = - new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128); + ImmutableConstantColumnReader reader = + new ImmutableConstantColumnReader( + field, capacity, partitionValues, fieldIndex, useDecimal128); columnReaders[i] = reader; } } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 4291e3fb65..5f3cb890bb 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -47,7 +47,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, wi import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.objectstore.NativeConfig -import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet} +import org.apache.comet.parquet.{CometParquetScan, ImmutableConstantColumnReader, Native, SupportsComet} import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported} import org.apache.comet.serde.operator.CometNativeScan import org.apache.comet.shims.CometTypeShim @@ -660,7 +660,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com val schemaSupported = typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) val partitionSchemaSupported = - typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) + typeChecker.isPartitionSchemaSupported(partitionSchema, fallbackReasons) val cometExecEnabled = COMET_EXEC_ENABLED.get() if (!cometExecEnabled) { @@ -698,7 +698,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com return false } val partitionSchemaSupported = - typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons) + typeChecker.isPartitionSchemaSupported(r.partitionSchema, fallbackReasons) if (!partitionSchemaSupported) { withInfo( scanExec, @@ -741,6 +741,32 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C super.isTypeSupported(dt, name, fallbackReasons) } } + + /** + * Checks if the partition schema is supported for constant column readers. For + * native_iceberg_compat scan, partition columns use ImmutableConstantColumnReader which only + * supports primitive types. + */ + def isPartitionSchemaSupported( + partitionSchema: StructType, + fallbackReasons: ListBuffer[String]): Boolean = { + if (scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { + // For native_iceberg_compat, partition columns must be supported by + // ImmutableConstantColumnReader which only supports primitive types + partitionSchema.fields.forall { field => + if (ImmutableConstantColumnReader.isTypeSupported(field.dataType)) { + true + } else { + fallbackReasons += s"Partition column '${field.name}' has unsupported type " + + s"${field.dataType} for ImmutableConstantColumnReader in $scanImpl scan" + false + } + } + } else { + // For other scan implementations, use the standard type check + isSchemaSupported(partitionSchema, fallbackReasons) + } + } } object CometScanRule extends Logging { From 6bdfc9ec1778493da30dcdbef9ba48a7d51320bf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 3 Feb 2026 16:12:10 -0700 Subject: [PATCH 05/10] Revert documentation changes to parquet_scans.md Co-Authored-By: Claude Opus 4.5 --- docs/source/contributor-guide/parquet_scans.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index bbacff4d93..626b65b4eb 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -37,13 +37,9 @@ implementation: - Leverages the DataFusion community's ongoing improvements to `DataSourceExec` - Provides support for reading complex types (structs, arrays, and maps) -- Delegates Parquet decoding to native Rust code rather than JVM-side decoding +- Removes the use of reusable mutable-buffers in Comet, which is complex to maintain - Improves performance -> **Note on mutable buffers:** Both `native_comet` and `native_iceberg_compat` use reusable mutable buffers -> when transferring data from JVM to native code via Arrow FFI. The `native_iceberg_compat` implementation uses DataFusion's native Parquet reader for data columns, bypassing Comet's mutable buffer infrastructure entirely. However, partition columns still use `ConstantColumnReader`, which relies on Comet's mutable buffers that are reused across batches. This means native operators that buffer data (such as `SortExec` or `ShuffleWriterExec`) must perform deep copies to avoid data corruption. -> See the [FFI documentation](ffi.md) for details on the `arrow_ffi_safe` flag and ownership semantics. - The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: - When reading Parquet files written by systems other than Spark that contain columns with the logical type `UINT_8` From 1921e6c00a4899a1ad6ea6f642a9f1ccfab48c68 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 07:55:57 -0700 Subject: [PATCH 06/10] Export constant columns as 1-element scalar arrays for native expansion Instead of materializing full N-element Arrow arrays for partition and missing columns, export 1-element arrays from JVM and expand them on the native side using ScalarValue. This avoids O(N) memory allocation and copying for constant columns. - Add CometConstantVector: stores a single value, lazily creates a 1-element Arrow vector for FFI export, returns constant for all rowIds - Modify ImmutableConstantColumnReader to produce CometConstantVector - Add CometConstantVector case in NativeUtil.exportBatch() to skip row count validation for 1-element vectors - In scan.rs, detect 1-element arrays and expand via ScalarValue when actual_num_rows > 1; skip take() for scalar columns with selection vectors since constants are unaffected by row deletion Co-Authored-By: Claude Opus 4.5 --- .../ImmutableConstantColumnReader.java | 177 +---------- .../comet/vector/CometConstantVector.java | 285 ++++++++++++++++++ .../org/apache/comet/vector/NativeUtil.scala | 14 + native/core/src/execution/operators/scan.rs | 38 ++- 4 files changed, 331 insertions(+), 183 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/vector/CometConstantVector.java diff --git a/common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java index 55f75446f9..3cb475cd74 100644 --- a/common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ImmutableConstantColumnReader.java @@ -19,9 +19,6 @@ package org.apache.comet.parquet; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.*; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; @@ -31,9 +28,8 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import org.apache.spark.sql.types.*; -import org.apache.spark.unsafe.types.UTF8String; -import org.apache.comet.vector.CometPlainVector; +import org.apache.comet.vector.CometConstantVector; import org.apache.comet.vector.CometVector; /** @@ -74,8 +70,6 @@ public static boolean isTypeSupported(DataType type) { return false; } - private final BufferAllocator allocator = new RootAllocator(); - /** Whether all the values in this constant column are nulls */ private boolean isNull; @@ -143,174 +137,9 @@ protected void initNative() { nativeHandle = 0; } - /** Creates a constant Arrow vector with the specified number of rows. */ + /** Creates a constant vector with the specified logical row count. */ private CometVector createConstantVector(int numRows) { - ValueVector arrowVector = createArrowVector(numRows); - return new CometPlainVector(arrowVector, useDecimal128); - } - - /** Creates an Arrow vector filled with constant values. */ - private ValueVector createArrowVector(int numRows) { - if (isNull) { - return createNullVector(numRows); - } - - if (type == DataTypes.BooleanType) { - return createBooleanVector(numRows, (Boolean) value); - } else if (type == DataTypes.ByteType) { - return createByteVector(numRows, (Byte) value); - } else if (type == DataTypes.ShortType) { - return createShortVector(numRows, (Short) value); - } else if (type == DataTypes.IntegerType) { - return createIntVector(numRows, (Integer) value); - } else if (type == DataTypes.LongType) { - return createLongVector(numRows, (Long) value); - } else if (type == DataTypes.FloatType) { - return createFloatVector(numRows, (Float) value); - } else if (type == DataTypes.DoubleType) { - return createDoubleVector(numRows, (Double) value); - } else if (type == DataTypes.StringType) { - return createStringVector(numRows, (UTF8String) value); - } else if (type == DataTypes.BinaryType) { - return createBinaryVector(numRows, (byte[]) value); - } else if (type == DataTypes.DateType) { - return createDateVector(numRows, (Integer) value); - } else if (type == DataTypes.TimestampType || type == TimestampNTZType$.MODULE$) { - return createTimestampVector(numRows, (Long) value); - } else if (type instanceof DecimalType) { - return createDecimalVector(numRows, (Decimal) value, (DecimalType) type); - } else { - throw new UnsupportedOperationException("Unsupported Spark type: " + type); - } - } - - private ValueVector createNullVector(int numRows) { - NullVector vector = new NullVector(arrowField.getName(), numRows); - return vector; - } - - private ValueVector createBooleanVector(int numRows, boolean value) { - BitVector vector = new BitVector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value ? 1 : 0); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createByteVector(int numRows, byte value) { - TinyIntVector vector = new TinyIntVector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createShortVector(int numRows, short value) { - SmallIntVector vector = new SmallIntVector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createIntVector(int numRows, int value) { - IntVector vector = new IntVector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createLongVector(int numRows, long value) { - BigIntVector vector = new BigIntVector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createFloatVector(int numRows, float value) { - Float4Vector vector = new Float4Vector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createDoubleVector(int numRows, double value) { - Float8Vector vector = new Float8Vector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createStringVector(int numRows, UTF8String value) { - VarCharVector vector = new VarCharVector(arrowField, allocator); - byte[] bytes = value.getBytes(); - vector.allocateNew((long) bytes.length * numRows, numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, bytes); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createBinaryVector(int numRows, byte[] value) { - VarBinaryVector vector = new VarBinaryVector(arrowField, allocator); - vector.allocateNew((long) value.length * numRows, numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createDateVector(int numRows, int value) { - DateDayVector vector = new DateDayVector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createTimestampVector(int numRows, long value) { - TimeStampMicroTZVector vector = new TimeStampMicroTZVector(arrowField, allocator); - vector.allocateNew(numRows); - for (int i = 0; i < numRows; i++) { - vector.set(i, value); - } - vector.setValueCount(numRows); - return vector; - } - - private ValueVector createDecimalVector(int numRows, Decimal value, DecimalType dt) { - DecimalVector vector = - new DecimalVector(arrowField.getName(), allocator, dt.precision(), dt.scale()); - vector.allocateNew(numRows); - - java.math.BigDecimal bigDecimal = value.toJavaBigDecimal(); - for (int i = 0; i < numRows; i++) { - vector.set(i, bigDecimal); - } - vector.setValueCount(numRows); - return vector; + return new CometConstantVector(type, arrowField, useDecimal128, value, isNull, numRows); } /** Converts a Spark StructField to an Arrow Field. */ diff --git a/common/src/main/java/org/apache/comet/vector/CometConstantVector.java b/common/src/main/java/org/apache/comet/vector/CometConstantVector.java new file mode 100644 index 0000000000..c82af431b4 --- /dev/null +++ b/common/src/main/java/org/apache/comet/vector/CometConstantVector.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import java.math.BigDecimal; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.*; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A CometVector that stores a single constant value. For native export, it lazily creates a + * 1-element Arrow vector, avoiding the cost of materializing N identical elements. The native side + * detects the 1-element array and expands it to the actual batch size. + * + *

For Spark-direct consumption (e.g., ColumnarToRow), all getters return the constant value + * regardless of rowId. + */ +public class CometConstantVector extends CometVector { + private final BufferAllocator allocator = new RootAllocator(); + + /** Whether the constant value is null */ + private final boolean isNull; + + /** The constant value (null if isNull is true) */ + private final Object value; + + /** The Spark data type */ + private final DataType sparkType; + + /** The Arrow field for creating the 1-element vector */ + private final Field arrowField; + + /** Logical number of rows this vector represents */ + private int numValues; + + /** Lazily created 1-element Arrow vector for native export */ + private ValueVector lazyVector; + + public CometConstantVector( + DataType sparkType, + Field arrowField, + boolean useDecimal128, + Object value, + boolean isNull, + int numValues) { + super(sparkType, useDecimal128); + this.sparkType = sparkType; + this.arrowField = arrowField; + this.value = value; + this.isNull = isNull; + this.numValues = numValues; + } + + @Override + public void setNumNulls(int numNulls) { + // No-op: null status is determined by the constant isNull flag + } + + @Override + public void setNumValues(int numValues) { + this.numValues = numValues; + } + + @Override + public int numValues() { + return numValues; + } + + @Override + public boolean hasNull() { + return isNull; + } + + @Override + public int numNulls() { + return isNull ? numValues : 0; + } + + @Override + public boolean isNullAt(int rowId) { + return isNull; + } + + @Override + public boolean isFixedLength() { + return !(sparkType == DataTypes.StringType || sparkType == DataTypes.BinaryType); + } + + @Override + public boolean getBoolean(int rowId) { + return (Boolean) value; + } + + @Override + public byte getByte(int rowId) { + return (Byte) value; + } + + @Override + public short getShort(int rowId) { + return (Short) value; + } + + @Override + public int getInt(int rowId) { + return (Integer) value; + } + + @Override + public long getLong(int rowId) { + return (Long) value; + } + + @Override + public long getLongDecimal(int rowId) { + return (Long) value; + } + + @Override + public float getFloat(int rowId) { + return (Float) value; + } + + @Override + public double getDouble(int rowId) { + return (Double) value; + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNull) return null; + return (UTF8String) value; + } + + @Override + public byte[] getBinary(int rowId) { + if (isNull) return null; + return (byte[]) value; + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNull) return null; + return (Decimal) value; + } + + @Override + public byte[] copyBinaryDecimal(int i, byte[] dest) { + // Override to avoid the memory-address code path in CometVector. + // For constant decimals, we go through getDecimal() instead. + throw new UnsupportedOperationException( + "CometConstantVector does not support copyBinaryDecimal; use getDecimal() instead"); + } + + @Override + public ValueVector getValueVector() { + if (lazyVector == null) { + lazyVector = createOneElementVector(); + } + return lazyVector; + } + + @Override + public CometVector slice(int offset, int length) { + return new CometConstantVector(sparkType, arrowField, useDecimal128, value, isNull, length); + } + + @Override + public void close() { + if (lazyVector != null) { + lazyVector.close(); + lazyVector = null; + } + allocator.close(); + } + + /** Creates a 1-element Arrow vector holding the constant value. */ + private ValueVector createOneElementVector() { + if (isNull) { + return new NullVector(arrowField.getName(), 1); + } + + if (sparkType == DataTypes.BooleanType) { + BitVector v = new BitVector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Boolean) value ? 1 : 0); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.ByteType) { + TinyIntVector v = new TinyIntVector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Byte) value); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.ShortType) { + SmallIntVector v = new SmallIntVector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Short) value); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.IntegerType) { + IntVector v = new IntVector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Integer) value); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.LongType) { + BigIntVector v = new BigIntVector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Long) value); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.FloatType) { + Float4Vector v = new Float4Vector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Float) value); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.DoubleType) { + Float8Vector v = new Float8Vector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Double) value); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.StringType) { + VarCharVector v = new VarCharVector(arrowField, allocator); + byte[] bytes = ((UTF8String) value).getBytes(); + v.allocateNew((long) bytes.length, 1); + v.set(0, bytes); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.BinaryType) { + VarBinaryVector v = new VarBinaryVector(arrowField, allocator); + byte[] bytes = (byte[]) value; + v.allocateNew((long) bytes.length, 1); + v.set(0, bytes); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.DateType) { + DateDayVector v = new DateDayVector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Integer) value); + v.setValueCount(1); + return v; + } else if (sparkType == DataTypes.TimestampType || sparkType == TimestampNTZType$.MODULE$) { + TimeStampMicroTZVector v = new TimeStampMicroTZVector(arrowField, allocator); + v.allocateNew(1); + v.set(0, (Long) value); + v.setValueCount(1); + return v; + } else if (sparkType instanceof DecimalType) { + DecimalType dt = (DecimalType) sparkType; + DecimalVector v = + new DecimalVector(arrowField.getName(), allocator, dt.precision(), dt.scale()); + v.allocateNew(1); + BigDecimal bigDecimal = ((Decimal) value).toJavaBigDecimal(); + v.set(0, bigDecimal); + v.setValueCount(1); + return v; + } else { + throw new UnsupportedOperationException("Unsupported Spark type: " + sparkType); + } + } +} diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 45245121a0..c6bbb8e405 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -140,6 +140,20 @@ class NativeUtil { provider, arrowArray, arrowSchema) + case constantVector: CometConstantVector => + // Export the 1-element Arrow vector for scalar constant columns. + // Skip adding its value count to numRows validation since it's 1, not N. + // The native side will detect and expand 1-element arrays to the actual batch size. + val valueVector = constantVector.getValueVector + + val arrowSchema = ArrowSchema.wrap(schemaAddrs(index)) + val arrowArray = ArrowArray.wrap(arrayAddrs(index)) + Data.exportVector( + allocator, + getFieldVector(valueVector, "export"), + null, + arrowArray, + arrowSchema) case a: CometVector => val valueVector = a.getValueVector diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 2543705fb0..ee4de21963 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -28,6 +28,7 @@ use arrow::compute::{cast_with_options, take, CastOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::ffi::FFI_ArrowArray; use arrow::ffi::FFI_ArrowSchema; +use datafusion::common::ScalarValue; use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ @@ -209,16 +210,23 @@ impl ScanExec { let array = make_array(array_data); - // Apply selection if selection vectors exist (applies to all columns) + // Apply selection if selection vectors exist (applies to all columns). + // Skip take() for 1-element scalar constant arrays since they represent + // constant values unaffected by row deletion. let array = if let Some(ref selection_arrays) = selection_indices_arrays { - let indices = &selection_arrays[i]; - // Apply the selection using Arrow's take kernel - match take(&*array, &**indices, None) { - Ok(selected_array) => selected_array, - Err(e) => { - return Err(CometError::from(ExecutionError::ArrowError(format!( - "Failed to apply selection for column {i}: {e}", - )))); + if array.len() == 1 { + // Scalar constant column - skip selection, will be expanded later + array + } else { + let indices = &selection_arrays[i]; + // Apply the selection using Arrow's take kernel + match take(&*array, &**indices, None) { + Ok(selected_array) => selected_array, + Err(e) => { + return Err(CometError::from(ExecutionError::ArrowError(format!( + "Failed to apply selection for column {i}: {e}", + )))); + } } } } else { @@ -256,6 +264,18 @@ impl ScanExec { num_rows as usize }; + // Expand 1-element scalar constant columns to the actual batch size. + // The JVM side exports constant columns (partition/missing) as 1-element arrays + // to avoid materializing N identical values. We detect and expand them here. + if actual_num_rows > 1 { + for i in 0..inputs.len() { + if inputs[i].len() == 1 { + let scalar = ScalarValue::try_from_array(&inputs[i], 0)?; + inputs[i] = scalar.to_array_of_size(actual_num_rows)?; + } + } + } + Ok(InputBatch::new(inputs, Some(actual_num_rows))) } From 19b12c17bacc4446bb2039662f7fd48ec7dc1698 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 08:00:28 -0700 Subject: [PATCH 07/10] Add partition column scan benchmark to CometReadBenchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a benchmark that writes a partitioned parquet table and measures scan performance with 1 and 5 partition columns. Tests both reading data columns alongside partitions and reading partition columns themselves. This exercises the CometConstantVector → native scalar expansion path. Co-Authored-By: Claude Opus 4.5 --- .../sql/benchmark/CometReadBenchmark.scala | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala index 9b2dd186dd..25dab067fb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala @@ -620,6 +620,63 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { } } + def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit = { + val sqlBenchmark = new Benchmark( + s"Partitioned Scan with $numPartitionCols partition column(s)", + values, + output = output) + + withTempPath { dir => + withTempTable("parquetV1Table") { + // Create a table with data columns and partition columns + val partCols = (1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ") + val partNames = (1 to numPartitionCols).map(i => s"p$i").mkString(", ") + prepareTable(dir, spark.sql(s"SELECT value as id, $partCols FROM $tbl"), Some(partNames)) + + sqlBenchmark.addCase("SQL Parquet - Spark") { _ => + spark.sql("select sum(id) from parquetV1Table").noop() + } + + sqlBenchmark.addCase("SQL Parquet - Comet (Scan Only)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) { + spark.sql("select sum(id) from parquetV1Table").noop() + } + } + + sqlBenchmark.addCase("SQL Parquet - Comet (Scan + Exec)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { + spark.sql("select sum(id) from parquetV1Table").noop() + } + } + + // Also benchmark reading partition columns themselves + val partSumExpr = (1 to numPartitionCols) + .map(i => s"sum(length(p$i))") + .mkString(", ") + + sqlBenchmark.addCase("SQL Parquet - Spark (read partition cols)") { _ => + spark.sql(s"select $partSumExpr from parquetV1Table").noop() + } + + sqlBenchmark.addCase("SQL Parquet - Comet (read partition cols)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) { + spark.sql(s"select $partSumExpr from parquetV1Table").noop() + } + } + + sqlBenchmark.run() + } + } + } + def sortedLgStrFilterScanBenchmark(values: Int, fractionOfZeros: Double): Unit = { val percentageOfZeros = fractionOfZeros * 100 val benchmark = @@ -751,6 +808,12 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { sortedLgStrFilterScanBenchmark(v, fractionOfZeros) } } + + runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v => + for (numPartCols <- List(1, 5)) { + partitionColumnScanBenchmark(v, numPartCols) + } + } } } From befc755097774086ca6e77c4eab45c42c8e7d6bb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 08:31:01 -0700 Subject: [PATCH 08/10] Fix clippy warning: use iterator instead of index loop Co-Authored-By: Claude Opus 4.5 --- native/core/src/execution/operators/scan.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index ee4de21963..a5ab7878a4 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -268,10 +268,10 @@ impl ScanExec { // The JVM side exports constant columns (partition/missing) as 1-element arrays // to avoid materializing N identical values. We detect and expand them here. if actual_num_rows > 1 { - for i in 0..inputs.len() { - if inputs[i].len() == 1 { - let scalar = ScalarValue::try_from_array(&inputs[i], 0)?; - inputs[i] = scalar.to_array_of_size(actual_num_rows)?; + for col in &mut inputs { + if col.len() == 1 { + let scalar = ScalarValue::try_from_array(col, 0)?; + *col = scalar.to_array_of_size(actual_num_rows)?; } } } From bd6eb62f5d5dc1e957cbe82e1d545a107ce9523d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 08:32:50 -0700 Subject: [PATCH 09/10] Fix partitionBy to pass column names as separate arguments Co-Authored-By: Claude Opus 4.5 --- .../sql/benchmark/CometPartitionColumnBenchmark.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala index c79cac2878..1e4a8cda4d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala @@ -49,8 +49,15 @@ object CometPartitionColumnBenchmark extends CometBenchmarkBase { withTempTable("parquetV1Table") { val partCols = (1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ") - val partNames = (1 to numPartitionCols).map(i => s"p$i").mkString(", ") - prepareTable(dir, spark.sql(s"SELECT value as id, $partCols FROM $tbl"), Some(partNames)) + val partNames = (1 to numPartitionCols).map(i => s"p$i") + val df = spark.sql(s"SELECT value as id, $partCols FROM $tbl") + val parquetDir = dir.getCanonicalPath + "/parquetV1" + df.write + .partitionBy(partNames: _*) + .mode("overwrite") + .option("compression", "snappy") + .parquet(parquetDir) + spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table") sqlBenchmark.addCase("SQL Parquet - Spark") { _ => spark.sql("select sum(id) from parquetV1Table").noop() From b2ded2a88e0a0882f66cb16e11d2c0dbe722327b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 5 Feb 2026 08:38:35 -0700 Subject: [PATCH 10/10] Benchmark native_datafusion and native_iceberg_compat only Co-Authored-By: Claude Opus 4.5 --- .../CometPartitionColumnBenchmark.scala | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala index 1e4a8cda4d..9f52ba5ef0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala @@ -22,7 +22,7 @@ package org.apache.spark.sql.benchmark import org.apache.spark.benchmark.Benchmark import org.apache.comet.CometConf -import org.apache.comet.CometConf.{SCAN_NATIVE_COMET, SCAN_NATIVE_ICEBERG_COMPAT} +import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT} /** * Benchmark to measure partition column scan performance. This exercises the CometConstantVector @@ -63,15 +63,16 @@ object CometPartitionColumnBenchmark extends CometBenchmarkBase { spark.sql("select sum(id) from parquetV1Table").noop() } - sqlBenchmark.addCase("SQL Parquet - Comet (Scan Only)") { _ => + sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) { + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { spark.sql("select sum(id) from parquetV1Table").noop() } } - sqlBenchmark.addCase("SQL Parquet - Comet (Scan + Exec)") { _ => + sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", @@ -88,7 +89,16 @@ object CometPartitionColumnBenchmark extends CometBenchmarkBase { spark.sql(s"select $partSumExpr from parquetV1Table").noop() } - sqlBenchmark.addCase("SQL Parquet - Comet (read partition cols)") { _ => + sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion (partition cols)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) { + spark.sql(s"select $partSumExpr from parquetV1Table").noop() + } + } + + sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat (partition cols)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true",