Skip to content

Commit e705eb8

Browse files
committed
[VL][Delta] Adapt DELETE DV helper call for Delta 4.1
1 parent bbb971f commit e705eb8

1 file changed

Lines changed: 52 additions & 3 deletions

File tree

backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/commands/GlutenDeleteCommand.scala

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
2121
import org.apache.spark.sql.catalyst.expressions.Expression
2222
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2323
import org.apache.spark.sql.delta.{DeltaLog, DeltaTableUtils, NumRecordsStats, OptimisticTransaction}
24-
import org.apache.spark.sql.delta.actions.Action
24+
import org.apache.spark.sql.delta.actions.{Action, Metadata}
2525
import org.apache.spark.sql.delta.commands.MergeIntoCommandBase.totalBytesAndDistinctPartitionValues
2626
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
2727
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2828

2929
object GlutenDeleteCommand {
30+
private val processUnmodifiedDataMethodName = "processUnmodifiedData"
31+
3032
def apply(delegate: DeleteCommand): GlutenDeleteCommand =
3133
new GlutenDeleteCommand(
3234
delegate.deltaLog,
@@ -51,6 +53,53 @@ object GlutenDeleteCommand {
5153
dataPredicates.nonEmpty && DeletionVectorUtils.deletionVectorsWritable(snapshot)
5254
}
5355
}
56+
57+
private def processUnmodifiedData(
58+
sparkSession: SparkSession,
59+
touchedFiles: Seq[TouchedFileWithDV],
60+
txn: OptimisticTransaction): (Seq[Action], Map[String, Long]) = {
61+
val helper = DMLWithDeletionVectorsHelper
62+
val method = helper.getClass.getMethods.find {
63+
method => method.getName == processUnmodifiedDataMethodName && method.getParameterCount == 4
64+
}.getOrElse {
65+
helper.getClass.getMethods.find {
66+
method => method.getName == processUnmodifiedDataMethodName && method.getParameterCount == 3
67+
}.getOrElse {
68+
throw new IllegalStateException(
69+
s"Unable to find $processUnmodifiedDataMethodName on ${helper.getClass.getName}")
70+
}
71+
}
72+
73+
val result =
74+
if (method.getParameterCount == 4) {
75+
method.invoke(
76+
helper,
77+
sparkSession,
78+
touchedFiles,
79+
txn.snapshot,
80+
Int.box(dataSkippingStringPrefixLength(sparkSession, txn.metadata)))
81+
} else {
82+
method.invoke(
83+
helper,
84+
sparkSession,
85+
touchedFiles,
86+
txn.snapshot)
87+
}
88+
89+
result.asInstanceOf[(Seq[Action], Map[String, Long])]
90+
}
91+
92+
private def dataSkippingStringPrefixLength(
93+
sparkSession: SparkSession,
94+
metadata: Metadata): Int = {
95+
val statsCollectionUtilsClass =
96+
Class.forName("org.apache.spark.sql.delta.stats.StatsCollectionUtils$")
97+
val statsCollectionUtils = statsCollectionUtilsClass.getField("MODULE$").get(null)
98+
statsCollectionUtilsClass
99+
.getMethod("getDataSkippingStringPrefixLength", classOf[SparkSession], classOf[Metadata])
100+
.invoke(statsCollectionUtils, sparkSession, metadata)
101+
.asInstanceOf[Int]
102+
}
54103
}
55104

56105
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
@@ -142,10 +191,10 @@ class GlutenDeleteCommand(
142191
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
143192
val deleteActions =
144193
if (touchedFiles.nonEmpty) {
145-
val (actions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
194+
val (actions, metricMap) = GlutenDeleteCommand.processUnmodifiedData(
146195
sparkSession,
147196
touchedFiles,
148-
txn.snapshot)
197+
txn)
149198
metrics("numDeletedRows").set(metricMap("numModifiedRows"))
150199
numDeletedRows = Some(metricMap("numModifiedRows"))
151200
numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded")

0 commit comments

Comments
 (0)