Skip to content

Commit b0da63a

Browse files
Mohammad LinjawiMohammad Linjawi
authored andcommitted
[VL][Delta] Add JVM Delta DV scan handoff
1 parent d2b48f1 commit b0da63a

29 files changed

Lines changed: 2054 additions & 132 deletions

File tree

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.backendsapi.velox
18+
19+
import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
20+
import org.apache.gluten.sql.shims.SparkShimLoader
21+
22+
import org.apache.spark.sql.SparkSession
23+
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
24+
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap}
25+
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
26+
import org.apache.spark.sql.execution.datasources.PartitionedFile
27+
28+
import org.apache.hadoop.fs.Path
29+
30+
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
31+
32+
import scala.collection.JavaConverters._
33+
import scala.util.Try
34+
import scala.util.control.NonFatal
35+
36+
object VeloxDeltaMetadataUtils {
37+
val DeltaDvCardinality = "delta_dv_cardinality"
38+
val DeltaDvPayloadIndex = "delta_dv_payload_index"
39+
40+
private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded"
41+
private val RowIndexFilterType = "row_index_filter_type"
42+
private val RowIndexFilterTypeIfContained = "IF_CONTAINED"
43+
44+
final class NormalizedSplitMetadata(
45+
val otherMetadataColumns: JList[JMap[String, Object]],
46+
val deletionVectorPayloads: Array[Array[Byte]])
47+
extends Serializable
48+
49+
private def decodeDescriptor(
50+
normalizedMetadata: JMap[String, Object]): Option[DeletionVectorDescriptor] = {
51+
Option(normalizedMetadata.get(RowIndexFilterIdEncoded))
52+
.map(_.toString)
53+
.filter(_.nonEmpty)
54+
.flatMap(encoded => Try(DeletionVectorDescriptor.deserializeFromBase64(encoded)).toOption)
55+
}
56+
57+
private def serializePayload(
58+
dvStore: HadoopFileSystemDVStore,
59+
tablePath: Path,
60+
descriptor: DeletionVectorDescriptor): Array[Byte] = {
61+
if (tablePath == null) {
62+
throw new IllegalStateException(
63+
"Unable to resolve Delta table path while materializing deletion vector payload")
64+
}
65+
StoredBitmap
66+
.create(descriptor, tablePath)
67+
.load(dvStore)
68+
.serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
69+
}
70+
71+
private def normalizeMetadataWithDescriptor(
72+
metadata: JMap[String, Object],
73+
descriptor: DeletionVectorDescriptor): JMap[String, Object] = {
74+
val normalized = new JHashMap[String, Object]()
75+
if (metadata != null) {
76+
normalized.putAll(metadata)
77+
}
78+
normalized.put(DeltaDvCardinality, Long.box(descriptor.cardinality))
79+
normalized.remove(RowIndexFilterIdEncoded)
80+
if (!normalized.containsKey(RowIndexFilterType)) {
81+
normalized.put(RowIndexFilterType, RowIndexFilterTypeIfContained)
82+
}
83+
normalized
84+
}
85+
86+
def normalizeSplitMetadata(
87+
partitionColumnCount: Int,
88+
files: JList[PartitionedFile]): NormalizedSplitMetadata = {
89+
val dvStore = new HadoopFileSystemDVStore(activeSpark.sessionState.newHadoopConf())
90+
val normalizedMetadataColumns = new JArrayList[JMap[String, Object]](files.size())
91+
val deletionVectorPayloads = scala.collection.mutable.ArrayBuffer.empty[Array[Byte]]
92+
93+
files.asScala.foreach {
94+
file =>
95+
val otherMetadata =
96+
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
97+
val metadataWithDecodedPayload = new JHashMap[String, Object]()
98+
if (otherMetadata != null) {
99+
metadataWithDecodedPayload.putAll(otherMetadata)
100+
}
101+
102+
val descriptor = decodeDescriptor(metadataWithDecodedPayload)
103+
104+
descriptor match {
105+
case Some(descriptor) =>
106+
val normalized = normalizeMetadataWithDescriptor(metadataWithDecodedPayload, descriptor)
107+
val payloadTablePath = resolveTablePath(partitionColumnCount, file)
108+
val serializedPayload = serializePayload(dvStore, payloadTablePath, descriptor)
109+
normalized.put(DeltaDvPayloadIndex, Int.box(deletionVectorPayloads.length))
110+
deletionVectorPayloads += serializedPayload
111+
normalizedMetadataColumns.add(normalized)
112+
case None =>
113+
normalizedMetadataColumns.add(metadataWithDecodedPayload)
114+
}
115+
}
116+
117+
new NormalizedSplitMetadata(normalizedMetadataColumns, deletionVectorPayloads.toArray)
118+
}
119+
120+
private def activeSpark: SparkSession = {
121+
SparkSession.getActiveSession
122+
.orElse(SparkSession.getDefaultSession)
123+
.getOrElse {
124+
throw new IllegalStateException(
125+
"Active SparkSession is required to materialize Delta deletion vectors")
126+
}
127+
}
128+
129+
private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = {
130+
val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent
131+
var tablePath = fileParent
132+
for (_ <- 0 until partitionColumnCount) {
133+
tablePath = tablePath.getParent
134+
}
135+
val spark = activeSpark
136+
if (tablePath != null && isDeltaTablePath(spark, tablePath)) {
137+
return tablePath
138+
}
139+
140+
// Spark can report a partition column count that does not map 1:1 to path depth for
141+
// prepared Delta scans. Find the nearest ancestor of the file path that has _delta_log.
142+
var candidate = fileParent
143+
while (candidate != null && !isDeltaTablePath(spark, candidate)) {
144+
candidate = candidate.getParent
145+
}
146+
if (candidate != null) candidate else tablePath
147+
}
148+
149+
private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = {
150+
val deltaLogPath = new Path(tablePath, "_delta_log")
151+
try {
152+
deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath)
153+
} catch {
154+
case NonFatal(_) => false
155+
}
156+
}
157+
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
21+
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
22+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
23+
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
24+
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
25+
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
26+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
27+
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
28+
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
29+
import org.apache.spark.sql.execution.datasources.LogicalRelation
30+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
31+
import org.apache.spark.sql.types.StructType
32+
33+
/**
34+
* Rewrites Delta scans over DV-enabled tables to request the backend-specific skip-row metadata
35+
* column only when the snapshot actually contains DVs.
36+
*/
37+
trait PreprocessTableWithDVs extends SubqueryTransformerHelper {
38+
def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = {
39+
transformWithSubqueries(plan) { case ScanWithDeletionVectors(dvScan) => dvScan }
40+
}
41+
}
42+
43+
object ScanWithDeletionVectors {
44+
def unapply(a: LogicalRelation): Option[LogicalPlan] = a match {
45+
case scan @ LogicalRelation(
46+
relation @ HadoopFsRelation(
47+
index: TahoeFileIndex,
48+
_,
49+
_,
50+
_,
51+
format: DeltaParquetFileFormat,
52+
_),
53+
_,
54+
_,
55+
_) =>
56+
dvEnabledScanFor(scan, relation, format, index)
57+
case scan @ LogicalRelation(
58+
relation @ HadoopFsRelation(
59+
index: TahoeFileIndex,
60+
_,
61+
_,
62+
_,
63+
format: GlutenDeltaParquetFileFormat,
64+
_),
65+
_,
66+
_,
67+
_) =>
68+
dvEnabledScanFor(scan, relation, format, index)
69+
case _ => None
70+
}
71+
72+
def dvEnabledScanFor(
73+
scan: LogicalRelation,
74+
hadoopRelation: HadoopFsRelation,
75+
fileFormat: DeltaParquetFileFormat,
76+
index: TahoeFileIndex): Option[LogicalPlan] = {
77+
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
78+
return None
79+
}
80+
81+
require(
82+
!index.isInstanceOf[TahoeLogFileIndex],
83+
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")
84+
85+
if (fileFormat.hasTablePath) {
86+
return None
87+
}
88+
89+
val filesWithDVs = index
90+
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
91+
.filter(_.deletionVector != null)
92+
if (filesWithDVs.isEmpty) {
93+
return None
94+
}
95+
96+
val planOutput = scan.output
97+
val spark = SparkSession.getActiveSession.get
98+
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
99+
val rowIndexFilter = createRowIndexFilterNode(newScan)
100+
Some(Project(planOutput, rowIndexFilter))
101+
}
102+
103+
def dvEnabledScanFor(
104+
scan: LogicalRelation,
105+
hadoopRelation: HadoopFsRelation,
106+
fileFormat: GlutenDeltaParquetFileFormat,
107+
index: TahoeFileIndex): Option[LogicalPlan] = {
108+
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
109+
return None
110+
}
111+
112+
require(
113+
!index.isInstanceOf[TahoeLogFileIndex],
114+
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")
115+
116+
if (fileFormat.hasTablePath) {
117+
return None
118+
}
119+
120+
val filesWithDVs = index
121+
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
122+
.filter(_.deletionVector != null)
123+
if (filesWithDVs.isEmpty) {
124+
return None
125+
}
126+
127+
val planOutput = scan.output
128+
val spark = SparkSession.getActiveSession.get
129+
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
130+
val rowIndexFilter = createRowIndexFilterNode(newScan)
131+
Some(Project(planOutput, rowIndexFilter))
132+
}
133+
134+
private def addRowIndexIfMissing(attribute: AttributeReference): AttributeReference = {
135+
require(attribute.name == METADATA_NAME)
136+
137+
val dataType = attribute.dataType.asInstanceOf[StructType]
138+
if (dataType.fieldNames.contains(ParquetFileFormat.ROW_INDEX)) {
139+
return attribute
140+
}
141+
142+
val newDatatype = dataType.add(ParquetFileFormat.ROW_INDEX_FIELD)
143+
attribute.copy(dataType = newDatatype)(
144+
exprId = attribute.exprId,
145+
qualifier = attribute.qualifier)
146+
}
147+
148+
private def createScanWithSkipRowColumn(
149+
spark: SparkSession,
150+
inputScan: LogicalRelation,
151+
fileFormat: DeltaParquetFileFormat,
152+
tahoeFileIndex: TahoeFileIndex,
153+
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
154+
val useMetadataRowIndex =
155+
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
156+
157+
val skipRowField = IS_ROW_DELETED_STRUCT_FIELD
158+
val scanOutputWithMetadata = if (useMetadataRowIndex) {
159+
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
160+
inputScan.output.collect {
161+
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
162+
case o => o
163+
}
164+
} else {
165+
inputScan.output :+ fileFormat.createFileMetadataCol()
166+
}
167+
} else {
168+
inputScan.output
169+
}
170+
171+
val newScanOutput =
172+
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
173+
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
174+
val newFileFormat = fileFormat.copyWithDVInfo(
175+
tablePath = tahoeFileIndex.path.toString,
176+
optimizationsEnabled = useMetadataRowIndex)
177+
178+
val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
179+
hadoopFsRelation.sparkSession)
180+
181+
inputScan.copy(relation = newRelation, output = newScanOutput)
182+
}
183+
184+
private def createScanWithSkipRowColumn(
185+
spark: SparkSession,
186+
inputScan: LogicalRelation,
187+
fileFormat: GlutenDeltaParquetFileFormat,
188+
tahoeFileIndex: TahoeFileIndex,
189+
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
190+
val useMetadataRowIndex =
191+
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
192+
193+
val skipRowField = GlutenDeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD
194+
val scanOutputWithMetadata = if (useMetadataRowIndex) {
195+
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
196+
inputScan.output.collect {
197+
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
198+
case o => o
199+
}
200+
} else {
201+
inputScan.output :+ fileFormat.createFileMetadataCol()
202+
}
203+
} else {
204+
inputScan.output
205+
}
206+
207+
val newScanOutput =
208+
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
209+
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
210+
val newFileFormat = fileFormat.copyWithDVInfo(
211+
tablePath = tahoeFileIndex.path.toString,
212+
optimizationsEnabled = useMetadataRowIndex)
213+
214+
val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
215+
hadoopFsRelation.sparkSession)
216+
217+
inputScan.copy(relation = newRelation, output = newScanOutput)
218+
}
219+
220+
private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = {
221+
val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME)
222+
require(
223+
skipRowColumnRefs.size == 1,
224+
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
225+
val skipRowColumnRef = skipRowColumnRefs.head
226+
Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan)
227+
}
228+
}

0 commit comments

Comments
 (0)