diff --git a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala index 0587e8b07f7..4c6254d273d 100644 --- a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala +++ b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala @@ -23,9 +23,15 @@ import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform import org.apache.gluten.extension.columnar.validator.Validators import org.apache.gluten.extension.injector.Injector +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.SparkReflectionUtil class VeloxDeltaComponent extends Component { + private val deltaDvPreprocessRuleClassName = + "org.apache.gluten.extension.PreprocessDeltaScanWithDeletionVectors" + override def name(): String = "velox-delta" override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil @@ -36,6 +42,7 @@ class VeloxDeltaComponent extends Component { override def injectRules(injector: Injector): Unit = { val legacy = injector.gluten.legacy + injector.spark.injectOptimizerRule(deltaDvPreprocessRule) legacy.injectTransform { c => val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(), OffloadDeltaFilter()) @@ -46,4 +53,22 @@ class VeloxDeltaComponent extends Component { } DeltaPostTransformRules.rules.foreach(r => legacy.injectPostTransform(_ => r)) } + + private def deltaDvPreprocessRule(spark: SparkSession): Rule[LogicalPlan] = { + if (!SparkReflectionUtil.isClassPresent(deltaDvPreprocessRuleClassName)) { + return VeloxDeltaComponent.IdentityRule + } + + Class + .forName(deltaDvPreprocessRuleClassName) + .getConstructor(classOf[SparkSession]) + .newInstance(spark) + .asInstanceOf[Rule[LogicalPlan]] + } +} + +object VeloxDeltaComponent { + private object IdentityRule extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan + } } diff --git a/backends-velox/src-delta33/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala b/backends-velox/src-delta33/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala new file mode 100644 index 00000000000..d1733beb117 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.{DeltaFileReadOptions, RowIndexFilterType} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import org.apache.hadoop.fs.Path + +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} + +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +object VeloxDeltaMetadataUtils { + private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded" + private val RowIndexFilterTypeKey = "row_index_filter_type" + private val RowIndexFilterTypeIfContained = "IF_CONTAINED" + private val RowIndexFilterTypeIfNotContained = "IF_NOT_CONTAINED" + + final class NormalizedSplitMetadata( + val otherMetadataColumns: JList[JMap[String, Object]], + val deltaReadOptions: JList[DeltaFileReadOptions]) + extends Serializable + + private def decodeDescriptor( + normalizedMetadata: JMap[String, Object]): Option[DeletionVectorDescriptor] = { + Option(normalizedMetadata.get(RowIndexFilterIdEncoded)) + .map(_.toString) + .filter(_.nonEmpty) + .flatMap(encoded => Try(DeletionVectorDescriptor.deserializeFromBase64(encoded)).toOption) + } + + private def serializePayload( + dvStore: HadoopFileSystemDVStore, + tablePath: Path, + descriptor: DeletionVectorDescriptor): Array[Byte] = { + if (tablePath == null) { + throw new IllegalStateException( + "Unable to resolve Delta table path while materializing deletion vector payload") + } + StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + } + + private def normalizeMetadata(metadata: JMap[String, Object]): JMap[String, Object] = { + val normalized = new JHashMap[String, Object]() + if (metadata != null) { + normalized.putAll(metadata) + } + normalized.remove(RowIndexFilterIdEncoded) + normalized.remove(RowIndexFilterTypeKey) + normalized + } + + private def parseRowIndexFilterType( + metadata: JMap[String, Object]): RowIndexFilterType = { + Option(metadata.get(RowIndexFilterTypeKey)).map(_.toString) match { + case Some(RowIndexFilterTypeIfContained) => RowIndexFilterType.IF_CONTAINED + case Some(RowIndexFilterTypeIfNotContained) => RowIndexFilterType.IF_NOT_CONTAINED + case _ => RowIndexFilterType.KEEP_ALL + } + } + + def normalizeSplitMetadata( + partitionColumnCount: Int, + files: JList[PartitionedFile]): NormalizedSplitMetadata = { + val dvStore = new HadoopFileSystemDVStore(activeSpark.sessionState.newHadoopConf()) + val normalizedMetadataColumns = new JArrayList[JMap[String, Object]](files.size()) + val deltaReadOptions = new JArrayList[DeltaFileReadOptions](files.size()) + var hasDeletionVectors = false + + files.asScala.foreach { + file => + val otherMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + val metadataWithDecodedPayload = new JHashMap[String, Object]() + if (otherMetadata != null) { + metadataWithDecodedPayload.putAll(otherMetadata) + } + + val descriptor = decodeDescriptor(metadataWithDecodedPayload) + val rowIndexFilterType = parseRowIndexFilterType(metadataWithDecodedPayload) + val normalizedMetadata = normalizeMetadata(metadataWithDecodedPayload) + + descriptor match { + case Some(descriptor) => + hasDeletionVectors = true + val payloadTablePath = resolveTablePath(partitionColumnCount, file) + val serializedPayload = serializePayload(dvStore, payloadTablePath, descriptor) + deltaReadOptions.add( + new DeltaFileReadOptions( + rowIndexFilterType, + true, + descriptor.cardinality, + serializedPayload)) + normalizedMetadataColumns.add(normalizedMetadata) + case None => + deltaReadOptions.add( + new DeltaFileReadOptions(rowIndexFilterType, false, 0L, Array.emptyByteArray)) + normalizedMetadataColumns.add(normalizedMetadata) + } + } + + val deltaOptions = if (hasDeletionVectors) { + deltaReadOptions + } else { + new JArrayList[DeltaFileReadOptions]() + } + new NormalizedSplitMetadata(normalizedMetadataColumns, deltaOptions) + } + + private def activeSpark: SparkSession = { + SparkSession.getActiveSession + .orElse(SparkSession.getDefaultSession) + .getOrElse { + throw new IllegalStateException( + "Active SparkSession is required to materialize Delta deletion vectors") + } + } + + private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = { + val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent + var tablePath = fileParent + for (_ <- 0 until partitionColumnCount) { + tablePath = tablePath.getParent + } + val spark = activeSpark + if (tablePath != null && isDeltaTablePath(spark, tablePath)) { + return tablePath + } + + // Spark can report a partition column count that does not map 1:1 to path depth for + // prepared Delta scans. Find the nearest ancestor of the file path that has _delta_log. + var candidate = fileParent + while (candidate != null && !isDeltaTablePath(spark, candidate)) { + candidate = candidate.getParent + } + if (candidate != null) candidate else tablePath + } + + private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = { + val deltaLogPath = new Path(tablePath, "_delta_log") + try { + deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath) + } catch { + case NonFatal(_) => false + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/gluten/extension/PreprocessDeltaScanWithDeletionVectors.scala b/backends-velox/src-delta33/main/scala/org/apache/gluten/extension/PreprocessDeltaScanWithDeletionVectors.scala new file mode 100644 index 00000000000..26a98865627 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/gluten/extension/PreprocessDeltaScanWithDeletionVectors.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.delta.PreprocessTableWithDVs + +/** + * Delta 3.3 compatibility rule for DV scan metadata. + * + * Delta's own PrepareDeltaScan still runs normally. This Gluten-scoped rule only adds the + * backend-visible DV metadata columns after Delta has prepared the scan, so the physical Delta scan + * handoff can materialize the per-file DV payload for Velox without replacing Delta classes. + */ +class PreprocessDeltaScanWithDeletionVectors(protected val spark: SparkSession) + extends Rule[LogicalPlan] + with PreprocessTableWithDVs { + + override def apply(plan: LogicalPlan): LogicalPlan = preprocessTablesWithDVs(plan) +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala new file mode 100644 index 00000000000..56bae355005 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.delta.DeltaParquetFileFormat._ +import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable +import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Rewrites Delta scans over DV-enabled tables to request the backend-specific skip-row metadata + * column only when the snapshot actually contains DVs. + */ +trait PreprocessTableWithDVs extends SubqueryTransformerHelper { + def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = { + plan.transformDown { case ScanWithDeletionVectors(dvScan) => dvScan } + } +} + +object ScanWithDeletionVectors { + def unapply(a: LogicalRelation): Option[LogicalPlan] = a match { + case scan @ LogicalRelation( + relation @ HadoopFsRelation( + index: TahoeFileIndex, + _, + _, + _, + format: DeltaParquetFileFormat, + _), + _, + _, + _) => + dvEnabledScanFor(scan, relation, format, index) + case scan @ LogicalRelation( + relation @ HadoopFsRelation( + index: TahoeFileIndex, + _, + _, + _, + format: GlutenDeltaParquetFileFormat, + _), + _, + _, + _) => + dvEnabledScanFor(scan, relation, format, index) + case _ => None + } + + def dvEnabledScanFor( + scan: LogicalRelation, + hadoopRelation: HadoopFsRelation, + fileFormat: DeltaParquetFileFormat, + index: TahoeFileIndex): Option[LogicalPlan] = { + if (!deletionVectorsReadable(index.protocol, index.metadata)) { + return None + } + + if (index.isInstanceOf[TahoeLogFileIndex]) { + return None + } + + if (fileFormat.hasTablePath) { + return None + } + + val filesWithDVs = index + .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) + .filter(_.deletionVector != null) + if (filesWithDVs.isEmpty) { + return None + } + + val planOutput = scan.output + val spark = SparkSession.getActiveSession.get + val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation) + val rowIndexFilter = createRowIndexFilterNode(newScan) + Some(Project(planOutput, rowIndexFilter)) + } + + def dvEnabledScanFor( + scan: LogicalRelation, + hadoopRelation: HadoopFsRelation, + fileFormat: GlutenDeltaParquetFileFormat, + index: TahoeFileIndex): Option[LogicalPlan] = { + if (!deletionVectorsReadable(index.protocol, index.metadata)) { + return None + } + + if (index.isInstanceOf[TahoeLogFileIndex]) { + return None + } + + if (fileFormat.hasTablePath) { + return None + } + + val filesWithDVs = index + .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) + .filter(_.deletionVector != null) + if (filesWithDVs.isEmpty) { + return None + } + + val planOutput = scan.output + val spark = SparkSession.getActiveSession.get + val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation) + val rowIndexFilter = createRowIndexFilterNode(newScan) + Some(Project(planOutput, rowIndexFilter)) + } + + private def addRowIndexIfMissing(attribute: AttributeReference): AttributeReference = { + require(attribute.name == METADATA_NAME) + + val dataType = attribute.dataType.asInstanceOf[StructType] + if (dataType.fieldNames.contains(ParquetFileFormat.ROW_INDEX)) { + return attribute + } + + val newDatatype = dataType.add(ParquetFileFormat.ROW_INDEX_FIELD) + attribute.copy(dataType = newDatatype)( + exprId = attribute.exprId, + qualifier = attribute.qualifier) + } + + private def createScanWithSkipRowColumn( + spark: SparkSession, + inputScan: LogicalRelation, + fileFormat: DeltaParquetFileFormat, + tahoeFileIndex: TahoeFileIndex, + hadoopFsRelation: HadoopFsRelation): LogicalRelation = { + val useMetadataRowIndex = + spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX) + + val skipRowField = IS_ROW_DELETED_STRUCT_FIELD + val scanOutputWithMetadata = if (useMetadataRowIndex) { + if (inputScan.output.map(_.name).contains(METADATA_NAME)) { + inputScan.output.collect { + case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a) + case o => o + } + } else { + inputScan.output :+ fileFormat.createFileMetadataCol() + } + } else { + inputScan.output + } + + val newScanOutput = + scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)() + val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) + val newFileFormat = fileFormat.copyWithDVInfo( + tablePath = tahoeFileIndex.path.toString, + optimizationsEnabled = useMetadataRowIndex) + + val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)( + hadoopFsRelation.sparkSession) + + inputScan.copy(relation = newRelation, output = newScanOutput) + } + + private def createScanWithSkipRowColumn( + spark: SparkSession, + inputScan: LogicalRelation, + fileFormat: GlutenDeltaParquetFileFormat, + tahoeFileIndex: TahoeFileIndex, + hadoopFsRelation: HadoopFsRelation): LogicalRelation = { + val useMetadataRowIndex = + spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX) + + val skipRowField = GlutenDeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD + val scanOutputWithMetadata = if (useMetadataRowIndex) { + if (inputScan.output.map(_.name).contains(METADATA_NAME)) { + inputScan.output.collect { + case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a) + case o => o + } + } else { + inputScan.output :+ fileFormat.createFileMetadataCol() + } + } else { + inputScan.output + } + + val newScanOutput = + scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)() + val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) + val newFileFormat = fileFormat.copyWithDVInfo( + tablePath = tahoeFileIndex.path.toString, + optimizationsEnabled = useMetadataRowIndex) + + val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)( + hadoopFsRelation.sparkSession) + + inputScan.copy(relation = newRelation, output = newScanOutput) + } + + private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = { + val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME) + require( + skipRowColumnRefs.size == 1, + s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME") + val skipRowColumnRef = skipRowColumnRefs.head + Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan) + } +} diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala new file mode 100644 index 00000000000..bc32b9b3e72 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils +import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.{RowIndexFilterType => GlutenRowIndexFilterType} + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.tags.ExtendedSQLTest + +import org.apache.hadoop.fs.Path + +import scala.collection.JavaConverters._ + +@ExtendedSQLTest +class DeltaDeletionVectorHandoffSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLTestUtils + with DeltaSQLCommandTest { + + import testImplicits._ + + test("Spark 3.5 Delta DV handoff should materialize serialized payloads from scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val basePartitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val partitionedFile = basePartitionedFile.copy( + otherConstantMetadataColumnValues = Map[String, Object]( + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED -> + dataFile.deletionVector.serializeToBase64(), + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED" + )) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + val deltaReadOptions = normalized.deltaReadOptions.get(0) + + assert(deltaReadOptions.hasDeletionVector()) + assert(deltaReadOptions.serializedDeletionVector().nonEmpty) + assert(deltaReadOptions.deletionVectorCardinality() == dataFile.deletionVector.cardinality) + assert(deltaReadOptions.rowIndexFilterType() == GlutenRowIndexFilterType.IF_CONTAINED) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE)) + } + } + + test("Spark 3.5 Delta DV handoff should skip payload materialization without scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val partitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + + assert(normalized.deltaReadOptions.isEmpty) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE)) + } + } +} diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index f265168ddbd..a60054031c6 100644 --- a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -197,6 +197,34 @@ class DeltaSuite checkAnswer(data.toDF(), Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: Row(6) :: Nil) } + test("DV scan without metadata row index falls back and stays correct") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + + withSQLConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key -> "false") { + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + assert(log.update().allFiles.collect().exists(_.deletionVector != null)) + + val df = spark.read.format("delta").load(path) + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.collect { case _: DeltaScanTransformer => true }.isEmpty) + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + } + } + } + test("partitioned append - nulls") { val tempDir = Utils.createTempDir() Seq(Some(1), None) diff --git a/backends-velox/src-delta40/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala b/backends-velox/src-delta40/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala new file mode 100644 index 00000000000..af5f7c8b40d --- /dev/null +++ b/backends-velox/src-delta40/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.{DeltaFileReadOptions, RowIndexFilterType} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import org.apache.hadoop.fs.Path + +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} + +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +object VeloxDeltaMetadataUtils { + private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded" + private val RowIndexFilterTypeKey = "row_index_filter_type" + private val RowIndexFilterTypeIfContained = "IF_CONTAINED" + private val RowIndexFilterTypeIfNotContained = "IF_NOT_CONTAINED" + + final class NormalizedSplitMetadata( + val otherMetadataColumns: JList[JMap[String, Object]], + val deltaReadOptions: JList[DeltaFileReadOptions]) + extends Serializable + + private def decodeDescriptor( + normalizedMetadata: JMap[String, Object]): Option[DeletionVectorDescriptor] = { + Option(normalizedMetadata.get(RowIndexFilterIdEncoded)) + .map(_.toString) + .filter(_.nonEmpty) + .flatMap(parseDescriptor) + } + + private def parseDescriptor(encodedDescriptor: String): Option[DeletionVectorDescriptor] = { + val methods = Seq("deserializeFromBase64", "fromJson") + methods.iterator + .map { + methodName => + Try { + val method = DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String]) + method + .invoke(DeletionVectorDescriptor, encodedDescriptor) + .asInstanceOf[DeletionVectorDescriptor] + }.toOption + } + .collectFirst { case Some(descriptor) => descriptor } + } + + private def serializePayload( + dvStore: HadoopFileSystemDVStore, + tablePath: Path, + descriptor: DeletionVectorDescriptor): Array[Byte] = { + if (tablePath == null) { + throw new IllegalStateException( + "Unable to resolve Delta table path while materializing deletion vector payload") + } + StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + } + + private def normalizeMetadata(metadata: JMap[String, Object]): JMap[String, Object] = { + val normalized = new JHashMap[String, Object]() + if (metadata != null) { + normalized.putAll(metadata) + } + normalized.remove(RowIndexFilterIdEncoded) + normalized.remove(RowIndexFilterTypeKey) + normalized + } + + private def parseRowIndexFilterType( + metadata: JMap[String, Object]): RowIndexFilterType = { + Option(metadata.get(RowIndexFilterTypeKey)).map(_.toString) match { + case Some(RowIndexFilterTypeIfContained) => RowIndexFilterType.IF_CONTAINED + case Some(RowIndexFilterTypeIfNotContained) => RowIndexFilterType.IF_NOT_CONTAINED + case _ => RowIndexFilterType.KEEP_ALL + } + } + + def normalizeSplitMetadata( + partitionColumnCount: Int, + files: JList[PartitionedFile]): NormalizedSplitMetadata = { + val dvStore = new HadoopFileSystemDVStore(activeSpark.sessionState.newHadoopConf()) + val normalizedMetadataColumns = new JArrayList[JMap[String, Object]](files.size()) + val deltaReadOptions = new JArrayList[DeltaFileReadOptions](files.size()) + var hasDeletionVectors = false + + files.asScala.foreach { + file => + val otherMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + val metadataWithDecodedPayload = new JHashMap[String, Object]() + if (otherMetadata != null) { + metadataWithDecodedPayload.putAll(otherMetadata) + } + + val descriptor = decodeDescriptor(metadataWithDecodedPayload) + val rowIndexFilterType = parseRowIndexFilterType(metadataWithDecodedPayload) + val normalizedMetadata = normalizeMetadata(metadataWithDecodedPayload) + + descriptor match { + case Some(descriptor) => + hasDeletionVectors = true + val payloadTablePath = resolveTablePath(partitionColumnCount, file) + val serializedPayload = serializePayload(dvStore, payloadTablePath, descriptor) + deltaReadOptions.add( + new DeltaFileReadOptions( + rowIndexFilterType, + true, + descriptor.cardinality, + serializedPayload)) + normalizedMetadataColumns.add(normalizedMetadata) + case None => + deltaReadOptions.add( + new DeltaFileReadOptions(rowIndexFilterType, false, 0L, Array.emptyByteArray)) + normalizedMetadataColumns.add(normalizedMetadata) + } + } + + val deltaOptions = if (hasDeletionVectors) { + deltaReadOptions + } else { + new JArrayList[DeltaFileReadOptions]() + } + new NormalizedSplitMetadata(normalizedMetadataColumns, deltaOptions) + } + + private def activeSpark: SparkSession = { + SparkSession.getActiveSession + .orElse(SparkSession.getDefaultSession) + .getOrElse { + throw new IllegalStateException( + "Active SparkSession is required to materialize Delta deletion vectors") + } + } + + private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = { + val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent + var tablePath = fileParent + for (_ <- 0 until partitionColumnCount) { + tablePath = tablePath.getParent + } + val spark = activeSpark + if (tablePath != null && isDeltaTablePath(spark, tablePath)) { + return tablePath + } + + // Spark can report a partition column count that does not map 1:1 to path depth for + // prepared Delta scans. Find the nearest ancestor of the file path that has _delta_log. + var candidate = fileParent + while (candidate != null && !isDeltaTablePath(spark, candidate)) { + candidate = candidate.getParent + } + if (candidate != null) candidate else tablePath + } + + private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = { + val deltaLogPath = new Path(tablePath, "_delta_log") + try { + deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath) + } catch { + case NonFatal(_) => false + } + } +} diff --git a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala new file mode 100644 index 00000000000..975d38384f9 --- /dev/null +++ b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils +import org.apache.gluten.execution.DeltaScanTransformer +import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.{RowIndexFilterType => GlutenRowIndexFilterType} + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.tags.ExtendedSQLTest + +import org.apache.hadoop.fs.Path + +import scala.collection.JavaConverters._ + +@ExtendedSQLTest +class DeltaDeletionVectorHandoffSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLTestUtils + with DeltaSQLCommandTest { + + import testImplicits._ + + test("Spark 4 Delta DV scan should fall back when metadata row index is disabled") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + assert(log.update().allFiles.collect().exists(_.deletionVector != null)) + + // This covers scan behavior over an existing DV. Keep the no-metadata-row-index + // path on Spark until the native path can prove the same contract for DML DVs. + withSQLConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key -> "false") { + val df = spark.read.format("delta").load(path) + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.collect { case _: DeltaScanTransformer => true }.isEmpty) + checkAnswer(df, Seq((1, "a"), (2, "b")).toDF()) + } + } + } + + test("Spark 4 Delta DV handoff should materialize serialized payloads from scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val basePartitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val partitionedFile = basePartitionedFile.copy( + otherConstantMetadataColumnValues = Map[String, Object]( + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED -> + dataFile.deletionVector.serializeToBase64(), + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED" + )) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + val deltaReadOptions = normalized.deltaReadOptions.get(0) + + assert(deltaReadOptions.hasDeletionVector()) + assert(deltaReadOptions.serializedDeletionVector().nonEmpty) + assert(deltaReadOptions.deletionVectorCardinality() == dataFile.deletionVector.cardinality) + assert(deltaReadOptions.rowIndexFilterType() == GlutenRowIndexFilterType.IF_CONTAINED) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE)) + + val df = spark.read.format("delta").load(path) + checkAnswer(df, Seq((1, "a"), (2, "b")).toDF()) + } + } + + test("Spark 4 Delta DV handoff should skip payload materialization without scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val partitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + + assert(normalized.deltaReadOptions.isEmpty) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE)) + } + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index d8b23b358fa..c116cadab57 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -24,7 +24,8 @@ import org.apache.gluten.iterator.Iterators import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode -import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} +import org.apache.gluten.substrait.rel.{DeltaLocalFilesBuilder, LocalFilesBuilder, LocalFilesNode, SplitInfo} +import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.vectorized._ @@ -49,6 +50,11 @@ import scala.collection.JavaConverters._ import scala.collection.mutable class VeloxIteratorApi extends IteratorApi with Logging { + private type NormalizedDeltaSplitMetadata = + (Seq[java.util.Map[String, Object]], Seq[DeltaFileReadOptions]) + + private val deltaMetadataUtilsClassName = + "org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils$" private def setFileSchemaForLocalFiles( localFilesNode: LocalFilesNode, @@ -94,10 +100,33 @@ class VeloxIteratorApi extends IteratorApi with Logging { val metadataColumns = partitionFiles .map( f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava) - val otherMetadataColumns = partitionFiles - .map(f => SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f)) + val (otherMetadataColumns, deltaReadOptions) = + normalizeDeltaSplitMetadata(partitionSchema.fields.length, partitionFiles) + .getOrElse { + ( + partitionFiles.map { + f => SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f) + }, + Seq.empty[DeltaFileReadOptions]) + } - setFileSchemaForLocalFiles( + val localFilesNode = if (deltaReadOptions.nonEmpty) { + DeltaLocalFilesBuilder.makeDeltaLocalFiles( + partitionIndex, + paths.asJava, + starts.asJava, + lengths.asJava, + fileSizes.asJava, + modificationTimes.asJava, + partitionColumns.map(_.asJava).asJava, + metadataColumns.asJava, + fileFormat, + locations.toList.asJava, + mapAsJavaMap(properties), + otherMetadataColumns.asJava, + deltaReadOptions.asJava + ) + } else { LocalFilesBuilder.makeLocalFiles( partitionIndex, paths.asJava, @@ -111,10 +140,16 @@ class VeloxIteratorApi extends IteratorApi with Logging { locations.toList.asJava, mapAsJavaMap(properties), otherMetadataColumns.asJava - ), + ) + } + + val localFiles = setFileSchemaForLocalFiles( + localFilesNode, dataSchema, fileFormat ) + + localFiles } /** Generate native row partition. */ @@ -179,6 +214,38 @@ class VeloxIteratorApi extends IteratorApi with Logging { NativePlanEvaluator.injectWriteFilesTempPath(path, fileName) } + private def normalizeDeltaSplitMetadata( + partitionColumnCount: Int, + partitionFiles: Seq[PartitionedFile]): Option[NormalizedDeltaSplitMetadata] = { + try { + // scalastyle:off classforname + val moduleClass = Class.forName(deltaMetadataUtilsClassName) + // scalastyle:on classforname + val module = moduleClass.getField("MODULE$").get(null) + val normalizeMethod = + moduleClass.getMethod("normalizeSplitMetadata", classOf[Int], classOf[java.util.List[_]]) + val normalized = + normalizeMethod.invoke(module, Int.box(partitionColumnCount), partitionFiles.asJava) + val metadataMethod = normalized.getClass.getMethod("otherMetadataColumns") + val deltaOptionsMethod = normalized.getClass.getMethod("deltaReadOptions") + Some( + metadataMethod + .invoke(normalized) + .asInstanceOf[java.util.List[java.util.Map[String, Object]]] + .asScala + .toSeq, + deltaOptionsMethod + .invoke(normalized) + .asInstanceOf[java.util.List[DeltaFileReadOptions]] + .asScala + .toSeq + ) + } catch { + case _: ClassNotFoundException | _: NoSuchMethodException => + None + } + } + /** Generate Iterator[ColumnarBatch] for first stage. */ override def genFirstStageIterator( inputPartition: BaseGlutenPartition, diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index a726d3be916..c0678b33d2c 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include "compute/Runtime.h" #include "config/GlutenConfig.h" diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index f3ffab59a6a..befa4e9dbd1 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -17,8 +17,13 @@ #include "VeloxPlanConverter.h" #include +#include +#include +#include +#include #include "config/GlutenConfig.h" +#include "delta/DeltaSplitInfo.h" #include "iceberg/IcebergPlanConverter.h" #include "operators/plannodes/IteratorSplit.h" @@ -48,6 +53,80 @@ VeloxPlanConverter::VeloxPlanConverter( } namespace { +std::optional unpackMetadataValue(const google::protobuf::Any& value) { + google::protobuf::BytesValue bytesValue; + if (value.UnpackTo(&bytesValue)) { + return bytesValue.value(); + } + + google::protobuf::StringValue stringValue; + if (value.UnpackTo(&stringValue)) { + return stringValue.value(); + } + + google::protobuf::Int32Value int32Value; + if (value.UnpackTo(&int32Value)) { + return std::to_string(int32Value.value()); + } + + google::protobuf::Int64Value int64Value; + if (value.UnpackTo(&int64Value)) { + return std::to_string(int64Value.value()); + } + + google::protobuf::DoubleValue doubleValue; + if (value.UnpackTo(&doubleValue)) { + return std::to_string(doubleValue.value()); + } + + return std::nullopt; +} + +delta::DeltaRowIndexFilterType parseDeltaRowIndexFilterType(int filterType) { + switch (filterType) { + case 1: + return delta::DeltaRowIndexFilterType::kIfContained; + case 2: + return delta::DeltaRowIndexFilterType::kIfNotContained; + case 0: + default: + return delta::DeltaRowIndexFilterType::kKeepAll; + } +} + +std::shared_ptr parseDeltaSplitInfo( + const substrait::ReadRel_LocalFiles_FileOrFiles& file, + std::shared_ptr splitInfo) { + auto deltaSplitInfo = std::dynamic_pointer_cast(splitInfo) + ? std::dynamic_pointer_cast(splitInfo) + : std::make_shared(*splitInfo); + + deltaSplitInfo->format = dwio::common::FileFormat::PARQUET; + const auto& deltaReadOptions = file.delta(); + deltaSplitInfo->rowIndexFilterTypes.emplace_back( + parseDeltaRowIndexFilterType(deltaReadOptions.row_index_filter_type())); + + if (!deltaReadOptions.has_deletion_vector()) { + deltaSplitInfo->deletionVectors.emplace_back(std::nullopt); + return deltaSplitInfo; + } + + auto serializedPayload = deltaReadOptions.serialized_deletion_vector(); + VELOX_USER_CHECK(!serializedPayload.empty(), "Delta split has a deletion vector without a serialized payload"); + VELOX_USER_CHECK_LE( + serializedPayload.size(), + static_cast(std::numeric_limits::max()), + "Delta deletion vector serialized payload is too large"); + const auto cardinality = static_cast(deltaReadOptions.deletion_vector_cardinality()); + auto payload = std::make_shared(std::move(serializedPayload)); + const SplitPayloadBufferView payloadView{ + reinterpret_cast(payload->data()), static_cast(payload->size())}; + deltaSplitInfo->deletionVectors.emplace_back( + delta::DeltaDeletionVectorDescriptor::serialized(cardinality, payloadView)); + deltaSplitInfo->deletionVectorPayloads.emplace_back(std::move(payload)); + return deltaSplitInfo; +} + std::shared_ptr parseScanSplitInfo( const facebook::velox::config::ConfigBase* veloxCfg, const google::protobuf::RepeatedPtrField& fileList) { @@ -75,6 +154,11 @@ std::shared_ptr parseScanSplitInfo( for (const auto& metadataColumn : file.metadata_columns()) { metadataColumnMap[metadataColumn.key()] = metadataColumn.value(); } + for (const auto& otherMetadataColumn : file.other_const_metadata_columns()) { + if (auto unpackedValue = unpackMetadataValue(otherMetadataColumn.value())) { + metadataColumnMap[otherMetadataColumn.key()] = std::move(*unpackedValue); + } + } splitInfo->metadataColumns.emplace_back(metadataColumnMap); splitInfo->paths.emplace_back(file.uri_file()); @@ -103,6 +187,9 @@ std::shared_ptr parseScanSplitInfo( case SubstraitFileFormatCase::kIceberg: splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, std::move(splitInfo)); break; + case SubstraitFileFormatCase::kDelta: + splitInfo = parseDeltaSplitInfo(file, std::move(splitInfo)); + break; default: splitInfo->format = dwio::common::FileFormat::UNKNOWN; break; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index c3ac095cdc7..8499d56b352 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -15,9 +15,13 @@ * limitations under the License. */ #include "WholeStageResultIterator.h" +#include #include "VeloxBackend.h" #include "VeloxPlanConverter.h" #include "VeloxRuntime.h" +#include "compute/delta/DeltaConnector.h" +#include "compute/delta/DeltaSplit.h" +#include "compute/delta/DeltaSplitInfo.h" #include "config/VeloxConfig.h" #include "utils/ConfigExtractor.h" #include "velox/connectors/hive/HiveConfig.h" @@ -69,6 +73,51 @@ const std::string kWriteIOTime = "writeIOWallNanos"; // others const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; +const std::string kDeltaTableFormat = "delta"; +const std::string kTableFormatKey = "table_format"; + +bool isDeltaMetadata(const std::unordered_map& metadata) { + auto tableFormatIt = metadata.find(kTableFormatKey); + return tableFormatIt != metadata.end() && tableFormatIt->second == kDeltaTableFormat; +} + +bool isDeltaScanInfo(const std::shared_ptr& splitInfo) { + for (const auto& metadata : splitInfo->metadataColumns) { + if (isDeltaMetadata(metadata)) { + return true; + } + } + return false; +} + +const velox::core::TableScanNode* findTableScanNodeById( + const std::shared_ptr& planNode, + const velox::core::PlanNodeId& nodeId) { + if (planNode == nullptr) { + return nullptr; + } + + if (planNode->id() == nodeId) { + return dynamic_cast(planNode.get()); + } + + for (const auto& source : planNode->sources()) { + if (const auto* found = findTableScanNodeById(source, nodeId)) { + return found; + } + } + return nullptr; +} + +std::string connectorIdForScanNode( + const std::shared_ptr& planNode, + const velox::core::PlanNodeId& nodeId) { + const auto* tableScanNode = findTableScanNodeById(planNode, nodeId); + if (tableScanNode == nullptr) { + return ""; + } + return tableScanNode->tableHandle()->connectorId(); +} } // namespace @@ -134,7 +183,8 @@ WholeStageResultIterator::WholeStageResultIterator( throw std::runtime_error("Invalid scan information."); } - for (const auto& scanInfo : scanInfos) { + for (size_t scanInfoIdx = 0; scanInfoIdx < scanInfos.size(); ++scanInfoIdx) { + const auto& scanInfo = scanInfos[scanInfoIdx]; // Get the information for TableScan. // Partition index in scan info is not used. const auto& paths = scanInfo->paths; @@ -144,6 +194,10 @@ WholeStageResultIterator::WholeStageResultIterator( const auto& format = scanInfo->format; const auto& partitionColumns = scanInfo->partitionColumns; const auto& metadataColumns = scanInfo->metadataColumns; + const auto scanNodeConnectorId = connectorIdForScanNode(veloxPlan_, scanNodeIds_[scanInfoIdx]); + const auto deltaSplitInfo = std::dynamic_pointer_cast(scanInfo); + const bool isDeltaScan = + scanNodeConnectorId == connectorIds_.delta || deltaSplitInfo != nullptr || isDeltaScanInfo(scanInfo); #ifdef GLUTEN_ENABLE_GPU // Under the pre-condition that all the split infos has same partition column and format. const auto canUseCudfConnector = scanInfo->canUseCudfConnector(); @@ -177,10 +231,37 @@ WholeStageResultIterator::WholeStageResultIterator( deleteFiles, metadataColumn, properties[idx]); + } else if (isDeltaScan) { + std::unordered_map customSplitInfo{{"table_format", kDeltaTableFormat}}; + std::optional deletionVector = std::nullopt; + auto rowIndexFilterType = gluten::delta::DeltaRowIndexFilterType::kKeepAll; + if (deltaSplitInfo != nullptr) { + VELOX_USER_CHECK_LT(idx, deltaSplitInfo->deletionVectors.size()); + VELOX_USER_CHECK_LT(idx, deltaSplitInfo->rowIndexFilterTypes.size()); + deletionVector = deltaSplitInfo->deletionVectors[idx]; + rowIndexFilterType = deltaSplitInfo->rowIndexFilterTypes[idx]; + } + split = std::make_shared( + connectorIds_.delta, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + std::unordered_map(), + true, + deletionVector, + std::nullopt, + rowIndexFilterType, + metadataColumn, + properties[idx]); } else { auto connectorId = connectorIds_.hive; #ifdef GLUTEN_ENABLE_GPU - if (canUseCudfConnector && enableCudf_ && + if (connectorId == connectorIds_.hive && canUseCudfConnector && enableCudf_ && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault)) { connectorId = connectorIds_.cudfHive; } diff --git a/cpp/velox/compute/delta/DeltaSplitInfo.h b/cpp/velox/compute/delta/DeltaSplitInfo.h new file mode 100644 index 00000000000..c02e52f6b88 --- /dev/null +++ b/cpp/velox/compute/delta/DeltaSplitInfo.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "compute/delta/DeltaSplit.h" +#include "substrait/SubstraitToVeloxPlan.h" + +namespace gluten { + +struct DeltaSplitInfo : SplitInfo { + std::vector> deletionVectorPayloads; + std::vector> deletionVectors; + std::vector rowIndexFilterTypes; + + DeltaSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) { + deletionVectors.reserve(splitInfo.paths.capacity()); + deletionVectorPayloads.reserve(splitInfo.paths.capacity()); + rowIndexFilterTypes.reserve(splitInfo.paths.capacity()); + + const auto previousFileCount = splitInfo.paths.empty() ? 0 : splitInfo.paths.size() - 1; + deletionVectors.resize(previousFileCount, std::nullopt); + rowIndexFilterTypes.resize(previousFileCount, delta::DeltaRowIndexFilterType::kKeepAll); + } +}; + +} // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 5477176ce85..7d577c12a63 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -19,6 +19,8 @@ #include "TypeUtils.h" #include "VariantToVectorConverter.h" +#include "compute/delta/DeltaConnector.h" +#include "compute/delta/DeltaSplitInfo.h" #include "jni/JniHashTable.h" #include "operators/hashjoin/HashTableBuilder.h" #include "operators/plannodes/RowVectorStream.h" @@ -46,6 +48,9 @@ using namespace cudf_velox::connector::hive; namespace gluten { namespace { +const std::string kDeltaTableFormat = "delta"; +const std::string kTableFormatKey = "table_format"; + bool useCudfTableHandle(const std::vector>& splitInfos) { #ifdef GLUTEN_ENABLE_GPU if (splitInfos.empty()) { @@ -57,6 +62,23 @@ bool useCudfTableHandle(const std::vector>& splitInfo #endif } +bool isDeltaMetadata(const std::unordered_map& metadata) { + auto tableFormatIt = metadata.find(kTableFormatKey); + return tableFormatIt != metadata.end() && tableFormatIt->second == kDeltaTableFormat; +} + +bool isDeltaSplitInfo(const std::shared_ptr& splitInfo) { + if (std::dynamic_pointer_cast(splitInfo) != nullptr) { + return true; + } + for (const auto& metadata : splitInfo->metadataColumns) { + if (isDeltaMetadata(metadata)) { + return true; + } + } + return false; +} + core::SortOrder toSortOrder(const ::substrait::SortField& sortField) { switch (sortField.direction()) { case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST: @@ -1573,8 +1595,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; - auto connectorId = connectorIds_.hive; - if (useCudfTableHandle(splitInfos_) && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && + auto connectorId = isDeltaSplitInfo(splitInfo) ? connectorIds_.delta : connectorIds_.hive; + if (connectorId == connectorIds_.hive && useCudfTableHandle(splitInfos_) && + veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { #ifdef GLUTEN_ENABLE_GPU connectorId = connectorIds_.cudfHive; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 373601916d4..65a6f8e0872 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -19,6 +19,7 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" +#include "compute/Runtime.h" #include "compute/VeloxConnectorIds.h" #include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala index 1be03dd404a..5f2a5300130 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala @@ -55,18 +55,6 @@ case class DeltaScanTransformer( override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat - override protected def doValidateInternal(): ValidationResult = { - if ( - requiredSchema.fields.exists( - _.name == "__delta_internal_is_row_deleted") || requiredSchema.fields.exists( - _.name == "__delta_internal_row_index") - ) { - return ValidationResult.failed(s"Deletion vector is not supported in native.") - } - - super.doValidateInternal() - } - override def doCanonicalize(): DeltaScanTransformer = { DeltaScanTransformer( relation, @@ -90,7 +78,6 @@ case class DeltaScanTransformer( } object DeltaScanTransformer { - def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = { new DeltaScanTransformer( scanExec.relation, diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala index e16a6d12fda..f6a414db0f3 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala @@ -16,28 +16,37 @@ */ package org.apache.gluten.extension -import org.apache.gluten.execution.{DeltaScanTransformer, ProjectExecTransformer} +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.{DeltaScanTransformer, FilterExecTransformerBase, ProjectExecTransformer} import org.apache.gluten.extension.columnar.transition.RemoveTransitions import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateNamedStruct, Expression, GetStructField, If, InputFileBlockLength, InputFileBlockStart, InputFileName, IsNull, LambdaFunction, Literal, NamedLambdaVariable} -import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, TransformKeys, TransformValues} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, NamedExpression} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, NoMapping} -import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.FileFormat -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.types.StructType -import scala.collection.mutable import scala.collection.mutable.ListBuffer object DeltaPostTransformRules { def rules: Seq[Rule[SparkPlan]] = - RemoveTransitions :: pushDownInputFileExprRule :: columnMappingRule :: Nil + RemoveTransitions :: + nativeDeletionVectorRule :: + pushDownInputFileExprRule :: + columnMappingRule :: Nil + + private val deletionVectorDeletedRowColumnName = "__delta_internal_is_row_deleted" + private val deletionVectorRowIndexColumnName = "__delta_internal_row_index" + private val deletionVectorInternalColumnNames = + Set(deletionVectorDeletedRowColumnName, deletionVectorRowIndexColumnName) private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] = TreeNodeTag[String]("org.apache.gluten.delta.column.mapping") + private val PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG: TreeNodeTag[Boolean] = + TreeNodeTag[Boolean]("org.apache.gluten.delta.preserve.deletion.vector.row.index") private def notAppliedColumnMappingRule(plan: SparkPlan): Boolean = { plan.getTagValue(COLUMN_MAPPING_RULE_TAG).isEmpty @@ -65,6 +74,87 @@ object DeltaPostTransformRules { child.copy(output = p.output) } + /** + * Spark Delta injects synthetic deletion-vector predicates and columns into the plan. Those are + * needed for the JVM reader path, but for the native Delta scan path they must be stripped or + * they will be applied twice with incompatible semantics. + */ + val nativeDeletionVectorRule: Rule[SparkPlan] = (plan: SparkPlan) => { + tagRowIndexRequiredSubtrees(plan) + plan.transformUp { + case scan: DeltaScanTransformer => + val cleanedDataFilters = scan.dataFilters.flatMap(stripDeletionVectorPredicate) + val cleanedPushDownFilters = + scan.pushDownFilters.map(_.flatMap(stripDeletionVectorPredicate)) + val preserveRowIndex = shouldPreserveDeletionVectorRowIndex(scan) + val cleanedOutput = stripDeletionVectorInternalOutput(scan.output, preserveRowIndex) + val cleanedRequiredSchema = + stripDeletionVectorInternalSchema(scan.requiredSchema, preserveRowIndex) + if ( + cleanedDataFilters == scan.dataFilters && + cleanedPushDownFilters == scan.pushDownFilters && + cleanedOutput == scan.output && + cleanedRequiredSchema == scan.requiredSchema + ) { + scan + } else { + scan.copy( + output = cleanedOutput, + requiredSchema = cleanedRequiredSchema, + dataFilters = cleanedDataFilters, + pushDownFilters = cleanedPushDownFilters) + } + case project: ProjectExecTransformer if containsNativeDeltaScan(project.child) => + val cleanedProjectList = stripDeletionVectorInternalProjectList( + project.projectList, + shouldPreserveDeletionVectorRowIndex(project)) + if (cleanedProjectList == project.projectList) { + project + } else if (cleanedProjectList.isEmpty) { + project.child + } else { + ProjectExecTransformer(cleanedProjectList, project.child) + } + case project: ProjectExec if containsNativeDeltaScan(project.child) => + val cleanedProjectList = stripDeletionVectorInternalProjectList( + project.projectList, + shouldPreserveDeletionVectorRowIndex(project)) + if (cleanedProjectList == project.projectList) { + project + } else if (cleanedProjectList.isEmpty) { + project.child + } else { + ProjectExec(cleanedProjectList, project.child) + } + case filter: FilterExecTransformerBase if containsNativeDeltaScan(filter.child) => + stripDeletionVectorPredicate(filter.cond) match { + case Some(cleanCondition) if cleanCondition != filter.cond => + BackendsApiManager.getSparkPlanExecApiInstance + .genFilterExecTransformer(cleanCondition, filter.child) + case Some(_) => + filter + case None => + filter.child + } + case filter: FilterExec if containsNativeDeltaScan(filter.child) => + stripDeletionVectorPredicate(filter.condition) match { + case Some(cleanCondition) if cleanCondition != filter.condition => + FilterExec(cleanCondition, filter.child) + case Some(_) => + filter + case None => + filter.child + } + } + } + + private def containsNativeDeltaScan(plan: SparkPlan): Boolean = { + plan.exists { + case _: DeltaScanTransformer => true + case _ => false + } + } + private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match { case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping => true @@ -79,6 +169,82 @@ object DeltaPostTransformRules { } } + private def referencesDeletionVectorInternalColumn(expr: Expression): Boolean = { + expr.references.exists(attr => deletionVectorInternalColumnNames.contains(attr.name)) + } + + private def referencesDeletionVectorRowIndex(expr: Expression): Boolean = { + expr.references.exists(_.name == deletionVectorRowIndexColumnName) + } + + private def tagRowIndexRequiredSubtrees(plan: SparkPlan): Unit = { + def tagSubtree(subtree: SparkPlan): Unit = { + subtree.foreach(_.setTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG, true)) + } + + def visit(node: SparkPlan): Unit = { + val shouldPreserveRowIndex = + node.expressions.exists(containsIncrementMetricExpr) || + node.expressions.exists(referencesDeletionVectorRowIndex) + if (shouldPreserveRowIndex) { + node.children.foreach(tagSubtree) + } + node.children.foreach(visit) + } + + visit(plan) + } + + private def shouldPreserveDeletionVectorRowIndex(plan: SparkPlan): Boolean = { + plan.getTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG).contains(true) || + plan.expressions.exists(containsIncrementMetricExpr) || + plan.expressions.exists(referencesDeletionVectorRowIndex) + } + + private def shouldStripDeletionVectorInternalColumn( + columnName: String, + preserveRowIndex: Boolean): Boolean = { + columnName == deletionVectorDeletedRowColumnName || + (!preserveRowIndex && columnName == deletionVectorRowIndexColumnName) + } + + private def stripDeletionVectorInternalOutput( + output: Seq[Attribute], + preserveRowIndex: Boolean): Seq[Attribute] = { + output.filterNot(attr => shouldStripDeletionVectorInternalColumn(attr.name, preserveRowIndex)) + } + + private def stripDeletionVectorInternalProjectList( + projectList: Seq[NamedExpression], + preserveRowIndex: Boolean): Seq[NamedExpression] = { + projectList.filterNot( + expr => shouldStripDeletionVectorInternalColumn(expr.name, preserveRowIndex)) + } + + private def stripDeletionVectorInternalSchema( + schema: StructType, + preserveRowIndex: Boolean): StructType = { + StructType( + schema.filterNot( + field => shouldStripDeletionVectorInternalColumn(field.name, preserveRowIndex))) + } + + private def stripDeletionVectorPredicate(expr: Expression): Option[Expression] = { + expr match { + case And(left, right) => + (stripDeletionVectorPredicate(left), stripDeletionVectorPredicate(right)) match { + case (Some(cleanLeft), Some(cleanRight)) => Some(And(cleanLeft, cleanRight)) + case (Some(cleanLeft), None) => Some(cleanLeft) + case (None, Some(cleanRight)) => Some(cleanRight) + case (None, None) => None + } + case other if referencesDeletionVectorInternalColumn(other) => + None + case other => + Some(other) + } + } + private def isInputFileRelatedAttribute(attr: Attribute): Boolean = { attr match { case AttributeReference(name, _, _, _) => @@ -96,73 +262,6 @@ object DeltaPostTransformRules { } } - /** - * Checks whether two structurally compatible DataTypes have different struct field names at any - * nesting level. - */ - private def nestedFieldNamesDiffer(logical: DataType, physical: DataType): Boolean = { - (logical, physical) match { - case (l: StructType, p: StructType) if l.length == p.length => - l.zip(p).exists { - case (lf, pf) => - lf.name != pf.name || nestedFieldNamesDiffer(lf.dataType, pf.dataType) - } - case (l: ArrayType, p: ArrayType) => - nestedFieldNamesDiffer(l.elementType, p.elementType) - case (l: MapType, p: MapType) => - nestedFieldNamesDiffer(l.keyType, p.keyType) || - nestedFieldNamesDiffer(l.valueType, p.valueType) - case _ => false - } - } - - /** - * Rebuilds an expression tree so that nested struct field names match the logical schema. Uses - * positional extraction (GetStructField) and reconstruction (CreateNamedStruct) instead of Cast, - * so correctness does not depend on Velox's cast_match_struct_by_name config. - */ - private def reconcileFieldNames( - expr: Expression, - logical: DataType, - physical: DataType): Expression = { - (logical, physical) match { - case (l: StructType, p: StructType) if l.length == p.length => - val rebuiltFields = l.zip(p).zipWithIndex.flatMap { - case ((lf, pf), i) => - val extracted = GetStructField(expr, i, None) - val reconciled = reconcileFieldNames(extracted, lf.dataType, pf.dataType) - Seq(Literal(lf.name), reconciled) - } - val rebuilt = CreateNamedStruct(rebuiltFields) - If(IsNull(expr), Literal.create(null, l), rebuilt) - case (l: ArrayType, p: ArrayType) if nestedFieldNamesDiffer(l.elementType, p.elementType) => - val lambdaVar = NamedLambdaVariable("element", p.elementType, p.containsNull) - val body = reconcileFieldNames(lambdaVar, l.elementType, p.elementType) - ArrayTransform(expr, LambdaFunction(body, Seq(lambdaVar))) - case (l: MapType, p: MapType) => - val needKeys = nestedFieldNamesDiffer(l.keyType, p.keyType) - val needValues = nestedFieldNamesDiffer(l.valueType, p.valueType) - var result = expr - if (needValues) { - val keyVar = NamedLambdaVariable("key", p.keyType, false) - val valueVar = NamedLambdaVariable("value", p.valueType, p.valueContainsNull) - val body = reconcileFieldNames(valueVar, l.valueType, p.valueType) - result = TransformValues(result, LambdaFunction(body, Seq(keyVar, valueVar))) - } - if (needKeys) { - val keyVar = NamedLambdaVariable("key", p.keyType, false) - val valueVar = NamedLambdaVariable( - "value", - if (needValues) l.valueType else p.valueType, - p.valueContainsNull) - val body = reconcileFieldNames(keyVar, l.keyType, p.keyType) - result = TransformKeys(result, LambdaFunction(body, Seq(keyVar, valueVar))) - } - result - case _ => expr - } - } - /** * This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) * transform the metadata of Delta into Parquet's, each plan should only be transformed once. @@ -185,9 +284,8 @@ object DeltaPostTransformRules { )(SparkSession.active) // transform output's name into physical name so Reader can read data correctly // should keep the columns order the same as the origin output - case class ColumnMapping(logicalName: String, logicalType: DataType, physicalAttr: Attribute) - val columnMappings = ListBuffer.empty[ColumnMapping] - val seenNames = mutable.Set.empty[String] + val originColumnNames = ListBuffer.empty[String] + val transformedAttrs = ListBuffer.empty[Attribute] def mapAttribute(attr: Attribute) = { val newAttr = if (plan.isMetadataColumn(attr)) { attr @@ -198,8 +296,9 @@ object DeltaPostTransformRules { .createPhysicalAttributes(Seq(attr), fmt.referenceSchema, fmt.columnMappingMode) .head } - if (seenNames.add(attr.name)) { - columnMappings += ColumnMapping(attr.name, attr.dataType, newAttr) + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name } newAttr } @@ -239,20 +338,9 @@ object DeltaPostTransformRules { scanExecTransformer.copyTagsFrom(plan) tagColumnMappingRule(scanExecTransformer) - // Alias physical names back to logical names. For struct-typed columns, Delta column - // mapping renames internal field names to physical UUIDs. A top-level Alias only restores - // the column name, not the struct's internal field names. We rebuild the struct with - // logical field names using positional extraction (GetStructField/CreateNamedStruct) - // instead of Cast, so correctness does not depend on any Velox cast config. - val expr = columnMappings.map { - cm => - val projectedExpr: Expression = - if (nestedFieldNamesDiffer(cm.logicalType, cm.physicalAttr.dataType)) { - reconcileFieldNames(cm.physicalAttr, cm.logicalType, cm.physicalAttr.dataType) - } else { - cm.physicalAttr - } - Alias(projectedExpr, cm.logicalName)(exprId = cm.physicalAttr.exprId) + // alias physicalName into tableName + val expr = (transformedAttrs, originColumnNames).zipped.map { + (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId) } val projectExecTransformer = ProjectExecTransformer(expr.toSeq, scanExecTransformer) projectExecTransformer diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala index 5fe1b4ba86e..ebafb0c08c3 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala @@ -17,16 +17,93 @@ package org.apache.gluten.extension import org.apache.gluten.execution.DeltaScanTransformer +import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.delta.DeltaParquetFileFormat +import org.apache.spark.sql.delta.SnapshotDescriptor +import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable +import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.util.SparkVersionUtil case class OffloadDeltaScan() extends OffloadSingleNode { + private val DeletionVectorsUseMetadataRowIndexKey = + "spark.databricks.delta.deletionVectors.useMetadataRowIndex" + override def offload(plan: SparkPlan): SparkPlan = plan match { + case scan: FileSourceScanExec if isDeltaLogScan(scan) => + FallbackTags.add(scan, "fallback Delta _delta_log scan") + scan + case scan: FileSourceScanExec if shouldFallbackSpark34DeletionVectorScan(scan) => + FallbackTags.add(scan, "fallback Spark 3.4 Delta DV scan") + scan case scan: FileSourceScanExec - if scan.relation.fileFormat.getClass == classOf[DeltaParquetFileFormat] => + if shouldFallbackDeletionVectorScanWithoutMetadataRowIndex(scan) => + FallbackTags.add(scan, "fallback Delta DV scan without metadata row index") + scan + case scan: FileSourceScanExec if isDeltaScan(scan) => DeltaScanTransformer(scan) case other => other } + + private def isDeltaScan(scan: FileSourceScanExec): Boolean = { + isDeltaFileIndex(scan) || isDeltaParquetScan(scan) + } + + private def isDeltaParquetScan(scan: FileSourceScanExec): Boolean = { + val fileFormatClass = scan.relation.fileFormat.getClass + fileFormatClass == classOf[DeltaParquetFileFormat] || + fileFormatClass.getSimpleName == "GlutenDeltaParquetFileFormat" + } + + private def isDeltaFileIndex(scan: FileSourceScanExec): Boolean = { + scan.relation.location.isInstanceOf[TahoeFileIndex] || + scan.relation.location.isInstanceOf[PreparedDeltaFileIndex] + } + + private def isDeltaLogScan(scan: FileSourceScanExec): Boolean = { + scan.relation.location.rootPaths.exists { + path => + val root = path.toString + root.contains("/_delta_log") || root.contains("\\_delta_log") || root.endsWith("_delta_log") + } + } + + private def shouldFallbackSpark34DeletionVectorScan(scan: FileSourceScanExec): Boolean = { + if (SparkVersionUtil.gteSpark35) { + return false + } + + containsDeletionVector(scan) + } + + private def shouldFallbackDeletionVectorScanWithoutMetadataRowIndex( + scan: FileSourceScanExec): Boolean = { + if (!SparkVersionUtil.gteSpark35) { + return false + } + + // Delta DML tests force this path and rely on Spark's injected + // row-index filter column for correctness. Keep it on Spark until the native path can + // prove the same contract for DML-generated DVs. + val useMetadataRowIndex = + scan.relation.sparkSession.sessionState.conf + .getConfString(DeletionVectorsUseMetadataRowIndexKey, "true") + .toBoolean + !useMetadataRowIndex && containsDeletionVector(scan) + } + + private def containsDeletionVector(scan: FileSourceScanExec): Boolean = { + scan.relation.location match { + case preparedIndex: PreparedDeltaFileIndex => + preparedIndex.preparedScan.files.exists(_.deletionVector != null) + case index: TahoeFileIndex => + val snapshot = index.asInstanceOf[SnapshotDescriptor] + deletionVectorsReadable(snapshot.protocol, snapshot.metadata) + case _ => + false + } + } } diff --git a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala index 031bf460347..fda594ef84d 100644 --- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala +++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala @@ -18,7 +18,10 @@ package org.apache.gluten.execution import org.apache.spark.SparkConf import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types._ +import org.apache.spark.util.SparkVersionUtil import scala.collection.JavaConverters._ @@ -37,6 +40,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { .set("spark.memory.offHeap.size", "2g") .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.ansi.enabled", "false") .set("spark.sql.sources.useV1SourceList", "avro") .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") @@ -209,16 +213,40 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") checkAnswer(spark.read.format("delta").load(path), df1.union(df2)) spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (${values2.mkString(", ")})") - import org.apache.spark.sql.execution.GlutenImplicits._ val df = spark.read.format("delta").load(path) - assert( - df.fallbackSummary.fallbackNodeToReason - .flatMap(_.values) - .exists(_.contains("Deletion vector is not supported in native"))) + val executedPlan = df.queryExecution.executedPlan + if (SparkVersionUtil.gteSpark35) { + assert(executedPlan.collect { case _: DeltaScanTransformer => true }.nonEmpty) + val planText = executedPlan.toString() + assert(!planText.contains("__delta_internal_is_row_deleted")) + assert(!planText.contains("__delta_internal_row_index")) + } else { + assert(executedPlan.collect { case _: DeltaScanTransformer => true }.isEmpty) + } checkAnswer(df, df1) } } + testWithMinSparkVersion("delta: _delta_log scan should fallback", "3.4") { + withTempPath { + p => + import testImplicits._ + val path = p.getCanonicalPath + Seq((1, "a"), (2, "b")).toDF("id", "value").write.format("delta").save(path) + + val deltaLogDf = spark.read.json(s"$path/_delta_log/*.json") + val executedPlan = deltaLogDf.queryExecution.executedPlan + + assert(executedPlan.collect { case _: FileSourceScanExecTransformerBase => true }.isEmpty) + assert(executedPlan.collect { case _: BatchScanExecTransformerBase => true }.isEmpty) + assert(executedPlan.collect { + case _: FileSourceScanExec => true + case _: BatchScanExec => true + }.nonEmpty) + assert(deltaLogDf.count() > 0) + } + } + testWithMinSparkVersion("delta: push down input_file_name expression", "3.2") { withTable("source_table") { withTable("target_table") { @@ -320,13 +348,13 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { withSQLConf("spark.gluten.sql.columnar.scanOnly" -> "true") { withTable("delta_pf") { spark.sql(s""" - |create table test (id int, name string) using delta + |create table delta_pf (id int, name string) using delta |""".stripMargin) spark.sql(s""" - |insert into test values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") + |insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") |""".stripMargin) runQueryAndCompare( - "select id from test where name > 'v1'", + "select id from delta_pf where name > 'v1'", compareResult = true, noFallBack = false) { df => diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java new file mode 100644 index 00000000000..fc75285eddb --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel; + +import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions; + +import java.util.List; +import java.util.Map; + +public class DeltaLocalFilesBuilder { + private DeltaLocalFilesBuilder() {} + + public static DeltaLocalFilesNode makeDeltaLocalFiles( + Integer index, + List paths, + List starts, + List lengths, + List fileSizes, + List modificationTimes, + List> partitionColumns, + List> metadataColumns, + LocalFilesNode.ReadFileFormat fileFormat, + List preferredLocations, + Map properties, + List> otherMetadataColumns, + List deltaReadOptions) { + return new DeltaLocalFilesNode( + index, + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + fileFormat, + preferredLocations, + properties, + otherMetadataColumns, + deltaReadOptions); + } +} diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java new file mode 100644 index 00000000000..dd34838261c --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel; + +import com.google.protobuf.ByteString; +import io.substrait.proto.ReadRel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class DeltaLocalFilesNode extends LocalFilesNode { + private final List deltaReadOptions = new ArrayList<>(); + + DeltaLocalFilesNode( + Integer index, + List paths, + List starts, + List lengths, + List fileSizes, + List modificationTimes, + List> partitionColumns, + List> metadataColumns, + ReadFileFormat fileFormat, + List preferredLocations, + Map properties, + List> otherMetadataColumns, + List deltaReadOptions) { + super( + index, + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + fileFormat, + preferredLocations, + properties, + otherMetadataColumns); + this.deltaReadOptions.addAll(deltaReadOptions); + } + + @Override + protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) { + DeltaFileReadOptions options = deltaReadOptions.get(index); + ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.Builder deltaBuilder = + ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.newBuilder() + .setRowIndexFilterType(toProtoRowIndexFilterType(options.rowIndexFilterType())) + .setHasDeletionVector(options.hasDeletionVector()); + + if (options.hasDeletionVector()) { + deltaBuilder + .setDeletionVectorCardinality(options.deletionVectorCardinality()) + .setSerializedDeletionVector(ByteString.copyFrom(options.serializedDeletionVector())); + } + + fileBuilder.setDelta(deltaBuilder.build()); + } + + private static ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType + toProtoRowIndexFilterType(RowIndexFilterType rowIndexFilterType) { + switch (rowIndexFilterType) { + case IF_CONTAINED: + return ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.IF_CONTAINED; + case IF_NOT_CONTAINED: + return ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.IF_NOT_CONTAINED; + case KEEP_ALL: + default: + return ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.KEEP_ALL; + } + } + + public enum RowIndexFilterType { + KEEP_ALL, + IF_CONTAINED, + IF_NOT_CONTAINED + } + + public static class DeltaFileReadOptions implements Serializable { + private static final long serialVersionUID = 1L; + + private final RowIndexFilterType rowIndexFilterType; + private final boolean hasDeletionVector; + private final long deletionVectorCardinality; + private final byte[] serializedDeletionVector; + + public DeltaFileReadOptions( + RowIndexFilterType rowIndexFilterType, + boolean hasDeletionVector, + long deletionVectorCardinality, + byte[] serializedDeletionVector) { + this.rowIndexFilterType = rowIndexFilterType; + this.hasDeletionVector = hasDeletionVector; + this.deletionVectorCardinality = deletionVectorCardinality; + this.serializedDeletionVector = + serializedDeletionVector == null ? new byte[0] : serializedDeletionVector; + } + + public RowIndexFilterType rowIndexFilterType() { + return rowIndexFilterType; + } + + public boolean hasDeletionVector() { + return hasDeletionVector; + } + + public long deletionVectorCardinality() { + return deletionVectorCardinality; + } + + public byte[] serializedDeletionVector() { + return serializedDeletionVector; + } + } +} diff --git a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto index 2bfb68e0979..02c7f4cc5c6 100644 --- a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto @@ -197,6 +197,18 @@ message ReadRel { repeated DeleteFile delete_files = 3; } + message DeltaReadOptions { + enum RowIndexFilterType { + KEEP_ALL = 0; + IF_CONTAINED = 1; + IF_NOT_CONTAINED = 2; + } + RowIndexFilterType row_index_filter_type = 1; + bool has_deletion_vector = 2; + uint64 deletion_vector_cardinality = 3; + bytes serialized_deletion_vector = 4; + } + // File reading options oneof file_format { ParquetReadOptions parquet = 9; @@ -207,6 +219,7 @@ message ReadRel { TextReadOptions text = 14; JsonReadOptions json = 15; IcebergReadOptions iceberg = 16; + DeltaReadOptions delta = 22; } message partitionColumn { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index 0ef0b6a28c3..2b36cac94b7 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -39,6 +39,9 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession) if (!glutenConf.enableFallbackReport) { return plan } + if (GlutenFallbackReporter.containsInternalDeltaLogScan(plan)) { + return plan + } printFallbackReason(plan) if (GlutenUIUtils.uiEnabled(spark.sparkContext)) { postFallbackReason(plan) @@ -96,3 +99,20 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession) GlutenUIUtils.postEvent(sc, event) } } + +object GlutenFallbackReporter { + private[execution] def containsInternalDeltaLogScan(plan: SparkPlan): Boolean = { + plan.exists { + case scan: FileSourceScanExec => + scan.relation.location.rootPaths.exists { + path => + val root = path.toString + root.contains("/_delta_log") || + root.contains("\\_delta_log") || + root.endsWith("_delta_log") + } + case _ => + false + } + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala index 30aac6a8f38..f2529da82f1 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala @@ -22,6 +22,7 @@ import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.catalyst.plans.logical.CommandResult import org.apache.spark.sql.execution.ui.{GlutenUIUtils, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} import scala.collection.mutable @@ -79,6 +80,9 @@ class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener with if (!enabledAtStart) { return } + if (shouldSkipInternalDeltaLogQuery(qe)) { + return + } val summary = GlutenImplicits.collectQueryExecutionFallbackSummary(qe.sparkSession, qe) @@ -107,6 +111,15 @@ class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener with e) } } + + private def shouldSkipInternalDeltaLogQuery(qe: QueryExecution): Boolean = { + qe.commandExecuted.exists { + case r: CommandResult => + GlutenFallbackReporter.containsInternalDeltaLogScan(r.commandPhysicalPlan) + case _ => + false + } || GlutenFallbackReporter.containsInternalDeltaLogScan(qe.executedPlan) + } } object GlutenQueryExecutionListener {