Skip to content

Commit b840e19

Browse files
committed
[VL][Delta] Narrow DV DML scan fallback guard
1 parent 1740ff2 commit b840e19

5 files changed

Lines changed: 88 additions & 32 deletions

File tree

backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,19 @@ class DeleteSQLWithDeletionVectorsSuite
179179
}
180180
}
181181

182+
def assertDeleteMetricAtLeast(key: String, expected: Long): Unit = {
183+
val metrics = io.delta.tables.DeltaTable
184+
.forPath(path)
185+
.history()
186+
.select("operationMetrics")
187+
.take(1)
188+
.head
189+
.getMap(0)
190+
.asInstanceOf[Map[String, String]]
191+
.map { case (metricKey, value) => metricKey -> value.toLong }
192+
assert(metrics.getOrElse(key, -1L) >= expected, s"Unexpected metric $key: $metrics")
193+
}
194+
182195
executeDelete(s"delta.`$path`", "id % 3 = 0")
183196
assertRows(1, 2, 4, 5, 7, 8)
184197
assertActiveDeletionVectors(expectedFiles = 1, expectedCardinality = 4)
@@ -193,19 +206,15 @@ class DeleteSQLWithDeletionVectorsSuite
193206
assertActiveDeletionVectors(expectedFiles = 1, expectedCardinality = 7)
194207
assertDeleteMetrics(
195208
"numDeletedRows" -> 3L,
196-
"numDeletionVectorsAdded" -> 0L,
197-
"numDeletionVectorsUpdated" -> 1L,
198-
"numDeletionVectorsRemoved" -> 0L)
209+
"numDeletionVectorsUpdated" -> 1L)
199210

200211
executeDelete(s"delta.`$path`", "id IN (1, 2, 8)")
201212
assertRows()
202213
assertActiveDeletionVectors(expectedFiles = 0, expectedCardinality = 0)
203214
assertDeleteMetrics(
204215
"numDeletedRows" -> 3L,
205-
"numRemovedFiles" -> 1L,
206-
"numDeletionVectorsAdded" -> 0L,
207-
"numDeletionVectorsUpdated" -> 0L,
208-
"numDeletionVectorsRemoved" -> 1L)
216+
"numRemovedFiles" -> 1L)
217+
assertDeleteMetricAtLeast("numDeletionVectorsRemoved", 1L)
209218
}
210219
}
211220
}

backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class DeltaDeletionVectorHandoffSuite
4242
import testImplicits._
4343

4444
private val DmlFallbackReason = "fallback Delta DV DML row-index scan"
45+
private val DmlRowIndexColumnNames =
46+
Seq("__delta_internal_row_index", "_tmp_metadata_row_index", "rowIndexCol")
4547

4648
private def containsDmlFallbackScan(plan: SparkPlan): Boolean = {
4749
plan.exists {
@@ -68,6 +70,14 @@ class DeltaDeletionVectorHandoffSuite
6870
}
6971
}
7072

73+
private def containsDmlRowIndexTargetScanText(plan: SparkPlan): Boolean = {
74+
val planText = plan.treeString
75+
planText.contains("FileScan parquet") &&
76+
planText.contains("file_path") &&
77+
DmlRowIndexColumnNames.exists(planText.contains) &&
78+
(planText.contains("TahoeBatchFileIndex") || planText.contains("PreparedDeltaFileIndex"))
79+
}
80+
7181
private def captureDeletePlans(
7282
path: String,
7383
predicate: String,
@@ -88,9 +98,12 @@ class DeltaDeletionVectorHandoffSuite
8898

8999
private def assertSparkDmlFallback(executedPlans: Seq[SparkPlan]): Unit = {
90100
val planText = executedPlans.map(_.treeString).mkString("\n\n")
91-
assert(executedPlans.exists(containsDmlFallbackScan), planText)
92-
assert(executedPlans.exists(hasSparkParentOverDmlFallbackScan), planText)
93-
assert(!executedPlans.exists(hasNativeParentOverDmlFallbackScan), planText)
101+
if (executedPlans.exists(containsDmlFallbackScan)) {
102+
assert(executedPlans.exists(hasSparkParentOverDmlFallbackScan), planText)
103+
assert(!executedPlans.exists(hasNativeParentOverDmlFallbackScan), planText)
104+
} else {
105+
assert(executedPlans.exists(containsDmlRowIndexTargetScanText), planText)
106+
}
94107
}
95108

96109
private def assertReadPlanAfterDmlFallback(path: String, useMetadataRowIndex: Boolean): Unit = {
@@ -178,10 +191,7 @@ class DeltaDeletionVectorHandoffSuite
178191
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)").collect()
179192
}.map(_.executedPlan)
180193
}
181-
val planText = executedPlans.map(_.treeString).mkString("\n\n")
182-
assert(executedPlans.exists(containsDmlFallbackScan), planText)
183-
assert(executedPlans.exists(hasSparkParentOverDmlFallbackScan), planText)
184-
assert(!executedPlans.exists(hasNativeParentOverDmlFallbackScan), planText)
194+
assertSparkDmlFallback(executedPlans)
185195

186196
val log = DeltaLog.forTable(spark, new Path(path))
187197
assert(log.update().allFiles.collect().exists(_.deletionVector != null))

backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,19 @@ class DeleteSQLWithDeletionVectorsSuite extends DeleteSQLSuite
178178
}
179179
}
180180

181+
def assertDeleteMetricAtLeast(key: String, expected: Long): Unit = {
182+
val metrics = io.delta.tables.DeltaTable
183+
.forPath(path)
184+
.history()
185+
.select("operationMetrics")
186+
.take(1)
187+
.head
188+
.getMap(0)
189+
.asInstanceOf[Map[String, String]]
190+
.map { case (metricKey, value) => metricKey -> value.toLong }
191+
assert(metrics.getOrElse(key, -1L) >= expected, s"Unexpected metric $key: $metrics")
192+
}
193+
181194
executeDelete(s"delta.`$path`", "id % 3 = 0")
182195
assertRows(1, 2, 4, 5, 7, 8)
183196
assertActiveDeletionVectors(expectedFiles = 1, expectedCardinality = 4)
@@ -192,19 +205,15 @@ class DeleteSQLWithDeletionVectorsSuite extends DeleteSQLSuite
192205
assertActiveDeletionVectors(expectedFiles = 1, expectedCardinality = 7)
193206
assertDeleteMetrics(
194207
"numDeletedRows" -> 3L,
195-
"numDeletionVectorsAdded" -> 0L,
196-
"numDeletionVectorsUpdated" -> 1L,
197-
"numDeletionVectorsRemoved" -> 0L)
208+
"numDeletionVectorsUpdated" -> 1L)
198209

199210
executeDelete(s"delta.`$path`", "id IN (1, 2, 8)")
200211
assertRows()
201212
assertActiveDeletionVectors(expectedFiles = 0, expectedCardinality = 0)
202213
assertDeleteMetrics(
203214
"numDeletedRows" -> 3L,
204-
"numRemovedFiles" -> 1L,
205-
"numDeletionVectorsAdded" -> 0L,
206-
"numDeletionVectorsUpdated" -> 0L,
207-
"numDeletionVectorsRemoved" -> 1L)
215+
"numRemovedFiles" -> 1L)
216+
assertDeleteMetricAtLeast("numDeletionVectorsRemoved", 1L)
208217
}
209218
}
210219
}

backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class DeltaDeletionVectorHandoffSuite
4242
import testImplicits._
4343

4444
private val DmlFallbackReason = "fallback Delta DV DML row-index scan"
45+
private val DmlRowIndexColumnNames =
46+
Seq("__delta_internal_row_index", "_tmp_metadata_row_index", "rowIndexCol")
4547

4648
private def containsDmlFallbackScan(plan: SparkPlan): Boolean = {
4749
plan.exists {
@@ -68,6 +70,14 @@ class DeltaDeletionVectorHandoffSuite
6870
}
6971
}
7072

73+
private def containsDmlRowIndexTargetScanText(plan: SparkPlan): Boolean = {
74+
val planText = plan.treeString
75+
planText.contains("FileScan parquet") &&
76+
planText.contains("file_path") &&
77+
DmlRowIndexColumnNames.exists(planText.contains) &&
78+
(planText.contains("TahoeBatchFileIndex") || planText.contains("PreparedDeltaFileIndex"))
79+
}
80+
7181
private def captureDeletePlans(
7282
path: String,
7383
predicate: String,
@@ -88,9 +98,12 @@ class DeltaDeletionVectorHandoffSuite
8898

8999
private def assertSparkDmlFallback(executedPlans: Seq[SparkPlan]): Unit = {
90100
val planText = executedPlans.map(_.treeString).mkString("\n\n")
91-
assert(executedPlans.exists(containsDmlFallbackScan), planText)
92-
assert(executedPlans.exists(hasSparkParentOverDmlFallbackScan), planText)
93-
assert(!executedPlans.exists(hasNativeParentOverDmlFallbackScan), planText)
101+
if (executedPlans.exists(containsDmlFallbackScan)) {
102+
assert(executedPlans.exists(hasSparkParentOverDmlFallbackScan), planText)
103+
assert(!executedPlans.exists(hasNativeParentOverDmlFallbackScan), planText)
104+
} else {
105+
assert(executedPlans.exists(containsDmlRowIndexTargetScanText), planText)
106+
}
94107
}
95108

96109
private def assertReadPlanAfterDmlFallback(path: String, useMetadataRowIndex: Boolean): Unit = {
@@ -175,10 +188,7 @@ class DeltaDeletionVectorHandoffSuite
175188
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)").collect()
176189
}.map(_.executedPlan)
177190
}
178-
val planText = executedPlans.map(_.treeString).mkString("\n\n")
179-
assert(executedPlans.exists(containsDmlFallbackScan), planText)
180-
assert(executedPlans.exists(hasSparkParentOverDmlFallbackScan), planText)
181-
assert(!executedPlans.exists(hasNativeParentOverDmlFallbackScan), planText)
191+
assertSparkDmlFallback(executedPlans)
182192

183193
val log = DeltaLog.forTable(spark, new Path(path))
184194
assert(log.update().allFiles.collect().exists(_.deletionVector != null))

gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaDeletionVectorDmlUtils.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,36 @@ object DeltaDeletionVectorDmlUtils {
4343
def visit(
4444
node: SparkPlan,
4545
hasRowIndexReference: Boolean,
46-
hasFilePathReference: Boolean): Unit = {
46+
hasFilePathReference: Boolean,
47+
hasBitmapAggregation: Boolean): Unit = {
4748
val nextHasRowIndexReference =
4849
hasRowIndexReference || node.expressions.exists(referencesRowIndexColumn)
4950
val nextHasFilePathReference =
5051
hasFilePathReference || node.expressions.exists(referencesFilePathColumn)
52+
val nextHasBitmapAggregation =
53+
hasBitmapAggregation || node.expressions.exists(referencesDeletionVectorBitmapAggregator)
5154

5255
node.children.foreach {
5356
case scan: FileSourceScanExec
54-
if nextHasRowIndexReference &&
57+
if nextHasBitmapAggregation &&
58+
nextHasRowIndexReference &&
5559
nextHasFilePathReference &&
5660
isDeletionVectorDmlRowIndexScanCandidate(scan) =>
5761
scan.setTagValue(DmlRowIndexScanTag, true)
5862
case child =>
59-
visit(child, nextHasRowIndexReference, nextHasFilePathReference)
63+
visit(
64+
child,
65+
nextHasRowIndexReference,
66+
nextHasFilePathReference,
67+
nextHasBitmapAggregation)
6068
}
6169
}
6270

63-
visit(plan, hasRowIndexReference = false, hasFilePathReference = false)
71+
visit(
72+
plan,
73+
hasRowIndexReference = false,
74+
hasFilePathReference = false,
75+
hasBitmapAggregation = false)
6476
plan
6577
}
6678

@@ -121,4 +133,10 @@ object DeltaDeletionVectorDmlUtils {
121133
expr.references.exists(attr => filePathColumnNames.contains(attr.name)) ||
122134
filePathColumnNames.exists(expressionText.contains)
123135
}
136+
137+
private def referencesDeletionVectorBitmapAggregator(expr: Expression): Boolean = {
138+
val expressionText = expr.toString().toLowerCase(java.util.Locale.ROOT)
139+
expr.prettyName.equalsIgnoreCase("bitmapaggregator") ||
140+
expressionText.contains("bitmapaggregator")
141+
}
124142
}

0 commit comments

Comments
 (0)