Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
069a1d3
[VL][Delta] Add native DV read-only infrastructure
Apr 9, 2026
4aac1cd
[VL][Delta] Split DV read foundation
Apr 9, 2026
1e9af3c
[VL][Delta] Fix DV read foundation CI
Apr 9, 2026
d4ad1f4
[VL][CI] Adding docker image for maven cache (#11655)
sh-shamsan Apr 2, 2026
f4d90ba
[VL] Refine logs in BHJ optimization (#11870)
JkSelf Apr 2, 2026
e984dc0
[GLUTEN-11872][VL] Fix docker build (#11873)
zhouyuan Apr 3, 2026
799ed03
[GLUTEN-6887][VL] Daily Update Velox Version (2026_04_01) (#11860)
GlutenPerfBot Apr 3, 2026
912f781
[GLUTEN-11678][VL] Native validation should check CrossRelNode's expr…
wecharyu Apr 3, 2026
c183749
[GLUTEN-11872][VL] Fix docker metadata action version (#11875)
zhouyuan Apr 6, 2026
81c04ad
[VL] Use UnboundedBlockingQueue when create threads pool (#11877)
FelixYBW Apr 7, 2026
fff5256
[MINOR] Expose Gluten and component build information to SparkConf (#…
jiangjiangtian Apr 7, 2026
88c5375
[GLUTEN-1433] [VL] Add config to disable TimestampNTZ validation fall…
Mariamalmesfer Apr 7, 2026
4d03d61
[GLUTEN-11849][CH] Fix diff for var_samp returns NaN instead of NULL …
exmy Apr 7, 2026
cdf092a
[GLUTEN-11838] Enable 'Eliminate two aggregate joins with attribute r…
zzcclp Apr 7, 2026
b813208
[GLUTEN-9219][VL] Update on function support docs (#11881)
GlutenPerfBot Apr 7, 2026
037e2ef
[MINOR] Remove Spark 3.2 tests (#11887)
philo-he Apr 8, 2026
9dedf04
[GLUTEN-11888][VL]Remove the synchronized lock in VeloxBroadcastBuild…
JkSelf Apr 8, 2026
2af5396
[GLUTEN-11888] [VL] Parallel build hash table to improve bhj performa…
JkSelf Apr 9, 2026
4350557
[VL][Delta] Export roaring C++ headers
Apr 9, 2026
9842fd9
[VL][Delta] Normalize CMake ASF headers
Apr 9, 2026
c56a6aa
[VL][Delta] Export roaring C headers
Apr 9, 2026
82a3fe7
[VL][Delta] Harden bundled Velox lib loading
Apr 9, 2026
9b95547
[VL][Delta] Normalize Delta test CMake header
Apr 9, 2026
5ed9f7d
Merge branch 'main' into delta-dv-read-foundation
malinjawi Apr 9, 2026
af947c0
Merge branch 'main' into delta-dv-read-foundation
malinjawi Apr 13, 2026
637294f
[VL][Delta] Fix Velox file split reader API
Apr 13, 2026
ab2e26a
Merge branch 'main' into delta-dv-read-foundation
malinjawi Apr 13, 2026
0caeb7e
[VL][Delta] Fix CI for roaring PIC and row index shims
Apr 13, 2026
d2d279e
[VL][Delta] Fix roaring link propagation
Apr 13, 2026
6a17c21
[VL][Delta] Fix Delta scan CI regressions
Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.velox

import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.execution.datasources.PartitionedFile

import org.apache.hadoop.fs.Path

import java.util.{HashMap => JHashMap, Map => JMap}

object VeloxDeltaMetadataUtils {
val DeltaDvStorageType = "delta_dv_storage_type"
val DeltaDvPathOrInline = "delta_dv_path_or_inline"
val DeltaDvOffset = "delta_dv_offset"
val DeltaDvSizeInBytes = "delta_dv_size_in_bytes"
val DeltaDvCardinality = "delta_dv_cardinality"

private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded"

def normalizeOtherMetadataColumns(
partitionColumnCount: Int,
file: PartitionedFile,
otherConstantMetadataColumnValues: JMap[String, Object]): JMap[String, Object] = {
val normalized = new JHashMap[String, Object]()
if (otherConstantMetadataColumnValues != null) {
normalized.putAll(otherConstantMetadataColumnValues)
}

Option(normalized.get(RowIndexFilterIdEncoded)).map(_.toString).foreach {
encodedDescriptor =>
val descriptor = DeletionVectorDescriptor.deserializeFromBase64(encodedDescriptor)
descriptor.storageType match {
case "i" =>
normalized.put(DeltaDvStorageType, descriptor.storageType)
normalized.put(DeltaDvPathOrInline, descriptor.pathOrInlineDv)
case _ =>
val absolutePath =
descriptor.absolutePath(resolveTablePath(partitionColumnCount, file))
normalized.put(DeltaDvStorageType, "p")
normalized.put(DeltaDvPathOrInline, absolutePath.toUri.toASCIIString)
}
descriptor.offset.foreach(offset => normalized.put(DeltaDvOffset, Int.box(offset)))
normalized.put(DeltaDvSizeInBytes, Int.box(descriptor.sizeInBytes))
normalized.put(DeltaDvCardinality, Long.box(descriptor.cardinality))
normalized.remove(RowIndexFilterIdEncoded)
}

normalized
}

private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = {
var tablePath = new Path(file.filePath.toString).getParent
for (_ <- 0 until partitionColumnCount) {
tablePath = tablePath.getParent
}
tablePath
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType

/**
* Rewrites Delta scans over DV-enabled tables to request the backend-specific skip-row metadata
* column only when the snapshot actually contains DVs.
*/
trait PreprocessTableWithDVs extends SubqueryTransformerHelper {
def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = {
transformWithSubqueries(plan) { case ScanWithDeletionVectors(dvScan) => dvScan }
}
}

object ScanWithDeletionVectors {
def unapply(a: LogicalRelation): Option[LogicalPlan] = a match {
case scan @ LogicalRelation(
relation @ HadoopFsRelation(
index: TahoeFileIndex,
_,
_,
_,
format: DeltaParquetFileFormat,
_),
_,
_,
_) =>
dvEnabledScanFor(scan, relation, format, index)
case scan @ LogicalRelation(
relation @ HadoopFsRelation(
index: TahoeFileIndex,
_,
_,
_,
format: GlutenDeltaParquetFileFormat,
_),
_,
_,
_) =>
dvEnabledScanFor(scan, relation, format, index)
case _ => None
}

def dvEnabledScanFor(
scan: LogicalRelation,
hadoopRelation: HadoopFsRelation,
fileFormat: DeltaParquetFileFormat,
index: TahoeFileIndex): Option[LogicalPlan] = {
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
return None
}

require(
!index.isInstanceOf[TahoeLogFileIndex],
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")

if (fileFormat.hasTablePath) {
return None
}

val filesWithDVs = index
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
if (filesWithDVs.isEmpty) {
return None
}

val planOutput = scan.output
val spark = SparkSession.getActiveSession.get
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
val rowIndexFilter = createRowIndexFilterNode(newScan)
Some(Project(planOutput, rowIndexFilter))
}

def dvEnabledScanFor(
scan: LogicalRelation,
hadoopRelation: HadoopFsRelation,
fileFormat: GlutenDeltaParquetFileFormat,
index: TahoeFileIndex): Option[LogicalPlan] = {
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
return None
}

require(
!index.isInstanceOf[TahoeLogFileIndex],
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")

if (fileFormat.hasTablePath) {
return None
}

val filesWithDVs = index
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
if (filesWithDVs.isEmpty) {
return None
}

val planOutput = scan.output
val spark = SparkSession.getActiveSession.get
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
val rowIndexFilter = createRowIndexFilterNode(newScan)
Some(Project(planOutput, rowIndexFilter))
}

private def addRowIndexIfMissing(attribute: AttributeReference): AttributeReference = {
require(attribute.name == METADATA_NAME)

val dataType = attribute.dataType.asInstanceOf[StructType]
if (dataType.fieldNames.contains(ParquetFileFormat.ROW_INDEX)) {
return attribute
}

val newDatatype = dataType.add(ParquetFileFormat.ROW_INDEX_FIELD)
attribute.copy(dataType = newDatatype)(
exprId = attribute.exprId,
qualifier = attribute.qualifier)
}

private def createScanWithSkipRowColumn(
spark: SparkSession,
inputScan: LogicalRelation,
fileFormat: DeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
val useMetadataRowIndex =
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)

val skipRowField = IS_ROW_DELETED_STRUCT_FIELD
val scanOutputWithMetadata = if (useMetadataRowIndex) {
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
inputScan.output.collect {
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
case o => o
}
} else {
inputScan.output :+ fileFormat.createFileMetadataCol()
}
} else {
inputScan.output
}

val newScanOutput =
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
val newFileFormat = fileFormat.copyWithDVInfo(
tablePath = tahoeFileIndex.path.toString,
optimizationsEnabled = useMetadataRowIndex)

val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
hadoopFsRelation.sparkSession)

inputScan.copy(relation = newRelation, output = newScanOutput)
}

private def createScanWithSkipRowColumn(
spark: SparkSession,
inputScan: LogicalRelation,
fileFormat: GlutenDeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
val useMetadataRowIndex =
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)

val skipRowField = GlutenDeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD
val scanOutputWithMetadata = if (useMetadataRowIndex) {
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
inputScan.output.collect {
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
case o => o
}
} else {
inputScan.output :+ fileFormat.createFileMetadataCol()
}
} else {
inputScan.output
}

val newScanOutput =
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
val newFileFormat = fileFormat.copyWithDVInfo(
tablePath = tahoeFileIndex.path.toString,
optimizationsEnabled = useMetadataRowIndex)

val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
hadoopFsRelation.sparkSession)

inputScan.copy(relation = newRelation, output = newScanOutput)
}

private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = {
val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME)
require(
skipRowColumnRefs.size == 1,
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
val skipRowColumnRef = skipRowColumnRefs.head
Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.stats

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.{DeltaTable, OptimisticTransaction, PreprocessTableWithDVs}
import org.apache.spark.sql.delta.sources.DeltaSQLConf

/** Shadow Delta's PrepareDeltaScan to inject backend-specific DV preprocessing. */
class PrepareDeltaScan(protected val spark: SparkSession)
extends Rule[LogicalPlan]
with PrepareDeltaScanBase
with PreprocessTableWithDVs {

override def apply(plan0: LogicalPlan): LogicalPlan = {
var plan = plan0

val isSubquery = isSubqueryRoot(plan)
val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
if (isSubquery || isDataSourceV2) {
return plan
}

val updatedPlan = if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING)) {
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED)) {
plan = optimizeQueryWithMetadata(plan)
}
prepareDeltaScan(plan)
} else {
OptimisticTransaction.getActive.foreach {
txn =>
val logsInPlan = plan.collect { case DeltaTable(fileIndex) => fileIndex.deltaLog }
if (logsInPlan.exists(_.isSameLogAs(txn.deltaLog))) {
txn.readWholeTable()
}
}
plan
}

preprocessTablesWithDVs(updatedPlan)
}
}
Loading
Loading