Skip to content

Commit bbb971f

Browse files
committed
[VL][Delta] Add persistent DV DELETE correctness path
Route Delta DELETE commands with persistent deletion vectors through the Gluten-specific command while leaving metadata-only, full-table, and non-DV cases on the existing Delta path. Add Delta 3.3 and Delta 4.0 coverage for persistent DV DELETE routing and repeated deletion-vector updates. Validation: git diff --cached --check; mvn test-compile -pl backends-velox -am -Pjava-17,spark-3.5,backends-velox,hadoop-3.3,spark-ut,delta -DskipTests; mvn test-compile -pl backends-velox -am -Pjava-17,spark-4.0,scala-2.13,backends-velox,hadoop-3.3,spark-ut,delta -DskipTests.
1 parent f313556 commit bbb971f

9 files changed

Lines changed: 884 additions & 2 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.delta.commands
18+
19+
import org.apache.spark.sql.{DataFrame, SparkSession}
20+
import org.apache.spark.sql.catalyst.expressions.Expression
21+
import org.apache.spark.sql.delta.{DeltaLog, OptimisticTransaction}
22+
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
23+
24+
object GlutenDMLWithDeletionVectorsHelper extends DeltaCommand {
25+
def findTouchedFiles(
26+
sparkSession: SparkSession,
27+
txn: OptimisticTransaction,
28+
hasDVsEnabled: Boolean,
29+
deltaLog: DeltaLog,
30+
targetDf: DataFrame,
31+
fileIndex: TahoeBatchFileIndex,
32+
condition: Expression,
33+
opName: String): Seq[TouchedFileWithDV] = {
34+
require(
35+
DMLWithDeletionVectorsHelper.SUPPORTED_DML_COMMANDS.contains(opName),
36+
s"Expecting opName to be one of " +
37+
s"${DMLWithDeletionVectorsHelper.SUPPORTED_DML_COMMANDS.mkString(", ")}, " +
38+
s"but got '$opName'."
39+
)
40+
41+
recordDeltaOperation(deltaLog, opType = s"$opName.findTouchedFiles.gluten") {
42+
val candidateFiles = fileIndex.addFiles
43+
val matchedRowIndexSets =
44+
DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition(
45+
sparkSession,
46+
txn,
47+
hasDVsEnabled,
48+
targetDf,
49+
candidateFiles,
50+
condition)
51+
52+
val nameToAddFileMap = generateCandidateFileMap(txn.deltaLog.dataPath, candidateFiles)
53+
DMLWithDeletionVectorsHelper.findFilesWithMatchingRows(
54+
txn,
55+
nameToAddFileMap,
56+
matchedRowIndexSets)
57+
}
58+
}
59+
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.delta.commands
18+
19+
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
21+
import org.apache.spark.sql.catalyst.expressions.Expression
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.delta.{DeltaLog, DeltaTableUtils, NumRecordsStats, OptimisticTransaction}
24+
import org.apache.spark.sql.delta.actions.Action
25+
import org.apache.spark.sql.delta.commands.MergeIntoCommandBase.totalBytesAndDistinctPartitionValues
26+
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
27+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
28+
29+
object GlutenDeleteCommand {
30+
def apply(delegate: DeleteCommand): GlutenDeleteCommand =
31+
new GlutenDeleteCommand(
32+
delegate.deltaLog,
33+
delegate.catalogTable,
34+
delegate.target,
35+
delegate.condition)
36+
37+
def shouldOffload(delegate: DeleteCommand, sparkSession: SparkSession): Boolean = {
38+
val persistentDeletionVectorsEnabled =
39+
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS)
40+
if (!persistentDeletionVectorsEnabled) {
41+
return false
42+
}
43+
44+
delegate.condition.exists {
45+
deleteCondition =>
46+
val snapshot = delegate.deltaLog.update()
47+
val (_, dataPredicates) = DeltaTableUtils.splitMetadataAndDataPredicates(
48+
deleteCondition,
49+
snapshot.metadata.partitionColumns,
50+
sparkSession)
51+
dataPredicates.nonEmpty && DeletionVectorUtils.deletionVectorsWritable(snapshot)
52+
}
53+
}
54+
}
55+
56+
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
57+
class GlutenDeleteCommand(
58+
override val deltaLog: DeltaLog,
59+
override val catalogTable: Option[CatalogTable],
60+
override val target: LogicalPlan,
61+
override val condition: Option[Expression])
62+
extends DeleteCommand(deltaLog, catalogTable, target, condition) {
63+
64+
override def performDelete(
65+
sparkSession: SparkSession,
66+
deltaLog: DeltaLog,
67+
txn: OptimisticTransaction): (Seq[Action], DeleteMetric) = {
68+
val (cond, metadataPredicates, otherPredicates) =
69+
condition match {
70+
case Some(deleteCondition) =>
71+
val (metadata, data) = DeltaTableUtils.splitMetadataAndDataPredicates(
72+
deleteCondition,
73+
txn.metadata.partitionColumns,
74+
sparkSession)
75+
(deleteCondition, metadata, data)
76+
case None =>
77+
return super.performDelete(sparkSession, deltaLog, txn)
78+
}
79+
80+
val shouldWriteDVs = otherPredicates.nonEmpty &&
81+
shouldWritePersistentDeletionVectors(sparkSession, txn)
82+
if (!shouldWriteDVs) {
83+
return super.performDelete(sparkSession, deltaLog, txn)
84+
}
85+
86+
var numRemovedFiles: Long = 0
87+
val numAddedFiles: Long = 0
88+
var scanTimeMs: Long = 0
89+
val rewriteTimeMs: Long = 0
90+
val numAddedBytes: Long = 0
91+
val changeFileBytes: Long = 0
92+
val numRemovedBytes: Long = 0
93+
val numFilesBeforeSkipping: Long = txn.snapshot.numOfFiles
94+
val numBytesBeforeSkipping: Long = txn.snapshot.sizeInBytes
95+
var numFilesAfterSkipping: Long = 0
96+
var numBytesAfterSkipping: Long = 0
97+
var numPartitionsAfterSkipping: Option[Long] = None
98+
val numPartitionsRemovedFrom: Option[Long] = None
99+
val numPartitionsAddedTo: Option[Long] = None
100+
var numDeletedRows: Option[Long] = None
101+
val numCopiedRows: Option[Long] = None
102+
var numDeletionVectorsAdded: Long = 0
103+
var numDeletionVectorsRemoved: Long = 0
104+
var numDeletionVectorsUpdated: Long = 0
105+
106+
val startTime = System.nanoTime()
107+
val numFilesTotal = txn.snapshot.numOfFiles
108+
val candidateFiles = txn.filterFiles(
109+
metadataPredicates ++ otherPredicates,
110+
keepNumRecords = true)
111+
112+
numFilesAfterSkipping = candidateFiles.size
113+
val (numCandidateBytes, numCandidatePartitions) =
114+
totalBytesAndDistinctPartitionValues(candidateFiles)
115+
numBytesAfterSkipping = numCandidateBytes
116+
if (txn.metadata.partitionColumns.nonEmpty) {
117+
numPartitionsAfterSkipping = Some(numCandidatePartitions)
118+
}
119+
120+
val fileIndex = new TahoeBatchFileIndex(
121+
sparkSession,
122+
"delete",
123+
candidateFiles,
124+
deltaLog,
125+
deltaLog.dataPath,
126+
txn.snapshot)
127+
val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
128+
sparkSession,
129+
target,
130+
fileIndex)
131+
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)
132+
val touchedFiles = GlutenDMLWithDeletionVectorsHelper.findTouchedFiles(
133+
sparkSession,
134+
txn,
135+
mustReadDeletionVectors,
136+
deltaLog,
137+
targetDf,
138+
fileIndex,
139+
cond,
140+
opName = "DELETE")
141+
142+
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
143+
val deleteActions =
144+
if (touchedFiles.nonEmpty) {
145+
val (actions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
146+
sparkSession,
147+
touchedFiles,
148+
txn.snapshot)
149+
metrics("numDeletedRows").set(metricMap("numModifiedRows"))
150+
numDeletedRows = Some(metricMap("numModifiedRows"))
151+
numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded")
152+
numDeletionVectorsRemoved = metricMap("numDeletionVectorsRemoved")
153+
numDeletionVectorsUpdated = metricMap("numDeletionVectorsUpdated")
154+
numRemovedFiles = metricMap("numRemovedFiles")
155+
actions
156+
} else {
157+
Nil
158+
}
159+
160+
metrics("numRemovedFiles").set(numRemovedFiles)
161+
metrics("numAddedFiles").set(numAddedFiles)
162+
val executionTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
163+
metrics("executionTimeMs").set(executionTimeMs)
164+
metrics("scanTimeMs").set(scanTimeMs)
165+
metrics("rewriteTimeMs").set(rewriteTimeMs)
166+
metrics("numAddedChangeFiles").set(0L)
167+
metrics("changeFileBytes").set(changeFileBytes)
168+
metrics("numAddedBytes").set(numAddedBytes)
169+
metrics("numRemovedBytes").set(numRemovedBytes)
170+
metrics("numFilesBeforeSkipping").set(numFilesBeforeSkipping)
171+
metrics("numBytesBeforeSkipping").set(numBytesBeforeSkipping)
172+
metrics("numFilesAfterSkipping").set(numFilesAfterSkipping)
173+
metrics("numBytesAfterSkipping").set(numBytesAfterSkipping)
174+
metrics("numDeletionVectorsAdded").set(numDeletionVectorsAdded)
175+
metrics("numDeletionVectorsRemoved").set(numDeletionVectorsRemoved)
176+
metrics("numDeletionVectorsUpdated").set(numDeletionVectorsUpdated)
177+
numPartitionsAfterSkipping.foreach(metrics("numPartitionsAfterSkipping").set)
178+
numPartitionsAddedTo.foreach(metrics("numPartitionsAddedTo").set)
179+
numPartitionsRemovedFrom.foreach(metrics("numPartitionsRemovedFrom").set)
180+
numCopiedRows.foreach(metrics("numCopiedRows").set)
181+
txn.registerSQLMetrics(sparkSession, metrics)
182+
sendDriverMetrics(sparkSession, metrics)
183+
184+
val numRecordsStats = NumRecordsStats.fromActions(deleteActions)
185+
val deleteMetric = DeleteMetric(
186+
condition = condition.map(_.sql).getOrElse("true"),
187+
numFilesTotal,
188+
numFilesAfterSkipping,
189+
numAddedFiles,
190+
numRemovedFiles,
191+
numAddedFiles,
192+
numAddedChangeFiles = 0L,
193+
numFilesBeforeSkipping,
194+
numBytesBeforeSkipping,
195+
numFilesAfterSkipping,
196+
numBytesAfterSkipping,
197+
numPartitionsAfterSkipping,
198+
numPartitionsAddedTo,
199+
numPartitionsRemovedFrom,
200+
numCopiedRows,
201+
numDeletedRows,
202+
numAddedBytes,
203+
numRemovedBytes,
204+
changeFileBytes = changeFileBytes,
205+
scanTimeMs,
206+
rewriteTimeMs,
207+
numDeletionVectorsAdded,
208+
numDeletionVectorsRemoved,
209+
numDeletionVectorsUpdated,
210+
numLogicalRecordsAdded = numRecordsStats.numLogicalRecordsAdded,
211+
numLogicalRecordsRemoved = numRecordsStats.numLogicalRecordsRemoved
212+
)
213+
214+
val actionsToCommit = if (deleteActions.nonEmpty) {
215+
createSetTransaction(sparkSession, deltaLog).toSeq ++ deleteActions
216+
} else {
217+
Seq.empty
218+
}
219+
(actionsToCommit, deleteMetric)
220+
}
221+
}

backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.datasources.v2
1919
import org.apache.gluten.config.VeloxDeltaConfig
2020
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
2121

22+
import org.apache.spark.sql.SparkSession
2223
import org.apache.spark.sql.delta.catalog.DeltaCatalog
23-
import org.apache.spark.sql.delta.commands.{DeleteCommand, DeltaCommand, OptimizeTableCommand, UpdateCommand}
24+
import org.apache.spark.sql.delta.commands.{DeleteCommand, DeltaCommand, GlutenDeleteCommand, OptimizeTableCommand, UpdateCommand}
2425
import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
2526
import org.apache.spark.sql.delta.sources.DeltaDataSource
2627
import org.apache.spark.sql.execution.SparkPlan
@@ -35,6 +36,9 @@ case class OffloadDeltaCommand() extends OffloadSingleNode with DeltaCommand {
3536
plan match {
3637
case ExecutedCommandExec(uc: UpdateCommand) =>
3738
ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(uc))
39+
case ExecutedCommandExec(dc: DeleteCommand)
40+
if GlutenDeleteCommand.shouldOffload(dc, SparkSession.active) =>
41+
ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(GlutenDeleteCommand(dc)))
3842
case ExecutedCommandExec(dc: DeleteCommand) =>
3943
ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc))
4044
case ExecutedCommandExec(optimize: OptimizeTableCommand) if shouldOffloadOptimize(optimize) =>

backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,71 @@ class DeleteSQLWithDeletionVectorsSuite
143143
text = "SELECT key, value, 1 FROM tab",
144144
expectResult = Row(0, 3, 1) :: Nil)
145145
}
146+
147+
test("repeated DELETE produces, updates, and removes persistent deletion vectors") {
148+
withTempDir {
149+
dir =>
150+
val path = dir.getCanonicalPath
151+
spark.range(0, 10, 1, numPartitions = 1).toDF("id").write.format("delta").save(path)
152+
val log = DeltaLog.forTable(spark, path)
153+
154+
def assertRows(expected: Long*): Unit = {
155+
checkAnswer(
156+
sql(s"SELECT id FROM delta.`$path` ORDER BY id"),
157+
expected.map(id => Row(id)))
158+
}
159+
160+
def assertActiveDeletionVectors(expectedFiles: Int, expectedCardinality: Long): Unit = {
161+
val filesWithDVs = getFilesWithDeletionVectors(log)
162+
assert(filesWithDVs.size === expectedFiles)
163+
assert(filesWithDVs.map(_.deletionVector.cardinality).sum === expectedCardinality)
164+
}
165+
166+
def assertDeleteMetrics(expected: (String, Long)*): Unit = {
167+
val metrics = io.delta.tables.DeltaTable
168+
.forPath(path)
169+
.history()
170+
.select("operationMetrics")
171+
.take(1)
172+
.head
173+
.getMap(0)
174+
.asInstanceOf[Map[String, String]]
175+
.map { case (key, value) => key -> value.toLong }
176+
expected.foreach {
177+
case (key, value) =>
178+
assert(metrics.getOrElse(key, -1L) === value, s"Unexpected metric $key: $metrics")
179+
}
180+
}
181+
182+
executeDelete(s"delta.`$path`", "id % 3 = 0")
183+
assertRows(1, 2, 4, 5, 7, 8)
184+
assertActiveDeletionVectors(expectedFiles = 1, expectedCardinality = 4)
185+
assertDeleteMetrics(
186+
"numDeletedRows" -> 4L,
187+
"numDeletionVectorsAdded" -> 1L,
188+
"numDeletionVectorsUpdated" -> 0L,
189+
"numDeletionVectorsRemoved" -> 0L)
190+
191+
executeDelete(s"delta.`$path`", "id IN (4, 5, 7)")
192+
assertRows(1, 2, 8)
193+
assertActiveDeletionVectors(expectedFiles = 1, expectedCardinality = 7)
194+
assertDeleteMetrics(
195+
"numDeletedRows" -> 3L,
196+
"numDeletionVectorsAdded" -> 0L,
197+
"numDeletionVectorsUpdated" -> 1L,
198+
"numDeletionVectorsRemoved" -> 0L)
199+
200+
executeDelete(s"delta.`$path`", "id IN (1, 2, 8)")
201+
assertRows()
202+
assertActiveDeletionVectors(expectedFiles = 0, expectedCardinality = 0)
203+
assertDeleteMetrics(
204+
"numDeletedRows" -> 3L,
205+
"numRemovedFiles" -> 1L,
206+
"numDeletionVectorsAdded" -> 0L,
207+
"numDeletionVectorsUpdated" -> 0L,
208+
"numDeletionVectorsRemoved" -> 1L)
209+
}
210+
}
146211
}
147212

148213
@ExtendedSQLTest

0 commit comments

Comments
 (0)